Skip to content

StickyPartitioner implemented #589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

alrz1999
Copy link

Add StickyPartitioner to Balancer.go
closes #584

@alrz1999 alrz1999 marked this pull request as ready for review January 21, 2021 11:46
Copy link
Contributor

@nlsun nlsun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, thanks for the PR, I'm learning about the sticky partitioning concept myself and asked some questions to help me understand

func (spc *stickyPartitionCache) getIndex(msg Message) (int, bool) {
spc.lock.Lock()
defer spc.lock.Unlock()
index, prs := spc.indexCache[msg.Topic]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make a note (maybe up top in the header comment for StickyPartitioner) that if topic is set in the writer, the topic will not be set in the message.

But that's ok because in that case there is only 1 topic, so only having a key of empty string "" in indexCache is ok

return sp.balance(msg, partitions)
}

func (sp *StickyPartitioner) balance(msg Message, partitions []int) int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we write the contents of this function directly into Balance()?

Comment on lines +348 to +350
if spc.indexCache == nil {
spc.indexCache = make(map[string]int)
}
Copy link
Contributor

@nlsun nlsun Jan 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts about adding this to a constructor (e.g. NewStickyPartitioner) instead?

func (spc *stickyPartitionCache) getIndex(msg Message) (int, bool) {
spc.lock.Lock()
defer spc.lock.Unlock()
index, prs := spc.indexCache[msg.Topic]
Copy link
Contributor

@nlsun nlsun Jan 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's more common for the second boolean variable to be named ok (i personally find it easier to read too), this applies to the other usages in this code as well

Suggested change
index, prs := spc.indexCache[msg.Topic]
index, ok := spc.indexCache[msg.Topic]

indexCache map[string]int
}

func (spc *stickyPartitionCache) nextPartition(msg Message, partitions []int, prevPartition int) int {
Copy link
Contributor

@nlsun nlsun Jan 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure I understand the purpose of the prevPartition argument, I think the code might work without it

The way I understand it the logic for selecting the next partition is:

  • If there is only 1 partition return that partition
  • otherwise, pick a random partition, save it, and return that

BTW my only source of knowledge is https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/


// OnNewBatch changes the sticky partition If a batch completed for the current sticky partition.
// Alternately, if no sticky partition has been determined, set one.
func (sp *StickyPartitioner) OnNewBatch(msg Message, partitions []int, prevPartition int) int {
Copy link
Contributor

@nlsun nlsun Jan 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like sticky partitioning should keep track of a batch size per topic, and upon filling the batch, change to a new partition

It looks like here you intend to have the user of this StickyPartitioner change the partition themselves? Thoughts on keeping the batch size counter internally instead? Then the user can just write and not worry about the details

@nlsun
Copy link
Contributor

nlsun commented Feb 19, 2021

hi @alrz1999, any updates here?

@nmerkulov
Copy link

@nlsun @alrz1999 heya, i can continue with that branch. I need that thing and for now it seems impossible to implement StickyBalancer as external balancer

@nlsun
Copy link
Contributor

nlsun commented Jun 1, 2021

@nmerkulov go for it!

@achille-roussel
Copy link
Contributor

Hello @nmerkulov, do you have any updates to share on that front?

@kislerdm
Copy link
Contributor

Hey folks! Any update wrt this PR? Is there any timeline for a release with sticky partitioner available as a re-balancing protocol? To add, any thoughts about implementing the CooperativeStickyAssignor to cover incremental cooperative rebalancing (find details here)?

@nlsun
Copy link
Contributor

nlsun commented Jul 1, 2022

@kislerdm Hello! We welcome contributions to both this StickyPartitioner and CooperativeStickyAssignor

@zekth
Copy link

zekth commented Mar 20, 2024

@nlsun would it be ok to spin-up a new PR with improvements/applied reviews on this one? Spotted few nits and i'm happy to make this land.

@nlsun
Copy link
Contributor

nlsun commented Mar 25, 2024

@zekth yes, go for it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sticky partitioner/
6 participants