Skip to content

StickyPartitioner implemented #589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,82 @@ func murmur2(data []byte) uint32 {

return h
}

// StickyPartitioner is an Balancer implementation that the Message key is NOT used as part of the balancing strategy
// in this balancer. Messages with the same key are not guaranteed to be sent to the same partition.
// If a partition is specified in the Message, use it
// Otherwise choose the sticky partition that changes when the batch is full.
type StickyPartitioner struct {
spc stickyPartitionCache
}

// Balance satisfies the Balancer interface.
func (sp *StickyPartitioner) Balance(msg Message, partitions ...int) int {
return sp.balance(msg, partitions)
}

func (sp *StickyPartitioner) balance(msg Message, partitions []int) int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we write the contents of this function directly into Balance()?

return sp.spc.partition(msg, partitions)
}

// OnNewBatch changes the sticky partition If a batch completed for the current sticky partition.
// Alternately, if no sticky partition has been determined, set one.
func (sp *StickyPartitioner) OnNewBatch(msg Message, partitions []int, prevPartition int) int {
Copy link
Contributor

@nlsun nlsun Jan 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like sticky partitioning should keep track of a batch size per topic, and upon filling the batch, change to a new partition

It looks like here you intend to have the user of this StickyPartitioner change the partition themselves? Thoughts on keeping the batch size counter internally instead? Then the user can just write and not worry about the details

return sp.spc.nextPartition(msg, partitions, prevPartition)
}

// stickyPartitionCache implements a cache used for sticky partitioning behavior. The cache tracks the current sticky
// partition for any given topic.
type stickyPartitionCache struct {
lock sync.Mutex
indexCache map[string]int
}

func (spc *stickyPartitionCache) nextPartition(msg Message, partitions []int, prevPartition int) int {
Copy link
Contributor

@nlsun nlsun Jan 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure I understand the purpose of the prevPartition argument, I think the code might work without it

The way I understand it the logic for selecting the next partition is:

  • If there is only 1 partition return that partition
  • otherwise, pick a random partition, save it, and return that

BTW my only source of knowledge is https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

oldPartition, prs := spc.getIndex(msg)
newPartition := oldPartition
if !prs {
newPartition = -1
}

if prs && oldPartition != prevPartition {
finalPartition, _ := spc.getIndex(msg)
return finalPartition
}

if len(partitions) == 1 {
newPartition = partitions[0]
} else {
for newPartition == -1 || newPartition == oldPartition {
newPartition = rand.Intn(len(partitions))
}
}
spc.setIndex(msg, newPartition)

finalPartition, _ := spc.getIndex(msg)
return finalPartition
}

func (spc *stickyPartitionCache) partition(msg Message, partitions []int) int {
if spc.indexCache == nil {
spc.indexCache = make(map[string]int)
}
Comment on lines +348 to +350
Copy link
Contributor

@nlsun nlsun Jan 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts about adding this to a constructor (e.g. NewStickyPartitioner) instead?

partition, prs := spc.getIndex(msg)
if prs {
return partition
}
return spc.nextPartition(msg, partitions, -1)
}

func (spc *stickyPartitionCache) getIndex(msg Message) (int, bool) {
spc.lock.Lock()
defer spc.lock.Unlock()
index, prs := spc.indexCache[msg.Topic]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make a note (maybe up top in the header comment for StickyPartitioner) that if topic is set in the writer, the topic will not be set in the message.

But that's ok because in that case there is only 1 topic, so only having a key of empty string "" in indexCache is ok

Copy link
Contributor

@nlsun nlsun Jan 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's more common for the second boolean variable to be named ok (i personally find it easier to read too), this applies to the other usages in this code as well

Suggested change
index, prs := spc.indexCache[msg.Topic]
index, ok := spc.indexCache[msg.Topic]

return index, prs
}

func (spc *stickyPartitionCache) setIndex(msg Message, index int) {
spc.lock.Lock()
defer spc.lock.Unlock()
spc.indexCache[msg.Topic] = index
}
102 changes: 102 additions & 0 deletions balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,105 @@ func TestLeastBytes(t *testing.T) {
})
}
}

func TestStickyPartitionerWithTwoPartitions(t *testing.T) {
testCases := map[string]struct {
messages []Message
Partitions []int
}{
"first test": {
messages: []Message{
{
Topic: "test",
},
},
Partitions: []int{
0, 1,
},
},
}

for label, test := range testCases {
t.Run(label, func(t *testing.T) {
sp := &StickyPartitioner{}
partitionCounter := make(map[int]int)

part := 0
for i := 0; i < 50; i++ {
part = sp.Balance(test.messages[0], test.Partitions...)
partitionCounter[part]++
}
sp.OnNewBatch(test.messages[0], test.Partitions, part)
for i := 0; i < 50; i++ {
part = sp.Balance(test.messages[0], test.Partitions...)
partitionCounter[part]++
}
if partitionCounter[0] != partitionCounter[1] || partitionCounter[0] != 50 {
t.Errorf("The distribution between two available partitions should be even")
}
})
}
}

func TestStickyPartitionerWithThreePartitions(t *testing.T) {
testCases := map[string]struct {
messages []Message
Partitions []int
}{
"first test": {
messages: []Message{
{
Topic: "A",
},
{
Topic: "B",
},
},
Partitions: []int{
0, 1, 2,
},
},
}

for label, test := range testCases {
t.Run(label, func(t *testing.T) {
sp := &StickyPartitioner{}
partitionCounter := make(map[int]int)

part := 0
for i := 0; i < 30; i++ {
part = sp.Balance(test.messages[0], test.Partitions...)
partitionCounter[part]++
if i%5 == 0 {
sp.Balance(test.messages[1], test.Partitions...)
}
}
sp.OnNewBatch(test.messages[0], test.Partitions, part)
oldPartition := part
for i := 0; i < 30; i++ {
part = sp.Balance(test.messages[0], test.Partitions...)
partitionCounter[part]++
if i%5 == 0 {
sp.Balance(test.messages[1], test.Partitions...)
}
}
newPartition := part

sp.OnNewBatch(test.messages[0], test.Partitions, oldPartition)
for i := 0; i < 30; i++ {
part = sp.Balance(test.messages[0], test.Partitions...)
partitionCounter[part]++
if i%5 == 0 {
sp.Balance(test.messages[1], test.Partitions...)
}
}

if partitionCounter[oldPartition] != 30 {
t.Errorf("Old partition batch must contains 30 messages")
}
if partitionCounter[newPartition] != 60 {
t.Errorf("New partition batch must contains 60 messages")
}
})
}
}