Skip to content

Commit 56fc90e

Browse files
committed
debezium/dbz#1668 Enable component descriptor generation for all sinks
Signed-off-by: Fiore Mario Vitale <mvitale@redhat.com>
1 parent 4353140 commit 56fc90e

49 files changed

Lines changed: 3145 additions & 682 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,16 @@
2626
import com.azure.messaging.eventhubs.EventHubProducerClient;
2727

2828
import io.debezium.DebeziumException;
29+
import io.debezium.Module;
30+
import io.debezium.config.Field;
2931
import io.debezium.engine.ChangeEvent;
3032
import io.debezium.engine.DebeziumEngine;
3133
import io.debezium.engine.DebeziumEngine.RecordCommitter;
34+
import io.debezium.metadata.ComponentMetadata;
35+
import io.debezium.metadata.ComponentMetadataFactory;
3236
import io.debezium.server.BaseChangeConsumer;
3337
import io.debezium.server.CustomConsumerBuilder;
38+
import io.debezium.server.DebeziumServerSink;
3439

3540
/**
3641
* This sink adapter delivers change event messages to Azure Event Hubs
@@ -40,28 +45,20 @@
4045
@Named("eventhubs")
4146
@Dependent
4247
public class EventHubsChangeConsumer extends BaseChangeConsumer
43-
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
48+
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>>, DebeziumServerSink {
4449

4550
private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsChangeConsumer.class);
4651

52+
private final ComponentMetadataFactory componentMetadataFactory = new ComponentMetadataFactory();
53+
4754
private static final String PROP_PREFIX = "debezium.sink.eventhubs.";
48-
private static final String PROP_CONNECTION_STRING_NAME = PROP_PREFIX + "connectionstring";
49-
private static final String PROP_EVENTHUB_NAME = PROP_PREFIX + "hubname";
50-
private static final String PROP_PARTITION_ID = PROP_PREFIX + "partitionid";
51-
private static final String PROP_PARTITION_KEY = PROP_PREFIX + "partitionkey";
52-
private static final String PROP_DYNAMIC_PARTITION_ROUTING_KEY = PROP_PREFIX + "dynamicpartitionrouting";
53-
// maximum size for the batch of events (bytes)
54-
private static final String PROP_MAX_BATCH_SIZE = PROP_PREFIX + "maxbatchsize";
55-
private static final String PROP_HASH_MESSAGE_KEY_FUNCTION = PROP_PREFIX + "hashmessagekeyfunction";
56-
57-
private String connectionString;
58-
private String eventHubName;
59-
private String configuredPartitionId;
60-
private String configuredPartitionKey;
55+
56+
private EventHubsChangeConsumerConfig config;
6157
private DynamicPartitionRoutingStrategy dynamicPartitionRoutingStrategy = DynamicPartitionRoutingStrategy.DEFAULT;
62-
private Integer maxBatchSize;
6358
private Integer partitionCount;
6459
private Optional<HashFunction> hashMessageFunction;
60+
private String configuredPartitionId;
61+
private String configuredPartitionKey;
6562

6663
// connection string format -
6764
// Endpoint=sb://<NAMESPACE>/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<ACCESS_KEY>;EntityPath=<HUB_NAME>
@@ -83,26 +80,25 @@ void connect() {
8380
return;
8481
}
8582

86-
final Config config = ConfigProvider.getConfig();
87-
connectionString = config.getValue(PROP_CONNECTION_STRING_NAME, String.class);
88-
eventHubName = config.getValue(PROP_EVENTHUB_NAME, String.class);
83+
final Config mpConfig = ConfigProvider.getConfig();
84+
85+
// Load configuration
86+
io.debezium.config.Configuration configuration = io.debezium.config.Configuration.from(getConfigSubset(mpConfig, PROP_PREFIX));
87+
this.config = new EventHubsChangeConsumerConfig(configuration);
8988

90-
// optional config
91-
maxBatchSize = config.getOptionalValue(PROP_MAX_BATCH_SIZE, Integer.class).orElse(0);
92-
configuredPartitionId = config.getOptionalValue(PROP_PARTITION_ID, String.class).orElse("");
93-
configuredPartitionKey = config.getOptionalValue(PROP_PARTITION_KEY, String.class).orElse("");
89+
configuredPartitionId = (config.getConfiguredPartitionId() != null) ? config.getConfiguredPartitionId() : "";
90+
configuredPartitionKey = (config.getConfiguredPartitionKey() != null) ? config.getConfiguredPartitionKey() : "";
9491
if (configuredPartitionId.isEmpty() && configuredPartitionKey.isEmpty()) {
95-
final var routingValue = config.getOptionalValue(PROP_DYNAMIC_PARTITION_ROUTING_KEY, String.class).orElse(DynamicPartitionRoutingStrategy.DEFAULT.name());
92+
final var routingValue = (config.getDynamicPartitionRouting() != null) ? config.getDynamicPartitionRouting() : DynamicPartitionRoutingStrategy.DEFAULT.name();
9693
dynamicPartitionRoutingStrategy = DynamicPartitionRoutingStrategy.fromString(routingValue);
9794
}
98-
hashMessageFunction = config.getOptionalValue(PROP_HASH_MESSAGE_KEY_FUNCTION, String.class)
99-
.map(HashFunction::fromString);
95+
hashMessageFunction = Optional.ofNullable(config.getHashMessageKeyFunction()).map(HashFunction::fromString);
10096

101-
String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, connectionString, eventHubName);
97+
String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, config.getConnectionString(), config.getEventHubName());
10298

10399
try {
104100
producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient();
105-
batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, maxBatchSize);
101+
batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, config.getMaxBatchSize());
106102
}
107103
catch (Exception e) {
108104
throw new DebeziumException(e);
@@ -116,12 +112,13 @@ void connect() {
116112

117113
if (!configuredPartitionId.isEmpty() && Integer.parseInt(configuredPartitionId) > partitionCount - 1) {
118114
throw new IndexOutOfBoundsException(
119-
String.format("Target partition id %s does not exist in target EventHub %s", configuredPartitionId, eventHubName));
115+
String.format("Target partition id %s does not exist in target EventHub %s", configuredPartitionId, config.getEventHubName()));
120116
}
121117
}
122118

123119
@PreDestroy
124-
void close() {
120+
@Override
121+
public void close() {
125122
try {
126123
producer.close();
127124
LOGGER.info("Closed Event Hubs producer client");
@@ -222,7 +219,7 @@ else if (!configuredPartitionKey.isEmpty()) {
222219
// Check that the target partition exists.
223220
if (targetPartitionId < BatchManager.BATCH_INDEX_FOR_NO_PARTITION_ID || targetPartitionId > partitionCount - 1) {
224221
throw new IndexOutOfBoundsException(
225-
String.format("Target partition id %d does not exist in target EventHub %s", targetPartitionId, eventHubName));
222+
String.format("Target partition id %d does not exist in target EventHub %s", targetPartitionId, config.getEventHubName()));
226223
}
227224

228225
batchManager.sendEventToPartitionId(eventData, recordIndex, targetPartitionId);
@@ -252,4 +249,21 @@ else if (!configuredPartitionKey.isEmpty()) {
252249
committer.markBatchFinished();
253250
LOGGER.trace("Batch marked finished");
254251
}
252+
253+
@Override
254+
public Field.Set getConfigFields() {
255+
return Field.setOf(
256+
EventHubsChangeConsumerConfig.CONNECTION_STRING,
257+
EventHubsChangeConsumerConfig.HUB_NAME,
258+
EventHubsChangeConsumerConfig.PARTITION_ID,
259+
EventHubsChangeConsumerConfig.PARTITION_KEY,
260+
EventHubsChangeConsumerConfig.DYNAMIC_PARTITION_ROUTING,
261+
EventHubsChangeConsumerConfig.MAX_BATCH_SIZE,
262+
EventHubsChangeConsumerConfig.HASH_MESSAGE_KEY_FUNCTION);
263+
}
264+
265+
@Override
266+
public List<ComponentMetadata> getConnectorMetadata() {
267+
return List.of(componentMetadataFactory.createComponentMetadata(this, Module.version()));
268+
}
255269
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright Debezium Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
package io.debezium.server.eventhubs;
7+
8+
import org.apache.kafka.common.config.ConfigDef;
9+
10+
import io.debezium.config.Configuration;
11+
import io.debezium.config.Field;
12+
13+
/**
14+
* Configuration fields for {@link EventHubsChangeConsumer}.
15+
*/
16+
public class EventHubsChangeConsumerConfig {
17+
18+
public static final Field CONNECTION_STRING = Field.create("connectionstring")
19+
.withDisplayName("Event Hubs Connection String")
20+
.withType(ConfigDef.Type.PASSWORD)
21+
.withWidth(ConfigDef.Width.LONG)
22+
.withImportance(ConfigDef.Importance.HIGH)
23+
.withDescription("Azure Event Hubs connection string (without EntityPath).");
24+
25+
public static final Field HUB_NAME = Field.create("hubname")
26+
.withDisplayName("Event Hub Name")
27+
.withType(ConfigDef.Type.STRING)
28+
.withWidth(ConfigDef.Width.MEDIUM)
29+
.withImportance(ConfigDef.Importance.HIGH)
30+
.withDescription("Name of the Event Hub.");
31+
32+
public static final Field PARTITION_ID = Field.create("partitionid")
33+
.withDisplayName("Partition ID")
34+
.withType(ConfigDef.Type.STRING)
35+
.withWidth(ConfigDef.Width.SHORT)
36+
.withImportance(ConfigDef.Importance.MEDIUM)
37+
.withDescription("Specific partition ID to send all events to. Leave empty for dynamic routing.");
38+
39+
public static final Field PARTITION_KEY = Field.create("partitionkey")
40+
.withDisplayName("Partition Key")
41+
.withType(ConfigDef.Type.STRING)
42+
.withWidth(ConfigDef.Width.MEDIUM)
43+
.withImportance(ConfigDef.Importance.MEDIUM)
44+
.withDescription("Partition key to use for all events. Leave empty for dynamic routing.");
45+
46+
public static final Field DYNAMIC_PARTITION_ROUTING = Field.create("dynamicpartitionrouting")
47+
.withDisplayName("Dynamic Partition Routing Strategy")
48+
.withType(ConfigDef.Type.STRING)
49+
.withDefault("DEFAULT")
50+
.withWidth(ConfigDef.Width.MEDIUM)
51+
.withImportance(ConfigDef.Importance.MEDIUM)
52+
.withDescription("Dynamic partition routing strategy when no partition ID or key is configured.");
53+
54+
public static final Field MAX_BATCH_SIZE = Field.create("maxbatchsize")
55+
.withDisplayName("Max Batch Size (bytes)")
56+
.withType(ConfigDef.Type.INT)
57+
.withDefault(0)
58+
.withWidth(ConfigDef.Width.SHORT)
59+
.withImportance(ConfigDef.Importance.MEDIUM)
60+
.withDescription("Maximum size in bytes for the batch of events. 0 means use default.");
61+
62+
public static final Field HASH_MESSAGE_KEY_FUNCTION = Field.create("hashmessagekeyfunction")
63+
.withDisplayName("Hash Message Key Function")
64+
.withType(ConfigDef.Type.STRING)
65+
.withWidth(ConfigDef.Width.MEDIUM)
66+
.withImportance(ConfigDef.Importance.LOW)
67+
.withDescription("Hash function to use for message key-based routing.");
68+
69+
// Instance fields
70+
private String connectionString;
71+
private String eventHubName;
72+
private String configuredPartitionId;
73+
private String configuredPartitionKey;
74+
private String dynamicPartitionRouting;
75+
private int maxBatchSize;
76+
private String hashMessageKeyFunction;
77+
78+
public EventHubsChangeConsumerConfig(Configuration config) {
79+
init(config);
80+
}
81+
82+
protected void init(Configuration config) {
83+
connectionString = config.getString(CONNECTION_STRING);
84+
eventHubName = config.getString(HUB_NAME);
85+
configuredPartitionId = config.getString(PARTITION_ID);
86+
configuredPartitionKey = config.getString(PARTITION_KEY);
87+
dynamicPartitionRouting = config.getString(DYNAMIC_PARTITION_ROUTING);
88+
maxBatchSize = config.getInteger(MAX_BATCH_SIZE);
89+
hashMessageKeyFunction = config.getString(HASH_MESSAGE_KEY_FUNCTION);
90+
}
91+
92+
public String getConnectionString() {
93+
return connectionString;
94+
}
95+
96+
public String getEventHubName() {
97+
return eventHubName;
98+
}
99+
100+
public String getConfiguredPartitionId() {
101+
return configuredPartitionId;
102+
}
103+
104+
public String getConfiguredPartitionKey() {
105+
return configuredPartitionKey;
106+
}
107+
108+
public String getDynamicPartitionRouting() {
109+
return dynamicPartitionRouting;
110+
}
111+
112+
public int getMaxBatchSize() {
113+
return maxBatchSize;
114+
}
115+
116+
public String getHashMessageKeyFunction() {
117+
return hashMessageKeyFunction;
118+
}
119+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
io.debezium.server.eventhubs.EventHubsChangeConsumer

debezium-server-infinispan/src/main/java/io/debezium/server/infinispan/InfinispanSinkConsumer.java

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import java.util.HashMap;
99
import java.util.List;
1010
import java.util.Map;
11-
import java.util.Optional;
1211

1312
import jakarta.annotation.PostConstruct;
1413
import jakarta.annotation.PreDestroy;
@@ -27,10 +26,15 @@
2726
import org.slf4j.LoggerFactory;
2827

2928
import io.debezium.DebeziumException;
29+
import io.debezium.Module;
30+
import io.debezium.config.Field;
3031
import io.debezium.engine.ChangeEvent;
3132
import io.debezium.engine.DebeziumEngine;
33+
import io.debezium.metadata.ComponentMetadata;
34+
import io.debezium.metadata.ComponentMetadataFactory;
3235
import io.debezium.server.BaseChangeConsumer;
3336
import io.debezium.server.CustomConsumerBuilder;
37+
import io.debezium.server.DebeziumServerSink;
3438

3539
/**
3640
* An implementation of the {@link DebeziumEngine.ChangeConsumer} interface that publishes change event messages to predefined Infinispan cache.
@@ -39,17 +43,15 @@
3943
*/
4044
@Named("infinispan")
4145
@Dependent
42-
public class InfinispanSinkConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
46+
public class InfinispanSinkConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>>, DebeziumServerSink {
4347

4448
private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanSinkConsumer.class);
4549

50+
private final ComponentMetadataFactory componentMetadataFactory = new ComponentMetadataFactory();
51+
4652
private static final String CONF_PREFIX = "debezium.sink.infinispan.";
47-
private static final String SERVER_HOST = CONF_PREFIX + "server.host";
48-
private static final String SERVER_PORT = CONF_PREFIX + "server.port";
49-
private static final String CACHE_NAME = CONF_PREFIX + "cache";
50-
private static final String USER_NAME = CONF_PREFIX + "user";
51-
private static final String PASSWORD = CONF_PREFIX + "password";
5253

54+
private InfinispanSinkConsumerConfig config;
5355
private RemoteCacheManager remoteCacheManager;
5456
private RemoteCache cache;
5557

@@ -65,17 +67,20 @@ void connect() {
6567
return;
6668
}
6769

68-
final Config config = ConfigProvider.getConfig();
69-
final String serverHost = config.getValue(SERVER_HOST, String.class);
70-
final String cacheName = config.getValue(CACHE_NAME, String.class);
71-
final Integer serverPort = config.getOptionalValue(SERVER_PORT, Integer.class).orElse(ConfigurationProperties.DEFAULT_HOTROD_PORT);
72-
final Optional<String> user = config.getOptionalValue(USER_NAME, String.class);
73-
final Optional<String> password = config.getOptionalValue(PASSWORD, String.class);
70+
final Config mpConfig = ConfigProvider.getConfig();
71+
72+
// Load configuration
73+
io.debezium.config.Configuration configuration = io.debezium.config.Configuration.from(getConfigSubset(mpConfig, CONF_PREFIX));
74+
this.config = new InfinispanSinkConsumerConfig(configuration);
75+
76+
final String serverHost = config.getServerHost();
77+
final String cacheName = config.getCacheName();
78+
final Integer serverPort = config.getServerPort() != null ? config.getServerPort() : ConfigurationProperties.DEFAULT_HOTROD_PORT;
7479

7580
ConfigurationBuilder builder = new ConfigurationBuilder();
7681
String uri;
77-
if (user.isPresent() && password.isPresent()) {
78-
uri = String.format("hotrod://%s:%s@%s:%d", user.get(), password.get(), serverHost, serverPort);
82+
if (config.getUser() != null && config.getPassword() != null) {
83+
uri = String.format("hotrod://%s:%s@%s:%d", config.getUser(), config.getPassword(), serverHost, serverPort);
7984
}
8085
else {
8186
uri = String.format("hotrod://%s:%d", serverHost, serverPort);
@@ -89,7 +94,8 @@ void connect() {
8994
}
9095

9196
@PreDestroy
92-
void close() {
97+
@Override
98+
public void close() {
9399
try {
94100
if (remoteCacheManager != null) {
95101
remoteCacheManager.close();
@@ -125,4 +131,19 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
125131

126132
committer.markBatchFinished();
127133
}
134+
135+
@Override
136+
public Field.Set getConfigFields() {
137+
return Field.setOf(
138+
InfinispanSinkConsumerConfig.SERVER_HOST,
139+
InfinispanSinkConsumerConfig.SERVER_PORT,
140+
InfinispanSinkConsumerConfig.CACHE,
141+
InfinispanSinkConsumerConfig.USER,
142+
InfinispanSinkConsumerConfig.PASSWORD);
143+
}
144+
145+
@Override
146+
public List<ComponentMetadata> getConnectorMetadata() {
147+
return List.of(componentMetadataFactory.createComponentMetadata(this, Module.version()));
148+
}
128149
}

0 commit comments

Comments
 (0)