Skip to content

feat: support consuming records from Kafka #17691

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

grobinson-grafana
Copy link
Contributor

@grobinson-grafana grobinson-grafana commented May 13, 2025

What this PR does / why we need it:

This pull request supports consuming records from Kafka, including:

  1. Replaying newly assigned partitions so an instance can rebuild it's in-memory state before serving queries
  2. Consuming records from other zones to synchronize state

Which issue(s) this PR fixes:
Fixes #

Special notes for your reviewer:

Checklist

  • Reviewed the CONTRIBUTING.md guide (required)
  • Documentation added
  • Tests updated
  • Title matches the required conventional commits format, see here
    • Note that Promtail is considered to be feature complete, and future development for logs collection will be in Grafana Alloy. As such, feat PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
  • Changes that require user attention or interaction to upgrade are documented in docs/sources/setup/upgrade/_index.md
  • If the change is deprecating or removing a configuration option, update the deprecated-config.yaml and deleted-config.yaml files respectively in the tools/deprecated-config-checker directory. Example PR

@grobinson-grafana grobinson-grafana requested a review from a team as a code owner May 13, 2025 13:40
@grobinson-grafana grobinson-grafana force-pushed the grobinson/add-state-to-partitions branch 4 times, most recently from 2023ac2 to 38c5c4f Compare May 14, 2025 17:25
@grobinson-grafana grobinson-grafana force-pushed the grobinson/add-state-to-partitions branch from 38c5c4f to bc22aaf Compare May 14, 2025 22:56
@grobinson-grafana grobinson-grafana changed the title feat: add state to PartitionManager feat: Support replaying via Kafka May 14, 2025
@grobinson-grafana grobinson-grafana force-pushed the grobinson/add-state-to-partitions branch 12 times, most recently from 9447271 to db69b8e Compare May 15, 2025 08:06
@grobinson-grafana grobinson-grafana changed the title feat: Support replaying via Kafka feat: support replaying records for newly assigned partitions May 15, 2025
@grobinson-grafana grobinson-grafana force-pushed the grobinson/add-state-to-partitions branch 4 times, most recently from de57183 to 75b6dc4 Compare May 15, 2025 08:43
Comment on lines 46 to 55
if err := l.checkOffsets(ctx, partition); err != nil {
level.Error(l.logger).Log(
"msg", "failed to check offsets, partition is ready",
"partition", partition,
"err", err,
)
l.partitionManager.SetReady(partition)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW the method checkOffsets returns an error on three cases:

  • On failing to fetch the start offset
  • On failing to fetch the last produced offset
  • On failing to fetch the next offset

All three fetch calls return an specific offset that is used for one of the following checks:

  • Knowing the start and the last produced offset we can check if the partition has no records.
  • Knowing the next and the last produced offset we can check if the partition has any records within the window size.

Can you elaborate why it is a valid case to set the partition ready if we cannot accomplish any of the above checks?If I understand correctly if we fail any of the offset fetching, we skip the partition replay, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to block kgo.OnPartitionsAssigned by retrying in a loop as it also blocks other callbacks (like kgo.OnPartitionsRevoked). The idea here was that if we can't figure out the state of a partition, we must set it to ready, so it can accept writes. Otherwise we cannot guarantee that it will eventually become ready (consider the case where the partition is empty).

If I understand correctly if we fail any of the offset fetching, we skip the partition replay, right?

We don't skip partition replay. That would require us to advance the offset to the end of the partition, which we can't do, because the call to LastProducedOffset might have failed. Replay will still happen if there are records to consume, but it means we start accepting synchronous writes before replay may have finished.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. I missed the point during my initial review that PlaybackManager.run() is another Go routine. Nevertheless I want to stress the point - not blocking though - that returning early on error fetch any of the offsets allows producing new records on the partitions. The particular case that I am interested in is the check lastProducedOffset will move with the new records appended right? If I understand correctly this moves our sliding replay window too?

# t0: Records before replay:
| Rec 1 | Rec 2 | Rec 3 |
^
|---- Sliding Window starts here because nextoffset (= Rec 1) < lastProducedOffset (= Rec 3)


# t1: Records without replay but appending resumed
| Rec 1 | Rec 2 | Rec 3 | Rec 4 | Rec 5 | ...
        ^
        |---- Sliding Window slightly moved and starts here with nextoffset (= Rec 2) < LastProducedoffset (= Rec 4)

Consindering the above are we going to replay only Rec 2 and Rec 3 ? If yes which part below saves us from replaying Rec 4 and Rec 5 too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'm following here, we can discuss on Slack if it's easier.

The particular case that I am interested in is the check lastProducedOffset will move with the new records appended right?

If you mean lastProducedOffset will be continuously refreshed while replaying a partition, then the answer is no, it won't move with new records appended. It is loaded once when the partition is assigned and never again.

If I've misunderstood happy to discuss on Slack and then we can summarize what we discussed here 😊

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this code is when a partition is assigned, we need to figure out if we can accept synchronous writes for it straight away, or if there are records we need to replay first before we can accept synchronous writes. That's all it does, nothing else.

return nil
}

func (m *PlaybackManager) processFetchTopicPartition(ctx context.Context) func(kgo.FetchTopicPartition) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens on rebalance with this method? AFAIU the partition lifecycler kicks in and evicts partitions in the partition manager as well as records replayed in the usage store, right?

Is this enough or do we need BlockRebalanaceOnPoll to revert the work done in this method and start replay anew after rebalance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race here where a record could be inserted between calling m.partitionManager.GetState (which if the partition has been revoked causes all remaining records to be discarded with the message discarding records for partition as the partition is not assigned to this client) and m.usage.Update. However, I think we can solve this with a.) scheduled evictions of the usage store and b.) evicting the partition also on assignment, ensuring that no records are left if a partition is unassigned and then re-assigned in succession.

BlockRebalanaceOnPoll has a number of disadvantages that I think we need to consider carefully before we start to use it.

@grobinson-grafana grobinson-grafana force-pushed the grobinson/add-state-to-partitions branch 2 times, most recently from 875f608 to 1fb7082 Compare May 16, 2025 10:06
@grobinson-grafana grobinson-grafana force-pushed the grobinson/add-state-to-partitions branch 2 times, most recently from 0c46f60 to 442f38b Compare May 16, 2025 11:49
This commit supports replaying records for newly assigned partitions
so an instance can rebuild it's in-memory usage data before serving
queries.
@grobinson-grafana grobinson-grafana force-pushed the grobinson/add-state-to-partitions branch from 442f38b to f6f23b4 Compare May 16, 2025 12:46
@grobinson-grafana grobinson-grafana changed the title feat: support replaying records for newly assigned partitions feat: support consuming records from Kafka May 16, 2025
@grobinson-grafana grobinson-grafana force-pushed the grobinson/add-state-to-partitions branch from 6ce655c to 9e7f6cd Compare May 16, 2025 13:50
@grobinson-grafana grobinson-grafana force-pushed the grobinson/add-state-to-partitions branch from 9e7f6cd to 3939ecc Compare May 16, 2025 14:04
@periklis periklis self-requested a review May 19, 2025 11:09
@grobinson-grafana grobinson-grafana force-pushed the grobinson/add-state-to-partitions branch 2 times, most recently from 5303e64 to c2ed183 Compare May 19, 2025 12:52
@grobinson-grafana grobinson-grafana force-pushed the grobinson/add-state-to-partitions branch from c2ed183 to 2538568 Compare May 19, 2025 12:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants