-
Notifications
You must be signed in to change notification settings - Fork 3.7k
feat: support consuming records from Kafka #17691
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
2023ac2
to
38c5c4f
Compare
38c5c4f
to
bc22aaf
Compare
9447271
to
db69b8e
Compare
de57183
to
75b6dc4
Compare
pkg/limits/partition_lifecycler.go
Outdated
if err := l.checkOffsets(ctx, partition); err != nil { | ||
level.Error(l.logger).Log( | ||
"msg", "failed to check offsets, partition is ready", | ||
"partition", partition, | ||
"err", err, | ||
) | ||
l.partitionManager.SetReady(partition) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW the method checkOffsets
returns an error on three cases:
- On failing to fetch the start offset
- On failing to fetch the last produced offset
- On failing to fetch the next offset
All three fetch calls return an specific offset that is used for one of the following checks:
- Knowing the start and the last produced offset we can check if the partition has no records.
- Knowing the next and the last produced offset we can check if the partition has any records within the window size.
Can you elaborate why it is a valid case to set the partition ready if we cannot accomplish any of the above checks?If I understand correctly if we fail any of the offset fetching, we skip the partition replay, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to block kgo.OnPartitionsAssigned
by retrying in a loop as it also blocks other callbacks (like kgo.OnPartitionsRevoked
). The idea here was that if we can't figure out the state of a partition, we must set it to ready, so it can accept writes. Otherwise we cannot guarantee that it will eventually become ready (consider the case where the partition is empty).
If I understand correctly if we fail any of the offset fetching, we skip the partition replay, right?
We don't skip partition replay. That would require us to advance the offset to the end of the partition, which we can't do, because the call to LastProducedOffset
might have failed. Replay will still happen if there are records to consume, but it means we start accepting synchronous writes before replay may have finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification. I missed the point during my initial review that PlaybackManager.run()
is another Go routine. Nevertheless I want to stress the point - not blocking though - that returning early on error fetch any of the offsets allows producing new records on the partitions. The particular case that I am interested in is the check lastProducedOffset
will move with the new records appended right? If I understand correctly this moves our sliding replay window too?
# t0: Records before replay:
| Rec 1 | Rec 2 | Rec 3 |
^
|---- Sliding Window starts here because nextoffset (= Rec 1) < lastProducedOffset (= Rec 3)
# t1: Records without replay but appending resumed
| Rec 1 | Rec 2 | Rec 3 | Rec 4 | Rec 5 | ...
^
|---- Sliding Window slightly moved and starts here with nextoffset (= Rec 2) < LastProducedoffset (= Rec 4)
Consindering the above are we going to replay only Rec 2
and Rec 3
? If yes which part below saves us from replaying Rec 4
and Rec 5
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I'm following here, we can discuss on Slack if it's easier.
The particular case that I am interested in is the check lastProducedOffset will move with the new records appended right?
If you mean lastProducedOffset
will be continuously refreshed while replaying a partition, then the answer is no, it won't move with new records appended. It is loaded once when the partition is assigned and never again.
If I've misunderstood happy to discuss on Slack and then we can summarize what we discussed here 😊
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of this code is when a partition is assigned, we need to figure out if we can accept synchronous writes for it straight away, or if there are records we need to replay first before we can accept synchronous writes. That's all it does, nothing else.
return nil | ||
} | ||
|
||
func (m *PlaybackManager) processFetchTopicPartition(ctx context.Context) func(kgo.FetchTopicPartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens on rebalance with this method? AFAIU the partition lifecycler kicks in and evicts partitions in the partition manager as well as records replayed in the usage store, right?
Is this enough or do we need BlockRebalanaceOnPoll
to revert the work done in this method and start replay anew after rebalance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race here where a record could be inserted between calling m.partitionManager.GetState
(which if the partition has been revoked causes all remaining records to be discarded with the message discarding records for partition as the partition is not assigned to this client
) and m.usage.Update
. However, I think we can solve this with a.) scheduled evictions of the usage store and b.) evicting the partition also on assignment, ensuring that no records are left if a partition is unassigned and then re-assigned in succession.
BlockRebalanaceOnPoll
has a number of disadvantages that I think we need to consider carefully before we start to use it.
875f608
to
1fb7082
Compare
0c46f60
to
442f38b
Compare
This commit supports replaying records for newly assigned partitions so an instance can rebuild it's in-memory usage data before serving queries.
442f38b
to
f6f23b4
Compare
6ce655c
to
9e7f6cd
Compare
9e7f6cd
to
3939ecc
Compare
5303e64
to
c2ed183
Compare
c2ed183
to
2538568
Compare
What this PR does / why we need it:
This pull request supports consuming records from Kafka, including:
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR