Skip to content

Commit 9d5f139

Browse files
authored
Deduplicate endpoints (#253)
EndpointSlices may contain duplicate endpoints. This commit deduplicates endpoints when resolving a Service:Port.
1 parent 8a6adbf commit 9d5f139

File tree

3 files changed

+33
-71
lines changed

3 files changed

+33
-71
lines changed

internal/state/resolver/resolver.go

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,15 @@ func resolveEndpoints(
7373
}
7474

7575
filteredSlices := filterEndpointSliceList(endpointSliceList, svcPort)
76-
capacity := calculateEndpointSliceCapacity(filteredSlices)
7776

78-
if capacity == 0 {
77+
if len(filteredSlices) == 0 {
7978
svcNsName := client.ObjectKeyFromObject(svc)
8079
return nil, fmt.Errorf("no valid endpoints found for Service %s and port %+v", svcNsName, svcPort)
8180
}
8281

83-
endpoints := make([]Endpoint, 0, capacity)
82+
// Endpoints may be duplicated across multiple EndpointSlices.
83+
// Using a set to prevent returning duplicate endpoints.
84+
endpointSet := make(map[Endpoint]struct{})
8485

8586
for _, eps := range filteredSlices {
8687
for _, endpoint := range eps.Endpoints {
@@ -95,11 +96,16 @@ func resolveEndpoints(
9596

9697
for _, address := range endpoint.Addresses {
9798
ep := Endpoint{Address: address, Port: endpointPort}
98-
endpoints = append(endpoints, ep)
99+
endpointSet[ep] = struct{}{}
99100
}
100101
}
101102
}
102103

104+
endpoints := make([]Endpoint, 0, len(endpointSet))
105+
for ep := range endpointSet {
106+
endpoints = append(endpoints, ep)
107+
}
108+
103109
return endpoints, nil
104110
}
105111

@@ -137,19 +143,6 @@ func ignoreEndpointSlice(endpointSlice discoveryV1.EndpointSlice, port v1.Servic
137143
return findPort(endpointSlice.Ports, port) == 0
138144
}
139145

140-
func calculateEndpointSliceCapacity(endpointSlices []discoveryV1.EndpointSlice) (capacity int) {
141-
for _, es := range endpointSlices {
142-
for _, e := range es.Endpoints {
143-
if !endpointReady(e) {
144-
continue
145-
}
146-
capacity += len(e.Addresses)
147-
}
148-
}
149-
150-
return
151-
}
152-
153146
func endpointReady(endpoint discoveryV1.Endpoint) bool {
154147
ready := endpoint.Conditions.Ready
155148
return ready != nil && *ready

internal/state/resolver/resolver_test.go

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,6 @@ var (
2626
Conditions: discoveryV1.EndpointConditions{Ready: helpers.GetBoolPointer(false)},
2727
}
2828

29-
notReadyEndpointSlice = discoveryV1.EndpointSlice{
30-
AddressType: discoveryV1.AddressTypeIPv4,
31-
Endpoints: []discoveryV1.Endpoint{
32-
notReadyEndpoint,
33-
notReadyEndpoint,
34-
}, // in reality these endpoints would be different but for this test it doesn't matter
35-
Ports: []discoveryV1.EndpointPort{
36-
{
37-
Name: &svcPortName,
38-
Port: helpers.GetInt32Pointer(80),
39-
},
40-
},
41-
}
42-
4329
mixedValidityEndpointSlice = discoveryV1.EndpointSlice{
4430
AddressType: discoveryV1.AddressTypeIPv4,
4531
Endpoints: []discoveryV1.Endpoint{readyEndpoint1, notReadyEndpoint, readyEndpoint1}, // 6 valid endpoints
@@ -157,45 +143,6 @@ func TestGetServicePort(t *testing.T) {
157143
}
158144
}
159145

160-
func TestCalculateEndpointSliceCapacity(t *testing.T) {
161-
testcases := []struct {
162-
msg string
163-
endpointSlices []discoveryV1.EndpointSlice
164-
targetPort int32
165-
expCapacity int
166-
}{
167-
{
168-
msg: "EndpointSlices with no ready endpoints",
169-
endpointSlices: []discoveryV1.EndpointSlice{
170-
notReadyEndpointSlice,
171-
notReadyEndpointSlice,
172-
},
173-
expCapacity: 0,
174-
},
175-
{
176-
msg: "EndpointSlices with some ready endpoints",
177-
endpointSlices: []discoveryV1.EndpointSlice{
178-
mixedValidityEndpointSlice,
179-
mixedValidityEndpointSlice,
180-
mixedValidityEndpointSlice,
181-
},
182-
expCapacity: 18,
183-
},
184-
{
185-
msg: "Empty EndpointSlice array",
186-
endpointSlices: []discoveryV1.EndpointSlice{},
187-
expCapacity: 0,
188-
},
189-
}
190-
191-
for _, tc := range testcases {
192-
capacity := calculateEndpointSliceCapacity(tc.endpointSlices)
193-
if capacity != tc.expCapacity {
194-
t.Errorf("calculateEndpointSliceCapacity() mismatch for %q; expected %d, got %d", tc.msg, capacity, tc.expCapacity)
195-
}
196-
}
197-
}
198-
199146
func TestGetDefaultPort(t *testing.T) {
200147
testcases := []struct {
201148
msg string

internal/state/resolver/service_resolver_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ var _ = Describe("ServiceResolver", func() {
9090
addresses2 = []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}
9191
ipv6Addresses = []string{"FE80:CD00:0:CDE:1257:0:211E:729C"}
9292
diffPortAddresses = []string{"11.0.0.1", "11.0.0.2"}
93+
dupeAddresses = []string{"9.0.0.1", "12.0.0.1", "9.0.0.2"}
9394

9495
svc = &v1.Service{
9596
ObjectMeta: metav1.ObjectMeta{
@@ -135,6 +136,16 @@ var _ = Describe("ServiceResolver", func() {
135136
httpPortName,
136137
discoveryV1.AddressTypeIPv4,
137138
)
139+
// contains some duplicate endpoints as slice1.
140+
// only unique endpoints should be returned.
141+
dupeEndpointSlice = createSlice(
142+
"svc-dupe-endpoints",
143+
dupeAddresses,
144+
8080,
145+
httpPortName,
146+
discoveryV1.AddressTypeIPv4,
147+
)
148+
138149
sliceIPV6 = createSlice(
139150
"svc-ipv6",
140151
ipv6Addresses,
@@ -160,7 +171,13 @@ var _ = Describe("ServiceResolver", func() {
160171
var err error
161172
// The fake K8s client does not honor the Service Name Index Field,
162173
// so it will return all the EndpointSlices regardless of the Service Name.
163-
fakeK8sClient, err = createFakeK8sClient(slice1, slice2, sliceIPV6, sliceNoMatchingPortName)
174+
fakeK8sClient, err = createFakeK8sClient(
175+
slice1,
176+
slice2,
177+
dupeEndpointSlice,
178+
sliceIPV6,
179+
sliceNoMatchingPortName,
180+
)
164181
Expect(err).ToNot(HaveOccurred())
165182

166183
serviceResolver = resolver.NewServiceResolverImpl(fakeK8sClient)
@@ -187,6 +204,10 @@ var _ = Describe("ServiceResolver", func() {
187204
Address: "10.0.0.3",
188205
Port: 8081,
189206
},
207+
{
208+
Address: "12.0.0.1",
209+
Port: 8080,
210+
},
190211
}
191212

192213
endpoints, err := serviceResolver.Resolve(context.TODO(), svc, 80)
@@ -202,6 +223,7 @@ var _ = Describe("ServiceResolver", func() {
202223
// delete valid endpoint slices
203224
Expect(fakeK8sClient.Delete(context.TODO(), slice1)).To(Succeed())
204225
Expect(fakeK8sClient.Delete(context.TODO(), slice2)).To(Succeed())
226+
Expect(fakeK8sClient.Delete(context.TODO(), dupeEndpointSlice)).To(Succeed())
205227

206228
endpoints, err := serviceResolver.Resolve(context.TODO(), svc, 80)
207229
Expect(err).To(HaveOccurred())

0 commit comments

Comments
 (0)