Skip to content

Commit b5305c4

Browse files
Jonathan S. Katzjkatz
Jonathan S. Katz
authored andcommitted
Modify PostgreSQL cluster test to be more efficient.
Previously, the PostgreSQL cluster test (from the command line, known as `pgo test`) would check if each individual user available to a PostgreSQL cluster could authenticate with its credentials. While this certainly determines that one can make a connection to a PostgreSQL database managed by the Operator, it also presents several issues: - If one has many users available to the Operator, the number of connections made can rapidly multiply. Imagine 10 users across 5 replicas - This also means that the user's credentials must be stored in the Operator. Currently this is a plaintext password. If the connections are _not_ being made over SSL, this creates the risk of password leakage. The test now performs two types of checks: instance (or pod) readiness checks, and endpoint availability. Both of these are designed to check if a PostgreSQL instance an be connected to and reached from the outside world. Specifically: - The pod readiness check calls `pg_isready` which can quickly determine if a PostgreSQL is accepting connections - Check if the service endpoints responsible for serving the PostgreSQL are reachable. If both of the checks pass, then a PostgreSQL cluster is reachable. This commit also does the following: - BREAKING: Modify the API endpoint that is consumed by `pgo test` to deliver the content in a structure manner - BREAKING: Remove the "TestCluster" event. This is not a lifecycle event. - Introduce the "Endpoints" Kubernetes construct to the `kubeapi` package - Some refactoring, cleanup, and improved documentation around the connectivity tests Issue: [ch5712]
1 parent c1f9490 commit b5305c4

File tree

6 files changed

+258
-150
lines changed

6 files changed

+258
-150
lines changed

apiserver/clusterservice/clusterimpl.go

Lines changed: 143 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131

3232
msgs "github.com/crunchydata/postgres-operator/apiservermsgs"
3333
"github.com/crunchydata/postgres-operator/config"
34-
"github.com/crunchydata/postgres-operator/events"
3534
"github.com/crunchydata/postgres-operator/kubeapi"
3635
"github.com/crunchydata/postgres-operator/sshutil"
3736
"github.com/crunchydata/postgres-operator/util"
@@ -268,153 +267,200 @@ func getServices(cluster *crv1.Pgcluster, ns string) ([]msgs.ShowClusterService,
268267
return output, err
269268
}
270269

