Skip to content

Commit ebaae58

Browse files
committed
Improve performance of endpoints deduplication
We use a map as a set to deduplicate endpoints. Before deduplicating, we can calculate the total number of the endpoints in the input and assume that most of those endpoints are unique. Then, we can use that number when initializing the map. That will improve the performance, as it will help to reduce the cost of growing the map to accommodate all the endpoints. The benchmarks are included.
1 parent e48728a commit ebaae58

File tree

2 files changed

+160
-2
lines changed

2 files changed

+160
-2
lines changed

internal/state/resolver/resolver.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,38 @@ func (e *ServiceResolverImpl) Resolve(ctx context.Context, svc *v1.Service, port
5959
return nil, fmt.Errorf("no endpoints found for Service %s", client.ObjectKeyFromObject(svc))
6060
}
6161

62-
return resolveEndpoints(svc, port, endpointSliceList)
62+
return resolveEndpoints(svc, port, endpointSliceList, initEndpointSetWithCalculatedSize)
63+
}
64+
65+
type initEndpointSetFunc func([]discoveryV1.EndpointSlice) map[Endpoint]struct{}
66+
67+
func initEndpointSetWithCalculatedSize(endpointSlices []discoveryV1.EndpointSlice) map[Endpoint]struct{} {
68+
// performance optimization to reduce the cost of growing the map. See the benchamarks for performance comparison.
69+
return make(map[Endpoint]struct{}, calculateReadyEndpoints(endpointSlices))
70+
}
71+
72+
func calculateReadyEndpoints(endpointSlices []discoveryV1.EndpointSlice) int {
73+
total := 0
74+
75+
for _, eps := range endpointSlices {
76+
for _, endpoint := range eps.Endpoints {
77+
78+
if !endpointReady(endpoint) {
79+
continue
80+
}
81+
82+
total += len(endpoint.Addresses)
83+
}
84+
}
85+
86+
return total
6387
}
6488

6589
func resolveEndpoints(
6690
svc *v1.Service,
6791
port int32,
6892
endpointSliceList discoveryV1.EndpointSliceList,
93+
initEndpointsSet initEndpointSetFunc,
6994
) ([]Endpoint, error) {
7095
svcPort, err := getServicePort(svc, port)
7196
if err != nil {
@@ -81,7 +106,7 @@ func resolveEndpoints(
81106

82107
// Endpoints may be duplicated across multiple EndpointSlices.
83108
// Using a set to prevent returning duplicate endpoints.
84-
endpointSet := make(map[Endpoint]struct{})
109+
endpointSet := initEndpointsSet(filteredSlices)
85110

86111
for _, eps := range filteredSlices {
87112
for _, endpoint := range eps.Endpoints {

internal/state/resolver/resolver_test.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package resolver
22

33
import (
4+
"fmt"
45
"testing"
56

67
"github.com/google/go-cmp/cmp"
8+
. "github.com/onsi/gomega"
79
v1 "k8s.io/api/core/v1"
810
discoveryV1 "k8s.io/api/discovery/v1"
911
"k8s.io/apimachinery/pkg/util/intstr"
@@ -470,3 +472,134 @@ func TestFindPort(t *testing.T) {
470472
}
471473
}
472474
}
475+
476+
func TestCalculateReadyEndpoints(t *testing.T) {
477+
g := NewGomegaWithT(t)
478+
479+
slices := []discoveryV1.EndpointSlice{
480+
{
481+
Endpoints: []discoveryV1.Endpoint{
482+
{
483+
Addresses: []string{"1.0.0.1"},
484+
Conditions: discoveryV1.EndpointConditions{
485+
Ready: helpers.GetBoolPointer(true),
486+
},
487+
},
488+
{
489+
Addresses: []string{"1.1.0.1", "1.1.0.2", "1.1.0.3, 1.1.0.4, 1.1.0.5"},
490+
Conditions: discoveryV1.EndpointConditions{
491+
// nil conditions should be treated as not ready
492+
},
493+
},
494+
},
495+
},
496+
{
497+
Endpoints: []discoveryV1.Endpoint{
498+
{
499+
Addresses: []string{"2.0.0.1", "2.0.0.2", "2.0.0.3"},
500+
Conditions: discoveryV1.EndpointConditions{
501+
Ready: helpers.GetBoolPointer(true),
502+
},
503+
},
504+
},
505+
},
506+
}
507+
508+
result := calculateReadyEndpoints(slices)
509+
510+
g.Expect(result).To(Equal(4))
511+
}
512+
513+
func generateEndpointSliceList(n int) discoveryV1.EndpointSliceList {
514+
const maxEndpointsPerSlice = 100 // use the Kubernetes default max for endpoints in a slice.
515+
516+
slicesCount := (n + maxEndpointsPerSlice - 1) / maxEndpointsPerSlice
517+
518+
result := discoveryV1.EndpointSliceList{
519+
Items: make([]discoveryV1.EndpointSlice, 0, slicesCount),
520+
}
521+
522+
ready := true
523+
524+
for i := 0; n > 0; i++ {
525+
c := maxEndpointsPerSlice
526+
if n < maxEndpointsPerSlice {
527+
c = n
528+
}
529+
n -= maxEndpointsPerSlice
530+
531+
slice := discoveryV1.EndpointSlice{
532+
Endpoints: make([]discoveryV1.Endpoint, c),
533+
AddressType: discoveryV1.AddressTypeIPv4,
534+
Ports: []discoveryV1.EndpointPort{
535+
{
536+
Port: nil, // will match any port in the service
537+
},
538+
},
539+
}
540+
541+
for j := 0; j < c; j++ {
542+
slice.Endpoints[j] = discoveryV1.Endpoint{
543+
Addresses: []string{fmt.Sprintf("10.0.%d.%d", i, j)},
544+
Conditions: discoveryV1.EndpointConditions{
545+
Ready: &ready,
546+
},
547+
}
548+
}
549+
550+
result.Items = append(result.Items, slice)
551+
}
552+
553+
return result
554+
}
555+
556+
func BenchmarkResolve(b *testing.B) {
557+
counts := []int{
558+
1,
559+
2,
560+
5,
561+
10,
562+
25,
563+
50,
564+
100,
565+
500,
566+
1000,
567+
}
568+
569+
svc := &v1.Service{
570+
Spec: v1.ServiceSpec{
571+
Ports: []v1.ServicePort{
572+
{
573+
Port: 80,
574+
},
575+
},
576+
},
577+
}
578+
579+
initEndpointSet := func([]discoveryV1.EndpointSlice) map[Endpoint]struct{} {
580+
return make(map[Endpoint]struct{})
581+
}
582+
583+
for _, count := range counts {
584+
list := generateEndpointSliceList(count)
585+
586+
b.Run(fmt.Sprintf("%d endpoints", count), func(b *testing.B) {
587+
bench(b, svc, list, initEndpointSet, count)
588+
})
589+
b.Run(fmt.Sprintf("%d endpoints with optimization", count), func(b *testing.B) {
590+
bench(b, svc, list, initEndpointSetWithCalculatedSize, count)
591+
})
592+
}
593+
}
594+
595+
func bench(b *testing.B, svc *v1.Service, list discoveryV1.EndpointSliceList, initSet initEndpointSetFunc, n int) {
596+
for i := 0; i < b.N; i++ {
597+
res, err := resolveEndpoints(svc, 80, list, initSet)
598+
if len(res) != n {
599+
b.Fatalf("expected %d endpoints, got %d", n, len(res))
600+
}
601+
if err != nil {
602+
b.Fatal(err)
603+
}
604+
}
605+
}

0 commit comments

Comments
 (0)