Description
Description
I am using confluent-kafka-python 1.3. when the stats_cb enabled, but the consumer stops consuming messages. there may be memory leak.
Please refer the following section codes.
I fount that when one message causes handler_fn(batch) to raise exception, the offsets in the batch cannot be committed. The batch will store 500 messages but cannot commit the offsets. The consumer will be stuck here. It cannot consume and commit messages. And the buffer queue should be full soon. The consumer finally did not poll any messages because batch if full. And the memory usage should be bounded by partition number * 1 M, which is set by "queued.max.messages.kbytes". The memory usage should be bounded.
But in fact, I found the memory usage increased up to 100%.
I have no idea why. I guess because consumer stops consuming or polling new messages and the queue may be full. And stats is queued up and is not dispatched finally.
Any ideas welcomed. Thanks
How to reproduce
Config:
config = {
"client.id": socket.gethostname()
"bootstrap.servers": kafka_hosts,
"group.id": group_id,
"enable.auto.commit": False,
"auto.offset.reset": "latest" if reset_to_latest else "earliest",
"statistics.interval.ms": 15000,
"stats_cb": simple_print_function,
"queued.max.messages.kbytes": 1024,
}
rdkafka_config={
'queued.max.messages.kbytes': 1024,
'debug': 'all',
'queued.min.messages': 10,
'queued.min.messages': 20,
'fetch.message.max.bytes': 1024*512,
},
Consumer codes:
def run(self):
batch = []
while not self.stopped:
...
try:
events = self.consumer.consume(
num_messages = 500 - len(batch),
timeout = max(0, ...),
)
except Exception:
continue
messages = [event for event in events if not event.error()]
batch += messages
if not batch:
continue
try:
handler_fn(batch)
except Exception:
continue
# Successful handler, clear batch and send a commit
batch = []
self.commit(...)
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): 1.3 - Apache Kafka broker version:
- Client configuration:
{...}
- Operating system: ubuntu
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue