Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.metrics.CommitTiming;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.credentials.provider.AwsCredentialsProviderService;
import org.apache.nifi.processors.aws.kinesis.MemoryBoundRecordBuffer.Lease;
import org.apache.nifi.processors.aws.kinesis.ReaderRecordProcessor.ProcessingResult;
import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ConsumeRecordsResult;
import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ShardBufferId;
import org.apache.nifi.processors.aws.kinesis.converter.InjectMetadataRecordConverter;
import org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverter;
Expand Down Expand Up @@ -112,6 +114,7 @@
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.FIRST_SUB_SEQUENCE_NUMBER;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.LAST_SEQUENCE_NUMBER;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.LAST_SUB_SEQUENCE_NUMBER;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.MILLIS_BEHIND_LATEST;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.MIME_TYPE;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.PARTITION_KEY;
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.RECORD_COUNT;
Expand Down Expand Up @@ -146,6 +149,8 @@ Uses DynamoDB for check pointing and coordination, and (optional) CloudWatch for
description = "A SubSequence Number of the last Kinesis Record in the FlowFile. Generated by KPL when aggregating records into a single Kinesis Record"),
@WritesAttribute(attribute = APPROXIMATE_ARRIVAL_TIMESTAMP,
description = "Approximate arrival timestamp of the last Kinesis Record in the FlowFile"),
@WritesAttribute(attribute = MILLIS_BEHIND_LATEST,
description = "Milliseconds behind the latest record in the shard at the time records were consumed"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally the issue was about writing millis behind latest in a gauge.
Do we need to write it to FlowFile attributes too?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this attribute will be needed downstream, no problems.
But I wouldn't put it in FlowFile attributes if it's only caused by the gauge name format.

@WritesAttribute(attribute = MIME_TYPE,
description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer (if configured)"),
@WritesAttribute(attribute = RECORD_COUNT,
Expand Down Expand Up @@ -482,6 +487,10 @@ public void setup(final ProcessContext context) {
}
}

static String makeMillisBehindLatestGaugeName(final String streamName, final String shardId) {
return "%s[stream.name=\"%s\",shard.id=\"%s\"]".formatted(MILLIS_BEHIND_LATEST, streamName, shardId);
}

/**
* Creating Kinesis HTTP client, as per
* {@link software.amazon.kinesis.common.KinesisClientUtil#adjustKinesisClientBuilder(KinesisAsyncClientBuilder)}.
Expand Down Expand Up @@ -696,20 +705,28 @@ private void checkInitializationResult(final InitializationResult initialization
}
}



Comment thread
pvillard31 marked this conversation as resolved.
Outdated
private void processRecordsFromBuffer(final ProcessSession session, final Lease lease) {
try {
final List<KinesisClientRecord> records = recordBuffer.consumeRecords(lease);
final ConsumeRecordsResult result = recordBuffer.consumeRecords(lease);
final List<KinesisClientRecord> records = result.records();

final String shardId = lease.shardId();
final Long millisBehindLatest = result.millisBehindLatest();
if (millisBehindLatest != null) {
session.recordGauge(makeMillisBehindLatestGaugeName(streamName, shardId), millisBehindLatest, CommitTiming.SESSION_COMMITTED);
}
Comment thread
lkuchars marked this conversation as resolved.
Outdated

if (records.isEmpty()) {
recordBuffer.returnBufferLease(lease);
return;
}

final String shardId = lease.shardId();
switch (processingStrategy) {
case FLOW_FILE -> processRecordsAsRaw(session, shardId, records);
case RECORD -> processRecordsWithReader(session, shardId, records);
case DEMARCATOR -> processRecordsAsDemarcated(session, shardId, records);
case FLOW_FILE -> processRecordsAsRaw(session, shardId, result);
case RECORD -> processRecordsWithReader(session, shardId, result);
case DEMARCATOR -> processRecordsAsDemarcated(session, shardId, result);
}

session.adjustCounter("Records Processed", records.size(), false);
Expand Down Expand Up @@ -740,10 +757,11 @@ private void rollbackRecords(final Lease lease) {
}
}

private void processRecordsAsRaw(final ProcessSession session, final String shardId, final List<KinesisClientRecord> records) {
private void processRecordsAsRaw(final ProcessSession session, final String shardId, final ConsumeRecordsResult consumeResult) {
final List<KinesisClientRecord> records = consumeResult.records();
for (final KinesisClientRecord record : records) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId, record, record));
flowFile = session.putAllAttributes(flowFile, ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId, record, record, consumeResult.millisBehindLatest()));

flowFile = session.write(flowFile, out -> {
try (final WritableByteChannel channel = Channels.newChannel(out)) {
Expand All @@ -757,27 +775,27 @@ private void processRecordsAsRaw(final ProcessSession session, final String shar
}
}

private void processRecordsWithReader(final ProcessSession session, final String shardId, final List<KinesisClientRecord> records) {
private void processRecordsWithReader(final ProcessSession session, final String shardId, final ConsumeRecordsResult consumeResult) {
final ReaderRecordProcessor recordProcessor = readerRecordProcessor;
if (recordProcessor == null) {
throw new IllegalStateException("RecordProcessor has not been initialized");
}

final ProcessingResult result = recordProcessor.processRecords(session, streamName, shardId, records);
final ProcessingResult result = recordProcessor.processRecords(session, streamName, shardId, consumeResult);

session.transfer(result.successFlowFiles(), REL_SUCCESS);
session.transfer(result.parseFailureFlowFiles(), REL_PARSE_FAILURE);
}

private void processRecordsAsDemarcated(final ProcessSession session, final String shardId, final List<KinesisClientRecord> records) {
private void processRecordsAsDemarcated(final ProcessSession session, final String shardId, final ConsumeRecordsResult consumeRecordsResult) {
final List<KinesisClientRecord> records = consumeRecordsResult.records();
final byte[] demarcator = demarcatorValue;
if (demarcator == null) {
throw new IllegalStateException("Demarcator has not been initialized");
}

FlowFile flowFile = session.create();

final Map<String, String> attributes = ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId, records.getFirst(), records.getLast());
final Map<String, String> attributes = ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId, records.getFirst(), records.getLast(), consumeRecordsResult.millisBehindLatest());
attributes.put(RECORD_COUNT, String.valueOf(records.size()));
flowFile = session.putAllAttributes(flowFile, attributes);

Expand Down Expand Up @@ -821,7 +839,8 @@ public void processRecords(final ProcessRecordsInput processRecordsInput) {
if (bufferId == null) {
throw new IllegalStateException("Buffer ID not found: Record Processor not initialized");
}
recordBuffer.addRecords(bufferId, processRecordsInput.records(), processRecordsInput.checkpointer());
recordBuffer.addRecords(bufferId, processRecordsInput.records(),
processRecordsInput.checkpointer(), processRecordsInput.millisBehindLatest());
Comment thread
lkuchars marked this conversation as resolved.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ final class ConsumeKinesisAttributes {
static final String FIRST_SUB_SEQUENCE_NUMBER = PREFIX + "first.subsequence.number";
static final String LAST_SEQUENCE_NUMBER = PREFIX + "last.sequence.number";
static final String LAST_SUB_SEQUENCE_NUMBER = PREFIX + "last.subsequence.number";
static final String MILLIS_BEHIND_LATEST = PREFIX + "millis.behind.latest";

static final String PARTITION_KEY = PREFIX + "partition.key";
static final String APPROXIMATE_ARRIVAL_TIMESTAMP = PREFIX + "approximate.arrival.timestamp.ms";
Expand All @@ -48,14 +49,16 @@ final class ConsumeKinesisAttributes {
* @param shardId the shard ID the FlowFile records came from.
* @param firstRecord the first Kinesis record in the FlowFile.
* @param lastRecord the last Kinesis record in the FlowFile.
* @param millisBehindLatest milliseconds behind the latest record in the shard, or null if not available.
* @return a <b>mutable</b> map with kinesis attributes.
*/
static Map<String, String> fromKinesisRecords(
final String streamName,
final String shardId,
final KinesisClientRecord firstRecord,
final KinesisClientRecord lastRecord) {
final Map<String, String> attributes = new HashMap<>(8);
final KinesisClientRecord lastRecord,
final Long millisBehindLatest) {
final Map<String, String> attributes = new HashMap<>(9);

attributes.put(STREAM_NAME, streamName);
attributes.put(SHARD_ID, shardId);
Expand All @@ -72,6 +75,10 @@ static Map<String, String> fromKinesisRecords(
attributes.put(APPROXIMATE_ARRIVAL_TIMESTAMP, String.valueOf(lastRecord.approximateArrivalTimestamp().toEpochMilli()));
}

if (millisBehindLatest != null) {
attributes.put(MILLIS_BEHIND_LATEST, String.valueOf(millisBehindLatest));
}

return attributes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import jakarta.annotation.Nullable;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ConsumeRecordsResult;
import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ShardBufferId;
import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ShardBufferLease;
import software.amazon.kinesis.exceptions.InvalidStateException;
Expand Down Expand Up @@ -96,7 +97,8 @@ public ShardBufferId createBuffer(final String shardId) {
}

@Override
public void addRecords(final ShardBufferId bufferId, final List<KinesisClientRecord> records, final RecordProcessorCheckpointer checkpointer) {
public void addRecords(final ShardBufferId bufferId, final List<KinesisClientRecord> records,
final RecordProcessorCheckpointer checkpointer, final Long millisBehindLatest) {
if (records.isEmpty()) {
return;
}
Expand All @@ -112,7 +114,7 @@ public void addRecords(final ShardBufferId bufferId, final List<KinesisClientRec
return;
}

final RecordBatch recordBatch = new RecordBatch(records, checkpointer, calculateMemoryUsage(records));
final RecordBatch recordBatch = new RecordBatch(records, checkpointer, calculateMemoryUsage(records), millisBehindLatest);
memoryTracker.reserveMemory(recordBatch, bufferId);
final boolean addedRecords = buffer.offer(recordBatch);

Expand Down Expand Up @@ -211,18 +213,18 @@ public Optional<Lease> acquireBufferLease() {
}

@Override
public List<KinesisClientRecord> consumeRecords(final Lease lease) {
public ConsumeRecordsResult consumeRecords(final Lease lease) {
if (lease.isReturnedToPool()) {
logger.warn("Attempting to consume records from a buffer that was already returned to the pool. Ignoring");
return emptyList();
return ConsumeRecordsResult.empty();
}

final ShardBufferId bufferId = lease.bufferId();

final ShardBuffer buffer = shardBuffers.get(bufferId);
if (buffer == null) {
logger.debug("Buffer with id {} not found. Cannot consume records", bufferId);
return emptyList();
return ConsumeRecordsResult.empty();
}

return buffer.consumeRecords();
Expand Down Expand Up @@ -398,7 +400,8 @@ void freeMemory(final Collection<RecordBatch> consumedBatches, final ShardBuffer

private record RecordBatch(List<KinesisClientRecord> records,
RecordProcessorCheckpointer checkpointer,
long batchSizeBytes) {
long batchSizeBytes,
Long millisBehindLatest) {
int size() {
return records.size();
}
Expand Down Expand Up @@ -489,9 +492,9 @@ boolean offer(final RecordBatch recordBatch) {
return true;
}

List<KinesisClientRecord> consumeRecords() {
ConsumeRecordsResult consumeRecords() {
if (invalidated.get()) {
return emptyList();
return ConsumeRecordsResult.empty();
}

RecordBatch pendingBatch;
Expand All @@ -500,11 +503,15 @@ List<KinesisClientRecord> consumeRecords() {
}

final List<KinesisClientRecord> recordsToConsume = new ArrayList<>();
Long lastMillisBehindLatest = null;
for (final RecordBatch batch : inProgressBatches) {
recordsToConsume.addAll(batch.records());
if (batch.millisBehindLatest != null) {
lastMillisBehindLatest = batch.millisBehindLatest;
}
}

return recordsToConsume;
return new ConsumeRecordsResult(recordsToConsume, lastMillisBehindLatest);
}

List<RecordBatch> commitConsumedRecords() {
Expand Down
Loading
Loading