Skip to content

Commit 9ac2936

Browse files
Vighneswar Rao BojjaRulox
Vighneswar Rao Bojja
authored andcommitted
Add support for updating server parameters
1 parent 7ab5c57 commit 9ac2936

File tree

3 files changed

+676
-78
lines changed

3 files changed

+676
-78
lines changed

client/nginx.go

Lines changed: 209 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,30 @@ import (
77
"io"
88
"io/ioutil"
99
"net/http"
10+
"reflect"
11+
"strings"
1012
)
1113

12-
// APIVersion is a version of NGINX Plus API.
13-
const APIVersion = 5
14+
const (
15+
// APIVersion is a version of NGINX Plus API.
16+
APIVersion = 5
1417

15-
const pathNotFoundCode = "PathNotFound"
18+
pathNotFoundCode = "PathNotFound"
19+
streamContext = true
20+
httpContext = false
21+
defaultServerPort = "80"
22+
)
1623

17-
const streamContext = true
18-
const httpContext = false
24+
// Default values for servers in Upstreams.
25+
var (
26+
defaultMaxConns = 0
27+
defaultMaxFails = 1
28+
defaultFailTimeout = "10s"
29+
defaultSlowStart = "0s"
30+
defaultBackup = false
31+
defaultDown = false
32+
defaultWeight = 1
33+
)
1934

