Skip to content

Update pgo test to check connectivity on instances & endpoints ([ch5712]) #1076

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 143 additions & 97 deletions apiserver/clusterservice/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

msgs "github.com/crunchydata/postgres-operator/apiservermsgs"
"github.com/crunchydata/postgres-operator/config"
"github.com/crunchydata/postgres-operator/events"
"github.com/crunchydata/postgres-operator/kubeapi"
"github.com/crunchydata/postgres-operator/sshutil"
"github.com/crunchydata/postgres-operator/util"
Expand Down Expand Up @@ -268,153 +267,200 @@ func getServices(cluster *crv1.Pgcluster, ns string) ([]msgs.ShowClusterService,
return output, err
}

// TestCluster performs a variety of readiness checks against one or more
// clusters within a namespace. It leverages the following two Kuberentes
// constructs in order to determine the availability of PostgreSQL clusters:
// - Pod readiness checks. The Pod readiness checks leverage "pg_isready" to
// determine if the PostgreSQL cluster is able to accept connecions
// - Endpoint checks. The check sees if the services in front of the the
// PostgreSQL instances are able to route connections from the "outside" into
// the instances
func TestCluster(name, selector, ns, pgouser string, allFlag bool) msgs.ClusterTestResponse {
var err error

log.Debugf("TestCluster(%s,%s,%s,%s,%s): Called",
name, selector, ns, pgouser, allFlag)

response := msgs.ClusterTestResponse{}
response.Results = make([]msgs.ClusterTestResult, 0)
response.Status = msgs.Status{Code: msgs.Ok, Msg: ""}

log.Debugf("selector is %s", selector)
if selector == "" && allFlag {
log.Debug("selector is empty and --all is specified")
} else {
if selector == "" {
log.Debugf("selector is: %s", selector)

// if the select is empty, determine if its because the flag for
// "all clusters" in a namespace is set
//
// otherwise, a name cluster name must be passed in, and said name should
// be used
if selector == "" {
if allFlag {
log.Debug("selector is : all clusters in %s", ns)
} else {
selector = "name=" + name
log.Debugf("selector is: %s", selector)
}
}

//get a list of matching clusters
// Find a list of a clusters that match the given selector
clusterList := crv1.PgclusterList{}
err = kubeapi.GetpgclustersBySelector(apiserver.RESTClient, &clusterList, selector, ns)

// If the response errors, return here, as we won't be able to return any
// useful information in the test
if err != nil {
log.Errorf("Cluster lookup failed: %s", err.Error())
response.Status.Code = msgs.Error
response.Status.Msg = err.Error()
return response
}

//loop thru each cluster

log.Debugf("clusters found len is %d", len(clusterList.Items))
log.Debugf("Total clusters found: %d", len(clusterList.Items))

// Iterate through each cluster and perform the various tests against them
for _, c := range clusterList.Items {
result := msgs.ClusterTestResult{}
result.ClusterName = c.Name
// Set up the object that will be appended to the response that
// indicates the availability of the endpoints / instances for this
// cluster
result := msgs.ClusterTestResult{
ClusterName: c.Name,
Endpoints: make([]msgs.ClusterTestDetail, 0),
Instances: make([]msgs.ClusterTestDetail, 0),
}

detail := msgs.ShowClusterDetail{}
detail.Cluster = c

// Get the PostgreSQL instances!
log.Debugf("Looking up instance pods for cluster: %s", c.Name)
pods, err := GetPrimaryAndReplicaPods(&c, ns)

// if there is an error with returning the primary/replica instances,
// then error and continue
if err != nil {
response.Status.Code = msgs.Error
response.Status.Msg = err.Error()
return response
log.Errorf("Instance pod lookup failed: %s", err.Error())
instance := msgs.ClusterTestDetail{
Available: false,
InstanceType: msgs.ClusterTestInstanceTypePrimary,
}
result.Instances = append(result.Instances, instance)
response.Results = append(response.Results, result)
continue
}

log.Debugf("pods found %d", len(pods))
//loop thru the pods, make sure they are all ready
primaryReady := true
replicaReady := true
for _, pod := range pods {
if pod.Type == msgs.PodTypePrimary {
if !pod.Ready {
primaryReady = false
}
} else if pod.Type == msgs.PodTypeReplica {
if !pod.Ready {
replicaReady = false
}
}

// if there are no pods found, then the cluster is not ready at all, and
// we can make an early on checking the availability of this cluster
if len(pods) == 0 {
log.Infof("Cluster has no instances available: %s", c.Name)
instance := msgs.ClusterTestDetail{
Available: false,
InstanceType: msgs.ClusterTestInstanceTypePrimary,
}
result.Instances = append(result.Instances, instance)
response.Results = append(response.Results, result)
continue
}
if !primaryReady {
response.Status.Code = msgs.Error
response.Status.Msg = "cluster not ready yet, try later"
return response

// Check each instance (i.e. pod) to see if its readiness check passes.
//
// (We are assuming that the readiness check is performing the
// equivalent to a "pg_isready" which denotes that a PostgreSQL instance
// is connectable. If you have any doubts about this, check the
// readiness check code)
//
// Also denotes the type of PostgreSQL instance this is. All of the pods
// returned are either primaries or replicas
for _, pod := range pods {
// set up the object with the instance status
instance := msgs.ClusterTestDetail{
Available: pod.Ready,
Message: pod.Name,
}
switch pod.Type {
case msgs.PodTypePrimary:
instance.InstanceType = msgs.ClusterTestInstanceTypePrimary
case msgs.PodTypeReplica:
instance.InstanceType = msgs.ClusterTestInstanceTypeReplica
}
log.Debugf("Instance found with attributes: (%s, %s, %s)",
instance.InstanceType, instance.Message, instance.Available)
// Add the report on the pods to this set
result.Instances = append(result.Instances, instance)
}

//get the services for this cluster
// Time to check the endpoints. We will check the available endpoints
// vis-a-vis the services
detail.Services, err = getServices(&c, ns)
if err != nil {
response.Status.Code = msgs.Error
response.Status.Msg = err.Error()
return response
}

//get the secrets for this cluster
secrets, err := apiserver.GetSecrets(&c, ns)
// if the services are unavailable, report an error and continue
// iterating
if err != nil {
response.Status.Code = msgs.Error
response.Status.Msg = err.Error()
return response
log.Errorf("Service lookup failed: %s", err.Error())
endpoint := msgs.ClusterTestDetail{
Available: false,
InstanceType: msgs.ClusterTestInstanceTypePrimary,
}
result.Endpoints = append(result.Endpoints, endpoint)
response.Results = append(response.Results, result)
continue
}

result.Items = make([]msgs.ClusterTestDetail, 0)

//for each service run a test and add results to output
// Iterate through the services and determine if they are reachable via
// their endpionts
for _, service := range detail.Services {

databases := make([]string, 0)
if service.BackrestRepo {
//dont include backrest repo service
} else if service.Pgbouncer {
databases = append(databases, service.ClusterName)
} else {
databases = append(databases, "postgres")
databases = append(databases, c.Spec.Database)
// prepare the endpoint request
endpointRequest := &kubeapi.GetEndpointRequest{
Clientset: apiserver.Clientset, // current clientset
Name: service.Name, // name of the service, used to find the endpoint
Namespace: ns, // namespace the service / endpoint resides in
}
// prepare the end result, add the endpoint connection information
endpoint := msgs.ClusterTestDetail{
Message: fmt.Sprintf("%s:%s", service.ClusterIP, c.Spec.Port),
}
for _, s := range secrets {
for _, db := range databases {

// skip postgres user for pgbouncer testing
if s.Username == "postgres" && service.Pgbouncer {
continue
}
// determine the type of endpoint that is being checked based on
// the information available in the service
switch {
default:
endpoint.InstanceType = msgs.ClusterTestInstanceTypePrimary
case strings.Contains(service.Name, msgs.PodTypeReplica):
endpoint.InstanceType = msgs.ClusterTestInstanceTypeReplica
case service.Pgbouncer:
endpoint.InstanceType = msgs.ClusterTestInstanceTypePGBouncer
case service.BackrestRepo:
endpoint.InstanceType = msgs.ClusterTestInstanceTypeBackups
}

item := msgs.ClusterTestDetail{}
username := s.Username
password := s.Password
database := db
item.PsqlString = "psql -p " + c.Spec.Port + " -h " + service.ClusterIP + " -U " + username + " " + database
log.Debug(item.PsqlString)
if (service.Name != c.ObjectMeta.Name) && replicaReady == false {
item.Working = false
} else {
status := query(username, service.ClusterIP, c.Spec.Port, database, password)
item.Working = false
if status {
item.Working = true
}
// make a call to the Kubernetes API to see if the endpoint exists
// if there is an error, indicate that this endpoint is inaccessible
// otherwise inspect the endpoint response to see if the Pods that
// comprise the Service are in the "NotReadyAddresses"
endpoint.Available = true
if endpointResponse, err := kubeapi.GetEndpoint(endpointRequest); err != nil {
endpoint.Available = false
} else {
for _, subset := range endpointResponse.Endpoint.Subsets {
// if any of the addresses are not ready in the endpoint,
// or there are no address ready, then the endpoint is not
// ready
if len(subset.NotReadyAddresses) > 0 && len(subset.Addresses) == 0 {
endpoint.Available = false
}
result.Items = append(result.Items, item)
}
}

}
response.Results = append(response.Results, result)
log.Debugf("Endpoint found with attributes: (%s, %s, %s)",
endpoint.InstanceType, endpoint.Message, endpoint.Available)

//publish event for cluster test
topics := make([]string, 1)
topics[0] = events.EventTopicCluster

f := events.EventTestClusterFormat{
EventHeader: events.EventHeader{
Namespace: ns,
Username: pgouser,
Topic: topics,
Timestamp: time.Now(),
EventType: events.EventTestCluster,
},
Clustername: c.Name,
}
// append the endpoint to the list
result.Endpoints = append(result.Endpoints, endpoint)

err = events.Publish(f)
if err != nil {
response.Status.Code = msgs.Error
response.Status.Msg = err.Error()
return response
}

// concaentate to the results and continue
response.Results = append(response.Results, result)
}

return response
Expand Down
25 changes: 20 additions & 5 deletions apiservermsgs/clustermsgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,31 @@ type ClusterTestRequest struct {
AllFlag bool
}

// ClusterTestDetail ...
// a collection of constants used to enumerate the output for
// ClusterTestDetail => InstanceType
const (
ClusterTestInstanceTypePrimary = "primary"
ClusterTestInstanceTypeReplica = "replica"
ClusterTestInstanceTypePGBouncer = "pgbouncer"
ClusterTestInstanceTypeBackups = "backups"
)

// ClusterTestDetail provides the output of an individual test that is performed
// on either a PostgreSQL instance (i.e. pod) or a service endpoint that is used
// to connect to the instances
type ClusterTestDetail struct {
PsqlString string
Working bool
Available bool // true if the object being tested is available (ready)
Message string // a descriptive message that can be displayed with
InstanceType string // an enumerated set of what this instance can be, e.g. "primary"
}

// ClusterTestResult ...
// ClusterTestResult contains the output for a test on a single PostgreSQL
// cluster. This includes the endpoints (i.e. how to connect to instances
// in a cluster) and the instances themselves (which are pods)
type ClusterTestResult struct {
ClusterName string
Items []ClusterTestDetail
Endpoints []ClusterTestDetail // a list of endpoints
Instances []ClusterTestDetail // a list of instances (pods)
}

// ClusterTestResponse ...
Expand Down
15 changes: 0 additions & 15 deletions events/eventtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ const (
EventUpgradeClusterCompleted = "UpgradeClusterCompleted"
EventDeleteCluster = "DeleteCluster"
EventDeleteClusterCompleted = "DeleteClusterCompleted"
EventTestCluster = "TestCluster"
EventCreateLabel = "CreateLabel"
EventLoad = "Load"
EventLoadCompleted = "LoadCompleted"
Expand Down Expand Up @@ -308,20 +307,6 @@ func (lvl EventDeleteClusterCompletedFormat) String() string {
return msg
}

//----------------------------
type EventTestClusterFormat struct {
EventHeader `json:"eventheader"`
Clustername string `json:"clustername"`
}

func (p EventTestClusterFormat) GetHeader() EventHeader {
return p.EventHeader
}
func (lvl EventTestClusterFormat) String() string {
msg := fmt.Sprintf("Event %s (test) - clustername %s", lvl.EventHeader, lvl.Clustername)
return msg
}

//----------------------------
type EventCreateBackupFormat struct {
EventHeader `json:"eventheader"`
Expand Down
Loading