270+
// TestCluster performs a variety of readiness checks against one or more
271+
// clusters within a namespace. It leverages the following two Kuberentes
272+
// constructs in order to determine the availability of PostgreSQL clusters:
273+
// - Pod readiness checks. The Pod readiness checks leverage "pg_isready" to
274+
// determine if the PostgreSQL cluster is able to accept connecions
275+
// - Endpoint checks. The check sees if the services in front of the the
276+
// PostgreSQL instances are able to route connections from the "outside" into
277+
// the instances
271278
func TestCluster(name, selector, ns, pgouser string, allFlag bool) msgs.ClusterTestResponse {
272279
var err error
273280

281+
log.Debugf("TestCluster(%s,%s,%s,%s,%s): Called",
282+
name, selector, ns, pgouser, allFlag)
283+
274284
response := msgs.ClusterTestResponse{}
275285
response.Results = make([]msgs.ClusterTestResult, 0)
276286
response.Status = msgs.Status{Code: msgs.Ok, Msg: ""}
277287

278-
log.Debugf("selector is %s", selector)
279-
if selector == "" && allFlag {
280-
log.Debug("selector is empty and --all is specified")
281-
} else {
282-
if selector == "" {
288+
log.Debugf("selector is: %s", selector)
289+
290+
// if the select is empty, determine if its because the flag for
291+
// "all clusters" in a namespace is set
292+
//
293+
// otherwise, a name cluster name must be passed in, and said name should
294+
// be used
295+
if selector == "" {
296+
if allFlag {
297+
log.Debug("selector is : all clusters in %s", ns)
298+
} else {
283299
selector = "name=" + name
300+
log.Debugf("selector is: %s", selector)
284301
}
285302
}
286303

287-
//get a list of matching clusters
304+
// Find a list of a clusters that match the given selector
288305
clusterList := crv1.PgclusterList{}
289306
err = kubeapi.GetpgclustersBySelector(apiserver.RESTClient, &clusterList, selector, ns)
290307

308+
// If the response errors, return here, as we won't be able to return any
309+
// useful information in the test
291310
if err != nil {
311+
log.Errorf("Cluster lookup failed: %s", err.Error())
292312
response.Status.Code = msgs.Error
293313
response.Status.Msg = err.Error()
294314
return response
295315
}
296316

297-
//loop thru each cluster
298-
299-
log.Debugf("clusters found len is %d", len(clusterList.Items))
317+
log.Debugf("Total clusters found: %d", len(clusterList.Items))
300318

319+
// Iterate through each cluster and perform the various tests against them
301320
for _, c := range clusterList.Items {
302-
result := msgs.ClusterTestResult{}
303-
result.ClusterName = c.Name
321+
// Set up the object that will be appended to the response that
322+
// indicates the availability of the endpoints / instances for this
323+
// cluster
324+
result := msgs.ClusterTestResult{
325+
ClusterName: c.Name,
326+
Endpoints: make([]msgs.ClusterTestDetail, 0),
327+
Instances: make([]msgs.ClusterTestDetail, 0),
328+
}
304329

305330
detail := msgs.ShowClusterDetail{}
306331
detail.Cluster = c
307332

333+
// Get the PostgreSQL instances!
334+
log.Debugf("Looking up instance pods for cluster: %s", c.Name)
308335
pods, err := GetPrimaryAndReplicaPods(&c, ns)
336+
337+
// if there is an error with returning the primary/replica instances,
338+
// then error and continue
309339
if err != nil {
310-
response.Status.Code = msgs.Error
311-
response.Status.Msg = err.Error()
312-
return response
340+
log.Errorf("Instance pod lookup failed: %s", err.Error())
341+
instance := msgs.ClusterTestDetail{
342+
Available: false,
343+
InstanceType: msgs.ClusterTestInstanceTypePrimary,
344+
}
345+
result.Instances = append(result.Instances, instance)
346+
response.Results = append(response.Results, result)
347+
continue
313348
}
349+
314350
log.Debugf("pods found %d", len(pods))
315-
//loop thru the pods, make sure they are all ready
316-
primaryReady := true
317-
replicaReady := true
318-
for _, pod := range pods {
319-
if pod.Type == msgs.PodTypePrimary {
320-
if !pod.Ready {
321-
primaryReady = false
322-
}
323-
} else if pod.Type == msgs.PodTypeReplica {
324-
if !pod.Ready {
325-
replicaReady = false
326-
}
327-
}
328351

352+
// if there are no pods found, then the cluster is not ready at all, and
353+
// we can make an early on checking the availability of this cluster
354+
if len(pods) == 0 {
355+
log.Infof("Cluster has no instances available: %s", c.Name)
356+
instance := msgs.ClusterTestDetail{
357+
Available: false,
358+
InstanceType: msgs.ClusterTestInstanceTypePrimary,
359+
}
360+
result.Instances = append(result.Instances, instance)
361+
response.Results = append(response.Results, result)
362+
continue
329363
}
330-
if !primaryReady {
331-
response.Status.Code = msgs.Error
332-
response.Status.Msg = "cluster not ready yet, try later"
333-
return response
364+
365+
// Check each instance (i.e. pod) to see if its readiness check passes.
366+
//
367+
// (We are assuming that the readiness check is performing the
368+
// equivalent to a "pg_isready" which denotes that a PostgreSQL instance
369+
// is connectable. If you have any doubts about this, check the
370+
// readiness check code)
371+
//
372+
// Also denotes the type of PostgreSQL instance this is. All of the pods
373+
// returned are either primaries or replicas
374+
for _, pod := range pods {
375+
// set up the object with the instance status
376+
instance := msgs.ClusterTestDetail{
377+
Available: pod.Ready,
378+
Message: pod.Name,
379+
}
380+
switch pod.Type {
381+
case msgs.PodTypePrimary:
382+
instance.InstanceType = msgs.ClusterTestInstanceTypePrimary
383+
case msgs.PodTypeReplica:
384+
instance.InstanceType = msgs.ClusterTestInstanceTypeReplica
385+
}
386+
log.Debugf("Instance found with attributes: (%s, %s, %s)",
387+
instance.InstanceType, instance.Message, instance.Available)
388+
// Add the report on the pods to this set
389+
result.Instances = append(result.Instances, instance)
334390
}
335391

336-
//get the services for this cluster
392+
// Time to check the endpoints. We will check the available endpoints
393+
// vis-a-vis the services
337394
detail.Services, err = getServices(&c, ns)
338-
if err != nil {
339-
response.Status.Code = msgs.Error
340-
response.Status.Msg = err.Error()
341-
return response
342-
}
343395

344-
//get the secrets for this cluster
345-
secrets, err := apiserver.GetSecrets(&c, ns)
396+
// if the services are unavailable, report an error and continue
397+
// iterating
346398
if err != nil {
347-
response.Status.Code = msgs.Error
348-
response.Status.Msg = err.Error()
349-
return response
399+
log.Errorf("Service lookup failed: %s", err.Error())
400+
endpoint := msgs.ClusterTestDetail{
401+
Available: false,
402+
InstanceType: msgs.ClusterTestInstanceTypePrimary,
403+
}
404+
result.Endpoints = append(result.Endpoints, endpoint)
405+
response.Results = append(response.Results, result)
406+
continue
350407
}
351408

352-
result.Items = make([]msgs.ClusterTestDetail, 0)
353-
354-
//for each service run a test and add results to output
409+
// Iterate through the services and determine if they are reachable via
410+
// their endpionts
355411
for _, service := range detail.Services {
356-
357-
databases := make([]string, 0)
358-
if service.BackrestRepo {
359-
//dont include backrest repo service
360-
} else if service.Pgbouncer {
361-
databases = append(databases, service.ClusterName)
362-
} else {
363-
databases = append(databases, "postgres")
364-
databases = append(databases, c.Spec.Database)
412+
// prepare the endpoint request
413+
endpointRequest := &kubeapi.GetEndpointRequest{
414+
Clientset: apiserver.Clientset, // current clientset
415+
Name: service.Name, // name of the service, used to find the endpoint
416+
Namespace: ns, // namespace the service / endpoint resides in
417+
}
418+
// prepare the end result, add the endpoint connection information
419+
endpoint := msgs.ClusterTestDetail{
420+
Message: fmt.Sprintf("%s:%s", service.ClusterIP, c.Spec.Port),
365421
}
366-
for _, s := range secrets {
367-
for _, db := range databases {
368422

369-
// skip postgres user for pgbouncer testing
370-
if s.Username == "postgres" && service.Pgbouncer {
371-
continue
372-
}
423+
// determine the type of endpoint that is being checked based on
424+
// the information available in the service
425+
switch {
426+
default:
427+
endpoint.InstanceType = msgs.ClusterTestInstanceTypePrimary
428+
case strings.Contains(service.Name, msgs.PodTypeReplica):
429+
endpoint.InstanceType = msgs.ClusterTestInstanceTypeReplica
430+
case service.Pgbouncer:
431+
endpoint.InstanceType = msgs.ClusterTestInstanceTypePGBouncer
432+
case service.BackrestRepo:
433+
endpoint.InstanceType = msgs.ClusterTestInstanceTypeBackups
434+
}
373435

374-
item := msgs.ClusterTestDetail{}
375-
username := s.Username
376-
password := s.Password
377-
database := db
378-
item.PsqlString = "psql -p " + c.Spec.Port + " -h " + service.ClusterIP + " -U " + username + " " + database
379-
log.Debug(item.PsqlString)
380-
if (service.Name != c.ObjectMeta.Name) && replicaReady == false {
381-
item.Working = false
382-
} else {
383-
status := query(username, service.ClusterIP, c.Spec.Port, database, password)
384-
item.Working = false
385-
if status {
386-
item.Working = true
387-
}
436+
// make a call to the Kubernetes API to see if the endpoint exists
437+
// if there is an error, indicate that this endpoint is inaccessible
438+
// otherwise inspect the endpoint response to see if the Pods that
439+
// comprise the Service are in the "NotReadyAddresses"
440+
endpoint.Available = true
441+
if endpointResponse, err := kubeapi.GetEndpoint(endpointRequest); err != nil {
442+
endpoint.Available = false
443+
} else {
444+
for _, subset := range endpointResponse.Endpoint.Subsets {
445+
// if any of the addresses are not ready in the endpoint,
446+
// or there are no address ready, then the endpoint is not
447+
// ready
448+
if len(subset.NotReadyAddresses) > 0 && len(subset.Addresses) == 0 {
449+
endpoint.Available = false
388450
}
389-
result.Items = append(result.Items, item)
390451
}
391452
}
392453

393-
}
394-
response.Results = append(response.Results, result)
454+
log.Debugf("Endpoint found with attributes: (%s, %s, %s)",
455+
endpoint.InstanceType, endpoint.Message, endpoint.Available)
395456

396-
//publish event for cluster test
397-
topics := make([]string, 1)
398-
topics[0] = events.EventTopicCluster
399-
400-
f := events.EventTestClusterFormat{
401-
EventHeader: events.EventHeader{
402-
Namespace: ns,
403-
Username: pgouser,
404-
Topic: topics,
405-
Timestamp: time.Now(),
406-
EventType: events.EventTestCluster,
407-
},
408-
Clustername: c.Name,
409-
}
457+
// append the endpoint to the list
458+
result.Endpoints = append(result.Endpoints, endpoint)
410459

411-
err = events.Publish(f)
412-
if err != nil {
413-
response.Status.Code = msgs.Error
414-
response.Status.Msg = err.Error()
415-
return response
416460
}
417461

462+
// concaentate to the results and continue
463+
response.Results = append(response.Results, result)
418464
}
419465

420466
return response

apiservermsgs/clustermsgs.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,16 +168,31 @@ type ClusterTestRequest struct {
168168
AllFlag bool
169169
}
170170

171-
// ClusterTestDetail ...
171+
// a collection of constants used to enumerate the output for
172+
// ClusterTestDetail => InstanceType
173+
const (
174+
ClusterTestInstanceTypePrimary = "primary"
175+
ClusterTestInstanceTypeReplica = "replica"
176+
ClusterTestInstanceTypePGBouncer = "pgbouncer"
177+
ClusterTestInstanceTypeBackups = "backups"
178+
)
179+
180+
// ClusterTestDetail provides the output of an individual test that is performed
181+
// on either a PostgreSQL instance (i.e. pod) or a service endpoint that is used
182+
// to connect to the instances
172183
type ClusterTestDetail struct {
173-
PsqlString string
174-
Working bool
184+
Available bool // true if the object being tested is available (ready)
185+
Message string // a descriptive message that can be displayed with
186+
InstanceType string // an enumerated set of what this instance can be, e.g. "primary"
175187
}
176188

177-
// ClusterTestResult ...
189+
// ClusterTestResult contains the output for a test on a single PostgreSQL
190+
// cluster. This includes the endpoints (i.e. how to connect to instances
191+
// in a cluster) and the instances themselves (which are pods)
178192
type ClusterTestResult struct {
179193
ClusterName string
180-
Items []ClusterTestDetail
194+
Endpoints []ClusterTestDetail // a list of endpoints
195+
Instances []ClusterTestDetail // a list of instances (pods)
181196
}
182197

183198
// ClusterTestResponse ...

events/eventtype.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ const (
5050
EventUpgradeClusterCompleted = "UpgradeClusterCompleted"
5151
EventDeleteCluster = "DeleteCluster"
5252
EventDeleteClusterCompleted = "DeleteClusterCompleted"
53-
EventTestCluster = "TestCluster"
5453
EventCreateLabel = "CreateLabel"
5554
EventLoad = "Load"
5655
EventLoadCompleted = "LoadCompleted"
@@ -308,20 +307,6 @@ func (lvl EventDeleteClusterCompletedFormat) String() string {
308307
return msg
309308
}
310309

311-
//----------------------------
312-
type EventTestClusterFormat struct {
313-
EventHeader `json:"eventheader"`
314-
Clustername string `json:"clustername"`
315-
}
316-
317-
func (p EventTestClusterFormat) GetHeader() EventHeader {
318-
return p.EventHeader
319-
}
320-
func (lvl EventTestClusterFormat) String() string {
321-
msg := fmt.Sprintf("Event %s (test) - clustername %s", lvl.EventHeader, lvl.Clustername)
322-
return msg
323-
}
324-
325310
//----------------------------
326311
type EventCreateBackupFormat struct {
327312
EventHeader `json:"eventheader"`

0 commit comments

Comments
 (0)