2035
// NginxClient lets you access NGINX Plus API.
2136
type NginxClient struct {
@@ -29,13 +44,13 @@ type versions []int
2944
type UpstreamServer struct {
3045
ID int `json:"id,omitempty"`
3146
Server string `json:"server"`
32-
MaxConns int `json:"max_conns"`
47+
MaxConns *int `json:"max_conns,omitempty"`
3348
MaxFails *int `json:"max_fails,omitempty"`
3449
FailTimeout string `json:"fail_timeout,omitempty"`
3550
SlowStart string `json:"slow_start,omitempty"`
36-
Route string `json:"route"`
37-
Backup bool `json:"backup"`
38-
Down bool `json:"down"`
51+
Route string `json:"route,omitempty"`
52+
Backup *bool `json:"backup,omitempty"`
53+
Down *bool `json:"down,omitempty"`
3954
Drain bool `json:"drain,omitempty"`
4055
Weight *int `json:"weight,omitempty"`
4156
Service string `json:"service,omitempty"`
@@ -45,12 +60,12 @@ type UpstreamServer struct {
4560
type StreamUpstreamServer struct {
4661
ID int `json:"id,omitempty"`
4762
Server string `json:"server"`
48-
MaxConns int `json:"max_conns"`
63+
MaxConns *int `json:"max_conns,omitempty"`
4964
MaxFails *int `json:"max_fails,omitempty"`
5065
FailTimeout string `json:"fail_timeout,omitempty"`
5166
SlowStart string `json:"slow_start,omitempty"`
52-
Backup bool `json:"backup"`
53-
Down bool `json:"down"`
67+
Backup *bool `json:"backup,omitempty"`
68+
Down *bool `json:"down,omitempty"`
5469
Weight *int `json:"weight,omitempty"`
5570
Service string `json:"service,omitempty"`
5671
}
@@ -469,32 +484,93 @@ func (client *NginxClient) DeleteHTTPServer(upstream string, server string) erro
469484
// UpdateHTTPServers updates the servers of the upstream.
470485
// Servers that are in the slice, but don't exist in NGINX will be added to NGINX.
471486
// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX.
472-
func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) ([]UpstreamServer, []UpstreamServer, error) {
487+
// Servers that are in the slice and exist in NGINX, but have different parameters, will be updated.
488+
func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) (added []UpstreamServer, deleted []UpstreamServer, updated []UpstreamServer, err error) {
473489
serversInNginx, err := client.GetHTTPServers(upstream)
474490
if err != nil {
475-
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
491+
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
492+
}
493+
494+
// We assume port 80 if no port is set for servers.
495+
for _, server := range servers {
496+
server.Server = addPortToServer(server.Server)
476497
}
477498

478-
toAdd, toDelete := determineUpdates(servers, serversInNginx)
499+
toAdd, toDelete, toUpdate := determineUpdates(servers, serversInNginx)
479500

480501
for _, server := range toAdd {
481502
err := client.AddHTTPServer(upstream, server)
482503
if err != nil {
483-
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
504+
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
484505
}
485506
}
486507

487508
for _, server := range toDelete {
488509
err := client.DeleteHTTPServer(upstream, server.Server)
489510
if err != nil {
490-
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
511+
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
491512
}
492513
}
493514

494-
return toAdd, toDelete, nil
515+
for _, server := range toUpdate {
516+
err := client.UpdateHTTPServer(upstream, server)
517+
if err != nil {
518+
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
519+
}
520+
}
521+
522+
return toAdd, toDelete, toUpdate, nil
495523
}
496524

497-
func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer) {
525+
// haveSameParameters checks if a given server has the same parameters an NGINX server already present. Order matters
526+
func haveSameParameters(newServer UpstreamServer, serverNGX UpstreamServer) bool {
527+
newServer.ID = serverNGX.ID
528+
529+
if serverNGX.MaxConns != nil && newServer.MaxConns == nil {
530+
newServer.MaxConns = &defaultMaxConns
531+
}
532+
533+
if serverNGX.MaxFails != nil && newServer.MaxFails == nil {
534+
newServer.MaxFails = &defaultMaxFails
535+
}
536+
537+
if serverNGX.FailTimeout != "" && newServer.FailTimeout == "" {
538+
newServer.FailTimeout = defaultFailTimeout
539+
}
540+
541+
if serverNGX.SlowStart != "" && newServer.SlowStart == "" {
542+
newServer.SlowStart = defaultSlowStart
543+
}
544+
545+
if serverNGX.Backup != nil && newServer.Backup == nil {
546+
newServer.Backup = &defaultBackup
547+
}
548+
549+
if serverNGX.Down != nil && newServer.Down == nil {
550+
newServer.Down = &defaultDown
551+
}
552+
553+
if serverNGX.Weight != nil && newServer.Weight == nil {
554+
newServer.Weight = &defaultWeight
555+
}
556+
557+
return reflect.DeepEqual(newServer, serverNGX)
558+
}
559+
560+
func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer, toUpdate []UpstreamServer) {
561+
for _, server := range updatedServers {
562+
updateFound := false
563+
for _, serverNGX := range nginxServers {
564+
if server.Server == serverNGX.Server && !haveSameParameters(server, serverNGX) {
565+
updateFound = true
566+
break
567+
}
568+
}
569+
if updateFound {
570+
toUpdate = append(toUpdate, server)
571+
}
572+
}
573+
498574
for _, server := range updatedServers {
499575
found := false
500576
for _, serverNGX := range nginxServers {
@@ -608,7 +684,7 @@ func (client *NginxClient) delete(path string, expectedStatusCode int) error {
608684
return nil
609685
}
610686

611-
func (client *NginxClient) patch(path string, input interface{}) error {
687+
func (client *NginxClient) patch(path string, input interface{}, expectedStatusCode int) error {
612688
path = fmt.Sprintf("%v/%v/%v/", client.apiEndpoint, APIVersion, path)
613689

614690
jsonInput, err := json.Marshal(input)
@@ -627,10 +703,10 @@ func (client *NginxClient) patch(path string, input interface{}) error {
627703
}
628704
defer resp.Body.Close()
629705

630-
if resp.StatusCode != http.StatusNoContent {
706+
if resp.StatusCode != expectedStatusCode {
631707
return createResponseMismatchError(resp.Body).Wrap(fmt.Sprintf(
632708
"failed to complete patch request: expected %v response, got %v",
633-
http.StatusNoContent, resp.StatusCode))
709+
expectedStatusCode, resp.StatusCode))
634710
}
635711
return nil
636712
}
@@ -655,6 +731,7 @@ func (client *NginxClient) GetStreamServers(upstream string) ([]StreamUpstreamSe
655731

656732
// AddStreamServer adds the stream server to the upstream.
657733
func (client *NginxClient) AddStreamServer(upstream string, server StreamUpstreamServer) error {
734+
server.Server = addPortToServer(server.Server)
658735
id, err := client.getIDOfStreamServer(upstream, server.Server)
659736
if err != nil {
660737
return fmt.Errorf("failed to add %v stream server to %v upstream: %v", server.Server, upstream, err)
@@ -673,6 +750,7 @@ func (client *NginxClient) AddStreamServer(upstream string, server StreamUpstrea
673750

674751
// DeleteStreamServer the server from the upstream.
675752
func (client *NginxClient) DeleteStreamServer(upstream string, server string) error {
753+
server = addPortToServer(server)
676754
id, err := client.getIDOfStreamServer(upstream, server)
677755
if err != nil {
678756
return fmt.Errorf("failed to remove %v stream server from %v upstream: %v", server, upstream, err)
@@ -692,29 +770,42 @@ func (client *NginxClient) DeleteStreamServer(upstream string, server string) er
692770
// UpdateStreamServers updates the servers of the upstream.
693771
// Servers that are in the slice, but don't exist in NGINX will be added to NGINX.
694772
// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX.
695-
func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) ([]StreamUpstreamServer, []StreamUpstreamServer, error) {
773+
// Servers that are in the slice and exist in NGINX, but have different parameters, will be updated.
774+
func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) (added []StreamUpstreamServer, deleted []StreamUpstreamServer, updated []StreamUpstreamServer, err error) {
696775
serversInNginx, err := client.GetStreamServers(upstream)
697776
if err != nil {
698-
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
777+
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
699778
}
700779

701-
toAdd, toDelete := determineStreamUpdates(servers, serversInNginx)
780+
// We assume port 80 if no port is set for servers.
781+
for _, server := range servers {
782+
server.Server = addPortToServer(server.Server)
783+
}
784+
785+
toAdd, toDelete, toUpdate := determineStreamUpdates(servers, serversInNginx)
702786

703787
for _, server := range toAdd {
704788
err := client.AddStreamServer(upstream, server)
705789
if err != nil {
706-
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
790+
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
707791
}
708792
}
709793

710794
for _, server := range toDelete {
711795
err := client.DeleteStreamServer(upstream, server.Server)
712796
if err != nil {
713-
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
797+
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
798+
}
799+
}
800+
801+
for _, server := range toUpdate {
802+
err := client.UpdateStreamServer(upstream, server)
803+
if err != nil {
804+
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
714805
}
715806
}
716807

717-
return toAdd, toDelete, nil
808+
return toAdd, toDelete, toUpdate, nil
718809
}
719810

720811
func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (int, error) {
@@ -732,7 +823,54 @@ func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (in
732823
return -1, nil
733824
}
734825

735-
func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer) {
826+
// haveStreamSameParameters checks if a given stream server has the same parameters an NGINX server already present. Order matters
827+
func haveStreamSameParameters(newServer StreamUpstreamServer, serverNGX StreamUpstreamServer) bool {
828+
newServer.ID = serverNGX.ID
829+
if serverNGX.MaxConns != nil && newServer.MaxConns == nil {
830+
newServer.MaxConns = &defaultMaxConns
831+
}
832+
833+
if serverNGX.MaxFails != nil && newServer.MaxFails == nil {
834+
newServer.MaxFails = &defaultMaxFails
835+
}
836+
837+
if serverNGX.FailTimeout != "" && newServer.FailTimeout == "" {
838+
newServer.FailTimeout = defaultFailTimeout
839+
}
840+
841+
if serverNGX.SlowStart != "" && newServer.SlowStart == "" {
842+
newServer.SlowStart = defaultSlowStart
843+
}
844+
845+
if serverNGX.Backup != nil && newServer.Backup == nil {
846+
newServer.Backup = &defaultBackup
847+
}
848+
849+
if serverNGX.Down != nil && newServer.Down == nil {
850+
newServer.Down = &defaultDown
851+
}
852+
853+
if serverNGX.Weight != nil && newServer.Weight == nil {
854+
newServer.Weight = &defaultWeight
855+
}
856+
857+
return reflect.DeepEqual(newServer, serverNGX)
858+
}
859+
860+
func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer, toUpdate []StreamUpstreamServer) {
861+
for _, server := range updatedServers {
862+
updateFound := false
863+
for _, serverNGX := range nginxServers {
864+
if server.Server == serverNGX.Server && !haveStreamSameParameters(server, serverNGX) {
865+
updateFound = true
866+
break
867+
}
868+
}
869+
if updateFound {
870+
toUpdate = append(toUpdate, server)
871+
}
872+
}
873+
736874
for _, server := range updatedServers {
737875
found := false
738876
for _, serverNGX := range nginxServers {
@@ -1059,7 +1197,7 @@ func (client *NginxClient) modifyKeyValPair(zone string, key string, val string,
10591197

10601198
path := fmt.Sprintf("%v/keyvals/%v", base, zone)
10611199
input := KeyValPairs{key: val}
1062-
err := client.patch(path, &input)
1200+
err := client.patch(path, &input, http.StatusNoContent)
10631201
if err != nil {
10641202
return fmt.Errorf("failed to update key value pair for %v/%v zone: %v", base, zone, err)
10651203
}
@@ -1092,7 +1230,7 @@ func (client *NginxClient) deleteKeyValuePair(zone string, key string, stream bo
10921230
keyval[key] = nil
10931231

10941232
path := fmt.Sprintf("%v/keyvals/%v", base, zone)
1095-
err := client.patch(path, &keyval)
1233+
err := client.patch(path, &keyval, http.StatusNoContent)
10961234
if err != nil {
10971235
return fmt.Errorf("failed to remove key values pair for %v/%v zone: %v", base, zone, err)
10981236
}
@@ -1125,3 +1263,43 @@ func (client *NginxClient) deleteKeyValPairs(zone string, stream bool) error {
11251263
}
11261264
return nil
11271265
}
1266+
1267+
// UpdateHTTPServer updates the server of the upstream.
1268+
func (client *NginxClient) UpdateHTTPServer(upstream string, server UpstreamServer) error {
1269+
path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, server.ID)
1270+
server.ID = 0
1271+
err := client.patch(path, &server, http.StatusOK)
1272+
if err != nil {
1273+
return fmt.Errorf("failed to update %v server to %v upstream: %v", server.Server, upstream, err)
1274+
}
1275+
1276+
return nil
1277+
}
1278+
1279+
// UpdateStreamServer updates the stream server of the upstream.
1280+
func (client *NginxClient) UpdateStreamServer(upstream string, server StreamUpstreamServer) error {
1281+
path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, server.ID)
1282+
server.ID = 0
1283+
err := client.patch(path, &server, http.StatusOK)
1284+
if err != nil {
1285+
return fmt.Errorf("failed to update %v stream server to %v upstream: %v", server.Server, upstream, err)
1286+
}
1287+
1288+
return nil
1289+
}
1290+
1291+
func addPortToServer(server string) string {
1292+
if len(strings.Split(server, ":")) == 2 {
1293+
return server
1294+
}
1295+
1296+
if len(strings.Split(server, "]:")) == 2 {
1297+
return server
1298+
}
1299+
1300+
if strings.HasPrefix(server, "unix:") {
1301+
return server
1302+
}
1303+
1304+
return fmt.Sprintf("%v:%v", server, defaultServerPort)
1305+
}

0 commit comments

Comments
 (0)