Description
Description
The get_watermark_offsets ignores the timeout and blocks forever when the kafka broker for a selected partition is down. Once the broker is back up, the function returns. I suspect this is a bug in rdkafka, but I'm posting it here first in case it's an issue with the python bindings.
How to reproduce
Create a topic with one partition. Run the below example. Once the example has consumed a few messages, kill the broker hosting the partition. The call to get_watermark_offsets will block until the broker comes back up.
import time
import sys
import confluent_kafka
from confluent_kafka import Consumer, KafkaError
from uuid import uuid4
if __name__ == '__main__':
debug_thread = threading.Thread(target=debug_thread_func)
debug_thread.start()
client = Consumer({'bootstrap.servers': 'gateway:9092', 'group.id': str(uuid4()),
'default.topic.config': {'auto.offset.reset': 'smallest'}})
def assigned(consumer, partitions):
print("Assigned:", partitions)
client.subscribe(['ibbot'], on_assign=assigned)
while True:
msg = client.poll(timeout=1)
for partition in client.assignment():
print(client.get_watermark_offsets(partition, timeout=1))
if msg is not None:
if msg.error():
print("Error: ", msg.error())
else:
print("Data")
client.close()
Note that this example is probably dependent on a bug in rdkafka, so it may not be reproducible 100% of the time. You can also reproduce this with a multi-partition and broker setup. In this case, the function only blocks for as long as it takes for a new leader to be elected.
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): kafka-python: ('0.11.4', 721920) rdkafka: ('0.11.4', 722175) - Apache Kafka broker version: 0.11.0.2
- Client configuration:
{'bootstrap.servers': 'gateway:9092', 'group.id': str(uuid4()), 'default.topic.config': {'auto.offset.reset': 'smallest'}}
- Operating system: RHEL 7
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue