Skip to content

Commit 86bd163

Browse files
awellesspvillard31
authored andcommitted
NIFI-15586 Add demarcator processing strategy to ConsumeKinesis
This closes #10904. Signed-off-by: Pierre Villard <[email protected]>
1 parent 89c2f20 commit 86bd163

3 files changed

Lines changed: 119 additions & 9 deletions

File tree

nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.nifi.annotation.lifecycle.OnStopped;
3030
import org.apache.nifi.components.DescribedValue;
3131
import org.apache.nifi.components.PropertyDescriptor;
32+
import org.apache.nifi.components.Validator;
3233
import org.apache.nifi.controller.NodeTypeProvider;
3334
import org.apache.nifi.flowfile.FlowFile;
3435
import org.apache.nifi.logging.ComponentLog;
@@ -93,6 +94,7 @@
9394
import java.time.Instant;
9495
import java.util.Date;
9596
import java.util.List;
97+
import java.util.Map;
9698
import java.util.Optional;
9799
import java.util.Set;
98100
import java.util.UUID;
@@ -102,6 +104,7 @@
102104
import java.util.concurrent.TimeoutException;
103105
import java.util.concurrent.atomic.AtomicBoolean;
104106

107+
import static java.nio.charset.StandardCharsets.UTF_8;
105108
import static java.util.concurrent.TimeUnit.NANOSECONDS;
106109
import static java.util.concurrent.TimeUnit.SECONDS;
107110
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.APPROXIMATE_ARRIVAL_TIMESTAMP;
@@ -246,6 +249,19 @@ Ensure that the credentials provided have access to Kinesis, DynamoDB and (optio
246249
.allowableValues(OutputStrategy.class)
247250
.build();
248251

252+
static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
253+
.name("Message Demarcator")
254+
.description("""
255+
Specifies the string (interpreted as UTF-8) to use for demarcating multiple Kinesis messages
256+
within a single FlowFile. If not specified, the content of the messages will be concatenated
257+
without any delimiter.
258+
To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.
259+
""")
260+
.required(false)
261+
.addValidator(Validator.VALID)
262+
.dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR)
263+
.build();
264+
249265
static final PropertyDescriptor INITIAL_STREAM_POSITION = new PropertyDescriptor.Builder()
250266
.name("Initial Stream Position")
251267
.description("The position in the stream where the processor should start reading.")
@@ -309,6 +325,7 @@ Ensure that the credentials provided have access to Kinesis, DynamoDB and (optio
309325
RECORD_READER,
310326
RECORD_WRITER,
311327
OUTPUT_STRATEGY,
328+
MESSAGE_DEMARCATOR,
312329
INITIAL_STREAM_POSITION,
313330
STREAM_POSITION_TIMESTAMP,
314331
MAX_BYTES_TO_BUFFER,
@@ -339,6 +356,7 @@ Ensure that the credentials provided have access to Kinesis, DynamoDB and (optio
339356
private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
340357

341358
private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
359+
private volatile @Nullable byte[] demarcatorValue;
342360

343361
private volatile Future<InitializationResult> initializationResultFuture;
344362
private final AtomicBoolean initialized = new AtomicBoolean();
@@ -360,7 +378,7 @@ public void migrateProperties(final PropertyConfiguration config) {
360378
@Override
361379
public Set<Relationship> getRelationships() {
362380
return switch (processingStrategy) {
363-
case FLOW_FILE -> RAW_FILE_RELATIONSHIPS;
381+
case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
364382
case RECORD -> RECORD_FILE_RELATIONSHIPS;
365383
};
366384
}
@@ -375,9 +393,16 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
375393
@OnScheduled
376394
public void setup(final ProcessContext context) {
377395
readerRecordProcessor = switch (processingStrategy) {
378-
case FLOW_FILE -> null;
396+
case FLOW_FILE, DEMARCATOR -> null;
379397
case RECORD -> createReaderRecordProcessor(context);
380398
};
399+
demarcatorValue = switch (processingStrategy) {
400+
case FLOW_FILE, RECORD -> null;
401+
case DEMARCATOR -> {
402+
final String demarcatorValue = context.getProperty(MESSAGE_DEMARCATOR).getValue();
403+
yield demarcatorValue != null ? demarcatorValue.getBytes(UTF_8) : new byte[0];
404+
}
405+
};
381406

382407
final Region region = RegionUtil.getRegion(context);
383408
final AwsCredentialsProvider credentialsProvider = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
@@ -593,6 +618,7 @@ private void cleanUpState() {
593618

594619
recordBuffer = null;
595620
readerRecordProcessor = null;
621+
demarcatorValue = null;
596622
}
597623

598624
private void shutdownScheduler() {
@@ -680,11 +706,10 @@ private void processRecordsFromBuffer(final ProcessSession session, final Lease
680706
}
681707

682708
final String shardId = lease.shardId();
683-
final ReaderRecordProcessor processor = readerRecordProcessor;
684-
if (processor != null) {
685-
processRecordsWithReader(processor, session, shardId, records);
686-
} else {
687-
processRecordsAsRaw(session, shardId, records);
709+
switch (processingStrategy) {
710+
case FLOW_FILE -> processRecordsAsRaw(session, shardId, records);
711+
case RECORD -> processRecordsWithReader(session, shardId, records);
712+
case DEMARCATOR -> processRecordsAsDemarcated(session, shardId, records);
688713
}
689714

690715
session.adjustCounter("Records Processed", records.size(), false);
@@ -732,13 +757,48 @@ private void processRecordsAsRaw(final ProcessSession session, final String shar
732757
}
733758
}
734759

735-
private void processRecordsWithReader(final ReaderRecordProcessor recordProcessor, final ProcessSession session, final String shardId, final List<KinesisClientRecord> records) {
760+
private void processRecordsWithReader(final ProcessSession session, final String shardId, final List<KinesisClientRecord> records) {
761+
final ReaderRecordProcessor recordProcessor = readerRecordProcessor;
762+
if (recordProcessor == null) {
763+
throw new IllegalStateException("RecordProcessor has not been initialized");
764+
}
765+
736766
final ProcessingResult result = recordProcessor.processRecords(session, streamName, shardId, records);
737767

738768
session.transfer(result.successFlowFiles(), REL_SUCCESS);
739769
session.transfer(result.parseFailureFlowFiles(), REL_PARSE_FAILURE);
740770
}
741771

772+
private void processRecordsAsDemarcated(final ProcessSession session, final String shardId, final List<KinesisClientRecord> records) {
773+
final byte[] demarcator = demarcatorValue;
774+
if (demarcator == null) {
775+
throw new IllegalStateException("Demarcator has not been initialized");
776+
}
777+
778+
FlowFile flowFile = session.create();
779+
780+
final Map<String, String> attributes = ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId, records.getFirst(), records.getLast());
781+
attributes.put(RECORD_COUNT, String.valueOf(records.size()));
782+
flowFile = session.putAllAttributes(flowFile, attributes);
783+
784+
flowFile = session.write(flowFile, out -> {
785+
try (final WritableByteChannel channel = Channels.newChannel(out)) {
786+
boolean writtenData = false;
787+
for (final KinesisClientRecord record : records) {
788+
if (writtenData) {
789+
out.write(demarcator);
790+
}
791+
channel.write(record.data());
792+
writtenData = true;
793+
}
794+
}
795+
});
796+
797+
session.getProvenanceReporter().receive(flowFile, ProvenanceTransitUriFormat.toTransitUri(streamName, shardId));
798+
799+
session.transfer(flowFile, REL_SUCCESS);
800+
}
801+
742802
/**
743803
* An adapter between Kinesis Consumer Library and {@link RecordBuffer}.
744804
*/
@@ -859,7 +919,8 @@ public String getDescription() {
859919

860920
enum ProcessingStrategy implements DescribedValue {
861921
FLOW_FILE("Write one FlowFile for each consumed Kinesis Record"),
862-
RECORD("Write one FlowFile containing multiple consumed Kinesis Records processed with Record Reader and Record Writer");
922+
RECORD("Write one FlowFile containing multiple consumed Kinesis Records processed with Record Reader and Record Writer"),
923+
DEMARCATOR("Write one FlowFile containing multiple consumed Kinesis Records separated by a configurable demarcator");
863924

864925
private final String description;
865926

nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,35 @@ void testRecordProcessingWithSchemaChangesAndInvalidRecords() throws Initializat
442442
assertReceiveProvenanceEvents(recordRunner.getProvenanceEvents(), firstFlowFile, secondFlowFile, parseFailureFlowFile);
443443
}
444444

445+
@Test
446+
void testRecordProcessingWithDemarcator() throws InitializationException {
447+
streamClient.createStream(1);
448+
449+
final TestRunner demarcatorTestRunner = createDemarcatorTestRunner(streamName, applicationName, System.lineSeparator());
450+
451+
final List<String> testRecords = List.of(
452+
"{\"name\":\"John\",\"age\":30}", // Schema A
453+
"{\"name\":\"Jane\",\"age\":25}", // Schema A
454+
"{invalid json}",
455+
"{\"id\":\"123\",\"value\":\"test\"}" // Schema B
456+
);
457+
458+
testRecords.forEach(record -> streamClient.putRecord("key", record));
459+
460+
runProcessorWithInitAndWaitForFiles(demarcatorTestRunner, 1);
461+
462+
// All records from the same shard are put as is into the same FlowFile.
463+
demarcatorTestRunner.assertTransferCount(REL_SUCCESS, 1);
464+
final List<MockFlowFile> successFlowFiles = demarcatorTestRunner.getFlowFilesForRelationship(REL_SUCCESS);
465+
466+
final MockFlowFile flowFile = successFlowFiles.getFirst();
467+
assertEquals("4", flowFile.getAttribute(RECORD_COUNT));
468+
flowFile.assertContentEquals(String.join(System.lineSeparator(), testRecords));
469+
470+
// Verify provenance events.
471+
assertReceiveProvenanceEvents(demarcatorTestRunner.getProvenanceEvents(), flowFile);
472+
}
473+
445474
private static void assertReceiveProvenanceEvents(final List<ProvenanceEventRecord> actualEvents, final FlowFile... expectedFlowFiles) {
446475
assertReceiveProvenanceEvents(actualEvents, List.of(expectedFlowFiles));
447476
}
@@ -511,6 +540,16 @@ private TestRunner createRecordTestRunner(final String streamName, final String
511540
return runner;
512541
}
513542

543+
private TestRunner createDemarcatorTestRunner(final String streamName, final String applicationName, final String demarcator) throws InitializationException {
544+
final TestRunner runner = createTestRunner(streamName, applicationName);
545+
546+
runner.setProperty(ConsumeKinesis.PROCESSING_STRATEGY, ConsumeKinesis.ProcessingStrategy.DEMARCATOR);
547+
runner.setProperty(ConsumeKinesis.MESSAGE_DEMARCATOR, demarcator);
548+
549+
runner.assertValid();
550+
return runner;
551+
}
552+
514553
private void runProcessorWithInitAndWaitForFiles(final TestRunner runner, final int expectedFlowFileCount) {
515554
runProcessorAndWaitForFiles(runner, expectedFlowFileCount, true);
516555
}

nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Set;
2929

3030
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.PROCESSING_STRATEGY;
31+
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.DEMARCATOR;
3132
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.FLOW_FILE;
3233
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.RECORD;
3334
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_PARSE_FAILURE;
@@ -61,6 +62,15 @@ void getRelationshipsForRecordProcessingStrategy() {
6162
assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships);
6263
}
6364

65+
@Test
66+
void getRelationshipsForDemarcatorProcessingStrategy() {
67+
testRunner.setProperty(PROCESSING_STRATEGY, DEMARCATOR);
68+
69+
final Set<Relationship> relationships = testRunner.getProcessor().getRelationships();
70+
71+
assertEquals(Set.of(REL_SUCCESS), relationships);
72+
}
73+
6474
private static TestRunner createTestRunner() {
6575
final TestRunner runner = TestRunners.newTestRunner(ConsumeKinesis.class);
6676

0 commit comments

Comments
 (0)