Skip to content

Commit f5d3190

Browse files
committed
DBZ-8911 Add configuration to skip heartbeat messages in Redis Stream consumer
1 parent f9a2fd4 commit f5d3190

12 files changed

Lines changed: 330 additions & 7 deletions

File tree

debezium-server-milvus/src/main/java/io/debezium/server/milvus/MilvusChangeConsumer.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import jakarta.inject.Inject;
1515
import jakarta.inject.Named;
1616

17+
import org.apache.kafka.connect.data.Field;
1718
import org.apache.kafka.connect.data.Struct;
1819
import org.apache.kafka.connect.source.SourceRecord;
1920
import org.eclipse.microprofile.config.inject.ConfigProperty;
@@ -26,6 +27,7 @@
2627
import io.debezium.DebeziumException;
2728
import io.debezium.data.Envelope;
2829
import io.debezium.data.Envelope.Operation;
30+
import io.debezium.data.Json;
2931
import io.debezium.embedded.EmbeddedEngineChangeEvent;
3032
import io.debezium.engine.ChangeEvent;
3133
import io.debezium.engine.DebeziumEngine;
@@ -62,6 +64,9 @@ public class MilvusChangeConsumer extends BaseChangeConsumer implements Debezium
6264
@ConfigProperty(name = PROP_PREFIX + "database", defaultValue = "default")
6365
String databaseName;
6466

