@@ -267,8 +267,9 @@ mod tests {
267
267
builder:: meta:: ObjectMetaBuilder ,
268
268
commons:: {
269
269
authentication:: {
270
- tls:: AuthenticationProvider , AuthenticationClass , AuthenticationClassProvider ,
271
- AuthenticationClassSpec ,
270
+ kerberos,
271
+ tls:: { self } ,
272
+ AuthenticationClass , AuthenticationClassProvider , AuthenticationClassSpec ,
272
273
} ,
273
274
networking:: DomainName ,
274
275
} ,
@@ -307,7 +308,7 @@ mod tests {
307
308
ResolvedAuthenticationClasses :: new ( vec ! [ AuthenticationClass {
308
309
metadata: ObjectMetaBuilder :: new( ) . name( "auth-class" ) . build( ) ,
309
310
spec: AuthenticationClassSpec {
310
- provider: AuthenticationClassProvider :: Tls ( AuthenticationProvider {
311
+ provider: AuthenticationClassProvider :: Tls ( tls :: AuthenticationProvider {
311
312
client_cert_secret_class: Some ( "client-auth-secret-class" . to_string( ) ) ,
312
313
} ) ,
313
314
} ,
@@ -456,4 +457,97 @@ mod tests {
456
457
)
457
458
) ;
458
459
}
460
+
461
+ #[ test]
462
+ fn test_get_kafka_kerberos_listeners_config ( ) {
463
+ let object_name = "simple-kafka-broker-default" ;
464
+ let cluster_info = default_cluster_info ( ) ;
465
+
466
+ let kafka_cluster = r#"
467
+ apiVersion: kafka.stackable.tech/v1alpha1
468
+ kind: KafkaCluster
469
+ metadata:
470
+ name: simple-kafka
471
+ namespace: default
472
+ spec:
473
+ image:
474
+ productVersion: 3.7.1
475
+ clusterConfig:
476
+ authentication:
477
+ - authenticationClass: kafka-kerberos
478
+ tls:
479
+ serverSecretClass: tls
480
+ zookeeperConfigMapName: xyz
481
+ "# ;
482
+ let kafka: KafkaCluster = serde_yaml:: from_str ( kafka_cluster) . expect ( "illegal test input" ) ;
483
+ let kafka_security = KafkaTlsSecurity :: new (
484
+ ResolvedAuthenticationClasses :: new ( vec ! [ AuthenticationClass {
485
+ metadata: ObjectMetaBuilder :: new( ) . name( "auth-class" ) . build( ) ,
486
+ spec: AuthenticationClassSpec {
487
+ provider: AuthenticationClassProvider :: Kerberos (
488
+ kerberos:: AuthenticationProvider {
489
+ kerberos_secret_class: "kerberos-secret-class" . to_string( ) ,
490
+ } ,
491
+ ) ,
492
+ } ,
493
+ } ] ) ,
494
+ "tls" . to_string ( ) ,
495
+ Some ( "tls" . to_string ( ) ) ,
496
+ ) ;
497
+
498
+ let config =
499
+ get_kafka_listener_config ( & kafka, & kafka_security, object_name, & cluster_info) . unwrap ( ) ;
500
+
501
+ assert_eq ! (
502
+ config. listeners( ) ,
503
+ format!(
504
+ "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}" ,
505
+ name = KafkaListenerName :: Client ,
506
+ host = LISTENER_LOCAL_ADDRESS ,
507
+ port = kafka_security. client_port( ) ,
508
+ internal_name = KafkaListenerName :: Internal ,
509
+ internal_host = LISTENER_LOCAL_ADDRESS ,
510
+ internal_port = kafka_security. internal_port( ) ,
511
+ bootstrap_name = KafkaListenerName :: Bootstrap ,
512
+ bootstrap_host = LISTENER_LOCAL_ADDRESS ,
513
+ bootstrap_port = kafka_security. bootstrap_port( ) ,
514
+ )
515
+ ) ;
516
+
517
+ assert_eq ! (
518
+ config. advertised_listeners( ) ,
519
+ format!(
520
+ "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}" ,
521
+ name = KafkaListenerName :: Client ,
522
+ host = node_address_cmd( STACKABLE_LISTENER_BROKER_DIR ) ,
523
+ port = node_port_cmd(
524
+ STACKABLE_LISTENER_BROKER_DIR ,
525
+ kafka_security. client_port_name( )
526
+ ) ,
527
+ internal_name = KafkaListenerName :: Internal ,
528
+ internal_host = pod_fqdn( & kafka, object_name, & cluster_info) . unwrap( ) ,
529
+ internal_port = kafka_security. internal_port( ) ,
530
+ bootstrap_name = KafkaListenerName :: Bootstrap ,
531
+ bootstrap_host = node_address_cmd( STACKABLE_LISTENER_BROKER_DIR ) ,
532
+ bootstrap_port = node_port_cmd(
533
+ STACKABLE_LISTENER_BROKER_DIR ,
534
+ kafka_security. client_port_name( )
535
+ ) ,
536
+ )
537
+ ) ;
538
+
539
+ assert_eq ! (
540
+ config. listener_security_protocol_map( ) ,
541
+ format!(
542
+ "{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol}" ,
543
+ name = KafkaListenerName :: Client ,
544
+ protocol = KafkaListenerProtocol :: SaslSsl ,
545
+ internal_name = KafkaListenerName :: Internal ,
546
+ internal_protocol = KafkaListenerProtocol :: Ssl ,
547
+ bootstrap_name = KafkaListenerName :: Bootstrap ,
548
+ bootstrap_protocol = KafkaListenerProtocol :: SaslSsl ,
549
+
550
+ )
551
+ ) ;
552
+ }
459
553
}
0 commit comments