Skip to content

Commit 2f7e3ee

Browse files
authored
fix stream duplication on operator restart (#2733)
* fix stream duplication on operator restart * add try except to streams e2e test
1 parent c7ee34e commit 2f7e3ee

File tree

3 files changed

+226
-153
lines changed

3 files changed

+226
-153
lines changed

e2e/tests/test_e2e.py

Lines changed: 113 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -2131,130 +2131,136 @@ def test_stream_resources(self):
21312131
verbs=["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"]
21322132
)
21332133
cluster_role.rules.append(fes_cluster_role_rule)
2134-
k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role)
21352134

2136-
# create a table in one of the database of acid-minimal-cluster
2137-
create_stream_table = """
2138-
CREATE TABLE test_table (id int, payload jsonb);
2139-
"""
2140-
self.query_database(leader.metadata.name, "foo", create_stream_table)
2135+
try:
2136+
k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role)
21412137

2142-
# update the manifest with the streams section
2143-
patch_streaming_config = {
2144-
"spec": {
2145-
"patroni": {
2146-
"slots": {
2147-
"manual_slot": {
2148-
"type": "physical"
2149-
}
2150-
}
2151-
},
2152-
"streams": [
2153-
{
2154-
"applicationId": "test-app",
2155-
"batchSize": 100,
2156-
"database": "foo",
2157-
"enableRecovery": True,
2158-
"tables": {
2159-
"test_table": {
2160-
"eventType": "test-event",
2161-
"idColumn": "id",
2162-
"payloadColumn": "payload",
2163-
"recoveryEventType": "test-event-dlq"
2138+
# create a table in one of the database of acid-minimal-cluster
2139+
create_stream_table = """
2140+
CREATE TABLE test_table (id int, payload jsonb);
2141+
"""
2142+
self.query_database(leader.metadata.name, "foo", create_stream_table)
2143+
2144+
# update the manifest with the streams section
2145+
patch_streaming_config = {
2146+
"spec": {
2147+
"patroni": {
2148+
"slots": {
2149+
"manual_slot": {
2150+
"type": "physical"
21642151
}
21652152
}
21662153
},
2167-
{
2168-
"applicationId": "test-app2",
2169-
"batchSize": 100,
2170-
"database": "foo",
2171-
"enableRecovery": True,
2172-
"tables": {
2173-
"test_non_exist_table": {
2174-
"eventType": "test-event",
2175-
"idColumn": "id",
2176-
"payloadColumn": "payload",
2177-
"recoveryEventType": "test-event-dlq"
2154+
"streams": [
2155+
{
2156+
"applicationId": "test-app",
2157+
"batchSize": 100,
2158+
"database": "foo",
2159+
"enableRecovery": True,
2160+
"tables": {
2161+
"test_table": {
2162+
"eventType": "test-event",
2163+
"idColumn": "id",
2164+
"payloadColumn": "payload",
2165+
"recoveryEventType": "test-event-dlq"
2166+
}
2167+
}
2168+
},
2169+
{
2170+
"applicationId": "test-app2",
2171+
"batchSize": 100,
2172+
"database": "foo",
2173+
"enableRecovery": True,
2174+
"tables": {
2175+
"test_non_exist_table": {
2176+
"eventType": "test-event",
2177+
"idColumn": "id",
2178+
"payloadColumn": "payload",
2179+
"recoveryEventType": "test-event-dlq"
2180+
}
21782181
}
21792182
}
2180-
}
2181-
]
2183+
]
2184+
}
21822185
}
2183-
}
2184-
k8s.api.custom_objects_api.patch_namespaced_custom_object(
2185-
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config)
2186-
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2186+
k8s.api.custom_objects_api.patch_namespaced_custom_object(
2187+
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config)
2188+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
21872189

2188-
# check if publication, slot, and fes resource are created
2189-
get_publication_query = """
2190-
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app';
2191-
"""
2192-
get_slot_query = """
2193-
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app';
2194-
"""
2195-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1,
2196-
"Publication is not created", 10, 5)
2197-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1,
2198-
"Replication slot is not created", 10, 5)
2199-
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
2190+
# check if publication, slot, and fes resource are created
2191+
get_publication_query = """
2192+
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app';
2193+
"""
2194+
get_slot_query = """
2195+
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app';
2196+
"""
2197+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1,
2198+
"Publication is not created", 10, 5)
2199+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1,
2200+
"Replication slot is not created", 10, 5)
2201+
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
22002202
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1,
22012203
"Could not find Fabric Event Stream resource", 10, 5)
22022204

2203-
# check if the non-existing table in the stream section does not create a publication and slot
2204-
get_publication_query_not_exist_table = """
2205-
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2';
2206-
"""
2207-
get_slot_query_not_exist_table = """
2208-
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2';
2209-
"""
2210-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0,
2211-
"Publication is created for non-existing tables", 10, 5)
2212-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0,
2213-
"Replication slot is created for non-existing tables", 10, 5)
2214-
2215-
# grant create and ownership of test_table to foo_user, reset search path to default
2216-
grant_permission_foo_user = """
2217-
GRANT CREATE ON DATABASE foo TO foo_user;
2218-
ALTER TABLE test_table OWNER TO foo_user;
2219-
ALTER ROLE foo_user RESET search_path;
2220-
"""
2221-
self.query_database(leader.metadata.name, "foo", grant_permission_foo_user)
2222-
# non-postgres user creates a publication
2223-
create_nonstream_publication = """
2224-
CREATE PUBLICATION mypublication FOR TABLE test_table;
2225-
"""
2226-
self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user")
2205+
# check if the non-existing table in the stream section does not create a publication and slot
2206+
get_publication_query_not_exist_table = """
2207+
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2';
2208+
"""
2209+
get_slot_query_not_exist_table = """
2210+
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2';
2211+
"""
2212+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0,
2213+
"Publication is created for non-existing tables", 10, 5)
2214+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0,
2215+
"Replication slot is created for non-existing tables", 10, 5)
2216+
2217+
# grant create and ownership of test_table to foo_user, reset search path to default
2218+
grant_permission_foo_user = """
2219+
GRANT CREATE ON DATABASE foo TO foo_user;
2220+
ALTER TABLE test_table OWNER TO foo_user;
2221+
ALTER ROLE foo_user RESET search_path;
2222+
"""
2223+
self.query_database(leader.metadata.name, "foo", grant_permission_foo_user)
2224+
# non-postgres user creates a publication
2225+
create_nonstream_publication = """
2226+
CREATE PUBLICATION mypublication FOR TABLE test_table;
2227+
"""
2228+
self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user")
22272229