67+
@ConfigProperty(name = PROP_PREFIX + "unwind.json", defaultValue = "false")
68+
boolean unwindJson;
69+
6570
@Inject
6671
@CustomConsumerBuilder
6772
Instance<MilvusClientV2> customClient;
@@ -185,6 +190,8 @@ private void deleteRecord(String collectionName, ChangeEvent<Object, Object> rec
185190

186191
private JsonObject getValue(ChangeEvent<Object, Object> record, SourceRecord sourceRecord) {
187192
final var value = getString(record.value());
193+
var valueSchema = sourceRecord.valueSchema();
194+
188195
var json = gson.fromJson(value, JsonObject.class);
189196

190197
if ((json.has("schema") || json.has("schemaId")) && json.has("payload")) {
@@ -195,8 +202,18 @@ private JsonObject getValue(ChangeEvent<Object, Object> record, SourceRecord sou
195202
if (Envelope.isEnvelopeSchema(sourceRecord.valueSchema())) {
196203
// Message is envelope, so only after part is used
197204
json = json.getAsJsonObject(Envelope.FieldName.AFTER);
205+
valueSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
198206
}
199207

208+
if (unwindJson) {
209+
for (Field field : valueSchema.fields()) {
210+
if (Json.LOGICAL_NAME.equals(field.schema().name()) && json.has(field.name())) {
211+
final var stringValue = json.get(field.name()).getAsString();
212+
final var jsonValue = gson.fromJson(stringValue, JsonObject.class);
213+
json.add(field.name(), jsonValue);
214+
}
215+
}
216+
}
200217
return json;
201218
}
202219

debezium-server-milvus/src/test/java/io/debezium/server/milvus/MilvusIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import org.junit.jupiter.api.BeforeEach;
1919
import org.junit.jupiter.api.Test;
2020

21+
import com.google.gson.JsonObject;
22+
2123
import io.debezium.connector.postgresql.connection.PostgresConnection;
2224
import io.debezium.jdbc.JdbcConfiguration;
2325
import io.debezium.server.TestConfigSource;
@@ -104,10 +106,12 @@ public void testMilvus() throws Exception {
104106
assertThat(dataRead1.get("pk")).isEqualTo(1l);
105107
assertThat(dataRead1.get("value")).isEqualTo("one");
106108
assertThat(dataRead1.get("f_vector")).isEqualTo(List.of(1.1f, 1.2f, 1.3f));
109+
assertThat(dataRead1.get("f_json")).isInstanceOf(JsonObject.class);
107110

108111
assertThat(dataRead2.get("pk")).isEqualTo(2l);
109112
assertThat(dataRead2.get("value")).isEqualTo("two");
110113
assertThat(dataRead2.get("f_vector")).isEqualTo(List.of(2.1f, 2.2f, 2.3f));
114+
assertThat(dataRead2.get("f_json")).isInstanceOf(JsonObject.class);
111115

112116
final JdbcConfiguration config = JdbcConfiguration.create()
113117
.with("hostname", dbHostname)

debezium-server-milvus/src/test/java/io/debezium/server/milvus/MilvusTestConfigSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public MilvusTestConfigSource() {
2525
milvusTest.put("debezium.source.topic.prefix", "testc");
2626
milvusTest.put("debezium.source.schema.include.list", "inventory");
2727
milvusTest.put("debezium.source.table.include.list", "inventory.t_vector");
28+
milvusTest.put("debezium.sink.milvus.unwind.json", "true");
2829

2930
config = milvusTest;
3031
}

debezium-server-milvus/src/test/java/io/debezium/server/milvus/MilvusTestResourceLifecycleManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,12 @@ private void createMilvusCollections() {
8484
.dataType(DataType.FloatVector)
8585
.dimension(3)
8686
.build();
87+
final var jsonField = CreateCollectionReq.FieldSchema.builder()
88+
.name("f_json")
89+
.dataType(DataType.JSON)
90+
.build();
8791
final var collectionSchema = CollectionSchema.builder()
88-
.fieldSchemaList(List.of(pkField, valueField, vectorField))
92+
.fieldSchemaList(List.of(pkField, valueField, vectorField, jsonField))
8993
.build();
9094
final var index = IndexParam.builder()
9195
.fieldName("f_vector")

debezium-server-milvus/src/test/java/io/debezium/server/milvus/VectorPostgresTestResourceLifecycleManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ public Map<String, String> start() {
3333
connection.execute(
3434
"CREATE SCHEMA IF NOT EXISTS pgvector",
3535
"CREATE EXTENSION IF NOT EXISTS vector SCHEMA pgvector",
36-
"CREATE TABLE inventory.t_vector (pk INT8 PRIMARY KEY, value VARCHAR(32), f_vector pgvector.vector(3));",
37-
"INSERT INTO inventory.t_vector VALUES (1, 'one', '[1.1, 1.2, 1.3]')",
38-
"INSERT INTO inventory.t_vector VALUES (2, 'two', '[2.1, 2.2, 2.3]')");
36+
"CREATE TABLE inventory.t_vector (pk INT8 PRIMARY KEY, value VARCHAR(32), f_vector pgvector.vector(3), f_json JSON);",
37+
"INSERT INTO inventory.t_vector VALUES (1, 'one', '[1.1, 1.2, 1.3]', '{}'::JSON)",
38+
"INSERT INTO inventory.t_vector VALUES (2, 'two', '[2.1, 2.2, 2.3]', '{}'::JSON)");
3939
}
4040
catch (Exception e) {
4141
throw new DebeziumException(e);

debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import jakarta.enterprise.context.Dependent;
2626
import jakarta.inject.Named;
2727

28+
import org.eclipse.microprofile.config.Config;
2829
import org.eclipse.microprofile.config.ConfigProvider;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
@@ -56,6 +57,9 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
5657

5758
private static final String DEBEZIUM_REDIS_SINK_CLIENT_NAME = "debezium:redis:sink";
5859

60+
private static final String HEARTBEAT_PREFIX_CONFIG = "topic.heartbeat.prefix";
61+
private static final String DEFAULT_HEARTBEAT_PREFIX = "__debezium-heartbeat";
62+
5963
private static final String EXTENDED_MESSAGE_KEY_KEY = "key";
6064
private static final String EXTENDED_MESSAGE_VALUE_KEY = "value";
6165
private RedisClient client;
@@ -66,11 +70,22 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
6670

6771
private RedisStreamChangeConsumerConfig config;
6872

73+
private String heartbeatPrefix;
74+
6975
@PostConstruct
7076
void connect() {
71-
Configuration configuration = Configuration.from(getConfigSubset(ConfigProvider.getConfig(), ""));
77+
// Get configuration from ConfigProvider
78+
Config mpConfig = ConfigProvider.getConfig();
79+
Map<String, Object> sourceConfig = getConfigSubset(mpConfig, "debezium.source.");
80+
81+
// Get Redis sink configuration
82+
Configuration configuration = Configuration.from(getConfigSubset(mpConfig, ""));
7283
config = new RedisStreamChangeConsumerConfig(configuration);
7384

85+
// Get the heartbeat prefix from the configuration
86+
heartbeatPrefix = (String) sourceConfig.getOrDefault(HEARTBEAT_PREFIX_CONFIG, DEFAULT_HEARTBEAT_PREFIX);
87+
LOGGER.info("Using heartbeat prefix: {}", heartbeatPrefix);
88+
7489
String messageFormat = config.getMessageFormat();
7590
if (MESSAGE_FORMAT_EXTENDED.equals(messageFormat)) {
7691
recordMapFunction = record -> {
@@ -173,6 +188,14 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
173188
List<SimpleEntry<String, Map<String, String>>> recordsMap = new ArrayList<>(clonedBatch.size());
174189
for (ChangeEvent<Object, Object> record : clonedBatch) {
175190
String destination = streamNameMapper.map(record.destination());
191+
192+
// Check if this is a heartbeat message that should be skipped
193+
if (config.isSkipHeartbeatMessages() && destination.startsWith(heartbeatPrefix)) {
194+
// Mark as processed but don't add to Redis
195+
committer.markProcessed(record);
196+
continue;
197+
}
198+
176199
Map<String, String> recordMap = recordMapFunction.apply(record);
177200
recordsMap.add(new SimpleEntry<>(destination, recordMap));
178201
}

debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumerConfig.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public class RedisStreamChangeConsumerConfig extends RedisCommonConfig {
4545
.withDefault(DEFAULT_BUFFER_FILL_RATE)
4646
.withValidation(RangeValidator.atLeast(0));
4747

48+
private static final boolean DEFAULT_SKIP_HEARTBEAT_MESSAGES = true;
49+
private static final Field PROP_SKIP_HEARTBEAT_MESSAGES = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "skip.heartbeat.messages")
50+
.withDefault(DEFAULT_SKIP_HEARTBEAT_MESSAGES);
51+
4852
private int batchSize;
4953
private String nullKey;
5054
private String nullValue;
@@ -53,6 +57,7 @@ public class RedisStreamChangeConsumerConfig extends RedisCommonConfig {
5357
private int memoryLimitMb;
5458
private int batchDelay;
5559
private int bufferFillRate;
60+
private boolean skipHeartbeatMessages;
5661

5762
public RedisStreamChangeConsumerConfig(Configuration config) {
5863
super(config, PROP_PREFIX);
@@ -67,11 +72,13 @@ protected void init(Configuration config) {
6772
messageFormat = config.getString(PROP_MESSAGE_FORMAT);
6873
memoryLimitMb = config.getInteger(PROP_MEMORY_LIMIT_MB);
6974
bufferFillRate = config.getInteger(PROP_BUFFER_FILL_RATE);
75+
skipHeartbeatMessages = config.getBoolean(PROP_SKIP_HEARTBEAT_MESSAGES);
7076
}
7177

7278
@Override
7379
protected List<Field> getAllConfigurationFields() {
74-
List<Field> fields = Collect.arrayListOf(PROP_BATCH_SIZE, PROP_NULL_KEY, PROP_NULL_VALUE, PROP_MESSAGE_FORMAT);
80+
List<Field> fields = Collect.arrayListOf(PROP_BATCH_SIZE, PROP_NULL_KEY, PROP_NULL_VALUE, PROP_MESSAGE_FORMAT,
81+
PROP_SKIP_HEARTBEAT_MESSAGES);
7582
fields.addAll(super.getAllConfigurationFields());
7683
return fields;
7784
}
@@ -108,4 +115,7 @@ public int getMemoryLimitMb() {
108115
return memoryLimitMb;
109116
}
110117

111-
}
118+
public boolean isSkipHeartbeatMessages() {
119+
return skipHeartbeatMessages;
120+
}
121+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright Debezium Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
package io.debezium.server.redis;
7+
8+
import static org.junit.Assert.assertTrue;
9+
10+
import java.util.List;
11+
import java.util.concurrent.TimeUnit;
12+
13+
import org.awaitility.Awaitility;
14+
import org.junit.jupiter.api.Test;
15+
16+
import io.debezium.connector.postgresql.connection.PostgresConnection;
17+
import io.debezium.util.Testing;
18+
import io.quarkus.test.common.QuarkusTestResource;
19+
import io.quarkus.test.junit.QuarkusIntegrationTest;
20+
import io.quarkus.test.junit.TestProfile;
21+
22+
import redis.clients.jedis.HostAndPort;
23+
import redis.clients.jedis.Jedis;
24+
25+
/**
26+
* Integration test that verifies heartbeat messages are stored in Redis Stream when skip.heartbeat.messages=false
27+
*
28+
* @author Yossi Shirizli
29+
*/
30+
@QuarkusIntegrationTest
31+
@TestProfile(RedisStreamHeartbeatDisabledTestProfile.class)
32+
@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
33+
public class RedisStreamHeartbeatDisabledIT {
34+
35+
private static final String HEARTBEAT_PREFIX = "__debezium-heartbeat";
36+
37+
/**
38+
* Test that heartbeat messages are stored when skip.heartbeat.messages=false
39+
*/
40+
@Test
41+
public void testHeartbeatMessagesStored() throws Exception {
42+
Testing.Print.enable();
43+
44+
// Get a connection to Redis
45+
Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
46+
47+
// Create a table and insert data to trigger the CDC process
48+
final PostgresConnection connection = TestUtils.getPostgresConnection();
49+
Testing.print("Creating new redis_test table and inserting 2 records to it");
50+
connection.execute(
51+
"CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)",
52+
"INSERT INTO inventory.redis_test VALUES (1)",
53+
"INSERT INTO inventory.redis_test VALUES (2)");
54+
connection.close();
55+
56+
// Wait specifically for heartbeat streams to appear and have entries
57+
Testing.print("Waiting for heartbeats to be generated...");
58+
59+
Awaitility.await()
60+
.atMost(3, TimeUnit.SECONDS)
61+
.pollInterval(500, TimeUnit.MILLISECONDS)
62+
.until(() -> {
63+
List<String> heartbeatStreams = jedis.keys(HEARTBEAT_PREFIX + "*").stream()
64+
.filter(key -> {
65+
try {
66+
// Check if it's a stream and has entries
67+
return jedis.xlen(key) > 0;
68+
}
69+
catch (Exception e) {
70+
return false;
71+
}
72+
})
73+
.toList();
74+
75+
return !heartbeatStreams.isEmpty();
76+
});
77+
78+
// After successful waiting, verify data streams
79+
List<String> dataStreams = jedis.keys("testc.inventory.*").stream()
80+
.filter(key -> {
81+
try {
82+
return jedis.xlen(key) > 0;
83+
}
84+
catch (Exception e) {
85+
return false;
86+
}
87+
})
88+
.toList();
89+
90+
Testing.print("Data streams found: " + dataStreams);
91+
92+
// Verify that we have data streams
93+
assertTrue("Expected to find at least one data stream", !dataStreams.isEmpty());
94+
95+
jedis.close();
96+
}
97+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright Debezium Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
package io.debezium.server.redis;
7+
8+
import java.util.Map;
9+
10+
public class RedisStreamHeartbeatDisabledTestProfile extends RedisStreamTestProfile {
11+
@Override
12+
public Map<String, String> getConfigOverrides() {
13+
Map<String, String> config = super.getConfigOverrides();
14+
15+
// Enable heartbeats with a short interval
16+
config.put("debezium.source.heartbeat.interval.ms", "50");
17+
18+
// Set skip.heartbeat.messages to false to allow heartbeat messages to be stored
19+
config.put("debezium.sink.redis.skip.heartbeat.messages", "false");
20+
21+
return config;
22+
}
23+
}

0 commit comments

Comments
 (0)