Skip to content

KafkaConsumer and current positions #118

Closed
@AlexeyRaga

Description

@AlexeyRaga

When the consumer starts and there are no new messages for the group it is impossible to know current positions and to commit current offsets.

Committing offsets even if there are no new messages can be important because offsets in Kafka can expire. To prevent offsets from expiring we need to commit them periodically, even is there are no changes.

Normal commit() doesn't help because it won't do anything and would return KafkaError.NO_OFFSETS.

It is possible to use commit(offsets) and give it specific offsets, but how to get them?

Currently even if the consumer gets partitions assigned, consumer.assignment() returns an empty list when there are no new messages. I strongly suspect that even if I receive messages it will only give me partitions for the received messages, not all of them.

OK, I can get a list of assigned partitions from on_assign callback. But then when I call consumer.position(_assigned) it won't give me current offsets for the group, only for the partitions I received messages from.

Adding the ability to query currently assigned offsets would be very helpful.
Making commit() committing offsets even if nothing changed would be even more helpful.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions