Skip to content

Commit 1d2c25f

Browse files
NIFI-15795 Fixed Email Processors Executor Service handling (#11105)
- Relocated Scheduled Executor Service creation to instance level with creation and shutdown in Processor lifecycle methods - Replaced SLF4J Logger references with standard Component Log
1 parent 05ea8b9 commit 1d2c25f

File tree

1 file changed

+76
-68
lines changed
  • nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email

1 file changed

+76
-68
lines changed

nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java

Lines changed: 76 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import org.apache.nifi.processor.Relationship;
3737
import org.apache.nifi.processor.exception.ProcessException;
3838
import org.apache.nifi.processor.util.StandardValidators;
39-
import org.slf4j.Logger;
40-
import org.slf4j.LoggerFactory;
4139
import org.springframework.beans.factory.support.StaticListableBeanFactory;
4240
import org.springframework.context.expression.BeanFactoryResolver;
4341
import org.springframework.expression.spel.support.StandardEvaluationContext;
@@ -51,13 +49,13 @@
5149
import java.util.Arrays;
5250
import java.util.List;
5351
import java.util.Map.Entry;
54-
import java.util.Optional;
5552
import java.util.Properties;
5653
import java.util.Set;
5754
import java.util.concurrent.ArrayBlockingQueue;
5855
import java.util.concurrent.BlockingQueue;
5956
import java.util.concurrent.Executors;
6057
import java.util.concurrent.ScheduledExecutorService;
58+
import java.util.concurrent.ThreadFactory;
6159
import java.util.concurrent.TimeUnit;
6260

6361
/**
@@ -181,7 +179,7 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
181179
REL_SUCCESS
182180
);
183181

184-
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
182+
private ScheduledExecutorService scheduledExecutorService;
185183

186184
protected volatile T messageReceiver;
187185

@@ -191,34 +189,44 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
191189

192190
private volatile ProcessSession processSession;
193191

194-
protected volatile Optional<OAuth2AccessTokenProvider> oauth2AccessTokenProviderOptional;
195-
protected volatile AccessToken oauth2AccessDetails;
192+
private volatile OAuth2AccessTokenProvider accessTokenProvider;
193+
private volatile AccessToken currentAccessToken;
196194

197195
protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
198196
return PROPERTY_DESCRIPTORS;
199197
}
200198

201199
@OnScheduled
202200
public void onScheduled(final ProcessContext context) {
201+
final ThreadFactory threadFactory = Thread.ofPlatform()
202+
.name("%s[%s]-MailReceiver".formatted(getClass().getSimpleName(), getIdentifier()))
203+
.factory();
204+
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
205+
203206
if (context.getProperty(AUTHORIZATION_MODE).getValue().equals(OAUTH_AUTHORIZATION_MODE.getValue())) {
204207
OAuth2AccessTokenProvider oauth2AccessTokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
205208

206-
oauth2AccessTokenProviderOptional = Optional.of(oauth2AccessTokenProvider);
207-
oauth2AccessDetails = oauth2AccessTokenProvider.getAccessDetails();
209+
accessTokenProvider = oauth2AccessTokenProvider;
210+
currentAccessToken = oauth2AccessTokenProvider.getAccessDetails();
208211
} else {
209-
oauth2AccessTokenProviderOptional = Optional.empty();
210-
oauth2AccessDetails = null;
212+
accessTokenProvider = null;
213+
currentAccessToken = null;
211214
}
212215
}
213216

214217
@OnStopped
215-
public void stop(ProcessContext processContext) {
216-
this.flushRemainingMessages(processContext);
218+
public void onStopped() {
219+
flushRemainingMessages();
217220
try {
218-
this.messageReceiver.destroy();
219-
this.messageReceiver = null;
220-
} catch (Exception e) {
221-
this.logger.warn("Failure while closing processor", e);
221+
messageReceiver.destroy();
222+
messageReceiver = null;
223+
} catch (final Exception e) {
224+
getLogger().warn("Failed to close Mail Receiver", e);
225+
}
226+
227+
if (scheduledExecutorService != null) {
228+
scheduledExecutorService.shutdownNow();
229+
scheduledExecutorService = null;
222230
}
223231
}
224232

@@ -228,12 +236,12 @@ public Set<Relationship> getRelationships() {
228236
}
229237

230238
@Override
231-
public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
232-
this.initializeIfNecessary(context, processSession);
239+
public void onTrigger(final ProcessContext context, final ProcessSession session) {
240+
initializeIfNecessary(context, session);
233241

234-
Message emailMessage = this.receiveMessage();
242+
final Message emailMessage = receiveMessage();
235243
if (emailMessage != null) {
236-
this.transfer(emailMessage, context, processSession);
244+
transfer(emailMessage, session);
237245
}
238246
}
239247

@@ -273,17 +281,18 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
273281
*/
274282
protected abstract String getProtocol(ProcessContext processContext);
275283

276-
/**
277-
* Builds the url used to connect to the email server.
278-
*/
279-
String buildUrl(ProcessContext processContext) {
284+
String buildUrl(final ProcessContext processContext) {
280285
String host = processContext.getProperty(HOST).evaluateAttributeExpressions().getValue();
281286
String port = processContext.getProperty(PORT).evaluateAttributeExpressions().getValue();
282287
String user = processContext.getProperty(USER).evaluateAttributeExpressions().getValue();
283288

284-
String password = oauth2AccessTokenProviderOptional.map(oauth2AccessTokenProvider ->
285-
oauth2AccessTokenProvider.getAccessDetails().getAccessToken()
286-
).orElse(processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue());
289+
final String password;
290+
if (accessTokenProvider == null) {
291+
password = processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
292+
} else {
293+
final AccessToken accessDetails = accessTokenProvider.getAccessDetails();
294+
password = accessDetails.getAccessToken();
295+
}
287296

288297
String folder = processContext.getProperty(FOLDER).evaluateAttributeExpressions().getValue();
289298

@@ -307,7 +316,7 @@ String buildUrl(ProcessContext processContext) {
307316
int passwordEndIndex = urlBuilder.indexOf("@");
308317
urlBuilder.replace(passwordStartIndex, passwordEndIndex, "[password]");
309318
this.displayUrl = protocol + "://" + urlBuilder;
310-
this.logger.info("Connecting to server [{}]", this.displayUrl);
319+
getLogger().info("Connecting to server [{}]", this.displayUrl);
311320

312321
return finalUrl;
313322
}
@@ -318,7 +327,7 @@ String buildUrl(ProcessContext processContext) {
318327
* and is ready to receive messages.
319328
*/
320329
private synchronized void initializeIfNecessary(ProcessContext context, ProcessSession processSession) {
321-
if (this.messageReceiver == null || isOauth2AccessDetailsRefreshed()) {
330+
if (this.messageReceiver == null || isAccessTokenRefreshRequired()) {
322331
this.processSession = processSession;
323332
this.messageReceiver = this.buildMessageReceiver(context);
324333

@@ -329,23 +338,26 @@ private synchronized void initializeIfNecessary(ProcessContext context, ProcessS
329338
// Spring Integration 7 expects an evaluation context bean; register a lightweight one for the receiver
330339
final StaticListableBeanFactory beanFactory = new StaticListableBeanFactory();
331340
final StandardEvaluationContext evaluationContext = new StandardEvaluationContext();
332-
final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
333341
evaluationContext.setBeanResolver(new BeanFactoryResolver(beanFactory));
334342
beanFactory.addBean(IntegrationContextUtils.INTEGRATION_EVALUATION_CONTEXT_BEAN_NAME, evaluationContext);
335-
beanFactory.addBean(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME, new ConcurrentTaskScheduler(scheduledExecutor));
343+
beanFactory.addBean(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME, new ConcurrentTaskScheduler(scheduledExecutorService));
336344
this.messageReceiver.setBeanFactory(beanFactory);
337345
this.messageReceiver.afterPropertiesSet();
338346

339347
this.messageQueue = new ArrayBlockingQueue<>(fetchSize);
340348
}
341349
}
342350

343-
private boolean isOauth2AccessDetailsRefreshed() {
344-
boolean oauthDetailsRefreshed = this.oauth2AccessTokenProviderOptional.isPresent()
345-
&&
346-
(this.oauth2AccessDetails == null || !oauth2AccessDetails.equals(this.oauth2AccessTokenProviderOptional.get().getAccessDetails()));
351+
private boolean isAccessTokenRefreshRequired() {
352+
final boolean refreshRequired;
353+
354+
if (accessTokenProvider == null) {
355+
refreshRequired = false;
356+
} else {
357+
refreshRequired = currentAccessToken == null || !currentAccessToken.equals(accessTokenProvider.getAccessDetails());
358+
}
347359

348-
return oauthDetailsRefreshed;
360+
return refreshRequired;
349361
}
350362

351363
/**
@@ -369,7 +381,10 @@ private Properties buildJavaMailProperties(ProcessContext context) {
369381
final String timeoutInMillis = String.valueOf(context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS));
370382
javaMailProperties.setProperty(propertyName, timeoutInMillis);
371383

372-
oauth2AccessTokenProviderOptional.ifPresent(oauth2AccessTokenProvider -> javaMailProperties.put("mail." + protocol + ".auth.mechanisms", "XOAUTH2"));
384+
if (accessTokenProvider != null) {
385+
final String authMechanismsProperty = "mail.%s.auth.mechanisms".formatted(protocol);
386+
javaMailProperties.put(authMechanismsProperty, "XOAUTH2");
387+
}
373388

374389
return javaMailProperties;
375390
}
@@ -415,37 +430,33 @@ private boolean isClosedException(final MessagingException exception) {
415430
return closedException;
416431
}
417432

418-
/**
419-
* Disposes the message by converting it to a {@link FlowFile} transferring
420-
* it to the REL_SUCCESS relationship.
421-
*/
422-
private void transfer(Message emailMessage, ProcessContext context, ProcessSession processSession) {
423-
long start = System.nanoTime();
424-
FlowFile flowFile = processSession.create();
433+
private void transfer(final Message message, final ProcessSession session) {
434+
final long started = System.nanoTime();
435+
FlowFile flowFile = session.create();
425436

426-
flowFile = processSession.append(flowFile, out -> {
437+
flowFile = session.write(flowFile, out -> {
427438
try {
428-
emailMessage.writeTo(out);
429-
} catch (MessagingException e) {
430-
throw new IOException(e);
439+
message.writeTo(out);
440+
} catch (final MessagingException e) {
441+
throw new IOException("Message [%d] serialization failed".formatted(message.getMessageNumber()), e);
431442
}
432443
});
433444

434-
long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
445+
final long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - started);
435446

436-
String fromAddressesString = "";
447+
String fromAddresses = "";
437448
try {
438-
Address[] fromAddresses = emailMessage.getFrom();
439-
if (fromAddresses != null) {
440-
fromAddressesString = Arrays.asList(fromAddresses).toString();
449+
final Address[] from = message.getFrom();
450+
if (from != null) {
451+
fromAddresses = Arrays.asList(from).toString();
441452
}
442-
} catch (MessagingException e) {
443-
this.logger.warn("Failed to retrieve 'From' attribute from Message.");
453+
} catch (final MessagingException e) {
454+
getLogger().warn("Failed to retrieve [From] address from Message [{}]", message.getMessageNumber());
444455
}
445456

446-
processSession.getProvenanceReporter().receive(flowFile, this.displayUrl, "Received message from " + fromAddressesString, executionDuration);
447-
this.getLogger().info("Successfully received {} from {} in {} millis", flowFile, fromAddressesString, executionDuration);
448-
processSession.transfer(flowFile, REL_SUCCESS);
457+
session.getProvenanceReporter().receive(flowFile, displayUrl, "Received message from " + fromAddresses, executionDuration);
458+
getLogger().info("Received {} from {} in {} millis", flowFile, fromAddresses, executionDuration);
459+
session.transfer(flowFile, REL_SUCCESS);
449460

450461
}
451462

@@ -458,26 +469,23 @@ private Message receiveMessage() {
458469
try {
459470
this.fillMessageQueueIfNecessary();
460471
emailMessage = this.messageQueue.poll(1, TimeUnit.MILLISECONDS);
461-
} catch (InterruptedException e) {
472+
} catch (final InterruptedException e) {
462473
Thread.currentThread().interrupt();
463-
this.logger.debug("Current thread is interrupted");
474+
getLogger().debug("Interrupted while receiving messages");
464475
}
465476
return emailMessage;
466477
}
467478

468-
/**
469-
* Will flush the remaining messages when this processor is stopped.
470-
*/
471-
private void flushRemainingMessages(ProcessContext processContext) {
479+
private void flushRemainingMessages() {
472480
Message emailMessage;
473481
try {
474482
while ((emailMessage = this.messageQueue.poll(1, TimeUnit.MILLISECONDS)) != null) {
475-
this.transfer(emailMessage, processContext, this.processSession);
476-
this.processSession.commitAsync();
483+
transfer(emailMessage, processSession);
484+
processSession.commitAsync();
477485
}
478-
} catch (InterruptedException e) {
486+
} catch (final InterruptedException e) {
479487
Thread.currentThread().interrupt();
480-
this.logger.debug("Current thread is interrupted");
488+
getLogger().debug("Interrupted while processing remaining messages");
481489
}
482490
}
483491
}

0 commit comments

Comments
 (0)