Skip to content

Commit 02581bc

Browse files
fix(ingress-controller): Add headers for tracing in requests to Konnect to upload configuration (#4062)
Co-authored-by: Jintao Zhang <[email protected]>
1 parent cdc86f8 commit 02581bc

6 files changed

Lines changed: 132 additions & 7 deletions

File tree

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@
4646

4747
### Added
4848

49+
- Add the following headers in requests of `ingress-controller` sent to Konnect
50+
for uploading configuration for tracing:
51+
- `X-Kic-Konnect-Sync-Instance-Id` for instance ID of Konnect config synchronizer.
52+
It is set to use the `ControlPlane`'s instance ID.
53+
- `X-Kic-Konnect-Sync-Serial-Number` for serial number of config sync round.
54+
- `X-Kic-Konnect-Sync-Start-Timestamp` for the timestamp of starting the config sync round.
55+
- `X-Kic-Konnect-Sync-Round-Id` for the ID to mark the config sync round.
56+
[#4062](https://github.com/Kong/kong-operator/pull/4062)
4957
- Add `spec.konnect.configUploadConcurrency` to set the concurrency of uploading
5058
configuration to Konnect in the on-prem gateway integration wit Konnect and
5159
decrease the default concurrency to `4` to reduce the possibility of triggering

ingress-controller/internal/konnect/config_synchronizer.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
"errors"
66
"fmt"
77
"sync"
8+
"sync/atomic"
9+
"time"
810

911
"github.com/go-logr/logr"
12+
"github.com/google/uuid"
1013
"github.com/kong/go-database-reconciler/pkg/file"
1114
"github.com/kong/go-kong/kong"
1215
"github.com/samber/mo"
@@ -18,6 +21,7 @@ import (
1821
"github.com/kong/kong-operator/v2/ingress-controller/internal/dataplane/deckgen"
1922
"github.com/kong/kong-operator/v2/ingress-controller/internal/dataplane/kongstate"
2023
"github.com/kong/kong-operator/v2/ingress-controller/internal/dataplane/sendconfig"
24+
"github.com/kong/kong-operator/v2/ingress-controller/internal/konnect/tracing"
2125
"github.com/kong/kong-operator/v2/ingress-controller/internal/logging"
2226
"github.com/kong/kong-operator/v2/ingress-controller/internal/metrics"
2327
"github.com/kong/kong-operator/v2/ingress-controller/internal/util"
@@ -45,6 +49,11 @@ type ConfigSynchronizer struct {
4549

4650
targetKongState mo.Option[TargetKongState]
4751
configLock sync.RWMutex
52+
53+
// synchronizerID is the identifier to mark the ConfigSynchronizer instance.
54+
synchronizerID string
55+
// serialNumber is the serial number to mark the loop round of config synchronization.
56+
serialNumber atomic.Uint32
4857
}
4958

5059
// TargetKongState wraps the Kong state to be uploaded to Konnect and indicates whether the configuration is a fallback
@@ -74,18 +83,23 @@ type ConfigSynchronizerParams struct {
7483
ConfigChangeDetector sendconfig.ConfigurationChangeDetector
7584
ConfigStatusNotifier clients.ConfigStatusNotifier
7685
MetricsRecorder metrics.Recorder
86+
SynchronizerID string
7787
}
7888

7989
func NewConfigSynchronizer(p ConfigSynchronizerParams) *ConfigSynchronizer {
90+
if p.SynchronizerID == "" {
91+
p.SynchronizerID = uuid.NewString()
92+
}
8093
return &ConfigSynchronizer{
81-
logger: p.Logger,
94+
logger: p.Logger.WithValues("synchronizerID", p.SynchronizerID),
8295
kongConfig: p.KongConfig,
8396
syncTicker: p.ConfigUploadTicker,
8497
konnectClientFactory: p.KonnectClientFactory,
8598
updateStrategyResolver: p.UpdateStrategyResolver,
8699
configChangeDetector: p.ConfigChangeDetector,
87100
configStatusNotifier: p.ConfigStatusNotifier,
88101
metricsRecorder: p.MetricsRecorder,
102+
synchronizerID: p.SynchronizerID,
89103
}
90104
}
91105

@@ -94,6 +108,7 @@ var _ manager.LeaderElectionRunnable = &ConfigSynchronizer{}
94108
// Start starts the loop to receive configuration and upload configuration to Konnect.
95109
func (s *ConfigSynchronizer) Start(ctx context.Context) error {
96110
s.logger.Info("Starting Konnect configuration synchronizer")
111+
ctx = context.WithValue(ctx, tracing.SynchronizerIDKey, s.synchronizerID)
97112

98113
konnectAdminClient, err := s.konnectClientFactory.NewKonnectClient(ctx)
99114
if err != nil {
@@ -199,26 +214,36 @@ func (s *ConfigSynchronizer) run(ctx context.Context) {
199214
}
200215

201216
func (s *ConfigSynchronizer) handleConfigSynchronizationTick(ctx context.Context) {
202-
s.logger.V(logging.DebugLevel).Info("Start uploading configuration to Konnect")
217+
// Add values about the sync round in the context.
218+
serialNumber := s.serialNumber.Add(1)
219+
startTimestamp := time.Now().Unix()
220+
syncRoundID := uuid.NewSHA1(tracing.SyncRoundIDNamespace, fmt.Appendf([]byte{}, "%s:%d:%d", s.synchronizerID, serialNumber, startTimestamp))
221+
ctx = context.WithValue(ctx, tracing.SyncSerialNumberKey, serialNumber)
222+
ctx = context.WithValue(ctx, tracing.SyncStartTimestampKey, startTimestamp)
223+
ctx = context.WithValue(ctx, tracing.SyncRoundIDKey, syncRoundID.String())
224+
logger := s.logger.WithValues("syncRoundID", syncRoundID, "serialNumber", serialNumber)
225+
226+
logger.V(logging.DebugLevel).Info("Start uploading configuration to Konnect")
203227

204228
// Get the latest configuration copy to upload to Konnect. We don't want to hold the lock for a long time to prevent
205229
// blocking the update of the configuration.
206230
targetCfg, ok := s.currentContent(ctx)
207231
if !ok {
208-
s.logger.Info("No configuration received yet, skipping Konnect configuration synchronization")
232+
logger.Info("No configuration received yet, skipping Konnect configuration synchronization")
209233
return
210234
}
211235

212236
// Upload the configuration to Konnect.
213-
if err := s.uploadConfig(ctx, s.konnectAdminClient, targetCfg); err != nil {
214-
s.logger.Error(err, "Failed to upload configuration to Konnect")
215-
logKonnectErrors(s.logger, err)
237+
if err := s.uploadConfig(ctx, logger, s.konnectAdminClient, targetCfg); err != nil {
238+
logger.Error(err, "Failed to upload configuration to Konnect")
239+
logKonnectErrors(logger, err)
216240
}
217241
}
218242

219243
// uploadConfig sends the given configuration to Konnect.
220244
func (s *ConfigSynchronizer) uploadConfig(
221245
ctx context.Context,
246+
logger logr.Logger,
222247
client *adminapi.KonnectClient,
223248
targetContent TargetContent,
224249
) error {
@@ -229,7 +254,7 @@ func (s *ConfigSynchronizer) uploadConfig(
229254

230255
newSHA, err := sendconfig.PerformUpdate(
231256
ctx,
232-
s.logger,
257+
logger,
233258
client,
234259
s.kongConfig,
235260
targetContent.Content,

ingress-controller/internal/konnect/tracing/datadog.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,43 @@ package tracing
33
import (
44
"context"
55
"net/http"
6+
"strconv"
67
"strings"
78

89
"github.com/go-logr/logr"
10+
"github.com/google/uuid"
911
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
1012

1113
"github.com/kong/kong-operator/v2/ingress-controller/internal/logging"
1214
)
1315

16+
// ContextKey is the key to carry values to trace Konnect configuration sync in the context.
17+
type ContextKey string
18+
1419
const (
20+
// SynchronizerIDKey is the context key for the ID of KIC's Konnect configuration synchronizer instance.
21+
SynchronizerIDKey ContextKey = "KonnectSynchronizerID"
22+
// SyncSerialNumberKey is the context key for serial number to mark a round of configuration sync to Konnect.
23+
SyncSerialNumberKey ContextKey = "KonnectSyncSerialNumber"
24+
// SyncRoundIDKey is the context key to mark the ID of a round of configuration sync.
25+
SyncRoundIDKey ContextKey = "KonnectSyncRoundID"
26+
// SyncStartTimestampKey is the context key for timestamp (in seconds) of starting a round of configuration sync.
27+
SyncStartTimestampKey ContextKey = "KonnectSyncStartTimestamp"
28+
)
29+
30+
// SyncRoundIDNamespace is the UUID namespace used to generate deterministic IDs for Konnect sync rounds.
31+
var SyncRoundIDNamespace = uuid.NameSpaceDNS
32+
33+
const (
34+
// InstanceIDHeader is the header to mark the ID of KIC's Konnect configuration synchronizer instance.
35+
InstanceIDHeader = "X-Kic-Konnect-Sync-Instance-Id"
36+
// SyncSerialNumberHeader is the header for serial number to mark a round of configuration sync to Konnect.
37+
SyncSerialNumberHeader = "X-Kic-Konnect-Sync-Serial-Number"
38+
// SyncStartTimestampHeader is the header for timestamp (in seconds) of starting a round of configuration sync.
39+
SyncStartTimestampHeader = "X-Kic-Konnect-Sync-Start-Timestamp"
40+
// SyncRoundIDHeader is the header to mark the ID of a round of configuration sync.
41+
SyncRoundIDHeader = "X-Kic-Konnect-Sync-Round-Id"
42+
1543
// B3TraceIDHeader is the header used by the B3 propagation format to pass the trace ID.
1644
B3TraceIDHeader = "X-B3-TraceId"
1745
// B3SpanIDHeader is the header used by the B3 propagation format to pass the span ID.
@@ -25,6 +53,8 @@ const (
2553

2654
// DoRequest is a helper function that sends an HTTP request and logs the result with DataDog trace ID.
2755
func DoRequest(ctx context.Context, httpClient *http.Client, req *http.Request) (*http.Response, error) {
56+
req = addHeaderFromContext(ctx, req)
57+
2858
httpResp, err := httpClient.Do(req)
2959
if err != nil {
3060
return nil, err
@@ -52,6 +82,25 @@ func DoRequest(ctx context.Context, httpClient *http.Client, req *http.Request)
5282
return httpResp, nil
5383
}
5484

85+
// addHeaderFromContext extracts the values to mark a configuration sync round in context
86+
// and sets the headers in the request for tracing.
87+
func addHeaderFromContext(ctx context.Context, req *http.Request) *http.Request {
88+
if instanceID, ok := ctx.Value(SynchronizerIDKey).(string); ok {
89+
req.Header.Add(InstanceIDHeader, instanceID)
90+
}
91+
if syncRoundID, ok := ctx.Value(SyncRoundIDKey).(string); ok {
92+
req.Header.Add(SyncRoundIDHeader, syncRoundID)
93+
}
94+
if serialNumber, ok := ctx.Value(SyncSerialNumberKey).(uint32); ok {
95+
req.Header.Add(SyncSerialNumberHeader, strconv.FormatUint(uint64(serialNumber), 10))
96+
}
97+
if startTimestamp, ok := ctx.Value(SyncStartTimestampKey).(int64); ok {
98+
req.Header.Add(SyncStartTimestampHeader, strconv.FormatInt(startTimestamp, 10))
99+
}
100+
101+
return req
102+
}
103+
55104
// loggerWithDataDogTraceID creates a new logger with the DataDog tracing information extracted from the HTTP response's
56105
// headers. This data is useful for correlating logs with traces and logs in DataDog.
57106
func loggerWithDataDogTraceID(logger logr.Logger, resp *http.Response) logr.Logger {

ingress-controller/internal/konnect/tracing/datadog_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package tracing_test
22

33
import (
44
"bytes"
5+
"context"
56
"net/http"
67
"net/http/httptest"
78
"testing"
@@ -97,3 +98,42 @@ func TestDoRequest(t *testing.T) {
9798
})
9899
}
99100
}
101+
102+
func TestDoRequest_SyncHeaders(t *testing.T) {
103+
var (
104+
testInstanceID = "test-instance-id"
105+
testSerialNumber = uint32(42)
106+
testStartTimestamp = int64(1700000000)
107+
testSyncRoundID = "test-sync-round-id"
108+
)
109+
110+
var receivedHeaders http.Header
111+
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
112+
receivedHeaders = r.Header.Clone()
113+
w.WriteHeader(http.StatusOK)
114+
}))
115+
t.Cleanup(testServer.Close)
116+
117+
client := testServer.Client()
118+
request, err := http.NewRequest(http.MethodGet, testServer.URL, nil)
119+
require.NoError(t, err)
120+
121+
loggerBuf := &bytes.Buffer{}
122+
logger := buflogr.NewWithBuffer(loggerBuf)
123+
ctx := ctrllog.IntoContext(t.Context(), logger)
124+
ctx = context.WithValue(ctx, tracing.SynchronizerIDKey, testInstanceID)
125+
ctx = context.WithValue(ctx, tracing.SyncSerialNumberKey, testSerialNumber)
126+
ctx = context.WithValue(ctx, tracing.SyncStartTimestampKey, testStartTimestamp)
127+
ctx = context.WithValue(ctx, tracing.SyncRoundIDKey, testSyncRoundID)
128+
129+
resp, err := tracing.DoRequest(ctx, client, request)
130+
require.NoError(t, err)
131+
t.Cleanup(func() {
132+
_ = resp.Body.Close()
133+
})
134+
135+
require.Equal(t, testInstanceID, receivedHeaders.Get(tracing.InstanceIDHeader))
136+
require.Equal(t, "42", receivedHeaders.Get(tracing.SyncSerialNumberHeader))
137+
require.Equal(t, "1700000000", receivedHeaders.Get(tracing.SyncStartTimestampHeader))
138+
require.Equal(t, testSyncRoundID, receivedHeaders.Get(tracing.SyncRoundIDHeader))
139+
}

ingress-controller/internal/manager/run.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ func New(
354354
updateStrategyResolver,
355355
configStatusNotifier,
356356
metricsRecorder,
357+
instanceID,
357358
)
358359
if err != nil {
359360
setupLog.Error(err, "Failed to setup Konnect configuration synchronizer with manager, skipping")

ingress-controller/internal/manager/setup.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ func setupKonnectConfigSynchronizerWithMgr(
499499
updateStrategyResolver sendconfig.UpdateStrategyResolver,
500500
configStatusNotifier clients.ConfigStatusNotifier,
501501
metricsRecorder metrics.Recorder,
502+
instanceID InstanceID,
502503
) (*konnect.ConfigSynchronizer, error) {
503504
s := konnect.NewConfigSynchronizer(
504505
konnect.ConfigSynchronizerParams{
@@ -510,6 +511,7 @@ func setupKonnectConfigSynchronizerWithMgr(
510511
ConfigChangeDetector: sendconfig.NewKonnectConfigurationChangeDetector(),
511512
ConfigStatusNotifier: configStatusNotifier,
512513
MetricsRecorder: metricsRecorder,
514+
SynchronizerID: instanceID.String(),
513515
},
514516
)
515517
err := mgr.Add(s)

0 commit comments

Comments
 (0)