1+ package com .playtika .testcontainer .nativekafka ;
2+
3+ import com .playtika .testcontainer .nativekafka .properties .NativeKafkaConfigurationProperties ;
4+ import com .playtika .testcontainer .nativekafka .properties .NativeKafkaConfigurationProperties .TopicConfiguration ;
5+ import lombok .Getter ;
6+ import lombok .RequiredArgsConstructor ;
7+ import lombok .extern .slf4j .Slf4j ;
8+ import org .apache .kafka .clients .admin .AdminClient ;
9+ import org .apache .kafka .clients .admin .CreateTopicsResult ;
10+ import org .apache .kafka .clients .admin .NewTopic ;
11+ import org .apache .kafka .clients .producer .ProducerConfig ;
12+ import org .springframework .beans .factory .InitializingBean ;
13+ import org .testcontainers .containers .GenericContainer ;
14+ import org .testcontainers .kafka .KafkaContainer ;
15+
16+ import java .util .Collection ;
17+ import java .util .List ;
18+ import java .util .Map ;
19+ import java .util .Properties ;
20+ import java .util .concurrent .ExecutionException ;
21+ import java .util .stream .Collectors ;
22+
23+ import static java .util .stream .Collectors .toMap ;
24+
25+ @ Slf4j
26+ @ RequiredArgsConstructor
27+ @ Getter
28+ public class NativeKafkaTopicsConfigurer implements InitializingBean {
29+ private static final int DEFAULT_PARTITION_COUNT = 1 ;
30+
31+ private final GenericContainer <?> nativeKafka ;
32+ private final NativeKafkaConfigurationProperties nativeKafkaProperties ;
33+
34+ @ Override
35+ public void afterPropertiesSet () {
36+ createTopics (this .nativeKafkaProperties .getTopicsToCreate (), this .nativeKafkaProperties .getTopicsConfiguration ());
37+ }
38+
39+ public void createTopics (Collection <String > topics , Collection <TopicConfiguration > topicsConfiguration ) {
40+ Map <String , TopicConfiguration > defaultTopicToTopicConfigurationMap =
41+ topics .stream ()
42+ .collect (toMap (topic -> topic ,
43+ topic -> new TopicConfiguration (topic , DEFAULT_PARTITION_COUNT )));
44+
45+ Map <String , TopicConfiguration > topicToTopicConfigurationMap =
46+ topicsConfiguration .stream ()
47+ .collect (toMap (TopicConfiguration ::getTopic ,
48+ topicConfiguration -> topicConfiguration ));
49+
50+ defaultTopicToTopicConfigurationMap .putAll (topicToTopicConfigurationMap );
51+
52+ Collection <TopicConfiguration > topicsConfigurationToCreate = defaultTopicToTopicConfigurationMap .values ();
53+
54+ if (!topicsConfigurationToCreate .isEmpty ()) {
55+ log .info ("Creating Native Kafka topics for configuration: {}" , topicsConfigurationToCreate );
56+ createTopicsUsingAdminClient (topicsConfigurationToCreate );
57+ log .info ("Created Native Kafka topics for configuration: {}" , topicsConfigurationToCreate );
58+ }
59+ }
60+
61+ private void createTopicsUsingAdminClient (Collection <TopicConfiguration > topicsConfigurationToCreate ) {
62+ Properties adminProps = new Properties ();
63+ adminProps .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , ((KafkaContainer ) nativeKafka ).getBootstrapServers ());
64+
65+ try (AdminClient adminClient = AdminClient .create (adminProps )) {
66+ List <NewTopic > newTopics = topicsConfigurationToCreate .stream ()
67+ .map (config -> new NewTopic (config .getTopic (), config .getPartitions (), (short ) 1 ))
68+ .collect (Collectors .toList ());
69+
70+ CreateTopicsResult result = adminClient .createTopics (newTopics );
71+
72+ // Wait for all topics to be created
73+ result .all ().get ();
74+ log .debug ("Successfully created {} topics using Admin API" , newTopics .size ());
75+ } catch (InterruptedException | ExecutionException e ) {
76+ log .error ("Failed to create topics using Admin API" , e );
77+ throw new RuntimeException ("Failed to create topics" , e );
78+ }
79+ }
80+ }
0 commit comments