From fe170334601669bde20a7a91b147455301ca2702 Mon Sep 17 00:00:00 2001 From: Alireza Ziaee Date: Sun, 17 Jan 2021 00:30:52 +0330 Subject: [PATCH 1/5] StickyPartitioner implemented --- balancer.go | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/balancer.go b/balancer.go index 7a50cc1ce..30052fd8a 100644 --- a/balancer.go +++ b/balancer.go @@ -288,3 +288,84 @@ func murmur2(data []byte) uint32 { return h } + +// StickyPartitioner is an Balancer implementation that the Message key is NOT used as part of the balancing strategy +// in this balancer. Messages with the same key are not guaranteed to be sent to the same partition. +// If a partition is specified in the Message, use it +// Otherwise choose the sticky partition that changes when the batch is full. +type StickyPartitioner struct { + spc stickyPartitionCache +} + +// Balance satisfies the Balancer interface. +func (sp *StickyPartitioner) Balance(msg Message, partitions ...int) int { + return sp.balance(msg, partitions) +} + +func (sp *StickyPartitioner) balance(msg Message, partitions []int) int { + return sp.spc.partition(msg, partitions) +} + +// OnNewBatch changes the sticky partition If a batch completed for the current sticky partition. +// Alternately, if no sticky partition has been determined, set one. +func (sp *StickyPartitioner) OnNewBatch(msg Message, partitions []int, prevPartition int) int { + return sp.spc.nextPartition(msg, partitions, prevPartition) +} + +// stickyPartitionCache implements a cache used for sticky partitioning behavior. The cache tracks the current sticky +// partition for any given topic. +type stickyPartitionCache struct { + lock sync.Mutex + indexCache map[string]int +} + +func (spc *stickyPartitionCache) nextPartition(msg Message, partitions []int, prevPartition int) int { + oldPartition, prs := spc.getIndex(msg) + newPartition := oldPartition + if !prs { + newPartition = -1 + } + + if prs && oldPartition != prevPartition { + finalPartition, _ := spc.getIndex(msg) + return finalPartition + } + + if len(partitions) < 1 { + newPartition = rand.Intn(len(partitions)) + } else if len(partitions) == 1 { + newPartition = partitions[0] + } else { + for newPartition == -1 || newPartition == oldPartition { + newPartition = rand.Intn(len(partitions)) + } + } + spc.setIndex(msg, newPartition) + + finalPartition, _ := spc.getIndex(msg) + return finalPartition +} + +func (spc *stickyPartitionCache) partition(msg Message, partitions []int) int { + if spc.indexCache == nil { + spc.indexCache = make(map[string]int) + } + partition, prs := spc.getIndex(msg) + if prs { + return partition + } + return spc.nextPartition(msg, partitions, -1) +} + +func (spc *stickyPartitionCache) getIndex(msg Message) (int, bool) { + spc.lock.Lock() + defer spc.lock.Unlock() + index, prs := spc.indexCache[msg.Topic] + return index, prs +} + +func (spc *stickyPartitionCache) setIndex(msg Message, index int) { + spc.lock.Lock() + defer spc.lock.Unlock() + spc.indexCache[msg.Topic] = index +} From 45603e97c2df1e75007233109e41302eab4eaa40 Mon Sep 17 00:00:00 2001 From: Alireza Ziaee Date: Thu, 21 Jan 2021 14:39:36 +0330 Subject: [PATCH 2/5] tests added for StickyPartitioner --- balancer.go | 4 +- balancer_test.go | 101 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 3 deletions(-) diff --git a/balancer.go b/balancer.go index 30052fd8a..7076078d4 100644 --- a/balancer.go +++ b/balancer.go @@ -331,9 +331,7 @@ func (spc *stickyPartitionCache) nextPartition(msg Message, partitions []int, pr return finalPartition } - if len(partitions) < 1 { - newPartition = rand.Intn(len(partitions)) - } else if len(partitions) == 1 { + if len(partitions) == 1 { newPartition = partitions[0] } else { for newPartition == -1 || newPartition == oldPartition { diff --git a/balancer_test.go b/balancer_test.go index acdfe54eb..093818d5f 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -352,3 +352,104 @@ func TestLeastBytes(t *testing.T) { }) } } +func TestStickyPartitionerWithTwoPartitions(t *testing.T) { + testCases := map[string]struct { + messages []Message + Partitions []int + }{ + "first test": { + messages: []Message{ + { + Topic: "test", + }, + }, + Partitions: []int{ + 0, 1, + }, + }, + } + + for label, test := range testCases { + t.Run(label, func(t *testing.T) { + sp := &StickyPartitioner{} + partitionCounter := make(map[int]int) + + part := 0 + for i := 0; i < 50; i++ { + part = sp.Balance(test.messages[0], test.Partitions...) + partitionCounter[part]++ + } + sp.OnNewBatch(test.messages[0], test.Partitions, part) + for i := 0; i < 50; i++ { + part = sp.Balance(test.messages[0], test.Partitions...) + partitionCounter[part]++ + } + if partitionCounter[0] != partitionCounter[1] || partitionCounter[0] != 50 { + t.Errorf("The distribution between two available partitions should be even") + } + }) + } +} + +func TestStickyPartitionerWithThreePartitions(t *testing.T) { + testCases := map[string]struct { + messages []Message + Partitions []int + }{ + "first test": { + messages: []Message{ + { + Topic: "A", + }, + { + Topic: "B", + }, + }, + Partitions: []int{ + 0, 1, 2, + }, + }, + } + + for label, test := range testCases { + t.Run(label, func(t *testing.T) { + sp := &StickyPartitioner{} + partitionCounter := make(map[int]int) + + part := 0 + for i := 0; i < 30; i++ { + part = sp.Balance(test.messages[0], test.Partitions...) + partitionCounter[part]++ + if i%5 == 0 { + sp.Balance(test.messages[1], test.Partitions...) + } + } + sp.OnNewBatch(test.messages[0], test.Partitions, part) + oldPartition := part + for i := 0; i < 30; i++ { + part = sp.Balance(test.messages[0], test.Partitions...) + partitionCounter[part]++ + if i%5 == 0 { + sp.Balance(test.messages[1], test.Partitions...) + } + } + newPartition := part + + sp.OnNewBatch(test.messages[0], test.Partitions, oldPartition) + for i := 0; i < 30; i++ { + part = sp.Balance(test.messages[0], test.Partitions...) + partitionCounter[part]++ + if i%5 == 0 { + sp.Balance(test.messages[1], test.Partitions...) + } + } + + if partitionCounter[oldPartition] != 30 { + t.Errorf("Old partition batch must contains 30 messages") + } + if partitionCounter[newPartition] != 60 { + t.Errorf("New partition batch must contains 60 messages") + } + }) + } +} From ca60dc1448158e4a4261183d71d60b4aacd0239e Mon Sep 17 00:00:00 2001 From: Alireza Ziaee Date: Thu, 21 Jan 2021 15:12:43 +0330 Subject: [PATCH 3/5] comment tests to check build --- balancer_test.go | 189 ++++++++++++++++++++++++----------------------- 1 file changed, 95 insertions(+), 94 deletions(-) diff --git a/balancer_test.go b/balancer_test.go index 093818d5f..8aed894a9 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -352,104 +352,105 @@ func TestLeastBytes(t *testing.T) { }) } } -func TestStickyPartitionerWithTwoPartitions(t *testing.T) { - testCases := map[string]struct { - messages []Message - Partitions []int - }{ - "first test": { - messages: []Message{ - { - Topic: "test", - }, - }, - Partitions: []int{ - 0, 1, - }, - }, - } - for label, test := range testCases { - t.Run(label, func(t *testing.T) { - sp := &StickyPartitioner{} - partitionCounter := make(map[int]int) +// func TestStickyPartitionerWithTwoPartitions(t *testing.T) { +// testCases := map[string]struct { +// messages []Message +// Partitions []int +// }{ +// "first test": { +// messages: []Message{ +// { +// Topic: "test", +// }, +// }, +// Partitions: []int{ +// 0, 1, +// }, +// }, +// } - part := 0 - for i := 0; i < 50; i++ { - part = sp.Balance(test.messages[0], test.Partitions...) - partitionCounter[part]++ - } - sp.OnNewBatch(test.messages[0], test.Partitions, part) - for i := 0; i < 50; i++ { - part = sp.Balance(test.messages[0], test.Partitions...) - partitionCounter[part]++ - } - if partitionCounter[0] != partitionCounter[1] || partitionCounter[0] != 50 { - t.Errorf("The distribution between two available partitions should be even") - } - }) - } -} +// for label, test := range testCases { +// t.Run(label, func(t *testing.T) { +// sp := &StickyPartitioner{} +// partitionCounter := make(map[int]int) -func TestStickyPartitionerWithThreePartitions(t *testing.T) { - testCases := map[string]struct { - messages []Message - Partitions []int - }{ - "first test": { - messages: []Message{ - { - Topic: "A", - }, - { - Topic: "B", - }, - }, - Partitions: []int{ - 0, 1, 2, - }, - }, - } +// part := 0 +// for i := 0; i < 50; i++ { +// part = sp.Balance(test.messages[0], test.Partitions...) +// partitionCounter[part]++ +// } +// sp.OnNewBatch(test.messages[0], test.Partitions, part) +// for i := 0; i < 50; i++ { +// part = sp.Balance(test.messages[0], test.Partitions...) +// partitionCounter[part]++ +// } +// if partitionCounter[0] != partitionCounter[1] || partitionCounter[0] != 50 { +// t.Errorf("The distribution between two available partitions should be even") +// } +// }) +// } +// } - for label, test := range testCases { - t.Run(label, func(t *testing.T) { - sp := &StickyPartitioner{} - partitionCounter := make(map[int]int) +// func TestStickyPartitionerWithThreePartitions(t *testing.T) { +// testCases := map[string]struct { +// messages []Message +// Partitions []int +// }{ +// "first test": { +// messages: []Message{ +// { +// Topic: "A", +// }, +// { +// Topic: "B", +// }, +// }, +// Partitions: []int{ +// 0, 1, 2, +// }, +// }, +// } - part := 0 - for i := 0; i < 30; i++ { - part = sp.Balance(test.messages[0], test.Partitions...) - partitionCounter[part]++ - if i%5 == 0 { - sp.Balance(test.messages[1], test.Partitions...) - } - } - sp.OnNewBatch(test.messages[0], test.Partitions, part) - oldPartition := part - for i := 0; i < 30; i++ { - part = sp.Balance(test.messages[0], test.Partitions...) - partitionCounter[part]++ - if i%5 == 0 { - sp.Balance(test.messages[1], test.Partitions...) - } - } - newPartition := part +// for label, test := range testCases { +// t.Run(label, func(t *testing.T) { +// sp := &StickyPartitioner{} +// partitionCounter := make(map[int]int) - sp.OnNewBatch(test.messages[0], test.Partitions, oldPartition) - for i := 0; i < 30; i++ { - part = sp.Balance(test.messages[0], test.Partitions...) - partitionCounter[part]++ - if i%5 == 0 { - sp.Balance(test.messages[1], test.Partitions...) - } - } +// part := 0 +// for i := 0; i < 30; i++ { +// part = sp.Balance(test.messages[0], test.Partitions...) +// partitionCounter[part]++ +// if i%5 == 0 { +// sp.Balance(test.messages[1], test.Partitions...) +// } +// } +// sp.OnNewBatch(test.messages[0], test.Partitions, part) +// oldPartition := part +// for i := 0; i < 30; i++ { +// part = sp.Balance(test.messages[0], test.Partitions...) +// partitionCounter[part]++ +// if i%5 == 0 { +// sp.Balance(test.messages[1], test.Partitions...) +// } +// } +// newPartition := part - if partitionCounter[oldPartition] != 30 { - t.Errorf("Old partition batch must contains 30 messages") - } - if partitionCounter[newPartition] != 60 { - t.Errorf("New partition batch must contains 60 messages") - } - }) - } -} +// sp.OnNewBatch(test.messages[0], test.Partitions, oldPartition) +// for i := 0; i < 30; i++ { +// part = sp.Balance(test.messages[0], test.Partitions...) +// partitionCounter[part]++ +// if i%5 == 0 { +// sp.Balance(test.messages[1], test.Partitions...) +// } +// } + +// if partitionCounter[oldPartition] != 30 { +// t.Errorf("Old partition batch must contains 30 messages") +// } +// if partitionCounter[newPartition] != 60 { +// t.Errorf("New partition batch must contains 60 messages") +// } +// }) +// } +// } From f51d2f212b1d1c822d37ba5fb73e571722d81e72 Mon Sep 17 00:00:00 2001 From: Alireza Ziaee Date: Thu, 21 Jan 2021 15:14:59 +0330 Subject: [PATCH 4/5] first test added --- balancer_test.go | 72 ++++++++++++++++++++++++------------------------ 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/balancer_test.go b/balancer_test.go index 8aed894a9..b83645560 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -353,44 +353,44 @@ func TestLeastBytes(t *testing.T) { } } -// func TestStickyPartitionerWithTwoPartitions(t *testing.T) { -// testCases := map[string]struct { -// messages []Message -// Partitions []int -// }{ -// "first test": { -// messages: []Message{ -// { -// Topic: "test", -// }, -// }, -// Partitions: []int{ -// 0, 1, -// }, -// }, -// } +func TestStickyPartitionerWithTwoPartitions(t *testing.T) { + testCases := map[string]struct { + messages []Message + Partitions []int + }{ + "first test": { + messages: []Message{ + { + Topic: "test", + }, + }, + Partitions: []int{ + 0, 1, + }, + }, + } -// for label, test := range testCases { -// t.Run(label, func(t *testing.T) { -// sp := &StickyPartitioner{} -// partitionCounter := make(map[int]int) + for label, test := range testCases { + t.Run(label, func(t *testing.T) { + sp := &StickyPartitioner{} + partitionCounter := make(map[int]int) -// part := 0 -// for i := 0; i < 50; i++ { -// part = sp.Balance(test.messages[0], test.Partitions...) -// partitionCounter[part]++ -// } -// sp.OnNewBatch(test.messages[0], test.Partitions, part) -// for i := 0; i < 50; i++ { -// part = sp.Balance(test.messages[0], test.Partitions...) -// partitionCounter[part]++ -// } -// if partitionCounter[0] != partitionCounter[1] || partitionCounter[0] != 50 { -// t.Errorf("The distribution between two available partitions should be even") -// } -// }) -// } -// } + part := 0 + for i := 0; i < 50; i++ { + part = sp.Balance(test.messages[0], test.Partitions...) + partitionCounter[part]++ + } + sp.OnNewBatch(test.messages[0], test.Partitions, part) + for i := 0; i < 50; i++ { + part = sp.Balance(test.messages[0], test.Partitions...) + partitionCounter[part]++ + } + if partitionCounter[0] != partitionCounter[1] || partitionCounter[0] != 50 { + t.Errorf("The distribution between two available partitions should be even") + } + }) + } +} // func TestStickyPartitionerWithThreePartitions(t *testing.T) { // testCases := map[string]struct { From 2d53664abb1848508fbce51a1fb14a20a0e3a603 Mon Sep 17 00:00:00 2001 From: Alireza Ziaee Date: Thu, 21 Jan 2021 15:17:04 +0330 Subject: [PATCH 5/5] second test added --- balancer_test.go | 116 +++++++++++++++++++++++------------------------ 1 file changed, 58 insertions(+), 58 deletions(-) diff --git a/balancer_test.go b/balancer_test.go index b83645560..8d021b261 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -392,65 +392,65 @@ func TestStickyPartitionerWithTwoPartitions(t *testing.T) { } } -// func TestStickyPartitionerWithThreePartitions(t *testing.T) { -// testCases := map[string]struct { -// messages []Message -// Partitions []int -// }{ -// "first test": { -// messages: []Message{ -// { -// Topic: "A", -// }, -// { -// Topic: "B", -// }, -// }, -// Partitions: []int{ -// 0, 1, 2, -// }, -// }, -// } +func TestStickyPartitionerWithThreePartitions(t *testing.T) { + testCases := map[string]struct { + messages []Message + Partitions []int + }{ + "first test": { + messages: []Message{ + { + Topic: "A", + }, + { + Topic: "B", + }, + }, + Partitions: []int{ + 0, 1, 2, + }, + }, + } -// for label, test := range testCases { -// t.Run(label, func(t *testing.T) { -// sp := &StickyPartitioner{} -// partitionCounter := make(map[int]int) + for label, test := range testCases { + t.Run(label, func(t *testing.T) { + sp := &StickyPartitioner{} + partitionCounter := make(map[int]int) -// part := 0 -// for i := 0; i < 30; i++ { -// part = sp.Balance(test.messages[0], test.Partitions...) -// partitionCounter[part]++ -// if i%5 == 0 { -// sp.Balance(test.messages[1], test.Partitions...) -// } -// } -// sp.OnNewBatch(test.messages[0], test.Partitions, part) -// oldPartition := part -// for i := 0; i < 30; i++ { -// part = sp.Balance(test.messages[0], test.Partitions...) -// partitionCounter[part]++ -// if i%5 == 0 { -// sp.Balance(test.messages[1], test.Partitions...) -// } -// } -// newPartition := part + part := 0 + for i := 0; i < 30; i++ { + part = sp.Balance(test.messages[0], test.Partitions...) + partitionCounter[part]++ + if i%5 == 0 { + sp.Balance(test.messages[1], test.Partitions...) + } + } + sp.OnNewBatch(test.messages[0], test.Partitions, part) + oldPartition := part + for i := 0; i < 30; i++ { + part = sp.Balance(test.messages[0], test.Partitions...) + partitionCounter[part]++ + if i%5 == 0 { + sp.Balance(test.messages[1], test.Partitions...) + } + } + newPartition := part -// sp.OnNewBatch(test.messages[0], test.Partitions, oldPartition) -// for i := 0; i < 30; i++ { -// part = sp.Balance(test.messages[0], test.Partitions...) -// partitionCounter[part]++ -// if i%5 == 0 { -// sp.Balance(test.messages[1], test.Partitions...) -// } -// } + sp.OnNewBatch(test.messages[0], test.Partitions, oldPartition) + for i := 0; i < 30; i++ { + part = sp.Balance(test.messages[0], test.Partitions...) + partitionCounter[part]++ + if i%5 == 0 { + sp.Balance(test.messages[1], test.Partitions...) + } + } -// if partitionCounter[oldPartition] != 30 { -// t.Errorf("Old partition batch must contains 30 messages") -// } -// if partitionCounter[newPartition] != 60 { -// t.Errorf("New partition batch must contains 60 messages") -// } -// }) -// } -// } + if partitionCounter[oldPartition] != 30 { + t.Errorf("Old partition batch must contains 30 messages") + } + if partitionCounter[newPartition] != 60 { + t.Errorf("New partition batch must contains 60 messages") + } + }) + } +}