diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 4d16cd539..98f7c1195 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -72,6 +72,7 @@ CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG, CONFIG_SOURCE_STATIC_BROKER_CONFIG, CONFIG_SOURCE_DEFAULT_CONFIG, + CONFIG_SOURCE_GROUP_CONFIG, RESOURCE_UNKNOWN, RESOURCE_ANY, RESOURCE_TOPIC, diff --git a/src/confluent_kafka/admin/_config.py b/src/confluent_kafka/admin/_config.py index 2c96bfa98..5f4f4680e 100644 --- a/src/confluent_kafka/admin/_config.py +++ b/src/confluent_kafka/admin/_config.py @@ -56,6 +56,7 @@ class ConfigSource(Enum): DYNAMIC_DEFAULT_BROKER_CONFIG = _cimpl.CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG #: Dynamic Default Broker STATIC_BROKER_CONFIG = _cimpl.CONFIG_SOURCE_STATIC_BROKER_CONFIG #: Static Broker DEFAULT_CONFIG = _cimpl.CONFIG_SOURCE_DEFAULT_CONFIG #: Default + GROUP_CONFIG = _cimpl.CONFIG_SOURCE_GROUP_CONFIG #: Group class ConfigEntry(object): diff --git a/src/confluent_kafka/src/AdminTypes.c b/src/confluent_kafka/src/AdminTypes.c index 5deeded2e..61adeb279 100644 --- a/src/confluent_kafka/src/AdminTypes.c +++ b/src/confluent_kafka/src/AdminTypes.c @@ -514,6 +514,8 @@ static void AdminTypes_AddObjectsConfigSource (PyObject *m) { RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG); PyModule_AddIntConstant(m, "CONFIG_SOURCE_DEFAULT_CONFIG", RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG); + PyModule_AddIntConstant(m, "CONFIG_SOURCE_GROUP_CONFIG", + RD_KAFKA_CONFIG_SOURCE_GROUP_CONFIG); } diff --git a/tests/integration/admin/test_incremental_alter_configs.py b/tests/integration/admin/test_incremental_alter_configs.py index 24428e6f1..43e43ebee 100644 --- a/tests/integration/admin/test_incremental_alter_configs.py +++ b/tests/integration/admin/test_incremental_alter_configs.py @@ -19,6 +19,8 @@ ConfigEntry, ResourceType, \ AlterConfigOpType +from tests.common import TestUtils + def assert_expected_config_entries(fs, num_fs, expected): """ @@ -147,3 +149,36 @@ def test_incremental_alter_configs(kafka_cluster): # Assert expected config entries. assert_expected_config_entries(fs, 1, expected) + + # TODO: enable this test for the classic run too, when + # Confluent Platform test cluster is upgraded to 8.0.0 + if TestUtils.use_group_protocol_consumer(): + group_id = "test-group" + + res_group = ConfigResource( + ResourceType.GROUP, + group_id, + incremental_configs=[ + ConfigEntry("consumer.session.timeout.ms", "50000", + incremental_operation=AlterConfigOpType.SET) + ] + ) + + expected[res_group] = ['consumer.session.timeout.ms="50000"'] + + # + # Incrementally alter some configuration values + # + fs = admin_client.incremental_alter_configs([res_group]) + + assert_operation_succeeded(fs, 1) + + time.sleep(1) + + # + # Get current group config + # + fs = admin_client.describe_configs([res_group]) + + # Assert expected config entries. + assert_expected_config_entries(fs, 1, expected)