Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.metrics.CommitTiming;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.credentials.provider.AwsCredentialsProviderService;
import org.apache.nifi.processors.aws.kinesis.MemoryBoundRecordBuffer.Lease;
import org.apache.nifi.processors.aws.kinesis.ReaderRecordProcessor.ProcessingResult;
import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ConsumeRecordsResult;
import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ShardBufferId;
import org.apache.nifi.processors.aws.kinesis.converter.InjectMetadataRecordConverter;
import org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverter;
Expand Down Expand Up @@ -108,6 +110,7 @@
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.APPROXIMATE_ARRIVAL_TIMESTAMP;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.CURRENT_LAG;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.FIRST_SEQUENCE_NUMBER;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.FIRST_SUB_SEQUENCE_NUMBER;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.LAST_SEQUENCE_NUMBER;
Expand Down Expand Up @@ -146,6 +149,8 @@ Uses DynamoDB for check pointing and coordination, and (optional) CloudWatch for
description = "A SubSequence Number of the last Kinesis Record in the FlowFile. Generated by KPL when aggregating records into a single Kinesis Record"),
@WritesAttribute(attribute = APPROXIMATE_ARRIVAL_TIMESTAMP,
description = "Approximate arrival timestamp of the last Kinesis Record in the FlowFile"),
@WritesAttribute(attribute = CURRENT_LAG,
description = "Milliseconds behind the latest record in the shard at the time records were consumed"),
@WritesAttribute(attribute = MIME_TYPE,
description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer (if configured)"),
@WritesAttribute(attribute = RECORD_COUNT,
Expand Down Expand Up @@ -482,6 +487,10 @@ public void setup(final ProcessContext context) {
}
}

private static String makeCurrentLagGaugeName(final String streamName, final String shardId) {
return "%s[stream.name=\"%s\",shard.id=\"%s\"]".formatted(CURRENT_LAG, streamName, shardId);
}

/**
* Creating Kinesis HTTP client, as per
* {@link software.amazon.kinesis.common.KinesisClientUtil#adjustKinesisClientBuilder(KinesisAsyncClientBuilder)}.
Expand Down Expand Up @@ -698,20 +707,25 @@ private void checkInitializationResult(final InitializationResult initialization

private void processRecordsFromBuffer(final ProcessSession session, final Lease lease) {
try {
final List<KinesisClientRecord> records = recordBuffer.consumeRecords(lease);
final ConsumeRecordsResult result = recordBuffer.consumeRecords(lease);
final List<KinesisClientRecord> records = result.records();

final String shardId = lease.shardId();
final Long currentLag = result.currentLag();
if (records.isEmpty()) {
recordBuffer.returnBufferLease(lease);
return;
}

final String shardId = lease.shardId();
switch (processingStrategy) {
case FLOW_FILE -> processRecordsAsRaw(session, shardId, records);
case RECORD -> processRecordsWithReader(session, shardId, records);
case DEMARCATOR -> processRecordsAsDemarcated(session, shardId, records);
case FLOW_FILE -> processRecordsAsRaw(session, shardId, result);
case RECORD -> processRecordsWithReader(session, shardId, result);
case DEMARCATOR -> processRecordsAsDemarcated(session, shardId, result);
}

if (currentLag != null) {
session.recordGauge(makeCurrentLagGaugeName(streamName, shardId), currentLag, CommitTiming.SESSION_COMMITTED);
}
session.adjustCounter("Records Processed", records.size(), false);

session.commitAsync(
Expand Down Expand Up @@ -740,10 +754,11 @@ private void rollbackRecords(final Lease lease) {
}
}

private void processRecordsAsRaw(final ProcessSession session, final String shardId, final List<KinesisClientRecord> records) {
private void processRecordsAsRaw(final ProcessSession session, final String shardId, final ConsumeRecordsResult consumeResult) {
final List<KinesisClientRecord> records = consumeResult.records();
for (final KinesisClientRecord record : records) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId, record, record));
flowFile = session.putAllAttributes(flowFile, ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId, record, record, consumeResult.currentLag()));

flowFile = session.write(flowFile, out -> {
try (final WritableByteChannel channel = Channels.newChannel(out)) {
Expand All @@ -757,27 +772,27 @@ private void processRecordsAsRaw(final ProcessSession session, final String shar
}
}

private void processRecordsWithReader(final ProcessSession session, final String shardId, final List<KinesisClientRecord> records) {
private void processRecordsWithReader(final ProcessSession session, final String shardId, final ConsumeRecordsResult consumeResult) {
final ReaderRecordProcessor recordProcessor = readerRecordProcessor;
if (recordProcessor == null) {
throw new IllegalStateException("RecordProcessor has not been initialized");
}

final ProcessingResult result = recordProcessor.processRecords(session, streamName, shardId, records);
final ProcessingResult result = recordProcessor.processRecords(session, streamName, shardId, consumeResult);

session.transfer(result.successFlowFiles(), REL_SUCCESS);
session.transfer(result.parseFailureFlowFiles(), REL_PARSE_FAILURE);
}

private void processRecordsAsDemarcated(final ProcessSession session, final String shardId, final List<KinesisClientRecord> records) {
private void processRecordsAsDemarcated(final ProcessSession session, final String shardId, final ConsumeRecordsResult consumeRecordsResult) {
final List<KinesisClientRecord> records = consumeRecordsResult.records();
final byte[] demarcator = demarcatorValue;
if (demarcator == null) {
throw new IllegalStateException("Demarcator has not been initialized");
}

FlowFile flowFile = session.create();

final Map<String, String> attributes = ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId, records.getFirst(), records.getLast());
final Map<String, String> attributes = ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId, records.getFirst(), records.getLast(), consumeRecordsResult.currentLag());
attributes.put(RECORD_COUNT, String.valueOf(records.size()));
flowFile = session.putAllAttributes(flowFile, attributes);

Expand Down Expand Up @@ -821,7 +836,8 @@ public void processRecords(final ProcessRecordsInput processRecordsInput) {
if (bufferId == null) {
throw new IllegalStateException("Buffer ID not found: Record Processor not initialized");
}
recordBuffer.addRecords(bufferId, processRecordsInput.records(), processRecordsInput.checkpointer());
recordBuffer.addRecords(bufferId, processRecordsInput.records(),
processRecordsInput.checkpointer(), processRecordsInput.millisBehindLatest());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ final class ConsumeKinesisAttributes {
static final String FIRST_SUB_SEQUENCE_NUMBER = PREFIX + "first.subsequence.number";
static final String LAST_SEQUENCE_NUMBER = PREFIX + "last.sequence.number";
static final String LAST_SUB_SEQUENCE_NUMBER = PREFIX + "last.subsequence.number";
static final String CURRENT_LAG = PREFIX + "current.lag";

static final String PARTITION_KEY = PREFIX + "partition.key";
static final String APPROXIMATE_ARRIVAL_TIMESTAMP = PREFIX + "approximate.arrival.timestamp.ms";
Expand All @@ -48,14 +49,16 @@ final class ConsumeKinesisAttributes {
* @param shardId the shard ID the FlowFile records came from.
* @param firstRecord the first Kinesis record in the FlowFile.
* @param lastRecord the last Kinesis record in the FlowFile.
* @param currentLag milliseconds behind the latest record in the shard, or null if not available.
* @return a <b>mutable</b> map with kinesis attributes.
*/
static Map<String, String> fromKinesisRecords(
final String streamName,
final String shardId,
final KinesisClientRecord firstRecord,
final KinesisClientRecord lastRecord) {
final Map<String, String> attributes = new HashMap<>(8);
final KinesisClientRecord lastRecord,
final Long currentLag) {
final Map<String, String> attributes = new HashMap<>(9);

attributes.put(STREAM_NAME, streamName);
attributes.put(SHARD_ID, shardId);
Expand All @@ -72,6 +75,10 @@ static Map<String, String> fromKinesisRecords(
attributes.put(APPROXIMATE_ARRIVAL_TIMESTAMP, String.valueOf(lastRecord.approximateArrivalTimestamp().toEpochMilli()));
}

if (currentLag != null) {
attributes.put(CURRENT_LAG, String.valueOf(currentLag));
}

return attributes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import jakarta.annotation.Nullable;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ConsumeRecordsResult;
import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ShardBufferId;
import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ShardBufferLease;
import software.amazon.kinesis.exceptions.InvalidStateException;
Expand Down Expand Up @@ -96,7 +97,8 @@ public ShardBufferId createBuffer(final String shardId) {
}

@Override
public void addRecords(final ShardBufferId bufferId, final List<KinesisClientRecord> records, final RecordProcessorCheckpointer checkpointer) {
public void addRecords(final ShardBufferId bufferId, final List<KinesisClientRecord> records,
final RecordProcessorCheckpointer checkpointer, final Long currentLag) {
if (records.isEmpty()) {
return;
}
Expand All @@ -112,7 +114,7 @@ public void addRecords(final ShardBufferId bufferId, final List<KinesisClientRec
return;
}

final RecordBatch recordBatch = new RecordBatch(records, checkpointer, calculateMemoryUsage(records));
final RecordBatch recordBatch = new RecordBatch(records, checkpointer, calculateMemoryUsage(records), currentLag);
memoryTracker.reserveMemory(recordBatch, bufferId);
final boolean addedRecords = buffer.offer(recordBatch);

Expand Down Expand Up @@ -211,18 +213,18 @@ public Optional<Lease> acquireBufferLease() {
}

@Override
public List<KinesisClientRecord> consumeRecords(final Lease lease) {
public ConsumeRecordsResult consumeRecords(final Lease lease) {
if (lease.isReturnedToPool()) {
logger.warn("Attempting to consume records from a buffer that was already returned to the pool. Ignoring");
return emptyList();
return ConsumeRecordsResult.empty();
}

final ShardBufferId bufferId = lease.bufferId();

final ShardBuffer buffer = shardBuffers.get(bufferId);
if (buffer == null) {
logger.debug("Buffer with id {} not found. Cannot consume records", bufferId);
return emptyList();
return ConsumeRecordsResult.empty();
}

return buffer.consumeRecords();
Expand Down Expand Up @@ -398,7 +400,8 @@ void freeMemory(final Collection<RecordBatch> consumedBatches, final ShardBuffer

private record RecordBatch(List<KinesisClientRecord> records,
RecordProcessorCheckpointer checkpointer,
long batchSizeBytes) {
long batchSizeBytes,
Long currentLag) {
int size() {
return records.size();
}
Expand Down Expand Up @@ -489,9 +492,9 @@ boolean offer(final RecordBatch recordBatch) {
return true;
}

List<KinesisClientRecord> consumeRecords() {
ConsumeRecordsResult consumeRecords() {
if (invalidated.get()) {
return emptyList();
return ConsumeRecordsResult.empty();
}

RecordBatch pendingBatch;
Expand All @@ -500,11 +503,15 @@ List<KinesisClientRecord> consumeRecords() {
}

final List<KinesisClientRecord> recordsToConsume = new ArrayList<>();
Long currentLag = null;
for (final RecordBatch batch : inProgressBatches) {
recordsToConsume.addAll(batch.records());
if (batch.currentLag != null) {
currentLag = batch.currentLag;
}
}

return recordsToConsume;
return new ConsumeRecordsResult(recordsToConsume, currentLag);
}

List<RecordBatch> commitConsumedRecords() {
Expand Down
Loading
Loading