Skip to content

Extend consumer to support SASL/OAuth #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
robotframework>=3.2.1
confluent-kafka==1.5.0
confluent-kafka==1.6.1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
],
install_requires = [
'robotframework >= 3.2.1',
'confluent-kafka == 1.5.0',
'confluent-kafka == 1.6.1',
'requests >= 2.25.1',
'avro-python3 >= 1.10.1',
'fastavro >= 1.3.2',
Expand Down
85 changes: 85 additions & 0 deletions src/ConfluentKafkaLibrary/consumer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import uuid
import json
import requests
import base64
import time
import functools
import os
from threading import Thread
from confluent_kafka import Consumer, KafkaException, KafkaError, TopicPartition
from confluent_kafka import DeserializingConsumer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.avro import AvroConsumer
from requests_oauthlib import OAuth2Session
from bs4 import BeautifulSoup


class GetMessagesThread(Thread):
Expand Down Expand Up @@ -51,10 +59,59 @@ def clear_messages(self):
self.messages.clear()


class TokenProvider(object):

def __init__(self):
self.redirect_url = ""
self.clientid = ""
self.clientsecret = ""
self.tokenEndpoint = ""
self.refreshToken = ""
self.cert_file = ""

def init_token(self, auth_uri, redirect_url, user_name, user_password, verify_ssl=True):
oauth = OAuth2Session(redirect_uri=auth_uri)
response = oauth.get(url=redirect_url, verify=verify_ssl)
response_cookies = response.cookies
html_response = BeautifulSoup(response.text, "html.parser")
input_text = html_response.findAll("input", {"type": "hidden"})
token = input_text[0]["value"]
data = {"__RequestVerificationToken": token, "UserLogin": user_name, "Password": user_password}
response = requests.post(response.url, data=data, cookies=response_cookies, verify=verify_ssl)
data_response = json.loads(response.text)
self.redirect_url = redirect_url
self.clientid = data_response["clientId"]
self.clientsecret = data_response["clientSecret"]
self.tokenEndpoint = data_response["tokenEndpoint"]
self.refreshToken = data_response["refreshToken"]
self.cert_file = user_name + "cert.pem"
with open(self.cert_file, 'w') as f:
f.write(data_response["caCertificate"])

def access_token_refresh_cb(self, config):
base64_encoded_clientid_clientsecret = base64.b64encode(
str.encode(f'{self.clientid}:{self.clientsecret}'))
base64_encoded_clientid_clientsecret = base64_encoded_clientid_clientsecret.decode('ascii')

headers = {'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': f'Basic {base64_encoded_clientid_clientsecret}'}

data = {"grant_type": 'refresh_token', "redirect_uri": self.redirect_url, "refresh_token": self.refreshToken}
r = requests.post(self.tokenEndpoint, headers=headers, data=data, verify=os.path.abspath(self.cert_file))
print(r.text)
response = r.json()
"""The server may issue a new refresh token in the response, but if the response does not include
a new refresh token the client assumes the existing refresh token will still be valid"""
if 'refresh_token' in response:
self.refreshToken = response['refresh_token']
return response['access_token'], time.time() + float(response['expires_in'])


class KafkaConsumer():

def __init__(self):
self.consumers = {}
self.tokenProvider = None

def create_consumer(
self,
Expand All @@ -68,6 +125,12 @@ def create_consumer(
key_deserializer=None,
value_deserializer=None,
legacy=True,
oauth_callback=False,
auth_uri=None,
redirect_url=None,
user_name=None,
user_password=None,
verify_ssl=True,
**kwargs
):
"""Create Kafka Consumer and returns its `group_id` as string.
Expand All @@ -94,6 +157,15 @@ def create_consumer(
- ``legacy`` (bool): Activate SerializingConsumer if 'False' else
AvroConsumer (legacy) is used. Will be removed when confluent-kafka will deprecate this.
Default: `True`.
- ``oauth_callback`` (bool): If True the secure OAuth2 consumer will start.
Default: `False`.
- ``auth_uri`` (str): URL to identity server.
- ``redirect_url`` (str): URL to server/endpoint to receive a access token.
- ``user_name`` (str): User ID on identity server.
- ``user_password`` (str): User password on identity server.
- ``verify_ssl`` (optional): Either a boolean, in which case it controls whether we verify
the server's certificate, or a string, in which case it must be a path
to a CA bundle to use. Default: ``True``.

Note:
Configuration parameters are described in more detail at
Expand All @@ -120,6 +192,19 @@ def create_consumer(
'key.deserializer': key_deserializer,
'value.deserializer': value_deserializer,
**kwargs})
elif oauth_callback == True:
self.tokenProvider = TokenProvider()
self.tokenProvider.init_token(auth_uri=auth_uri, redirect_url=redirect_url, user_name=user_name, user_password=user_password, verify_ssl=verify_ssl)
consumer = Consumer({
'bootstrap.servers': '{}:{}'.format(server, port),
'group.id': group_id,
'enable.auto.commit': enable_auto_commit,
'allow.auto.create.topics': auto_create_topics,
'auto.offset.reset': auto_offset_reset,
'security.protocol': 'sasl_ssl',
'sasl.mechanisms': 'OAUTHBEARER',
'oauth_cb': functools.partial(TokenProvider.access_token_refresh_cb, self.tokenProvider),
**kwargs})
else:
consumer = Consumer({
'bootstrap.servers': '{}:{}'.format(server, port),
Expand Down
2 changes: 1 addition & 1 deletion src/ConfluentKafkaLibrary/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '1.5.0-7'
VERSION = '1.6.1'