2228-
# remove the streams section from the manifest
2229-
patch_streaming_config_removal = {
2230-
"spec": {
2231-
"streams": []
2230+
# remove the streams section from the manifest
2231+
patch_streaming_config_removal = {
2232+
"spec": {
2233+
"streams": []
2234+
}
22322235
}
2233-
}
2234-
k8s.api.custom_objects_api.patch_namespaced_custom_object(
2235-
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal)
2236-
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
2236+
k8s.api.custom_objects_api.patch_namespaced_custom_object(
2237+
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal)
2238+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
22372239

2238-
# check if publication, slot, and fes resource are removed
2239-
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
2240+
# check if publication, slot, and fes resource are removed
2241+
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
22402242
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0,
22412243
'Could not delete Fabric Event Stream resource', 10, 5)
2242-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0,
2243-
"Publication is not deleted", 10, 5)
2244-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0,
2245-
"Replication slot is not deleted", 10, 5)
2246-
2247-
# check the manual_slot and mypublication should not get deleted
2248-
get_manual_slot_query = """
2249-
SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot';
2250-
"""
2251-
get_nonstream_publication_query = """
2252-
SELECT * FROM pg_publication WHERE pubname = 'mypublication';
2253-
"""
2254-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1,
2255-
"Slot defined in patroni config is deleted", 10, 5)
2256-
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1,
2257-
"Publication defined not in stream section is deleted", 10, 5)
2244+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0,
2245+
"Publication is not deleted", 10, 5)
2246+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0,
2247+
"Replication slot is not deleted", 10, 5)
2248+
2249+
# check the manual_slot and mypublication should not get deleted
2250+
get_manual_slot_query = """
2251+
SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot';
2252+
"""
2253+
get_nonstream_publication_query = """
2254+
SELECT * FROM pg_publication WHERE pubname = 'mypublication';
2255+
"""
2256+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1,
2257+
"Slot defined in patroni config is deleted", 10, 5)
2258+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1,
2259+
"Publication defined not in stream section is deleted", 10, 5)
2260+
2261+
except timeout_decorator.TimeoutError:
2262+
print('Operator log: {}'.format(k8s.get_operator_log()))
2263+
raise
22582264

