Skip to content

consume(timeout=1) takes 5 seconds #407

Open
@elbaro

Description

@elbaro

Description

consume(num_messages=100, timeout=1) takes about 5 seconds.

How to reproduce

consumer.py

import time
from confluent_kafka import Consumer

consumer = Consumer({
    'bootstrap.servers': 'localhost',
    'group.id': 'example_consumer',
    'default.topic.config': {
        'auto.offset.reset': 'smallest'
     }
})

consumer.subscribe(['test_q'])

while True:
    while True:
        print('.')
        batch = consumer.consume(num_messages=100, timeout=1)
        if len(batch)>0:
            break
    print(len(batch))
    print('---')
    time.sleep(1)

producer.py

import time
from confluent_kafka import Producer

producer = Producer({
    'bootstrap.servers': 'localhost',
    'queue.buffering.max.messages': 10000000,  # 1e7
})

i = 0

while True:
    i += 1
    print(i)

    msg = 'msg ' + str(i)
    producer.poll(0)
    producer.produce('test_q', msg.encode(), callback=None)
    time.sleep(0.1)

open 3 terminals and run

  1. python consumer.py
  2. python producer.py
  3. ./bin/kafka-console-consumer.sh --zookeeper localhost --topic test_q

You can see the immediate consumption in the terminal3.
The producer sends 10 messages/sec so I expect the consumer to return the batch of size 10.

However, terminal1 prints every 5 seconds. consumer.consume doesn't return for 5 seconds.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): ('0.11.4', 721920), ('0.11.4', 722175)
  • Apache Kafka broker version: 2.12-1.1.0
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions