Skip to content

jms synchronous call to RabbitMQ (with spring boot context) - setting amqp = true for tmp queues #1208

@adamlukaszewski

Description

@adamlukaszewski

Citrus Version

4.1.0

Question

How can I enable amqp for temporary created RMQDestinations (jms-temp-queues) during a synchronous citrus invocation?

What I've tried so far

Some notes before:

  • The architecture of the application is based on messaging with RabbitMQ as the broker
  • The logic is implemented with the in-out pattern (synchronous call): The consumer is putting a message into the queue and waits for a response. That's what I want to test: Ship in a message --> let the application do whatever it has to do --> receive the finished response)
  • JSON will be used as message type.

The use case seems to be very simple. I have this test case:

@CitrusSpringSupport
@ContextConfiguration(classes = {CitrusSpringConfig.class, EndpointConfig.class})
public class TodoListIT {

    @CitrusEndpoint
    private JmsEndpoint personsQueueCreate;


    @Autowired
    private CitrusSpringConfig citrusSpringConfig;

    @Test
    @CitrusTest
    void testPost(@CitrusResource TestCaseRunner test) {
        test.variable("todoName", "citrus:concat('todo_', citrus:randomNumber(4))");
        test.variable("todoDescription", "Description: ${todoName}");

        // Use the endpoint configured without a specific destination, set headers in the send action
        test.$(SendMessageAction.Builder.send()
                        .endpoint(personsQueueCreate)
//                .fork(false)
                        .message()
                        .header(MP.MIDSA_PARAM_PERSON_ID.s(), "46b85fef-02a0-4f3f-bdf1-aaaa")
                        .header(MP.MIDSA_PARAM_BIOMETRIC_INCLUDED.s(), "false")
                        .header(MP.MIDSA_PARAM_MAX_RECENT_MOVEMENTS.s(), 1)
                        .body("{ \"title\": \"${todoName}\", \"description\": \"${todoDescription}\" }")
        );

        test.$(ReceiveMessageAction.Builder.receive()
                .endpoint(personsQueueCreate)
                .message()
                .body("\"Message received\""));
    }
}

Currently I am getting some when the temporary jms consumer (waiting for the response message) is getting the message. The stack shows:

com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227575
org.citrusframework.exceptions.TestCaseFailedException: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227575
	at org.citrusframework.DefaultTestCase.executeAction(DefaultTestCase.java:146)
	at org.citrusframework.DefaultTestCaseRunner.run(DefaultTestCaseRunner.java:129)
	at org.citrusframework.TestActionRunner.$(TestActionRunner.java:35)
	at com.midsa.border.guard.citrus.TodoListIT.testPost(TodoListIT.java:39)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.citrusframework.junit.jupiter.CitrusExtension.lambda$interceptTestMethod$3(CitrusExtension.java:148)
	at org.citrusframework.common.DefaultTestLoader.lambda$doLoad$1(DefaultTestLoader.java:118)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.citrusframework.common.DefaultTestLoader.doLoad(DefaultTestLoader.java:118)
	at org.citrusframework.common.DefaultTestLoader.load(DefaultTestLoader.java:95)
	at org.citrusframework.junit.jupiter.CitrusExtension.interceptTestMethod(CitrusExtension.java:156)
	at org.citrusframework.junit.jupiter.spring.CitrusSpringExtension.interceptTestMethod(CitrusSpringExtension.java:87)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:119)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:94)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:89)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at jdk.proxy1/jdk.proxy1.$Proxy2.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: org.citrusframework.exceptions.CitrusRuntimeException: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227575
	at org.citrusframework.jms.endpoint.JmsSyncProducer.send(JmsSyncProducer.java:149)
	at org.citrusframework.actions.SendMessageAction.doExecute(SendMessageAction.java:175)
	at org.citrusframework.actions.AbstractTestAction.execute(AbstractTestAction.java:59)
	at org.citrusframework.DefaultTestCase.executeAction(DefaultTestCase.java:139)
	... 97 more
Caused by: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B227575
	at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1087)
	at com.rabbitmq.jms.client.RMQMessage.convertJmsMessage(RMQMessage.java:841)
	at com.rabbitmq.jms.client.RMQMessage.convertMessage(RMQMessage.java:835)
	at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:358)
	at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:272)
	at org.citrusframework.jms.endpoint.JmsSyncProducer.send(JmsSyncProducer.java:135)
	... 100 more

Some findings:

  • After some debugging I could narrow down that the implicitly created jms queue (by citrus) waiting on the response message is not able to handle correctly the response object.

  • The application is working, also the correct json message was generated. In the debugger I was also able to convert the byte[] into a string and I have seen the json string which I have expected (I debugged the receive, when the response message was processed). --> This let me assume that the backend and calling logic is working as expected. I could also verify this in my logs and traces of the application.

  • I have also seen in the debugger, that for the temporary created queue amqp = false was set:

    image

    • Could it be that this is the issue? How can I set the amqp option for the temporary created queues to true?

Additional information

  • Since, I am working in a spring context: This is the endpoint configuration I am using:
package com.midsa.border.guard;

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import jakarta.jms.ConnectionFactory;
import org.citrusframework.dsl.endpoint.CitrusEndpoints;
import org.citrusframework.jms.endpoint.JmsEndpoint;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Import(TodoAppAutoConfiguration.class)
@Configuration
public class EndpointConfig {


    @Bean
    public ConnectionFactory connectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("xxxxx");
        connectionFactory.setPassword("xxxxx");
        connectionFactory.setVirtualHost("xxxxxx");
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672); // Replace with appropriate port
        return connectionFactory;
    }


    @Bean
    public RMQDestination jmsDestination() {
        RMQDestination jmsDestination = new RMQDestination();
        jmsDestination.setAmqp(true);
        return jmsDestination;
    }


    @Bean
    public JmsEndpoint personsQueueCreate(ConnectionFactory connectionFactory,
                                          RMQDestination jmsDestination) {
        jmsDestination.setDestinationName("midsa-service-border-guard.persons.get");
        jmsDestination.setAmqpExchangeName("midsa.persons.get");
        jmsDestination.setAmqpRoutingKey("");
        jmsDestination.setAmqpQueueName("midsa-service-border-guard.persons.get");

        return CitrusEndpoints
                .jms()
//                .asynchronous()
                .synchronous()
                .connectionFactory(connectionFactory)
                .destination(jmsDestination)
                .build();
    }
}
  • Is there maybe some springboot resolver logic that allows me to set for all destinations the correct values?

Thanks a lot and I am looking forward to hear from you :).

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions