Skip to content

Commit 6c415a8

Browse files
ianmugejpechane
authored andcommitted
debezium/dbz#1581 Add configurable stream name for NATS JetStream sink
Signed-off-by: Ian Muge <[email protected]>
1 parent 3ff22b3 commit 6c415a8

2 files changed

Lines changed: 6 additions & 1 deletion

File tree

debezium-server-nats-jetstream/src/main/java/io/debezium/server/nats/jetstream/NatsJetStreamChangeConsumer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class NatsJetStreamChangeConsumer extends BaseChangeConsumer
6969

7070
private static final String PROP_PREFIX = "debezium.sink.nats-jetstream.";
7171
private static final String PROP_URL = PROP_PREFIX + "url";
72+
private static final String PROP_STREAM_NAME = PROP_PREFIX + "stream-name";
7273
private static final String PROP_CREATE_STREAM = PROP_PREFIX + "create-stream";
7374
private static final String PROP_SUBJECTS = PROP_PREFIX + "subjects";
7475
private static final String PROP_STORAGE = PROP_PREFIX + "storage";
@@ -93,6 +94,9 @@ public class NatsJetStreamChangeConsumer extends BaseChangeConsumer
9394
private JetStream js;
9495
private RetryExecutor retryExecutor;
9596

97+
@ConfigProperty(name = PROP_STREAM_NAME, defaultValue = "DebeziumStream")
98+
String streamName;
99+
96100
@ConfigProperty(name = PROP_CREATE_STREAM, defaultValue = "false")
97101
boolean createStream;
98102

@@ -179,7 +183,7 @@ else if (tlsKeyStore.isPresent() && tlsKeyStorePassword.isPresent() && tlsPasswo
179183
StorageType storageType = storage.equals("file") ? StorageType.File : StorageType.Memory;
180184

181185
StreamConfiguration streamConfig = StreamConfiguration.builder()
182-
.name("DebeziumStream")
186+
.name(streamName)
183187
.description("The debezium stream, contains messages which are coming from debezium")
184188
.subjects(subjects.split(","))
185189
.storageType(storageType)

debezium-server-nats-jetstream/src/test/java/io/debezium/server/nats/jetstream/NatsJetStreamTestConfigSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public NatsJetStreamTestConfigSource() {
1919
natsJetStreamTest.put("debezium.sink.type", "nats-jetstream");
2020
natsJetStreamTest.put("debezium.sink.nats-jetstream.url",
2121
NatsJetStreamTestResourceLifecycleManager.getNatsContainerUrl());
22+
natsJetStreamTest.put("debezium.sink.nats-jetstream.stream-name", "DebeziumStream");
2223
natsJetStreamTest.put("debezium.sink.nats-jetstream.create-stream", "true");
2324
natsJetStreamTest.put("debezium.sink.nats-jetstream.subjects", "testc.inventory.customers");
2425
natsJetStreamTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");

0 commit comments

Comments
 (0)