Skip to content

Commit 05ea8b9

Browse files
authored
NIFI-15792 Added background message consumption in ConsumeKinesis (#11102)
Trigger background consumption of messages in ConsumeKinesis in such a way that data is pulled immediately instead of waiting on session commit to complete; when processor is stopped we also quickly bail out of our data gathering loop. Also found in Stateless that ConsumeKinesis errors when needing to create the DynamoDB table because stateless defaults to a 10 second timeout for @OnScheduled while standard defaults to a 1 minute timeout so updated to 1 min for stateless in order to make them consistent Signed-off-by: David Handermann <[email protected]>
1 parent 5cbafbc commit 05ea8b9

File tree

5 files changed

+38
-3
lines changed

5 files changed

+38
-3
lines changed

nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ private List<ShardFetchResult> consumeRecords(final Set<String> claimedShards) {
708708
final long startNanos = System.nanoTime();
709709
long estimatedBytes = 0;
710710

711-
while (System.nanoTime() < startNanos + maxBatchNanos && estimatedBytes < maxBatchBytes) {
711+
while (isScheduled() && System.nanoTime() < startNanos + maxBatchNanos && estimatedBytes < maxBatchBytes) {
712712
final List<String> readyShards = consumerClient.getShardIdsWithResults();
713713
if (readyShards.isEmpty()) {
714714
if (!consumerClient.hasPendingFetches()) {

nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/EnhancedFanOutClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@ long drainDeduplicatedEventCount() {
146146
return total;
147147
}
148148

149+
@Override
150+
protected void onResultPolled() {
151+
resumePausedConsumers();
152+
}
153+
149154
@Override
150155
void acknowledgeResults(final List<ShardFetchResult> results) {
151156
resumePausedConsumers();
@@ -355,6 +360,7 @@ static final class ShardConsumer {
355360
private volatile long lastSubscribeAttemptNanos;
356361
private volatile BigInteger lastQueuedSequenceNumber;
357362
private volatile boolean shardNotFound;
363+
358364
ShardConsumer(final String shardId, final Consumer<ShardFetchResult> resultSink,
359365
final Queue<ShardConsumer> pausedConsumers, final ComponentLog consumerLogger) {
360366
this.shardId = shardId;

nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/KinesisConsumerClientTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,35 @@ void testSubscriptionRenewalAfterPollBeforeAcknowledgeUsesMaxSequence() throws E
204204
verify(mockShardManager, times(2)).readCheckpoint("shardId-000000000001");
205205
}
206206

207+
/**
208+
* Verifies that polling a result from the per-shard queue resumes a paused EFO consumer.
209+
* Without this, single-threaded environments (such as Stateless) starve: the consumer
210+
* pauses after delivering one event, and no other thread calls startFetches or
211+
* acknowledgeResults to resume it during the consumeRecords loop.
212+
*/
213+
@Test
214+
void testPollingResultResumesPausedEfoConsumer() throws Exception {
215+
final KinesisShardManager mockShardManager = mock(KinesisShardManager.class);
216+
when(mockShardManager.readCheckpoint("shardId-000000000001")).thenReturn("50000");
217+
218+
final List<SubscribeToShardRequest> capturedRequests = new ArrayList<>();
219+
final EnhancedFanOutClient client = createEfoClient(capturedRequests);
220+
final List<Shard> shards = List.of(Shard.builder().shardId("shardId-000000000001").build());
221+
client.startFetches(shards, "test-stream", 100, "TRIM_HORIZON", mockShardManager);
222+
223+
final EnhancedFanOutClient.ShardConsumer consumer = client.getShardConsumer("shardId-000000000001");
224+
final Subscription subscription = mock(Subscription.class);
225+
consumer.setSubscription(subscription);
226+
consumer.pause();
227+
228+
client.enqueueResult(shardFetchResult("shardId-000000000001", "60000"));
229+
230+
final ShardFetchResult polled = client.pollShardResult("shardId-000000000001");
231+
assertNotNull(polled, "Expected queued result to be polled");
232+
233+
verify(subscription, times(1)).request(1);
234+
}
235+
207236
/**
208237
* Verifies that acknowledging multiple fetched results from the same shard requests only one
209238
* additional EFO event for that shard.

nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public StatelessEngineConfiguration parseEngineConfiguration(final File properti
114114

115115
final String statusTaskInterval = properties.getProperty(STATUS_TASK_INTERVAL, "1 min");
116116

117-
final String processorStartTimeout = properties.getProperty(PROCESSOR_START_TIMEOUT, "10 secs");
117+
final String processorStartTimeout = properties.getProperty(PROCESSOR_START_TIMEOUT, "1 min");
118118
final String componentEnableTimeout = properties.getProperty(COMPONENT_ENABLE_TIMEOUT, "10 secs");
119119

120120
return new StatelessEngineConfiguration() {

nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ default boolean isLogExtensionDiscovery() {
9595
* Defaults to "10 secs"
9696
*/
9797
default String getProcessorStartTimeout() {
98-
return "10 secs";
98+
return "1 min";
9999
}
100100

101101
/**

0 commit comments

Comments
 (0)