22592265
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
22602266
def test_taint_based_eviction(self):

pkg/cluster/streams.go

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -433,34 +433,55 @@ func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.
433433
}
434434

435435
func (c *Cluster) syncStream(appId string) error {
436+
var (
437+
streams *zalandov1.FabricEventStreamList
438+
err error
439+
)
440+
c.setProcessName("syncing stream with applicationId %s", appId)
441+
c.logger.Debugf("syncing stream with applicationId %s", appId)
442+
443+
listOptions := metav1.ListOptions{LabelSelector: c.labelsSet(true).String()}
444+
streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions)
445+
if err != nil {
446+
return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err)
447+
}
448+
436449
streamExists := false
437-
// update stream when it exists and EventStreams array differs
438-
for _, stream := range c.Streams {
439-
if appId == stream.Spec.ApplicationId {
440-
streamExists = true
441-
desiredStreams := c.generateFabricEventStream(appId)
442-
if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
443-
c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId)
444-
stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences
445-
c.setProcessName("updating event streams with applicationId %s", appId)
446-
stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), stream, metav1.UpdateOptions{})
447-
if err != nil {
448-
return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err)
449-
}
450-
c.Streams[appId] = stream
451-
}
452-
if match, reason := c.compareStreams(stream, desiredStreams); !match {
453-
c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason)
454-
desiredStreams.ObjectMeta = stream.ObjectMeta
455-
updatedStream, err := c.updateStreams(desiredStreams)
456-
if err != nil {
457-
return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err)
458-
}
459-
c.Streams[appId] = updatedStream
460-
c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId)
450+
for _, stream := range streams.Items {
451+
if stream.Spec.ApplicationId != appId {
452+
continue
453+
}
454+
if streamExists {
455+
c.logger.Warningf("more than one event stream with applicationId %s found, delete it", appId)
456+
if err = c.KubeClient.FabricEventStreams(stream.ObjectMeta.Namespace).Delete(context.TODO(), stream.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
457+
c.logger.Errorf("could not delete event stream %q with applicationId %s: %v", stream.ObjectMeta.Name, appId, err)
458+
} else {
459+
c.logger.Infof("redundant event stream %q with applicationId %s has been successfully deleted", stream.ObjectMeta.Name, appId)
461460
}
462461
continue
463462
}
463+
streamExists = true
464+
desiredStreams := c.generateFabricEventStream(appId)
465+
if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
466+
c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId)
467+
stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences
468+
c.setProcessName("updating event streams with applicationId %s", appId)
469+
stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{})
470+
if err != nil {
471+
return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err)
472+
}
473+
c.Streams[appId] = stream
474+
}
475+
if match, reason := c.compareStreams(&stream, desiredStreams); !match {
476+
c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason)
477+
desiredStreams.ObjectMeta = stream.ObjectMeta
478+
updatedStream, err := c.updateStreams(desiredStreams)
479+
if err != nil {
480+
return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err)
481+
}
482+
c.Streams[appId] = updatedStream
483+
c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId)
484+
}
464485
}
465486

466487
if !streamExists {

0 commit comments

Comments
 (0)