-
Notifications
You must be signed in to change notification settings - Fork 811
StickyPartitioner implemented #589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello, thanks for the PR, I'm learning about the sticky partitioning concept myself and asked some questions to help me understand
func (spc *stickyPartitionCache) getIndex(msg Message) (int, bool) { | ||
spc.lock.Lock() | ||
defer spc.lock.Unlock() | ||
index, prs := spc.indexCache[msg.Topic] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make a note (maybe up top in the header comment for StickyPartitioner
) that if topic
is set in the writer
, the topic will not be set in the message.
But that's ok because in that case there is only 1 topic, so only having a key of empty string ""
in indexCache
is ok
return sp.balance(msg, partitions) | ||
} | ||
|
||
func (sp *StickyPartitioner) balance(msg Message, partitions []int) int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we write the contents of this function directly into Balance()
?
if spc.indexCache == nil { | ||
spc.indexCache = make(map[string]int) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thoughts about adding this to a constructor (e.g. NewStickyPartitioner
) instead?
func (spc *stickyPartitionCache) getIndex(msg Message) (int, bool) { | ||
spc.lock.Lock() | ||
defer spc.lock.Unlock() | ||
index, prs := spc.indexCache[msg.Topic] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's more common for the second boolean variable to be named ok
(i personally find it easier to read too), this applies to the other usages in this code as well
index, prs := spc.indexCache[msg.Topic] | |
index, ok := spc.indexCache[msg.Topic] |
indexCache map[string]int | ||
} | ||
|
||
func (spc *stickyPartitionCache) nextPartition(msg Message, partitions []int, prevPartition int) int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure I understand the purpose of the prevPartition
argument, I think the code might work without it
The way I understand it the logic for selecting the next partition is:
- If there is only 1 partition return that partition
- otherwise, pick a random partition, save it, and return that
BTW my only source of knowledge is https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/
|
||
// OnNewBatch changes the sticky partition If a batch completed for the current sticky partition. | ||
// Alternately, if no sticky partition has been determined, set one. | ||
func (sp *StickyPartitioner) OnNewBatch(msg Message, partitions []int, prevPartition int) int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like sticky partitioning should keep track of a batch size per topic, and upon filling the batch, change to a new partition
It looks like here you intend to have the user of this StickyPartitioner change the partition themselves? Thoughts on keeping the batch size counter internally instead? Then the user can just write and not worry about the details
hi @alrz1999, any updates here? |
@nmerkulov go for it! |
Hello @nmerkulov, do you have any updates to share on that front? |
Hey folks! Any update wrt this PR? Is there any timeline for a release with sticky partitioner available as a re-balancing protocol? To add, any thoughts about implementing the |
@kislerdm Hello! We welcome contributions to both this StickyPartitioner and CooperativeStickyAssignor |
@nlsun would it be ok to spin-up a new PR with improvements/applied reviews on this one? Spotted few nits and i'm happy to make this land. |
@zekth yes, go for it! |
Add StickyPartitioner to Balancer.go
closes #584