Skip to content

Commit e7266e3

Browse files
committed
Add stream support
1 parent 84168a5 commit e7266e3

File tree

5 files changed

+572
-55
lines changed

5 files changed

+572
-55
lines changed

client/nginx_client.go

Lines changed: 221 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type NginxClient struct {
2020

2121
type versions []int
2222

23+
// UpstreamServer lets you configure HTTP upstreams
2324
type UpstreamServer struct {
2425
ID int64 `json:"id,omitempty"`
2526
Server string `json:"server"`
@@ -28,6 +29,15 @@ type UpstreamServer struct {
2829
SlowStart string `json:"slow_start,omitempty"`
2930
}
3031

32+
// StreamUpstreamServer lets you configure Stream upstreams
33+
type StreamUpstreamServer struct {
34+
ID int64 `json:"id,omitempty"`
35+
Server string `json:"server"`
36+
MaxFails int64 `json:"max_fails"`
37+
FailTimeout string `json:"fail_timeout,omitempty"`
38+
SlowStart string `json:"slow_start,omitempty"`
39+
}
40+
3141
type apiErrorResponse struct {
3242
Path string
3343
Method string
@@ -128,30 +138,15 @@ func (client *NginxClient) CheckIfUpstreamExists(upstream string) error {
128138
return err
129139
}
130140

131-
// GetHTTPServers returns the servers of the upsteam from NGINX.
141+
// GetHTTPServers returns the servers of the upstream from NGINX.
132142
func (client *NginxClient) GetHTTPServers(upstream string) ([]UpstreamServer, error) {
133-
url := fmt.Sprintf("%v/%v/http/upstreams/%v/servers", client.apiEndpoint, APIVersion, upstream)
134-
135-
resp, err := client.httpClient.Get(url)
136-
if err != nil {
137-
return nil, fmt.Errorf("failed to connect to the API to get upstream servers of upstream %v: %v", upstream, err)
138-
}
139-
defer resp.Body.Close()
140-
141-
if resp.StatusCode != http.StatusOK {
142-
mainErr := fmt.Errorf("upstream %v is invalid: expected %v response, got %v", upstream, http.StatusOK, resp.StatusCode)
143-
return nil, createResponseMismatchError(resp.Body, mainErr)
144-
}
145-
146-
body, err := ioutil.ReadAll(resp.Body)
147-
if err != nil {
148-
return nil, fmt.Errorf("failed to read the response body with upstream servers of upstream %v: %v", upstream, err)
149-
}
143+
path := fmt.Sprintf("http/upstreams/%v/servers", upstream)
150144

151145
var servers []UpstreamServer
152-
err = json.Unmarshal(body, &servers)
146+
err := client.get(path, &servers)
147+
153148
if err != nil {
154-
return nil, fmt.Errorf("error unmarshalling upstream servers of upstream %v: got %q response: %v", upstream, string(body), err)
149+
return nil, fmt.Errorf("failed to get the HTTP servers of upstream %v: %v", upstream, err)
155150
}
156151

157152
return servers, nil
@@ -168,25 +163,11 @@ func (client *NginxClient) AddHTTPServer(upstream string, server UpstreamServer)
168163
return fmt.Errorf("failed to add %v server to %v upstream: server already exists", server.Server, upstream)
169164
}
170165

171-
jsonServer, err := json.Marshal(server)
172-
if err != nil {
173-
return fmt.Errorf("error marshalling upstream server %v: %v", server, err)
174-
}
175-
176-
url := fmt.Sprintf("%v/%v/http/upstreams/%v/servers/", client.apiEndpoint, APIVersion, upstream)
177-
178-
resp, err := client.httpClient.Post(url, "application/json", bytes.NewBuffer(jsonServer))
179-
166+
path := fmt.Sprintf("http/upstreams/%v/servers/", upstream)
167+
err = client.post(path, &server)
180168
if err != nil {
181169
return fmt.Errorf("failed to add %v server to %v upstream: %v", server.Server, upstream, err)
182170
}
183-
defer resp.Body.Close()
184-
185-
if resp.StatusCode != http.StatusCreated {
186-
mainErr := fmt.Errorf("failed to add %v server to %v upstream: expected %v response, got %v",
187-
server.Server, upstream, http.StatusCreated, resp.StatusCode)
188-
return createResponseMismatchError(resp.Body, mainErr)
189-
}
190171

191172
return nil
192173
}
@@ -198,27 +179,15 @@ func (client *NginxClient) DeleteHTTPServer(upstream string, server string) erro
198179
return fmt.Errorf("failed to remove %v server from %v upstream: %v", server, upstream, err)
199180
}
200181
if id == -1 {
201-
return fmt.Errorf("failed to remove %v server from %v upstream: server doesn't exists", server, upstream)
182+
return fmt.Errorf("failed to remove %v server from %v upstream: server doesn't exist", server, upstream)
202183
}
203184

204-
url := fmt.Sprintf("%v/%v/http/upstreams/%v/servers/%v", client.apiEndpoint, APIVersion, upstream, id)
205-
206-
req, err := http.NewRequest(http.MethodDelete, url, nil)
207-
if err != nil {
208-
return fmt.Errorf("failed to create a request: %v", err)
209-
}
185+
path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, id)
186+
err = client.delete(path)
210187

