Skip to content

Commit e5ee079

Browse files
Vighneswar Rao BojjaRulox
Vighneswar Rao Bojja
authored andcommitted
Add support for updating server parameters
1 parent 631554e commit e5ee079

File tree

3 files changed

+713
-78
lines changed

3 files changed

+713
-78
lines changed

client/nginx.go

Lines changed: 212 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,30 @@ import (
77
"io"
88
"io/ioutil"
99
"net/http"
10+
"reflect"
11+
"strings"
1012
)
1113

12-
// APIVersion is a version of NGINX Plus API.
13-
const APIVersion = 5
14+
const (
15+
// APIVersion is a version of NGINX Plus API.
16+
APIVersion = 5
1417

15-
const pathNotFoundCode = "PathNotFound"
18+
pathNotFoundCode = "PathNotFound"
19+
streamContext = true
20+
httpContext = false
21+
defaultServerPort = "80"
22+
)
1623

17-
const streamContext = true
18-
const httpContext = false
24+
// Default values for servers in Upstreams.
25+
var (
26+
defaultMaxConns = 0
27+
defaultMaxFails = 1
28+
defaultFailTimeout = "10s"
29+
defaultSlowStart = "0s"
30+
defaultBackup = false
31+
defaultDown = false
32+
defaultWeight = 1
33+
)
1934

