Skip to content

Commit f0ef3cd

Browse files
Oskar Bondevjuranek
authored andcommitted
DBZ-8893 Fix KinesisChangeConsumer so that it retries failed records.
Use batchRequest.get(index) instead of putRecordsRequestEntryList.get(index) when creating the list of failed records for retry. Add test for successive retries
1 parent 3f5eb5f commit f0ef3cd

2 files changed

Lines changed: 92 additions & 5 deletions

File tree

debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
188188
for (int index = 0; index < putRecordsResults.size(); index++) {
189189
PutRecordsResultEntry entryResult = putRecordsResults.get(index);
190190
if (entryResult.errorCode() != null) {
191-
failedRecordsList.add(putRecordsRequestEntryList.get(index));
191+
failedRecordsList.add(batchRequest.get(index));
192192
}
193193
}
194194
batchRequest = failedRecordsList;

debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,13 @@ public class KinesisUnitTest {
5353
private AtomicBoolean threwException;
5454
List<ChangeEvent<Object, Object>> changeEvents;
5555
RecordCommitter<ChangeEvent<Object, Object>> committer;
56+
private static final Integer NUMBER_OF_CHANGE_EVENTS = 500;
5657

5758
@BeforeEach
5859
public void setup() {
5960
counter = new AtomicInteger(0);
6061
threwException = new AtomicBoolean(false);
61-
changeEvents = createChangeEvents(500, "key", "destination");
62+
changeEvents = createChangeEvents(NUMBER_OF_CHANGE_EVENTS, "key", "destination");
6263
committer = RecordCommitter();
6364
spyClient = spy(KinesisClient.builder().region(Region.of(KinesisTestConfigSource.KINESIS_REGION))
6465
.credentialsProvider(ProfileCredentialsProvider.create("default")).build());
@@ -283,7 +284,7 @@ public void testEmptyRecords() throws Exception {
283284
assertFalse(threwException.get());
284285
}
285286

286-
// 6. Test that a batch of 1000 records is correctly split into 2 batches of 500 records
287+
// 6. Test that a batch of 1000 records is correctly split into 2 batches of NUMBER_OF_CHANGE_EVENTS records
287288
@Test
288289
public void testBatchSplitting() throws Exception {
289290
// Arrange
@@ -326,8 +327,94 @@ public void testBatchSplitting() throws Exception {
326327
// Assert
327328
assertFalse(threwException.get());
328329
assertEquals(2, numBatches.get());
329-
assertEquals(500, numRecordsBatchOne.get());
330-
assertEquals(500, numRecordsBatchTwo.get());
330+
assertEquals(NUMBER_OF_CHANGE_EVENTS, numRecordsBatchOne.get());
331+
assertEquals(NUMBER_OF_CHANGE_EVENTS, numRecordsBatchTwo.get());
331332
}
332333

334+
// 7. Test that only failed records are re-sent after successive retry
335+
@Test
336+
public void testResendFailedRecordsSuccessive() throws Exception {
337+
// Arrange
338+
AtomicBoolean firstCall = new AtomicBoolean(true);
339+
AtomicBoolean secondCall = new AtomicBoolean(false);
340+
List<PutRecordsRequestEntry> failedRecordsFromFirstCall = new ArrayList<>();
341+
List<PutRecordsRequestEntry> failedRecordsFromSecondCall = new ArrayList<>();
342+
List<PutRecordsRequestEntry> recordsFromSecondCall = new ArrayList<>();
343+
List<PutRecordsRequestEntry> recordsFromThirdCall = new ArrayList<>();
344+
doAnswer(invocation -> {
345+
List<PutRecordsResultEntry> response = new ArrayList<>();
346+
PutRecordsRequest request = invocation.getArgument(0);
347+
List<PutRecordsRequestEntry> records = request.records();
348+
counter.incrementAndGet();
349+
350+
if (firstCall.get()) {
351+
int failedEntries = 100;
352+
for (int i = 0; i < records.size(); i++) {
353+
PutRecordsResultEntry recordResult;
354+
//
355+
if (i >= NUMBER_OF_CHANGE_EVENTS - failedEntries) {
356+
recordResult = PutRecordsResultEntry.builder().errorCode("ProvisionedThroughputExceededException")
357+
.errorMessage("The request rate for the stream is too high").build();
358+
359+
failedRecordsFromFirstCall.add(records.get(i));
360+
}
361+
else {
362+
recordResult = PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build();
363+
}
364+
response.add(recordResult);
365+
}
366+
firstCall.getAndSet(false);
367+
secondCall.getAndSet(true);
368+
return PutRecordsResponse.builder().failedRecordCount(failedEntries).records(response).build();
369+
}
370+
if (secondCall.get()) {
371+
int failedEntries = 20;
372+
for (int i = 0; i < failedRecordsFromFirstCall.size(); i++) {
373+
PutRecordsResultEntry recordResult;
374+
if (i >= failedRecordsFromFirstCall.size() - failedEntries) {
375+
recordResult = PutRecordsResultEntry.builder().errorCode("ProvisionedThroughputExceededException")
376+
.errorMessage("The request rate for the stream is too high").build();
377+
failedRecordsFromSecondCall.add(records.get(i));
378+
}
379+
else {
380+
recordResult = PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build();
381+
}
382+
recordsFromSecondCall.add(records.get(i));
383+
response.add(recordResult);
384+
}
385+
secondCall.getAndSet(false);
386+
return PutRecordsResponse.builder().failedRecordCount(failedEntries).records(response).build();
387+
}
388+
else {
389+
for (PutRecordsRequestEntry record : records) {
390+
recordsFromThirdCall.add(record);
391+
PutRecordsResultEntry recordResult = PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build();
392+
response.add(recordResult);
393+
}
394+
return PutRecordsResponse.builder().failedRecordCount(0).records(response).build();
395+
}
396+
}).when(spyClient).putRecords(any(PutRecordsRequest.class));
397+
398+
// Act
399+
try {
400+
kinesisChangeConsumer.connect();
401+
kinesisChangeConsumer.handleBatch(changeEvents, committer);
402+
}
403+
catch (Exception e) {
404+
threwException.getAndSet(true);
405+
}
406+
407+
// Assert
408+
assertFalse(threwException.get());
409+
assertEquals(3, counter.get());
410+
411+
assertEquals(recordsFromSecondCall.size(), failedRecordsFromFirstCall.size());
412+
assertEquals(recordsFromThirdCall.size(), failedRecordsFromSecondCall.size());
413+
for (int i = 0; i < recordsFromSecondCall.size(); i++) {
414+
assertEquals(failedRecordsFromFirstCall.get(i).data(), recordsFromSecondCall.get(i).data());
415+
}
416+
for (int i = 0; i < recordsFromThirdCall.size(); i++) {
417+
assertEquals(failedRecordsFromSecondCall.get(i).data(), recordsFromThirdCall.get(i).data());
418+
}
419+
}
333420
}

0 commit comments

Comments
 (0)