Description
When the consumer starts and there are no new messages for the group it is impossible to know current positions and to commit current offsets.
Committing offsets even if there are no new messages can be important because offsets in Kafka can expire. To prevent offsets from expiring we need to commit them periodically, even is there are no changes.
Normal commit()
doesn't help because it won't do anything and would return KafkaError.NO_OFFSETS
.
It is possible to use commit(offsets)
and give it specific offsets, but how to get them?
Currently even if the consumer gets partitions assigned, consumer.assignment()
returns an empty list when there are no new messages. I strongly suspect that even if I receive messages it will only give me partitions for the received messages, not all of them.
OK, I can get a list of assigned partitions from on_assign
callback. But then when I call consumer.position(_assigned)
it won't give me current offsets for the group, only for the partitions I received messages from.
Adding the ability to query currently assigned offsets would be very helpful.
Making commit()
committing offsets even if nothing changed would be even more helpful.