|
1 | | -package com.eventara.analytics.kafka; |
| 1 | +package com.eventara.analytics.config; |
2 | 2 | import org.apache.kafka.common.serialization.Serdes; |
| 3 | +import org.apache.kafka.streams.StreamsBuilder; |
3 | 4 | import org.apache.kafka.streams.StreamsConfig; |
| 5 | +import org.apache.kafka.streams.kstream.KStream; |
4 | 6 | import org.springframework.beans.factory.annotation.Value; |
5 | 7 | import org.springframework.context.annotation.Bean; |
6 | 8 | import org.springframework.context.annotation.Configuration; |
7 | 9 | import org.springframework.kafka.annotation.EnableKafkaStreams; |
8 | 10 | import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; |
9 | 11 | import org.springframework.kafka.config.KafkaStreamsConfiguration; |
| 12 | + |
10 | 13 | import java.util.HashMap; |
11 | 14 | import java.util.Map; |
12 | 15 |
|
13 | 16 | @Configuration |
14 | | -@EnableKafkaStreams |
| 17 | +//@EnableKafkaStreams |
15 | 18 | public class KafkaStreamsConfig { |
16 | 19 |
|
17 | 20 | @Value("${spring.kafka.bootstrap-servers}") |
18 | 21 | private String bootstrapServers; |
19 | 22 |
|
20 | 23 | @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) |
21 | | - public KafkaStreamsConfiguration kStreamsConfig(){ |
| 24 | + public KafkaStreamsConfiguration kStreamsConfig() { |
22 | 25 | Map<String, Object> props = new HashMap<>(); |
23 | | - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "eventara-streams"); |
| 26 | + |
| 27 | + // Basic configuration |
| 28 | + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "eventara-analytics"); |
24 | 29 | props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
| 30 | + |
| 31 | + // Serialization |
25 | 32 | props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
26 | 33 | props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
27 | 34 |
|
| 35 | + // Performance tuning |
| 36 | + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); |
| 37 | + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); |
| 38 | + |
28 | 39 | return new KafkaStreamsConfiguration(props); |
29 | 40 | } |
| 41 | + |
| 42 | + |
| 43 | + |
| 44 | +// @Bean |
| 45 | +// public KStream<String, String> analyticsStream(StreamsBuilder builder) { |
| 46 | +// KStream<String, String> stream = builder.stream("eventara.events.raw"); |
| 47 | +// stream.peek((key, value) -> System.out.println("Received raw event: " + value)); |
| 48 | +//// stream.peek((key, value) -> System.out.println("Received: " + value)) |
| 49 | +//// .to("analytics-output"); |
| 50 | +// |
| 51 | +// return stream; |
| 52 | +// } |
30 | 53 | } |
0 commit comments