From b7d4228650522251149edf49e2d4f7bf04a1db35 Mon Sep 17 00:00:00 2001 From: "Jonathan S. Katz" Date: Thu, 31 Oct 2019 17:51:40 -0400 Subject: [PATCH 1/2] Run gofmt on the cluster service implementation file. --- apiserver/clusterservice/clusterimpl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apiserver/clusterservice/clusterimpl.go b/apiserver/clusterservice/clusterimpl.go index ed119487ec..4740e0b7c3 100644 --- a/apiserver/clusterservice/clusterimpl.go +++ b/apiserver/clusterservice/clusterimpl.go @@ -405,7 +405,7 @@ func TestCluster(name, selector, ns, pgouser string, allFlag bool) msgs.ClusterT Timestamp: time.Now(), EventType: events.EventTestCluster, }, - Clustername: c.Name, + Clustername: c.Name, } err = events.Publish(f) From c860a178cb1e68c23cbb844527d12eddfb0b51d8 Mon Sep 17 00:00:00 2001 From: "Jonathan S. Katz" Date: Sat, 2 Nov 2019 12:02:45 -0400 Subject: [PATCH 2/2] Modify PostgreSQL cluster test to be more efficient. Previously, the PostgreSQL cluster test (from the command line, known as `pgo test`) would check if each individual user available to a PostgreSQL cluster could authenticate with its credentials. While this certainly determines that one can make a connection to a PostgreSQL database managed by the Operator, it also presents several issues: - If one has many users available to the Operator, the number of connections made can rapidly multiply. Imagine 10 users across 5 replicas - This also means that the user's credentials must be stored in the Operator. Currently this is a plaintext password. If the connections are _not_ being made over SSL, this creates the risk of password leakage. The test now performs two types of checks: instance (or pod) readiness checks, and endpoint availability. Both of these are designed to check if a PostgreSQL instance an be connected to and reached from the outside world. Specifically: - The pod readiness check calls `pg_isready` which can quickly determine if a PostgreSQL is accepting connections - Check if the service endpoints responsible for serving the PostgreSQL are reachable. If both of the checks pass, then a PostgreSQL cluster is reachable. This commit also does the following: - BREAKING: Modify the API endpoint that is consumed by `pgo test` to deliver the content in a structure manner - BREAKING: Remove the "TestCluster" event. This is not a lifecycle event. - Introduce the "Endpoints" Kubernetes construct to the `kubeapi` package - Some refactoring, cleanup, and improved documentation around the connectivity tests Issue: [ch5712] --- apiserver/clusterservice/clusterimpl.go | 240 ++++++++++++++---------- apiservermsgs/clustermsgs.go | 25 ++- events/eventtype.go | 15 -- kubeapi/endpoints.go | 69 +++++++ pgo/cmd/test.go | 37 ++-- testing/events/event_test.go | 22 --- 6 files changed, 258 insertions(+), 150 deletions(-) create mode 100644 kubeapi/endpoints.go diff --git a/apiserver/clusterservice/clusterimpl.go b/apiserver/clusterservice/clusterimpl.go index 4740e0b7c3..cc56b98046 100644 --- a/apiserver/clusterservice/clusterimpl.go +++ b/apiserver/clusterservice/clusterimpl.go @@ -31,7 +31,6 @@ import ( msgs "github.com/crunchydata/postgres-operator/apiservermsgs" "github.com/crunchydata/postgres-operator/config" - "github.com/crunchydata/postgres-operator/events" "github.com/crunchydata/postgres-operator/kubeapi" "github.com/crunchydata/postgres-operator/sshutil" "github.com/crunchydata/postgres-operator/util" @@ -268,153 +267,200 @@ func getServices(cluster *crv1.Pgcluster, ns string) ([]msgs.ShowClusterService, return output, err } +// TestCluster performs a variety of readiness checks against one or more +// clusters within a namespace. It leverages the following two Kuberentes +// constructs in order to determine the availability of PostgreSQL clusters: +// - Pod readiness checks. The Pod readiness checks leverage "pg_isready" to +// determine if the PostgreSQL cluster is able to accept connecions +// - Endpoint checks. The check sees if the services in front of the the +// PostgreSQL instances are able to route connections from the "outside" into +// the instances func TestCluster(name, selector, ns, pgouser string, allFlag bool) msgs.ClusterTestResponse { var err error + log.Debugf("TestCluster(%s,%s,%s,%s,%s): Called", + name, selector, ns, pgouser, allFlag) + response := msgs.ClusterTestResponse{} response.Results = make([]msgs.ClusterTestResult, 0) response.Status = msgs.Status{Code: msgs.Ok, Msg: ""} - log.Debugf("selector is %s", selector) - if selector == "" && allFlag { - log.Debug("selector is empty and --all is specified") - } else { - if selector == "" { + log.Debugf("selector is: %s", selector) + + // if the select is empty, determine if its because the flag for + // "all clusters" in a namespace is set + // + // otherwise, a name cluster name must be passed in, and said name should + // be used + if selector == "" { + if allFlag { + log.Debug("selector is : all clusters in %s", ns) + } else { selector = "name=" + name + log.Debugf("selector is: %s", selector) } } - //get a list of matching clusters + // Find a list of a clusters that match the given selector clusterList := crv1.PgclusterList{} err = kubeapi.GetpgclustersBySelector(apiserver.RESTClient, &clusterList, selector, ns) + // If the response errors, return here, as we won't be able to return any + // useful information in the test if err != nil { + log.Errorf("Cluster lookup failed: %s", err.Error()) response.Status.Code = msgs.Error response.Status.Msg = err.Error() return response } - //loop thru each cluster - - log.Debugf("clusters found len is %d", len(clusterList.Items)) + log.Debugf("Total clusters found: %d", len(clusterList.Items)) + // Iterate through each cluster and perform the various tests against them for _, c := range clusterList.Items { - result := msgs.ClusterTestResult{} - result.ClusterName = c.Name + // Set up the object that will be appended to the response that + // indicates the availability of the endpoints / instances for this + // cluster + result := msgs.ClusterTestResult{ + ClusterName: c.Name, + Endpoints: make([]msgs.ClusterTestDetail, 0), + Instances: make([]msgs.ClusterTestDetail, 0), + } detail := msgs.ShowClusterDetail{} detail.Cluster = c + // Get the PostgreSQL instances! + log.Debugf("Looking up instance pods for cluster: %s", c.Name) pods, err := GetPrimaryAndReplicaPods(&c, ns) + + // if there is an error with returning the primary/replica instances, + // then error and continue if err != nil { - response.Status.Code = msgs.Error - response.Status.Msg = err.Error() - return response + log.Errorf("Instance pod lookup failed: %s", err.Error()) + instance := msgs.ClusterTestDetail{ + Available: false, + InstanceType: msgs.ClusterTestInstanceTypePrimary, + } + result.Instances = append(result.Instances, instance) + response.Results = append(response.Results, result) + continue } + log.Debugf("pods found %d", len(pods)) - //loop thru the pods, make sure they are all ready - primaryReady := true - replicaReady := true - for _, pod := range pods { - if pod.Type == msgs.PodTypePrimary { - if !pod.Ready { - primaryReady = false - } - } else if pod.Type == msgs.PodTypeReplica { - if !pod.Ready { - replicaReady = false - } - } + // if there are no pods found, then the cluster is not ready at all, and + // we can make an early on checking the availability of this cluster + if len(pods) == 0 { + log.Infof("Cluster has no instances available: %s", c.Name) + instance := msgs.ClusterTestDetail{ + Available: false, + InstanceType: msgs.ClusterTestInstanceTypePrimary, + } + result.Instances = append(result.Instances, instance) + response.Results = append(response.Results, result) + continue } - if !primaryReady { - response.Status.Code = msgs.Error - response.Status.Msg = "cluster not ready yet, try later" - return response + + // Check each instance (i.e. pod) to see if its readiness check passes. + // + // (We are assuming that the readiness check is performing the + // equivalent to a "pg_isready" which denotes that a PostgreSQL instance + // is connectable. If you have any doubts about this, check the + // readiness check code) + // + // Also denotes the type of PostgreSQL instance this is. All of the pods + // returned are either primaries or replicas + for _, pod := range pods { + // set up the object with the instance status + instance := msgs.ClusterTestDetail{ + Available: pod.Ready, + Message: pod.Name, + } + switch pod.Type { + case msgs.PodTypePrimary: + instance.InstanceType = msgs.ClusterTestInstanceTypePrimary + case msgs.PodTypeReplica: + instance.InstanceType = msgs.ClusterTestInstanceTypeReplica + } + log.Debugf("Instance found with attributes: (%s, %s, %s)", + instance.InstanceType, instance.Message, instance.Available) + // Add the report on the pods to this set + result.Instances = append(result.Instances, instance) } - //get the services for this cluster + // Time to check the endpoints. We will check the available endpoints + // vis-a-vis the services detail.Services, err = getServices(&c, ns) - if err != nil { - response.Status.Code = msgs.Error - response.Status.Msg = err.Error() - return response - } - //get the secrets for this cluster - secrets, err := apiserver.GetSecrets(&c, ns) + // if the services are unavailable, report an error and continue + // iterating if err != nil { - response.Status.Code = msgs.Error - response.Status.Msg = err.Error() - return response + log.Errorf("Service lookup failed: %s", err.Error()) + endpoint := msgs.ClusterTestDetail{ + Available: false, + InstanceType: msgs.ClusterTestInstanceTypePrimary, + } + result.Endpoints = append(result.Endpoints, endpoint) + response.Results = append(response.Results, result) + continue } - result.Items = make([]msgs.ClusterTestDetail, 0) - - //for each service run a test and add results to output + // Iterate through the services and determine if they are reachable via + // their endpionts for _, service := range detail.Services { - - databases := make([]string, 0) - if service.BackrestRepo { - //dont include backrest repo service - } else if service.Pgbouncer { - databases = append(databases, service.ClusterName) - } else { - databases = append(databases, "postgres") - databases = append(databases, c.Spec.Database) + // prepare the endpoint request + endpointRequest := &kubeapi.GetEndpointRequest{ + Clientset: apiserver.Clientset, // current clientset + Name: service.Name, // name of the service, used to find the endpoint + Namespace: ns, // namespace the service / endpoint resides in + } + // prepare the end result, add the endpoint connection information + endpoint := msgs.ClusterTestDetail{ + Message: fmt.Sprintf("%s:%s", service.ClusterIP, c.Spec.Port), } - for _, s := range secrets { - for _, db := range databases { - // skip postgres user for pgbouncer testing - if s.Username == "postgres" && service.Pgbouncer { - continue - } + // determine the type of endpoint that is being checked based on + // the information available in the service + switch { + default: + endpoint.InstanceType = msgs.ClusterTestInstanceTypePrimary + case strings.Contains(service.Name, msgs.PodTypeReplica): + endpoint.InstanceType = msgs.ClusterTestInstanceTypeReplica + case service.Pgbouncer: + endpoint.InstanceType = msgs.ClusterTestInstanceTypePGBouncer + case service.BackrestRepo: + endpoint.InstanceType = msgs.ClusterTestInstanceTypeBackups + } - item := msgs.ClusterTestDetail{} - username := s.Username - password := s.Password - database := db - item.PsqlString = "psql -p " + c.Spec.Port + " -h " + service.ClusterIP + " -U " + username + " " + database - log.Debug(item.PsqlString) - if (service.Name != c.ObjectMeta.Name) && replicaReady == false { - item.Working = false - } else { - status := query(username, service.ClusterIP, c.Spec.Port, database, password) - item.Working = false - if status { - item.Working = true - } + // make a call to the Kubernetes API to see if the endpoint exists + // if there is an error, indicate that this endpoint is inaccessible + // otherwise inspect the endpoint response to see if the Pods that + // comprise the Service are in the "NotReadyAddresses" + endpoint.Available = true + if endpointResponse, err := kubeapi.GetEndpoint(endpointRequest); err != nil { + endpoint.Available = false + } else { + for _, subset := range endpointResponse.Endpoint.Subsets { + // if any of the addresses are not ready in the endpoint, + // or there are no address ready, then the endpoint is not + // ready + if len(subset.NotReadyAddresses) > 0 && len(subset.Addresses) == 0 { + endpoint.Available = false } - result.Items = append(result.Items, item) } } - } - response.Results = append(response.Results, result) + log.Debugf("Endpoint found with attributes: (%s, %s, %s)", + endpoint.InstanceType, endpoint.Message, endpoint.Available) - //publish event for cluster test - topics := make([]string, 1) - topics[0] = events.EventTopicCluster - - f := events.EventTestClusterFormat{ - EventHeader: events.EventHeader{ - Namespace: ns, - Username: pgouser, - Topic: topics, - Timestamp: time.Now(), - EventType: events.EventTestCluster, - }, - Clustername: c.Name, - } + // append the endpoint to the list + result.Endpoints = append(result.Endpoints, endpoint) - err = events.Publish(f) - if err != nil { - response.Status.Code = msgs.Error - response.Status.Msg = err.Error() - return response } + // concaentate to the results and continue + response.Results = append(response.Results, result) } return response diff --git a/apiservermsgs/clustermsgs.go b/apiservermsgs/clustermsgs.go index 132008ef64..57f2956550 100644 --- a/apiservermsgs/clustermsgs.go +++ b/apiservermsgs/clustermsgs.go @@ -168,16 +168,31 @@ type ClusterTestRequest struct { AllFlag bool } -// ClusterTestDetail ... +// a collection of constants used to enumerate the output for +// ClusterTestDetail => InstanceType +const ( + ClusterTestInstanceTypePrimary = "primary" + ClusterTestInstanceTypeReplica = "replica" + ClusterTestInstanceTypePGBouncer = "pgbouncer" + ClusterTestInstanceTypeBackups = "backups" +) + +// ClusterTestDetail provides the output of an individual test that is performed +// on either a PostgreSQL instance (i.e. pod) or a service endpoint that is used +// to connect to the instances type ClusterTestDetail struct { - PsqlString string - Working bool + Available bool // true if the object being tested is available (ready) + Message string // a descriptive message that can be displayed with + InstanceType string // an enumerated set of what this instance can be, e.g. "primary" } -// ClusterTestResult ... +// ClusterTestResult contains the output for a test on a single PostgreSQL +// cluster. This includes the endpoints (i.e. how to connect to instances +// in a cluster) and the instances themselves (which are pods) type ClusterTestResult struct { ClusterName string - Items []ClusterTestDetail + Endpoints []ClusterTestDetail // a list of endpoints + Instances []ClusterTestDetail // a list of instances (pods) } // ClusterTestResponse ... diff --git a/events/eventtype.go b/events/eventtype.go index e15b7923e8..9b3e185ae3 100644 --- a/events/eventtype.go +++ b/events/eventtype.go @@ -50,7 +50,6 @@ const ( EventUpgradeClusterCompleted = "UpgradeClusterCompleted" EventDeleteCluster = "DeleteCluster" EventDeleteClusterCompleted = "DeleteClusterCompleted" - EventTestCluster = "TestCluster" EventCreateLabel = "CreateLabel" EventLoad = "Load" EventLoadCompleted = "LoadCompleted" @@ -308,20 +307,6 @@ func (lvl EventDeleteClusterCompletedFormat) String() string { return msg } -//---------------------------- -type EventTestClusterFormat struct { - EventHeader `json:"eventheader"` - Clustername string `json:"clustername"` -} - -func (p EventTestClusterFormat) GetHeader() EventHeader { - return p.EventHeader -} -func (lvl EventTestClusterFormat) String() string { - msg := fmt.Sprintf("Event %s (test) - clustername %s", lvl.EventHeader, lvl.Clustername) - return msg -} - //---------------------------- type EventCreateBackupFormat struct { EventHeader `json:"eventheader"` diff --git a/kubeapi/endpoints.go b/kubeapi/endpoints.go new file mode 100644 index 0000000000..3e57c1a904 --- /dev/null +++ b/kubeapi/endpoints.go @@ -0,0 +1,69 @@ +package kubeapi + +/* + Copyright 2019 Crunchy Data Solutions, Inc. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import ( + log "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +// GetEndpointRequest is used for the GetEndpoint function, which includes the +// current Kubernetes request context, as well as the namespace / endpoint name +// being requested +type GetEndpointRequest struct { + Clientset *kubernetes.Clientset // Kubernetes Clientset that interfaces with the Kubernetes cluster + Name string // Name of the endpoint that is being queried + Namespace string // Namespace the endpoint being queried resides in +} + +// GetEndpointResponse contains the results from a successful request to the +// endpoint API, including the Kubernetes Endpoint as well as the original +// request data +type GetEndpointResponse struct { + Endpoint *v1.Endpoints // Kubernetes Endpoint object that specifics about the endpoint + Name string // Name of the endpoint + Namespace string // Namespace that the endpoint is in +} + +// GetEndpoint tries to find an individual endpoint in a namespace. Returns the +// endpoint object if it can be IsNotFound +// If no endpoint can be found, then an error is returned +func GetEndpoint(request *GetEndpointRequest) (*GetEndpointResponse, error) { + log.Debugf("GetEndpointResponse Called: (%s,%s,%s)", request.Clientset, request.Name, request.Namespace) + // set the endpoints interfaces that will be used to make the query + endpointsInterface := request.Clientset.CoreV1().Endpoints(request.Namespace) + // make the query to Kubernetes to see if the specific endpoint exists + endpoint, err := endpointsInterface.Get(request.Name, meta_v1.GetOptions{}) + // return at this point if there is an error + if err != nil { + log.Errorf("GetEndpointResponse(%s,%s): Endpoint Not Found: %s", + request.Name, request.Namespace, err.Error()) + return nil, err + } + // create a response and return + response := &GetEndpointResponse{ + Endpoint: endpoint, + Name: request.Name, + Namespace: request.Namespace, + } + + log.Debugf("GetEndpointResponse Response: (%s,%s,%s)", + response.Namespace, response.Name, response.Endpoint) + + return response, nil +} diff --git a/pgo/cmd/test.go b/pgo/cmd/test.go index e087f865d5..cba66c323b 100644 --- a/pgo/cmd/test.go +++ b/pgo/cmd/test.go @@ -18,17 +18,19 @@ package cmd import ( "encoding/json" "fmt" + "os" + msgs "github.com/crunchydata/postgres-operator/apiservermsgs" "github.com/crunchydata/postgres-operator/pgo/api" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "os" ) var testCmd = &cobra.Command{ Use: "test", Short: "Test cluster connectivity", - Long: `TEST allows you to test the connectivity for a cluster. For example: + Long: `TEST allows you to test the availability of a PostgreSQL cluster. For example: pgo test mycluster pgo test --selector=env=research @@ -108,16 +110,29 @@ func showTest(args []string, ns string) { for _, result := range response.Results { fmt.Println("") - fmt.Printf("cluster : %s \n", result.ClusterName) - for _, v := range result.Items { - fmt.Printf("%s%s is ", TreeBranch, v.PsqlString) - if v.Working { - fmt.Printf("%s\n", GREEN("Working")) - } else { - fmt.Printf("%s\n", RED("NOT working")) - } - } + fmt.Println(fmt.Sprintf("cluster : %s", result.ClusterName)) + + // first, print the test results for the endpoints, which make up + // the services + printTestResults("Services", result.Endpoints) + // first, print the test results for the instances + printTestResults("Instances", result.Instances) } + } +} +// prints out a set of test results +func printTestResults(testName string, results []msgs.ClusterTestDetail) { + // print out the header for this group of tests + fmt.Println(fmt.Sprintf("%s%s", TreeBranch, testName)) + // iterate though the results and print them! + for _, v := range results { + fmt.Printf("%s%s%s (%s): ", + TreeBranch, TreeBranch, v.InstanceType, v.Message) + if v.Available { + fmt.Println(fmt.Sprintf("%s", GREEN("UP"))) + } else { + fmt.Println(fmt.Sprintf("%s", RED("DOWN"))) + } } } diff --git a/testing/events/event_test.go b/testing/events/event_test.go index b0cca483f4..dbe0388677 100644 --- a/testing/events/event_test.go +++ b/testing/events/event_test.go @@ -23,7 +23,6 @@ func TestEventCreate(t *testing.T) { tryEventFailoverCluster(t) tryEventFailoverClusterCompleted(t) tryEventDeleteCluster(t) - tryEventTestCluster(t) tryEventCreateLabel(t) tryEventLoad(t) tryEventLoadCompleted(t) @@ -248,27 +247,6 @@ func tryEventDeleteCluster(t *testing.T) { } t.Log(f.String()) } -func tryEventTestCluster(t *testing.T) { - - topics := make([]string, 1) - topics[0] = events.EventTopicCluster - - f := events.EventTestClusterFormat{ - EventHeader: events.EventHeader{ - Namespace: Namespace, - Username: TestUsername, - Topic: topics, - EventType: events.EventTestCluster, - }, - Clustername: TestClusterName, - } - - err := events.Publish(f) - if err != nil { - t.Fatal(err.Error()) - } - t.Log(f.String()) -} func tryEventCreateBackup(t *testing.T) { topics := make([]string, 1)