211-
resp, err := client.httpClient.Do(req)
212188
if err != nil {
213189
return fmt.Errorf("failed to remove %v server from %v upstream: %v", server, upstream, err)
214190
}
215-
defer resp.Body.Close()
216-
217-
if resp.StatusCode != http.StatusOK {
218-
mainErr := fmt.Errorf("failed to remove %v server from %v upstream: expected %v response, got %v",
219-
server, upstream, http.StatusOK, resp.StatusCode)
220-
return createResponseMismatchError(resp.Body, mainErr)
221-
}
222191

223192
return nil
224193
}
@@ -295,3 +264,204 @@ func (client *NginxClient) getIDOfHTTPServer(upstream string, name string) (int6
295264

296265
return -1, nil
297266
}
267+
268+
func (client *NginxClient) get(path string, data interface{}) error {
269+
url := fmt.Sprintf("%v/%v/%v", client.apiEndpoint, APIVersion, path)
270+
resp, err := client.httpClient.Get(url)
271+
if err != nil {
272+
return fmt.Errorf("failed to get %v: %v", path, err)
273+
}
274+
defer resp.Body.Close()
275+
if resp.StatusCode != http.StatusOK {
276+
mainErr := fmt.Errorf("expected %v response, got %v", http.StatusOK, resp.StatusCode)
277+
return createResponseMismatchError(resp.Body, mainErr)
278+
}
279+
280+
body, err := ioutil.ReadAll(resp.Body)
281+
if err != nil {
282+
return fmt.Errorf("failed to read the response body: %v", err)
283+
}
284+
285+
err = json.Unmarshal(body, data)
286+
if err != nil {
287+
return fmt.Errorf("error unmarshaling response %q: %v", string(body), err)
288+
}
289+
return nil
290+
}
291+
292+
func (client *NginxClient) post(path string, input interface{}) error {
293+
url := fmt.Sprintf("%v/%v/%v", client.apiEndpoint, APIVersion, path)
294+
295+
jsonInput, err := json.Marshal(input)
296+
if err != nil {
297+
return fmt.Errorf("failed to marshall input: %v", err)
298+
}
299+
300+
resp, err := client.httpClient.Post(url, "application/json", bytes.NewBuffer(jsonInput))
301+
if err != nil {
302+
return fmt.Errorf("failed to post %v: %v", path, err)
303+
}
304+
defer resp.Body.Close()
305+
if resp.StatusCode != http.StatusCreated {
306+
mainErr := fmt.Errorf("expected %v response, got %v", http.StatusCreated, resp.StatusCode)
307+
return createResponseMismatchError(resp.Body, mainErr)
308+
}
309+
310+
return nil
311+
}
312+
313+
func (client *NginxClient) delete(path string) error {
314+
path = fmt.Sprintf("%v/%v/%v/", client.apiEndpoint, APIVersion, path)
315+
316+
req, err := http.NewRequest(http.MethodDelete, path, nil)
317+
if err != nil {
318+
return fmt.Errorf("failed to create a delete request: %v", err)
319+
}
320+
321+
resp, err := client.httpClient.Do(req)
322+
if err != nil {
323+
return fmt.Errorf("failed to create delete request: %v", err)
324+
}
325+
defer resp.Body.Close()
326+
327+
if resp.StatusCode != http.StatusOK {
328+
mainErr := fmt.Errorf("failed to complete delete request: expected %v response, got %v",
329+
http.StatusOK, resp.StatusCode)
330+
return createResponseMismatchError(resp.Body, mainErr)
331+
}
332+
return nil
333+
}
334+
335+
// CheckIfStreamUpstreamExists checks if the stream upstream exists in NGINX. If the upstream doesn't exist, it returns the error.
336+
func (client *NginxClient) CheckIfStreamUpstreamExists(upstream string) error {
337+
_, err := client.GetStreamServers(upstream)
338+
return err
339+
}
340+
341+
// GetStreamServers returns the stream servers of the upstream from NGINX.
342+
func (client *NginxClient) GetStreamServers(upstream string) ([]StreamUpstreamServer, error) {
343+
path := fmt.Sprintf("stream/upstreams/%v/servers", upstream)
344+
345+
var servers []StreamUpstreamServer
346+
err := client.get(path, &servers)
347+
348+
if err != nil {
349+
return nil, fmt.Errorf("failed to get stream servers of upstream server %v: %v", upstream, err)
350+
}
351+
352+
return servers, nil
353+
}
354+
355+
// AddStreamServer adds the server to the upstream.
356+
func (client *NginxClient) AddStreamServer(upstream string, server StreamUpstreamServer) error {
357+
id, err := client.getIDOfStreamServer(upstream, server.Server)
358+
359+
if err != nil {
360+
return fmt.Errorf("failed to add %v stream server to %v upstream: %v", server.Server, upstream, err)
361+
}
362+
if id != -1 {
363+
return fmt.Errorf("failed to add %v stream server to %v upstream: server already exists", server.Server, upstream)
364+
}
365+
366+
path := fmt.Sprintf("stream/upstreams/%v/servers/", upstream)
367+
err = client.post(path, &server)
368+
369+
if err != nil {
370+
return fmt.Errorf("failed to add %v stream server to %v upstream: %v", server.Server, upstream, err)
371+
}
372+
373+
return nil
374+
}
375+
376+
// DeleteStreamServer the server from the upstream.
377+
func (client *NginxClient) DeleteStreamServer(upstream string, server string) error {
378+
id, err := client.getIDOfStreamServer(upstream, server)
379+
if err != nil {
380+
return fmt.Errorf("failed to remove %v stream server from %v upstream: %v", server, upstream, err)
381+
}
382+
if id == -1 {
383+
return fmt.Errorf("failed to remove %v stream server from %v upstream: server doesn't exist", server, upstream)
384+
}
385+
386+
path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, id)
387+
err = client.delete(path)
388+
389+
if err != nil {
390+
return fmt.Errorf("failed to remove %v stream server from %v upstream: %v", server, upstream, err)
391+
}
392+
393+
return nil
394+
}
395+
396+
// UpdateStreamServers updates the servers of the upstream.
397+
// Servers that are in the slice, but don't exist in NGINX will be added to NGINX.
398+
// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX.
399+
func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) ([]StreamUpstreamServer, []StreamUpstreamServer, error) {
400+
serversInNginx, err := client.GetStreamServers(upstream)
401+
if err != nil {
402+
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
403+
}
404+
405+
toAdd, toDelete := determineStreamUpdates(servers, serversInNginx)
406+
407+
for _, server := range toAdd {
408+
err := client.AddStreamServer(upstream, server)
409+
if err != nil {
410+
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
411+
}
412+
}
413+
414+
for _, server := range toDelete {
415+
err := client.DeleteStreamServer(upstream, server.Server)
416+
if err != nil {
417+
return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err)
418+
}
419+
}
420+
421+
return toAdd, toDelete, nil
422+
}
423+
424+
func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (int64, error) {
425+
servers, err := client.GetStreamServers(upstream)
426+
if err != nil {
427+
return -1, fmt.Errorf("error getting id of stream server %v of upstream %v: %v", name, upstream, err)
428+
}
429+
430+
for _, s := range servers {
431+
if s.Server == name {
432+
return s.ID, nil
433+
}
434+
}
435+
436+
return -1, nil
437+
}
438+
439+
func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer) {
440+
for _, server := range updatedServers {
441+
found := false
442+
for _, serverNGX := range nginxServers {
443+
if server.Server == serverNGX.Server {
444+
found = true
445+
break
446+
}
447+
}
448+
if !found {
449+
toAdd = append(toAdd, server)
450+
}
451+
}
452+
453+
for _, serverNGX := range nginxServers {
454+
found := false
455+
for _, server := range updatedServers {
456+
if serverNGX.Server == server.Server {
457+
found = true
458+
break
459+
}
460+
}
461+
if !found {
462+
toRemove = append(toRemove, serverNGX)
463+
}
464+
}
465+
466+
return
467+
}

0 commit comments

Comments
 (0)