From c00246738bd8000ce5b5ea1273569f5679f437c0 Mon Sep 17 00:00:00 2001 From: marcelmarcelD <84496406+marcelmarcelD@users.noreply.github.com> Date: Mon, 7 Jun 2021 14:11:37 +0200 Subject: [PATCH 1/5] add extension for start secure OAuth2 consumer --- src/ConfluentKafkaLibrary/consumer.py | 76 +++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/src/ConfluentKafkaLibrary/consumer.py b/src/ConfluentKafkaLibrary/consumer.py index 100d137..28daa6d 100644 --- a/src/ConfluentKafkaLibrary/consumer.py +++ b/src/ConfluentKafkaLibrary/consumer.py @@ -1,9 +1,17 @@ import uuid +import json +import requests +import base64 +import time +import functools +import os from threading import Thread from confluent_kafka import Consumer, KafkaException, KafkaError, TopicPartition from confluent_kafka import DeserializingConsumer from confluent_kafka.avro.serializer import SerializerError from confluent_kafka.avro import AvroConsumer +from requests_oauthlib import OAuth2Session +from bs4 import BeautifulSoup class GetMessagesThread(Thread): @@ -51,10 +59,59 @@ def clear_messages(self): self.messages.clear() +class TokenProvider(object): + + def __init__(self): + self.redirect_url = "" + self.clientid = "" + self.clientsecret = "" + self.tokenEndpoint = "" + self.refreshToken = "" + self.cert_file = "" + + def init_token(self, auth_uri, redirect_url, user_name, user_password, verify_ssl=True): + oauth = OAuth2Session(redirect_uri=auth_uri) + response = oauth.get(url=redirect_url, verify=verify_ssl) + response_cookies = response.cookies + html_response = BeautifulSoup(response.text, "html.parser") + input_text = html_response.findAll("input", {"type": "hidden"}) + token = input_text[0]["value"] + data = {"__RequestVerificationToken": token, "UserLogin": user_name, "Password": user_password} + response = requests.post(response.url, data=data, cookies=response_cookies, verify=verify_ssl) + data_response = json.loads(response.text) + self.redirect_url = redirect_url + self.clientid = data_response["clientId"] + self.clientsecret = data_response["clientSecret"] + self.tokenEndpoint = data_response["tokenEndpoint"] + self.refreshToken = data_response["refreshToken"] + self.cert_file = user_name + "cert.pem" + with open(self.cert_file, 'w') as f: + f.write(data_response["caCertificate"]) + + def access_token_refresh_cb(self, config): + base64_encoded_clientid_clientsecret = base64.b64encode( + str.encode(f'{self.clientid}:{self.clientsecret}')) + base64_encoded_clientid_clientsecret = base64_encoded_clientid_clientsecret.decode('ascii') + + headers = {'Content-Type': 'application/x-www-form-urlencoded', + 'Authorization': f'Basic {base64_encoded_clientid_clientsecret}'} + + data = {"grant_type": 'refresh_token', "redirect_uri": self.redirect_url, "refresh_token": self.refreshToken} + r = requests.post(self.tokenEndpoint, headers=headers, data=data, verify=os.path.abspath(self.cert_file)) + print(r.text) + response = r.json() + """The server may issue a new refresh token in the response, but if the response does not include + a new refresh token the client assumes the existing refresh token will still be valid""" + if 'refresh_token' in response: + self.refreshToken = response['refresh_token'] + return response['access_token'], time.time() + float(response['expires_in']) + + class KafkaConsumer(): def __init__(self): self.consumers = {} + self.tokenProvider = None def create_consumer( self, @@ -68,6 +125,12 @@ def create_consumer( key_deserializer=None, value_deserializer=None, legacy=True, + oauth_callback=False, + auth_uri=None, + redirect_url=None, + user_name=None, + user_password=None, + verify_ssl=True, **kwargs ): """Create Kafka Consumer and returns its `group_id` as string. @@ -120,6 +183,19 @@ def create_consumer( 'key.deserializer': key_deserializer, 'value.deserializer': value_deserializer, **kwargs}) + elif oauth_callback == True: + self.tokenProvider = TokenProvider() + self.tokenProvider.init_token(auth_uri=auth_uri, redirect_url=redirect_url, user_name=user_name, user_password=user_password, verify_ssl=verify_ssl) + consumer = Consumer({ + 'bootstrap.servers': '{}:{}'.format(server, port), + 'group.id': group_id, + 'enable.auto.commit': enable_auto_commit, + 'allow.auto.create.topics': auto_create_topics, + 'auto.offset.reset': auto_offset_reset, + 'security.protocol': 'sasl_ssl', + 'sasl.mechanisms': 'OAUTHBEARER', + 'oauth_cb': functools.partial(TokenProvider.access_token_refresh_cb, self.tokenProvider), + **kwargs}) else: consumer = Consumer({ 'bootstrap.servers': '{}:{}'.format(server, port), From 55d56274b142805819e3c277b098565baf0f060a Mon Sep 17 00:00:00 2001 From: marcelmarcelD <84496406+marcelmarcelD@users.noreply.github.com> Date: Mon, 7 Jun 2021 14:15:20 +0200 Subject: [PATCH 2/5] Increase version for confluent-kafka library --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index e1de4d6..f5819a9 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ ], install_requires = [ 'robotframework >= 3.2.1', - 'confluent-kafka == 1.5.0', + 'confluent-kafka == 1.6.1', 'requests >= 2.25.1', 'avro-python3 >= 1.10.1', 'fastavro >= 1.3.2', From 4c76d2eaeb9ebdcb930f069cb7f37b09f1ccb0fc Mon Sep 17 00:00:00 2001 From: marcelmarcelD <84496406+marcelmarcelD@users.noreply.github.com> Date: Mon, 7 Jun 2021 14:16:54 +0200 Subject: [PATCH 3/5] Increase version for lib confluent-kafka --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 5204b4f..075a5c7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ robotframework>=3.2.1 -confluent-kafka==1.5.0 +confluent-kafka==1.6.1 From 2e166a7ee954c5012dbed025e0ab98f7f5d3c3a5 Mon Sep 17 00:00:00 2001 From: marcelmarcelD <84496406+marcelmarcelD@users.noreply.github.com> Date: Mon, 7 Jun 2021 14:18:25 +0200 Subject: [PATCH 4/5] add new version --- src/ConfluentKafkaLibrary/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ConfluentKafkaLibrary/version.py b/src/ConfluentKafkaLibrary/version.py index ce4761d..26e0f58 100644 --- a/src/ConfluentKafkaLibrary/version.py +++ b/src/ConfluentKafkaLibrary/version.py @@ -1 +1 @@ -VERSION = '1.5.0-7' +VERSION = '1.6.1' From 00cefc72eeb424630443183108657a4781e70170 Mon Sep 17 00:00:00 2001 From: marcelmarcelD <84496406+marcelmarcelD@users.noreply.github.com> Date: Mon, 7 Jun 2021 14:29:10 +0200 Subject: [PATCH 5/5] update documentation for consumer keyword arguments --- src/ConfluentKafkaLibrary/consumer.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/ConfluentKafkaLibrary/consumer.py b/src/ConfluentKafkaLibrary/consumer.py index 28daa6d..f6508da 100644 --- a/src/ConfluentKafkaLibrary/consumer.py +++ b/src/ConfluentKafkaLibrary/consumer.py @@ -157,6 +157,15 @@ def create_consumer( - ``legacy`` (bool): Activate SerializingConsumer if 'False' else AvroConsumer (legacy) is used. Will be removed when confluent-kafka will deprecate this. Default: `True`. + - ``oauth_callback`` (bool): If True the secure OAuth2 consumer will start. + Default: `False`. + - ``auth_uri`` (str): URL to identity server. + - ``redirect_url`` (str): URL to server/endpoint to receive a access token. + - ``user_name`` (str): User ID on identity server. + - ``user_password`` (str): User password on identity server. + - ``verify_ssl`` (optional): Either a boolean, in which case it controls whether we verify + the server's certificate, or a string, in which case it must be a path + to a CA bundle to use. Default: ``True``. Note: Configuration parameters are described in more detail at