2035
// NginxClient lets you access NGINX Plus API.
2136
type NginxClient struct {
@@ -29,13 +44,13 @@ type versions []int
2944
type UpstreamServer struct {
3045
ID int `json:"id,omitempty"`
3146
Server string `json:"server"`
32-
MaxConns int `json:"max_conns"`
47+
MaxConns *int `json:"max_conns,omitempty"`
3348
MaxFails *int `json:"max_fails,omitempty"`
3449
FailTimeout string `json:"fail_timeout,omitempty"`
3550
SlowStart string `json:"slow_start,omitempty"`
36-
Route string `json:"route"`
37-
Backup bool `json:"backup"`
38-
Down bool `json:"down"`
51+
Route string `json:"route,omitempty"`
52+
Backup *bool `json:"backup,omitempty"`
53+
Down *bool `json:"down,omitempty"`
3954
Drain bool `json:"drain,omitempty"`
4055
Weight *int `json:"weight,omitempty"`
4156
Service string `json:"service,omitempty"`
@@ -45,12 +60,12 @@ type UpstreamServer struct {
4560
type StreamUpstreamServer struct {
4661
ID int `json:"id,omitempty"`
4762
Server string `json:"server"`
48-
MaxConns int `json:"max_conns"`
63+
MaxConns *int `json:"max_conns,omitempty"`
4964
MaxFails *int `json:"max_fails,omitempty"`
5065
FailTimeout string `json:"fail_timeout,omitempty"`
5166
SlowStart string `json:"slow_start,omitempty"`
52-
Backup bool `json:"backup"`
53-
Down bool `json:"down"`
67+
Backup *bool `json:"backup,omitempty"`
68+
Down *bool `json:"down,omitempty"`
5469
Weight *int `json:"weight,omitempty"`
5570
Service string `json:"service,omitempty"`
5671
}
@@ -469,32 +484,96 @@ func (client *NginxClient) DeleteHTTPServer(upstream string, server string) erro
469484
// UpdateHTTPServers updates the servers of the upstream.
470485
// Servers that are in the slice, but don't exist in NGINX will be added to NGINX.
471486
// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX.
472-
func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) ([]UpstreamServer, []UpstreamServer, error) {
487+
// Servers that are in the slice and exist in NGINX, but have different parameters, will be updated.
488+
func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) (added []UpstreamServer, deleted []UpstreamServer, updated []UpstreamServer, err error) {
473489
serversInNginx, err := client.GetHTTPServers(upstream)
474490
if err != nil {
475-
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
491+
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
492+
}
493+
494+
// We assume port 80 if no port is set for servers.
495+
var formattedServers []UpstreamServer
496+
for _, server := range servers {
497+
server.Server = addPortToServer(server.Server)
498+
formattedServers = append(formattedServers, server)
476499
}
477500

478-
toAdd, toDelete := determineUpdates(servers, serversInNginx)
501+
toAdd, toDelete, toUpdate := determineUpdates(formattedServers, serversInNginx)
479502

480503
for _, server := range toAdd {
481504
err := client.AddHTTPServer(upstream, server)
482505
if err != nil {
483-
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
506+
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
484507
}
485508
}
486509

487510
for _, server := range toDelete {
488511
err := client.DeleteHTTPServer(upstream, server.Server)
489512
if err != nil {
490-
return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
513+
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
491514
}
492515
}
493516

494-
return toAdd, toDelete, nil
517+
for _, server := range toUpdate {
518+
err := client.UpdateHTTPServer(upstream, server)
519+
if err != nil {
520+
return nil, nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err)
521+
}
522+
}
523+
524+
return toAdd, toDelete, toUpdate, nil
495525
}
496526

497-
func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer) {
527+
// haveSameParameters checks if a given server has the same parameters as a server already present in NGINX. Order matters
528+
func haveSameParameters(newServer UpstreamServer, serverNGX UpstreamServer) bool {
529+
newServer.ID = serverNGX.ID
530+
531+
if serverNGX.MaxConns != nil && newServer.MaxConns == nil {
532+
newServer.MaxConns = &defaultMaxConns
533+
}
534+
535+
if serverNGX.MaxFails != nil && newServer.MaxFails == nil {
536+
newServer.MaxFails = &defaultMaxFails
537+
}
538+
539+
if serverNGX.FailTimeout != "" && newServer.FailTimeout == "" {
540+
newServer.FailTimeout = defaultFailTimeout
541+
}
542+
543+
if serverNGX.SlowStart != "" && newServer.SlowStart == "" {
544+
newServer.SlowStart = defaultSlowStart
545+
}
546+
547+
if serverNGX.Backup != nil && newServer.Backup == nil {
548+
newServer.Backup = &defaultBackup
549+
}
550+
551+
if serverNGX.Down != nil && newServer.Down == nil {
552+
newServer.Down = &defaultDown
553+
}
554+
555+
if serverNGX.Weight != nil && newServer.Weight == nil {
556+
newServer.Weight = &defaultWeight
557+
}
558+
559+
return reflect.DeepEqual(newServer, serverNGX)
560+
}
561+
562+
func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer, toUpdate []UpstreamServer) {
563+
for _, server := range updatedServers {
564+
updateFound := false
565+
for _, serverNGX := range nginxServers {
566+
if server.Server == serverNGX.Server && !haveSameParameters(server, serverNGX) {
567+
server.ID = serverNGX.ID
568+
updateFound = true
569+
break
570+
}
571+
}
572+
if updateFound {
573+
toUpdate = append(toUpdate, server)
574+
}
575+
}
576+
498577
for _, server := range updatedServers {
499578
found := false
500579
for _, serverNGX := range nginxServers {
@@ -608,7 +687,7 @@ func (client *NginxClient) delete(path string, expectedStatusCode int) error {
608687
return nil
609688
}
610689

611-
func (client *NginxClient) patch(path string, input interface{}) error {
690+
func (client *NginxClient) patch(path string, input interface{}, expectedStatusCode int) error {
612691
path = fmt.Sprintf("%v/%v/%v/", client.apiEndpoint, APIVersion, path)
613692

614693
jsonInput, err := json.Marshal(input)
@@ -627,10 +706,10 @@ func (client *NginxClient) patch(path string, input interface{}) error {
627706
}
628707
defer resp.Body.Close()
629708

630-
if resp.StatusCode != http.StatusNoContent {
709+
if resp.StatusCode != expectedStatusCode {
631710
return createResponseMismatchError(resp.Body).Wrap(fmt.Sprintf(
632711
"failed to complete patch request: expected %v response, got %v",
633-
http.StatusNoContent, resp.StatusCode))
712+
expectedStatusCode, resp.StatusCode))
634713
}
635714
return nil
636715
}
@@ -692,29 +771,43 @@ func (client *NginxClient) DeleteStreamServer(upstream string, server string) er
692771
// UpdateStreamServers updates the servers of the upstream.
693772
// Servers that are in the slice, but don't exist in NGINX will be added to NGINX.
694773
// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX.
695-
func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) ([]StreamUpstreamServer, []StreamUpstreamServer, error) {
774+
// Servers that are in the slice and exist in NGINX, but have different parameters, will be updated.
775+
func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) (added []StreamUpstreamServer, deleted []StreamUpstreamServer, updated []StreamUpstreamServer, err error) {
696776
serversInNginx, err := client.GetStreamServers(upstream)
697777
if err != nil {
698-
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
778+
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
699779
}
700780

701-
toAdd, toDelete := determineStreamUpdates(servers, serversInNginx)
781+
var formattedServers []StreamUpstreamServer
782+
for _, server := range servers {
783+
server.Server = addPortToServer(server.Server)
784+
formattedServers = append(formattedServers, server)
785+
}
786+
787+
toAdd, toDelete, toUpdate := determineStreamUpdates(formattedServers, serversInNginx)
702788

703789
for _, server := range toAdd {
704790
err := client.AddStreamServer(upstream, server)
705791
if err != nil {
706-
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
792+
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
707793
}
708794
}
709795

710796
for _, server := range toDelete {
711797
err := client.DeleteStreamServer(upstream, server.Server)
712798
if err != nil {
713-
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
799+
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
800+
}
801+
}
802+
803+
for _, server := range toUpdate {
804+
err := client.UpdateStreamServer(upstream, server)
805+
if err != nil {
806+
return nil, nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
714807
}
715808
}
716809

717-
return toAdd, toDelete, nil
810+
return toAdd, toDelete, toUpdate, nil
718811
}
719812

720813
func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (int, error) {
@@ -732,7 +825,55 @@ func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (in
732825
return -1, nil
733826
}
734827

735-
func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer) {
828+
// haveSameParametersForStream checks if a given server has the same parameters as a server already present in NGINX. Order matters
829+
func haveSameParametersForStream(newServer StreamUpstreamServer, serverNGX StreamUpstreamServer) bool {
830+
newServer.ID = serverNGX.ID
831+
if serverNGX.MaxConns != nil && newServer.MaxConns == nil {
832+
newServer.MaxConns = &defaultMaxConns
833+
}
834+
835+
if serverNGX.MaxFails != nil && newServer.MaxFails == nil {
836+
newServer.MaxFails = &defaultMaxFails
837+
}
838+
839+
if serverNGX.FailTimeout != "" && newServer.FailTimeout == "" {
840+
newServer.FailTimeout = defaultFailTimeout
841+
}
842+
843+
if serverNGX.SlowStart != "" && newServer.SlowStart == "" {
844+
newServer.SlowStart = defaultSlowStart
845+
}
846+
847+
if serverNGX.Backup != nil && newServer.Backup == nil {
848+
newServer.Backup = &defaultBackup
849+
}
850+
851+
if serverNGX.Down != nil && newServer.Down == nil {
852+
newServer.Down = &defaultDown
853+
}
854+
855+
if serverNGX.Weight != nil && newServer.Weight == nil {
856+
newServer.Weight = &defaultWeight
857+
}
858+
859+
return reflect.DeepEqual(newServer, serverNGX)
860+
}
861+
862+
func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer, toUpdate []StreamUpstreamServer) {
863+
for _, server := range updatedServers {
864+
updateFound := false
865+
for _, serverNGX := range nginxServers {
866+
if server.Server == serverNGX.Server && !haveSameParametersForStream(server, serverNGX) {
867+
server.ID = serverNGX.ID
868+
updateFound = true
869+
break
870+
}
871+
}
872+
if updateFound {
873+
toUpdate = append(toUpdate, server)
874+
}
875+
}
876+
736877
for _, server := range updatedServers {
737878
found := false
738879
for _, serverNGX := range nginxServers {
@@ -1059,7 +1200,7 @@ func (client *NginxClient) modifyKeyValPair(zone string, key string, val string,
10591200

10601201
path := fmt.Sprintf("%v/keyvals/%v", base, zone)
10611202
input := KeyValPairs{key: val}
1062-
err := client.patch(path, &input)
1203+
err := client.patch(path, &input, http.StatusNoContent)
10631204
if err != nil {
10641205
return fmt.Errorf("failed to update key value pair for %v/%v zone: %v", base, zone, err)
10651206
}
@@ -1092,7 +1233,7 @@ func (client *NginxClient) deleteKeyValuePair(zone string, key string, stream bo
10921233
keyval[key] = nil
10931234

10941235
path := fmt.Sprintf("%v/keyvals/%v", base, zone)
1095-
err := client.patch(path, &keyval)
1236+
err := client.patch(path, &keyval, http.StatusNoContent)
10961237
if err != nil {
10971238
return fmt.Errorf("failed to remove key values pair for %v/%v zone: %v", base, zone, err)
10981239
}
@@ -1125,3 +1266,43 @@ func (client *NginxClient) deleteKeyValPairs(zone string, stream bool) error {
11251266
}
11261267
return nil
11271268
}
1269+
1270+
// UpdateHTTPServer updates the server of the upstream.
1271+
func (client *NginxClient) UpdateHTTPServer(upstream string, server UpstreamServer) error {
1272+
path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, server.ID)
1273+
server.ID = 0
1274+
err := client.patch(path, &server, http.StatusOK)
1275+
if err != nil {
1276+
return fmt.Errorf("failed to update %v server to %v upstream: %v", server.Server, upstream, err)
1277+
}
1278+
1279+
return nil
1280+
}
1281+
1282+
// UpdateStreamServer updates the stream server of the upstream.
1283+
func (client *NginxClient) UpdateStreamServer(upstream string, server StreamUpstreamServer) error {
1284+
path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, server.ID)
1285+
server.ID = 0
1286+
err := client.patch(path, &server, http.StatusOK)
1287+
if err != nil {
1288+
return fmt.Errorf("failed to update %v stream server to %v upstream: %v", server.Server, upstream, err)
1289+
}
1290+
1291+
return nil
1292+
}
1293+
1294+
func addPortToServer(server string) string {
1295+
if len(strings.Split(server, ":")) == 2 {
1296+
return server
1297+
}
1298+
1299+
if len(strings.Split(server, "]:")) == 2 {
1300+
return server
1301+
}
1302+
1303+
if strings.HasPrefix(server, "unix:") {
1304+
return server
1305+
}
1306+
1307+
return fmt.Sprintf("%v:%v", server, defaultServerPort)
1308+
}

0 commit comments

Comments
 (0)