Open
Description
Description
consume(num_messages=100, timeout=1)
takes about 5 seconds.
How to reproduce
consumer.py
import time
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost',
'group.id': 'example_consumer',
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
})
consumer.subscribe(['test_q'])
while True:
while True:
print('.')
batch = consumer.consume(num_messages=100, timeout=1)
if len(batch)>0:
break
print(len(batch))
print('---')
time.sleep(1)
producer.py
import time
from confluent_kafka import Producer
producer = Producer({
'bootstrap.servers': 'localhost',
'queue.buffering.max.messages': 10000000, # 1e7
})
i = 0
while True:
i += 1
print(i)
msg = 'msg ' + str(i)
producer.poll(0)
producer.produce('test_q', msg.encode(), callback=None)
time.sleep(0.1)
open 3 terminals and run
python consumer.py
python producer.py
./bin/kafka-console-consumer.sh --zookeeper localhost --topic test_q
You can see the immediate consumption in the terminal3.
The producer sends 10 messages/sec so I expect the consumer to return the batch of size 10.
However, terminal1 prints every 5 seconds. consumer.consume doesn't return for 5 seconds.
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): ('0.11.4', 721920), ('0.11.4', 722175) - Apache Kafka broker version: 2.12-1.1.0
- Client configuration:
{...}
- Operating system:
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue