Skip to content

Deduplicate endpoints #253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 10 additions & 17 deletions internal/state/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,15 @@ func resolveEndpoints(
}

filteredSlices := filterEndpointSliceList(endpointSliceList, svcPort)
capacity := calculateEndpointSliceCapacity(filteredSlices)

if capacity == 0 {
if len(filteredSlices) == 0 {
svcNsName := client.ObjectKeyFromObject(svc)
return nil, fmt.Errorf("no valid endpoints found for Service %s and port %+v", svcNsName, svcPort)
}

endpoints := make([]Endpoint, 0, capacity)
// Endpoints may be duplicated across multiple EndpointSlices.
// Using a set to prevent returning duplicate endpoints.
endpointSet := make(map[Endpoint]struct{})

for _, eps := range filteredSlices {
for _, endpoint := range eps.Endpoints {
Expand All @@ -95,11 +96,16 @@ func resolveEndpoints(

for _, address := range endpoint.Addresses {
ep := Endpoint{Address: address, Port: endpointPort}
endpoints = append(endpoints, ep)
endpointSet[ep] = struct{}{}
}
}
}

endpoints := make([]Endpoint, 0, len(endpointSet))
for ep := range endpointSet {
endpoints = append(endpoints, ep)
}

return endpoints, nil
}

Expand Down Expand Up @@ -137,19 +143,6 @@ func ignoreEndpointSlice(endpointSlice discoveryV1.EndpointSlice, port v1.Servic
return findPort(endpointSlice.Ports, port) == 0
}

func calculateEndpointSliceCapacity(endpointSlices []discoveryV1.EndpointSlice) (capacity int) {
for _, es := range endpointSlices {
for _, e := range es.Endpoints {
if !endpointReady(e) {
continue
}
capacity += len(e.Addresses)
}
}

return
}

func endpointReady(endpoint discoveryV1.Endpoint) bool {
ready := endpoint.Conditions.Ready
return ready != nil && *ready
Expand Down
53 changes: 0 additions & 53 deletions internal/state/resolver/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,6 @@ var (
Conditions: discoveryV1.EndpointConditions{Ready: helpers.GetBoolPointer(false)},
}

notReadyEndpointSlice = discoveryV1.EndpointSlice{
AddressType: discoveryV1.AddressTypeIPv4,
Endpoints: []discoveryV1.Endpoint{
notReadyEndpoint,
notReadyEndpoint,
}, // in reality these endpoints would be different but for this test it doesn't matter
Ports: []discoveryV1.EndpointPort{
{
Name: &svcPortName,
Port: helpers.GetInt32Pointer(80),
},
},
}

mixedValidityEndpointSlice = discoveryV1.EndpointSlice{
AddressType: discoveryV1.AddressTypeIPv4,
Endpoints: []discoveryV1.Endpoint{readyEndpoint1, notReadyEndpoint, readyEndpoint1}, // 6 valid endpoints
Expand Down Expand Up @@ -157,45 +143,6 @@ func TestGetServicePort(t *testing.T) {
}
}

func TestCalculateEndpointSliceCapacity(t *testing.T) {
testcases := []struct {
msg string
endpointSlices []discoveryV1.EndpointSlice
targetPort int32
expCapacity int
}{
{
msg: "EndpointSlices with no ready endpoints",
endpointSlices: []discoveryV1.EndpointSlice{
notReadyEndpointSlice,
notReadyEndpointSlice,
},
expCapacity: 0,
},
{
msg: "EndpointSlices with some ready endpoints",
endpointSlices: []discoveryV1.EndpointSlice{
mixedValidityEndpointSlice,
mixedValidityEndpointSlice,
mixedValidityEndpointSlice,
},
expCapacity: 18,
},
{
msg: "Empty EndpointSlice array",
endpointSlices: []discoveryV1.EndpointSlice{},
expCapacity: 0,
},
}

for _, tc := range testcases {
capacity := calculateEndpointSliceCapacity(tc.endpointSlices)
if capacity != tc.expCapacity {
t.Errorf("calculateEndpointSliceCapacity() mismatch for %q; expected %d, got %d", tc.msg, capacity, tc.expCapacity)
}
}
}

func TestGetDefaultPort(t *testing.T) {
testcases := []struct {
msg string
Expand Down
24 changes: 23 additions & 1 deletion internal/state/resolver/service_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var _ = Describe("ServiceResolver", func() {
addresses2 = []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}
ipv6Addresses = []string{"FE80:CD00:0:CDE:1257:0:211E:729C"}
diffPortAddresses = []string{"11.0.0.1", "11.0.0.2"}
dupeAddresses = []string{"9.0.0.1", "12.0.0.1", "9.0.0.2"}

svc = &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -135,6 +136,16 @@ var _ = Describe("ServiceResolver", func() {
httpPortName,
discoveryV1.AddressTypeIPv4,
)
// contains some duplicate endpoints as slice1.
// only unique endpoints should be returned.
dupeEndpointSlice = createSlice(
"svc-dupe-endpoints",
dupeAddresses,
8080,
httpPortName,
discoveryV1.AddressTypeIPv4,
)

sliceIPV6 = createSlice(
"svc-ipv6",
ipv6Addresses,
Expand All @@ -160,7 +171,13 @@ var _ = Describe("ServiceResolver", func() {
var err error
// The fake K8s client does not honor the Service Name Index Field,
// so it will return all the EndpointSlices regardless of the Service Name.
fakeK8sClient, err = createFakeK8sClient(slice1, slice2, sliceIPV6, sliceNoMatchingPortName)
fakeK8sClient, err = createFakeK8sClient(
slice1,
slice2,
dupeEndpointSlice,
sliceIPV6,
sliceNoMatchingPortName,
)
Expect(err).ToNot(HaveOccurred())

serviceResolver = resolver.NewServiceResolverImpl(fakeK8sClient)
Expand All @@ -187,6 +204,10 @@ var _ = Describe("ServiceResolver", func() {
Address: "10.0.0.3",
Port: 8081,
},
{
Address: "12.0.0.1",
Port: 8080,
},
}

endpoints, err := serviceResolver.Resolve(context.TODO(), svc, 80)
Expand All @@ -202,6 +223,7 @@ var _ = Describe("ServiceResolver", func() {
// delete valid endpoint slices
Expect(fakeK8sClient.Delete(context.TODO(), slice1)).To(Succeed())
Expect(fakeK8sClient.Delete(context.TODO(), slice2)).To(Succeed())
Expect(fakeK8sClient.Delete(context.TODO(), dupeEndpointSlice)).To(Succeed())

endpoints, err := serviceResolver.Resolve(context.TODO(), svc, 80)
Expect(err).To(HaveOccurred())
Expand Down