Skip to content

Commit 2ce6d5b

Browse files
authored
Merge pull request #1760 from alexliahushau/main
ISSUE-240 OpenTelemetry propagate flow-id via HTTP client headers and…
2 parents 3e29747 + 8cdeadd commit 2ce6d5b

File tree

7 files changed

+194
-33
lines changed

7 files changed

+194
-33
lines changed

riptide-opentelemetry/src/main/java/org/zalando/riptide/opentelemetry/span/ExtensionAttributes.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.opentelemetry.api.common.AttributeKey;
44

55
public class ExtensionAttributes {
6+
public static final AttributeKey<String> FLOW_ID = AttributeKey.stringKey("flow_id");
67
public static final AttributeKey<String> HTTP_PATH = AttributeKey.stringKey("http.path");
78
public static final AttributeKey<Boolean> RETRY = AttributeKey.booleanKey("retry");
89
public static final AttributeKey<String> PEER_HOST = AttributeKey.stringKey("peer.hostname");
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.zalando.riptide.opentelemetry.span;
2+
3+
import io.opentelemetry.api.baggage.Baggage;
4+
import io.opentelemetry.api.trace.Span;
5+
import org.zalando.riptide.RequestArguments;
6+
7+
import java.util.Optional;
8+
9+
import lombok.RequiredArgsConstructor;
10+
11+
import static org.zalando.riptide.opentelemetry.span.ExtensionAttributes.FLOW_ID;
12+
13+
@RequiredArgsConstructor
14+
public class FlowIdSpanDecorator implements SpanDecorator {
15+
16+
public static final String FLOW_ID_HEADER = "X-Flow-ID";
17+
18+
private final boolean propagateFlowId;
19+
20+
@Override
21+
public void onRequest(Span span, RequestArguments arguments) {
22+
Optional.ofNullable(Baggage.current().getEntryValue(FLOW_ID.getKey()))
23+
.ifPresent(flowId -> {
24+
span.setAttribute(FLOW_ID, flowId);
25+
26+
if (propagateFlowId) {
27+
arguments.withHeader(FLOW_ID_HEADER, flowId);
28+
}
29+
});
30+
}
31+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.zalando.riptide.opentelemetry.span;
2+
3+
import io.opentelemetry.api.baggage.Baggage;
4+
import io.opentelemetry.api.trace.Span;
5+
import org.junit.jupiter.api.BeforeEach;
6+
import org.junit.jupiter.api.Test;
7+
import org.mockito.MockedStatic;
8+
import org.zalando.riptide.RequestArguments;
9+
10+
import static org.mockito.Mockito.*;
11+
12+
class FlowIdSpanDecoratorTest {
13+
14+
private static final String FLOW_ID = "flow-id-value";
15+
private static final String FLOW_ID_HEADER = "X-Flow-ID";
16+
17+
private Span span;
18+
private RequestArguments arguments;
19+
20+
@BeforeEach
21+
void setUp() {
22+
span = mock(Span.class);
23+
arguments = mock(RequestArguments.class);
24+
}
25+
26+
@Test
27+
void shouldSetFlowIdAttributeAndPropagateHeaderWhenEnabled() {
28+
final FlowIdSpanDecorator decorator = new FlowIdSpanDecorator(true);
29+
final Baggage baggage = mock(Baggage.class);
30+
when(baggage.getEntryValue(ExtensionAttributes.FLOW_ID.getKey())).thenReturn(FLOW_ID);
31+
32+
try (MockedStatic<Baggage> mockedBaggage = mockStatic(Baggage.class)) {
33+
mockedBaggage.when(Baggage::current).thenReturn(baggage);
34+
35+
decorator.onRequest(span, arguments);
36+
37+
verify(span).setAttribute(ExtensionAttributes.FLOW_ID, FLOW_ID);
38+
verify(arguments).withHeader(FLOW_ID_HEADER, FLOW_ID);
39+
}
40+
}
41+
42+
@Test
43+
void shouldSetFlowIdAttributeWithoutPropagatingHeaderWhenDisabled() {
44+
final FlowIdSpanDecorator decorator = new FlowIdSpanDecorator(false);
45+
final Baggage baggage = mock(Baggage.class);
46+
when(baggage.getEntryValue(ExtensionAttributes.FLOW_ID.getKey())).thenReturn(FLOW_ID);
47+
48+
try (MockedStatic<Baggage> mockedBaggage = mockStatic(Baggage.class)) {
49+
mockedBaggage.when(Baggage::current).thenReturn(baggage);
50+
51+
decorator.onRequest(span, arguments);
52+
53+
verify(span).setAttribute(ExtensionAttributes.FLOW_ID, FLOW_ID);
54+
verify(arguments, never()).withHeader(FLOW_ID_HEADER, FLOW_ID);
55+
}
56+
}
57+
58+
@Test
59+
void shouldDoNothingWhenFlowIdIsAbsent() {
60+
final FlowIdSpanDecorator decorator = new FlowIdSpanDecorator(true);
61+
final Baggage baggage = mock(Baggage.class);
62+
when(baggage.getEntryValue(ExtensionAttributes.FLOW_ID.getKey())).thenReturn(null);
63+
64+
try (MockedStatic<Baggage> mockedBaggage = mockStatic(Baggage.class)) {
65+
mockedBaggage.when(Baggage::current).thenReturn(baggage);
66+
67+
decorator.onRequest(span, arguments);
68+
69+
verify(span, never()).setAttribute(anyString(), anyString());
70+
verify(arguments, never()).withHeader(anyString(), anyString());
71+
}
72+
}
73+
}

riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/Defaulting.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ private static <K, V> Map<K, V> merge(final Map<K, V> base, final Map<K, V> defa
306306
private static Telemetry merge(final Telemetry base, final Telemetry defaults) {
307307
return new Telemetry(
308308
either(base.getEnabled(), defaults.getEnabled()),
309-
merge(base.getAttributes(), defaults.getAttributes(), Defaulting::merge)
309+
merge(base.getAttributes(), defaults.getAttributes(), Defaulting::merge),
310+
either(base.getPropagateFlowId(), defaults.getPropagateFlowId())
310311
);
311312
}
312313

riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/OpenTelemetryPluginFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.zalando.riptide.Plugin;
44
import org.zalando.riptide.opentelemetry.OpenTelemetryPlugin;
5+
import org.zalando.riptide.opentelemetry.span.FlowIdSpanDecorator;
56
import org.zalando.riptide.opentelemetry.span.RetrySpanDecorator;
67
import org.zalando.riptide.opentelemetry.span.SpanDecorator;
78
import org.zalando.riptide.opentelemetry.span.StaticSpanDecorator;
@@ -16,10 +17,12 @@ private OpenTelemetryPluginFactory() {
1617

1718
public static Plugin create(final RiptideProperties.Client client) {
1819
final List<SpanDecorator> decorators = new ArrayList<>();
19-
decorators.add(new StaticSpanDecorator(client.getTelemetry().getAttributes()));
20+
final var clientTelemetryConfig = client.getTelemetry();
21+
decorators.add(new StaticSpanDecorator(clientTelemetryConfig.getAttributes()));
2022
if (client.getRetry().getEnabled()) {
2123
decorators.add(new RetrySpanDecorator());
2224
}
25+
decorators.add(new FlowIdSpanDecorator(clientTelemetryConfig.getPropagateFlowId()));
2326
return new OpenTelemetryPlugin(decorators.toArray(new SpanDecorator[0]));
2427
}
2528
}

riptide-spring-boot-autoconfigure/src/main/java/org/zalando/riptide/autoconfigure/RiptideProperties.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public static final class Defaults {
132132
private Tracing tracing = new Tracing(false, emptyMap(), false);
133133

134134
@NestedConfigurationProperty
135-
private Telemetry telemetry = new Telemetry(false, emptyMap());
135+
private Telemetry telemetry = new Telemetry(false, emptyMap(), false);
136136

137137
@NestedConfigurationProperty
138138
private Chaos chaos = new Chaos(
@@ -419,6 +419,7 @@ public static final class Tracing {
419419
public static final class Telemetry {
420420
private Boolean enabled;
421421
private Map<String, String> attributes;
422+
private Boolean propagateFlowId;
422423
}
423424

424425
@Getter

riptide-spring-boot-autoconfigure/src/test/java/org/zalando/riptide/autoconfigure/OpenTelemetryPluginFactoryTest.java

Lines changed: 81 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package org.zalando.riptide.autoconfigure;
22

3-
import org.junit.jupiter.api.Test;
43
import org.junit.jupiter.params.ParameterizedTest;
54
import org.junit.jupiter.params.provider.CsvSource;
65
import org.springframework.util.ReflectionUtils;
76
import org.zalando.riptide.Plugin;
87
import org.zalando.riptide.opentelemetry.OpenTelemetryPlugin;
98
import org.zalando.riptide.opentelemetry.span.CompositeSpanDecorator;
9+
import org.zalando.riptide.opentelemetry.span.FlowIdSpanDecorator;
1010
import org.zalando.riptide.opentelemetry.span.RetrySpanDecorator;
1111
import org.zalando.riptide.opentelemetry.span.SpanDecorator;
1212

@@ -16,68 +16,119 @@
1616
import java.util.stream.StreamSupport;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19-
import static org.junit.jupiter.api.Assertions.*;
2019

2120
public class OpenTelemetryPluginFactoryTest {
2221

2322
private static final Field SPAN_DECORATOR_FIELD = ReflectionUtils.findField(OpenTelemetryPlugin.class, "spanDecorator");
2423
private static final Field DECORATORS_FIELD = ReflectionUtils.findField(CompositeSpanDecorator.class, "decorators");
24+
private static final Field PROPAGATE_FLOW_ID_FIELD = ReflectionUtils.findField(FlowIdSpanDecorator.class, "propagateFlowId");
2525

2626
static {
2727
SPAN_DECORATOR_FIELD.setAccessible(true);
2828
DECORATORS_FIELD.setAccessible(true);
29+
PROPAGATE_FLOW_ID_FIELD.setAccessible(true);
2930
}
3031

3132
@ParameterizedTest
3233
@CsvSource({
3334
"true, RetrySpanDecorator should be added when retry is enabled",
3435
"false, RetrySpanDecorator should not be added when retry is disabled"
3536
})
36-
void shouldCreatePluginWithRetrySpanDecoratorWhenClientRetryEnabled(
37-
final boolean isRetryEnabled,
38-
final String message
37+
void shouldAddRetrySpanDecoratorWhenEnabled(
38+
final boolean isRetryEnabled,
39+
final String message
3940
) throws IllegalAccessException {
40-
final Plugin plugin = OpenTelemetryPluginFactory.create(createTestClient(isRetryEnabled));
41+
final Plugin plugin = OpenTelemetryPluginFactory.create(createTestClient(isRetryEnabled, false));
4142

42-
assertNotNull(plugin);
43-
assertInstanceOf(OpenTelemetryPlugin.class, plugin);
43+
assertThat(plugin)
44+
.isNotNull()
45+
.isInstanceOf(OpenTelemetryPlugin.class);
4446

45-
final Optional<SpanDecorator> innerCompositeDecorator = StreamSupport.stream(
46-
((Iterable<SpanDecorator>) DECORATORS_FIELD.get(SPAN_DECORATOR_FIELD.get(plugin))).spliterator(), false
47-
).filter(decorator -> decorator instanceof CompositeSpanDecorator).findFirst();
47+
final Optional<CompositeSpanDecorator> innerCompositeDecorator = getDecorator(
48+
(SpanDecorator) SPAN_DECORATOR_FIELD.get(plugin),
49+
CompositeSpanDecorator.class
50+
);
4851

4952
assertThat(innerCompositeDecorator).isPresent();
50-
assertThat(
51-
StreamSupport.stream(
52-
((Iterable<SpanDecorator>) DECORATORS_FIELD.get(innerCompositeDecorator.get())).spliterator(), false
53-
).anyMatch(decorator -> decorator instanceof RetrySpanDecorator)
54-
).withFailMessage(message).isEqualTo(isRetryEnabled);
53+
assertThat(getDecorator(innerCompositeDecorator.get(), RetrySpanDecorator.class).isPresent())
54+
.withFailMessage(message)
55+
.isEqualTo(isRetryEnabled);
5556
}
5657

57-
@Test
58-
void shouldCreatePluginWithoutRetrySpanDecorator() throws IllegalAccessException {
59-
final Plugin plugin = OpenTelemetryPluginFactory.create(createTestClient(false));
6058

61-
assertNotNull(plugin);
62-
assertInstanceOf(OpenTelemetryPlugin.class, plugin);
59+
@ParameterizedTest
60+
@CsvSource({
61+
"true, FlowIdSpanDecorator should be added when client telemetry is enabled",
62+
"false, FlowIdSpanDecorator should be added when client telemetry is disabled"
63+
})
64+
void shouldAlwaysAddFlowIdSpanDecorator(
65+
final boolean isFlowIdPropagationEnabled,
66+
final String message
67+
) throws IllegalAccessException {
68+
final Plugin plugin = OpenTelemetryPluginFactory.create(createTestClient(false, isFlowIdPropagationEnabled));
6369

64-
final Optional<SpanDecorator> innerCompositeDecorator = StreamSupport.stream(
65-
((Iterable<SpanDecorator>) DECORATORS_FIELD.get(SPAN_DECORATOR_FIELD.get(plugin))).spliterator(), false
66-
).filter(decorator -> decorator instanceof CompositeSpanDecorator).findFirst();
70+
assertThat(plugin)
71+
.isNotNull()
72+
.isInstanceOf(OpenTelemetryPlugin.class);
73+
74+
final Optional<CompositeSpanDecorator> innerCompositeDecorator = getDecorator(
75+
(SpanDecorator) SPAN_DECORATOR_FIELD.get(plugin),
76+
CompositeSpanDecorator.class
77+
);
6778

6879
assertThat(innerCompositeDecorator).isPresent();
69-
assertTrue(
70-
StreamSupport.stream(
71-
((Iterable<SpanDecorator>) DECORATORS_FIELD.get(innerCompositeDecorator.get())).spliterator(), false
72-
).noneMatch(decorator -> decorator instanceof RetrySpanDecorator),
73-
"RetrySpanDecorator should not be added when retry is disabled"
80+
assertThat(getDecorator(innerCompositeDecorator.get(), FlowIdSpanDecorator.class))
81+
.withFailMessage(message)
82+
.isPresent();
83+
}
84+
85+
@ParameterizedTest
86+
@CsvSource({
87+
"true, flow id propagation should be enabled",
88+
"false, flow id propagation should be enabled be disabled"
89+
})
90+
void shouldAddFlowIdHeaderWhenPropagateFlowIdEnabled(
91+
final boolean isFlowIdPropagationEnabled,
92+
final String message
93+
) throws IllegalAccessException {
94+
final Plugin plugin = OpenTelemetryPluginFactory.create(createTestClient(false, isFlowIdPropagationEnabled));
95+
96+
assertThat(plugin)
97+
.isNotNull()
98+
.isInstanceOf(OpenTelemetryPlugin.class);
99+
100+
final Optional<CompositeSpanDecorator> innerCompositeDecorator = getDecorator(
101+
(SpanDecorator) SPAN_DECORATOR_FIELD.get(plugin),
102+
CompositeSpanDecorator.class
74103
);
104+
105+
assertThat(innerCompositeDecorator).isPresent();
106+
107+
final Optional<FlowIdSpanDecorator> decoratorCandidate = getDecorator(innerCompositeDecorator.get(), FlowIdSpanDecorator.class);
108+
assertThat(decoratorCandidate).isNotEmpty();
109+
110+
final FlowIdSpanDecorator decorator = decoratorCandidate.get();
111+
assertThat(PROPAGATE_FLOW_ID_FIELD.getBoolean(decorator))
112+
.withFailMessage(message)
113+
.isEqualTo(isFlowIdPropagationEnabled);
114+
}
115+
116+
private static <T extends SpanDecorator> Optional<T> getDecorator(
117+
final SpanDecorator decorator,
118+
final Class<T> decoratorType
119+
) throws IllegalAccessException {
120+
return StreamSupport.stream(((Iterable<SpanDecorator>) DECORATORS_FIELD.get(decorator)).spliterator(), false)
121+
.filter(decoratorType::isInstance)
122+
.map(decoratorType::cast)
123+
.findFirst();
75124
}
76125

77-
private static RiptideProperties.Client createTestClient(final boolean retryEnabled) {
126+
private static RiptideProperties.Client createTestClient(final boolean retryEnabled,
127+
final boolean propagateFlowId) {
78128
final RiptideProperties.Client client = new RiptideProperties.Client();
79129
final RiptideProperties.Telemetry telemetry = new RiptideProperties.Telemetry();
80130
telemetry.setAttributes(Map.of("service.name", "test-service"));
131+
telemetry.setPropagateFlowId(propagateFlowId);
81132
client.setTelemetry(telemetry);
82133

83134
final RiptideProperties.Retry retry = new RiptideProperties.Retry();

0 commit comments

Comments
 (0)