-
Notifications
You must be signed in to change notification settings - Fork 813
StickyPartitioner implemented #589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -288,3 +288,82 @@ func murmur2(data []byte) uint32 { | |||||
|
||||||
return h | ||||||
} | ||||||
|
||||||
// StickyPartitioner is an Balancer implementation that the Message key is NOT used as part of the balancing strategy | ||||||
// in this balancer. Messages with the same key are not guaranteed to be sent to the same partition. | ||||||
// If a partition is specified in the Message, use it | ||||||
// Otherwise choose the sticky partition that changes when the batch is full. | ||||||
type StickyPartitioner struct { | ||||||
spc stickyPartitionCache | ||||||
} | ||||||
|
||||||
// Balance satisfies the Balancer interface. | ||||||
func (sp *StickyPartitioner) Balance(msg Message, partitions ...int) int { | ||||||
return sp.balance(msg, partitions) | ||||||
} | ||||||
|
||||||
func (sp *StickyPartitioner) balance(msg Message, partitions []int) int { | ||||||
return sp.spc.partition(msg, partitions) | ||||||
} | ||||||
|
||||||
// OnNewBatch changes the sticky partition If a batch completed for the current sticky partition. | ||||||
// Alternately, if no sticky partition has been determined, set one. | ||||||
func (sp *StickyPartitioner) OnNewBatch(msg Message, partitions []int, prevPartition int) int { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it looks like sticky partitioning should keep track of a batch size per topic, and upon filling the batch, change to a new partition It looks like here you intend to have the user of this StickyPartitioner change the partition themselves? Thoughts on keeping the batch size counter internally instead? Then the user can just write and not worry about the details |
||||||
return sp.spc.nextPartition(msg, partitions, prevPartition) | ||||||
} | ||||||
|
||||||
// stickyPartitionCache implements a cache used for sticky partitioning behavior. The cache tracks the current sticky | ||||||
// partition for any given topic. | ||||||
type stickyPartitionCache struct { | ||||||
lock sync.Mutex | ||||||
indexCache map[string]int | ||||||
} | ||||||
|
||||||
func (spc *stickyPartitionCache) nextPartition(msg Message, partitions []int, prevPartition int) int { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not quite sure I understand the purpose of the The way I understand it the logic for selecting the next partition is:
BTW my only source of knowledge is https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/ |
||||||
oldPartition, prs := spc.getIndex(msg) | ||||||
newPartition := oldPartition | ||||||
if !prs { | ||||||
newPartition = -1 | ||||||
} | ||||||
|
||||||
if prs && oldPartition != prevPartition { | ||||||
finalPartition, _ := spc.getIndex(msg) | ||||||
return finalPartition | ||||||
} | ||||||
|
||||||
if len(partitions) == 1 { | ||||||
newPartition = partitions[0] | ||||||
} else { | ||||||
for newPartition == -1 || newPartition == oldPartition { | ||||||
newPartition = rand.Intn(len(partitions)) | ||||||
} | ||||||
} | ||||||
spc.setIndex(msg, newPartition) | ||||||
|
||||||
finalPartition, _ := spc.getIndex(msg) | ||||||
return finalPartition | ||||||
} | ||||||
|
||||||
func (spc *stickyPartitionCache) partition(msg Message, partitions []int) int { | ||||||
if spc.indexCache == nil { | ||||||
spc.indexCache = make(map[string]int) | ||||||
} | ||||||
Comment on lines
+348
to
+350
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thoughts about adding this to a constructor (e.g. |
||||||
partition, prs := spc.getIndex(msg) | ||||||
if prs { | ||||||
return partition | ||||||
} | ||||||
return spc.nextPartition(msg, partitions, -1) | ||||||
} | ||||||
|
||||||
func (spc *stickyPartitionCache) getIndex(msg Message) (int, bool) { | ||||||
spc.lock.Lock() | ||||||
defer spc.lock.Unlock() | ||||||
index, prs := spc.indexCache[msg.Topic] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should make a note (maybe up top in the header comment for But that's ok because in that case there is only 1 topic, so only having a key of empty string There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's more common for the second boolean variable to be named
Suggested change
|
||||||
return index, prs | ||||||
} | ||||||
|
||||||
func (spc *stickyPartitionCache) setIndex(msg Message, index int) { | ||||||
spc.lock.Lock() | ||||||
defer spc.lock.Unlock() | ||||||
spc.indexCache[msg.Topic] = index | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we write the contents of this function directly into
Balance()
?