Skip to content

Commit ffc83f5

Browse files
committed
feature: flightrecorder to enable Go trace
config: allow configuration for Go runtime/trace.FlightRecorder pass default period to define what means skipper to be slow config: set defaults to 100ms is slow and max 16MB trace file size Signed-off-by: Sandor Szücs <[email protected]>
1 parent 92c3d5b commit ffc83f5

File tree

5 files changed

+303
-9
lines changed

5 files changed

+303
-9
lines changed

config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ type Config struct {
8989
BlockProfileRate int `yaml:"block-profile-rate"`
9090
MutexProfileFraction int `yaml:"mutex-profile-fraction"`
9191
MemProfileRate int `yaml:"memory-profile-rate"`
92+
FlightRecorderTargetURL string `yaml:"flight-recorder-target-url"`
9293
DebugGcMetrics bool `yaml:"debug-gc-metrics"`
9394
RuntimeMetrics bool `yaml:"runtime-metrics"`
9495
ServeRouteMetrics bool `yaml:"serve-route-metrics"`
@@ -439,6 +440,7 @@ func NewConfig() *Config {
439440

440441
// logging, metrics, tracing:
441442
flag.BoolVar(&cfg.EnablePrometheusMetrics, "enable-prometheus-metrics", false, "*Deprecated*: use metrics-flavour. Switch to Prometheus metrics format to expose metrics")
443+
flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds")
442444
flag.StringVar(&cfg.OpenTracing, "opentracing", "noop", "list of arguments for opentracing (space separated), first argument is the tracer implementation")
443445
flag.StringVar(&cfg.OpenTracingInitialSpan, "opentracing-initial-span", "ingress", "set the name of the initial, pre-routing, tracing span")
444446
flag.StringVar(&cfg.OpenTracingExcludedProxyTags, "opentracing-excluded-proxy-tags", "", "set tags that should be excluded from spans created for proxy operation. must be a comma-separated list of strings.")
@@ -454,6 +456,7 @@ func NewConfig() *Config {
454456
flag.IntVar(&cfg.MutexProfileFraction, "mutex-profile-fraction", 0, "mutex profile fraction rate, see runtime.SetMutexProfileFraction")
455457
flag.IntVar(&cfg.MemProfileRate, "memory-profile-rate", 0, "memory profile rate, see runtime.MemProfileRate, keeps default 512 kB")
456458
flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds")
459+
flag.StringVar(&cfg.FlightRecorderTargetURL, "flight-recorder-target-url", "", "sets the flight recorder target URL that is used to write out the trace to.")
457460
flag.BoolVar(&cfg.DebugGcMetrics, "debug-gc-metrics", false, "enables reporting of the Go garbage collector statistics exported in debug.GCStats")
458461
flag.BoolVar(&cfg.RuntimeMetrics, "runtime-metrics", true, "enables reporting of the Go runtime statistics exported in runtime and specifically runtime.MemStats")
459462
flag.BoolVar(&cfg.ServeRouteMetrics, "serve-route-metrics", false, "enables reporting total serve time metrics for each route")
@@ -919,6 +922,7 @@ func (c *Config) ToOptions() skipper.Options {
919922
EnableProfile: c.EnableProfile,
920923
BlockProfileRate: c.BlockProfileRate,
921924
MutexProfileFraction: c.MutexProfileFraction,
925+
FlightRecorderTargetURL: c.FlightRecorderTargetURL,
922926
EnableDebugGcMetrics: c.DebugGcMetrics,
923927
EnableRuntimeMetrics: c.RuntimeMetrics,
924928
EnableServeRouteMetrics: c.ServeRouteMetrics,

proxy/breaker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ func TestBreakerMultipleHosts(t *testing.T) {
543543
checkBackendHostCounter("foo", testRateWindow),
544544
checkBackendHostCounter("bar", testRateWindow),
545545
setBackendFail,
546-
trace("setting fail"),
546+
traceBreakerTest("setting fail"),
547547
setBackendHostFail("foo"),
548548
setBackendHostFail("bar"),
549549
times(testRateFailures,

proxy/flightrecorder_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package proxy_test
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"net/http/httptest"
9+
"runtime/trace"
10+
"testing"
11+
"time"
12+
13+
"github.com/zalando/skipper/eskip"
14+
"github.com/zalando/skipper/filters"
15+
"github.com/zalando/skipper/filters/diag"
16+
"github.com/zalando/skipper/proxy"
17+
"github.com/zalando/skipper/proxy/proxytest"
18+
)
19+
20+
func TestFlightRecorder(t *testing.T) {
21+
ch := make(chan int)
22+
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
23+
if r.Method != "PUT" {
24+
w.WriteHeader(http.StatusMethodNotAllowed)
25+
w.Write([]byte(http.StatusText(http.StatusMethodNotAllowed)))
26+
ch <- http.StatusMethodNotAllowed
27+
return
28+
}
29+
30+
var buf bytes.Buffer
31+
n, err := io.Copy(&buf, r.Body)
32+
if err != nil {
33+
t.Fatalf("Failed to copy data: %v", err)
34+
}
35+
if n < 100 {
36+
t.Fatalf("Failed to write enough data: %d bytes", n)
37+
}
38+
w.WriteHeader(http.StatusCreated)
39+
w.Write([]byte(http.StatusText(http.StatusCreated)))
40+
ch <- http.StatusCreated
41+
}))
42+
defer service.Close()
43+
44+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
45+
w.WriteHeader(http.StatusOK)
46+
w.Write([]byte(http.StatusText(http.StatusOK)))
47+
}))
48+
defer backend.Close()
49+
50+
flightRecorder := trace.NewFlightRecorder(trace.FlightRecorderConfig{
51+
MinAge: time.Second,
52+
})
53+
flightRecorder.Start()
54+
55+
spec := diag.NewLatency()
56+
fr := make(filters.Registry)
57+
fr.Register(spec)
58+
59+
doc := fmt.Sprintf(`r: * -> latency("100ms") -> "%s"`, backend.URL)
60+
rr := eskip.MustParse(doc)
61+
62+
pr := proxytest.WithParams(fr, proxy.Params{
63+
FlightRecorder: flightRecorder,
64+
FlightRecorderTargetURL: service.URL,
65+
FlightRecorderPeriod: 90 * time.Millisecond,
66+
}, rr...)
67+
defer pr.Close()
68+
69+
rsp, err := pr.Client().Get(pr.URL)
70+
if err != nil {
71+
t.Fatalf("Failed to GET %q: %v", pr.URL, err)
72+
}
73+
defer rsp.Body.Close()
74+
_, err = io.ReadAll(rsp.Body)
75+
if err != nil {
76+
t.Fatalf("Failed to read body: %v", err)
77+
}
78+
79+
switch rsp.StatusCode {
80+
case http.StatusOK:
81+
// ok
82+
default:
83+
t.Fatalf("Failed to get status OK: %d", rsp.StatusCode)
84+
}
85+
86+
statusCode := <-ch
87+
switch statusCode {
88+
case http.StatusCreated:
89+
// ok
90+
default:
91+
t.Fatalf("Failed to get status OK: %d", rsp.StatusCode)
92+
}
93+
}

proxy/proxy.go

Lines changed: 148 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"os"
1818
"runtime"
1919
"runtime/pprof"
20+
"runtime/trace"
2021
"strconv"
2122
"strings"
2223
"sync"
@@ -377,6 +378,18 @@ type Params struct {
377378

378379
// PassiveHealthCheck defines the parameters for the healthy endpoints checker.
379380
PassiveHealthCheck *PassiveHealthCheck
381+
382+
// FlightRecorder is a started instance of https://pkg.go.dev/runtime/trace#FlightRecorder
383+
FlightRecorder *trace.FlightRecorder
384+
385+
// FlightRecorderTargetURL is the target to write the trace
386+
// to. Supported targets are http URL and file URL.
387+
FlightRecorderTargetURL string
388+
389+
// FlightRecorderPeriod is the time.Duration that is used to detect
390+
// a slow skipper. If skipper is detected to be slow it tries
391+
// to write out a trace as configured by the FlightRecorderTargetURL.
392+
FlightRecorderPeriod time.Duration
380393
}
381394

382395
type (
@@ -472,6 +485,10 @@ type Proxy struct {
472485
clientTLS *tls.Config
473486
hostname string
474487
onPanicSometimes rate.Sometimes
488+
flightRecorder *trace.FlightRecorder
489+
flightRecorderURL *url.URL
490+
flightRecorderPeriod time.Duration
491+
flightRecorderCH chan struct{}
475492
}
476493

477494
// proxyError is used to wrap errors during proxying and to indicate
@@ -874,6 +891,52 @@ func WithParams(p Params) *Proxy {
874891
maxUnhealthyEndpointsRatio: p.PassiveHealthCheck.MaxUnhealthyEndpointsRatio,
875892
}
876893
}
894+
895+
log := &logging.DefaultLog{}
896+
897+
var (
898+
frURL *url.URL
899+
// buffered channel size 10k to allow unblocked requests
900+
frChannel = make(chan struct{}, 10240)
901+
)
902+
if p.FlightRecorder != nil {
903+
var err error
904+
frURL, err = url.Parse(p.FlightRecorderTargetURL)
905+
if err != nil {
906+
p.FlightRecorder.Stop()
907+
p.FlightRecorder = nil
908+
} else {
909+
// decouple writing a trace from data-plane work
910+
go func() {
911+
foreverHang := 365 * 24 * time.Hour
912+
timer := time.NewTimer(foreverHang)
913+
defer timer.Stop()
914+
915+
last := time.Now().Add(-time.Hour)
916+
917+
for {
918+
select {
919+
case <-frChannel:
920+
// range through all notifications until 1ms there is no notification
921+
// reset timer to write trace after handling all the notifications
922+
timer.Reset(time.Millisecond)
923+
continue
924+
case <-quit:
925+
p.FlightRecorder.Stop()
926+
return
927+
case <-timer.C:
928+
if time.Since(last) >= time.Hour {
929+
writeGoTrace(p.FlightRecorder, frURL, log, tr)
930+
}
931+
last = time.Now()
932+
933+
timer.Reset(foreverHang)
934+
}
935+
}
936+
}()
937+
}
938+
}
939+
877940
return &Proxy{
878941
routing: p.Routing,
879942
registry: p.EndpointRegistry,
@@ -892,7 +955,7 @@ func WithParams(p Params) *Proxy {
892955
maxLoops: p.MaxLoopbacks,
893956
breakers: p.CircuitBreakers,
894957
limiters: p.RateLimiters,
895-
log: &logging.DefaultLog{},
958+
log: log,
896959
defaultHTTPStatus: defaultHTTPStatus,
897960
tracing: newProxyTracing(p.OpenTracing),
898961
copyStreamPoolEnabled: p.EnableCopyStreamPoolExperimental,
@@ -902,6 +965,87 @@ func WithParams(p Params) *Proxy {
902965
clientTLS: tr.TLSClientConfig,
903966
hostname: hostname,
904967
onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute},
968+
flightRecorder: p.FlightRecorder,
969+
flightRecorderURL: frURL,
970+
flightRecorderPeriod: p.FlightRecorderPeriod,
971+
flightRecorderCH: frChannel,
972+
}
973+
}
974+
975+
func (p *Proxy) writeTraceIfTooSlow(took time.Duration, span ot.Span) {
976+
span.SetTag("proxy.took", took)
977+
978+
if p.flightRecorder == nil {
979+
return
980+
}
981+
982+
if p.flightRecorderPeriod < 1*time.Millisecond && p.flightRecorderPeriod > took {
983+
return
984+
}
985+
986+
// signal too slow
987+
p.flightRecorderCH <- struct{}{}
988+
}
989+
990+
func writeGoTraceTo(log logging.Logger, flightRecorder *trace.FlightRecorder, w io.Writer) (int, error) {
991+
n, err := flightRecorder.WriteTo(w)
992+
if err != nil {
993+
return 0, fmt.Errorf("failed to write FlightRecorder data: %w", err)
994+
} else {
995+
log.Infof("FlightRecorder wrote %d bytes", n)
996+
}
997+
998+
return int(n), err
999+
}
1000+
1001+
func writeGoTrace(flightRecorder *trace.FlightRecorder, flightRecorderURL *url.URL, log logging.Logger, roundTripper http.RoundTripper) {
1002+
if flightRecorder == nil || flightRecorderURL == nil {
1003+
return
1004+
}
1005+
1006+
switch flightRecorderURL.Scheme {
1007+
case "file":
1008+
fd, err := os.Create(flightRecorderURL.Path)
1009+
if err != nil {
1010+
log.Errorf("Failed to create file %q: %v", err, flightRecorderURL.Path)
1011+
return
1012+
}
1013+
defer fd.Close()
1014+
1015+
_, err = writeGoTraceTo(log, flightRecorder, fd)
1016+
if err != nil {
1017+
log.Errorf("Failed to write trace file %q: %v", flightRecorderURL.Path, err)
1018+
}
1019+
1020+
case "http", "https":
1021+
var b bytes.Buffer
1022+
_, err := writeGoTraceTo(log, flightRecorder, &b)
1023+
if err != nil {
1024+
log.Errorf("Failed to write trace into in-memory buffer: %v", err)
1025+
return
1026+
}
1027+
1028+
req, err := http.NewRequest("PUT", flightRecorderURL.String(), &b)
1029+
if err != nil {
1030+
log.Errorf("Failed to create request to %q to send a trace: %v", flightRecorderURL.String(), err)
1031+
}
1032+
1033+
rsp, err := roundTripper.RoundTrip(req)
1034+
if err != nil {
1035+
log.Errorf("Failed to write trace to %q: %v", flightRecorderURL.String(), err)
1036+
return
1037+
}
1038+
1039+
rsp.Body.Close()
1040+
1041+
switch rsp.StatusCode {
1042+
case 200, 201, 204:
1043+
log.Infof("Successful send of a trace to %q", flightRecorderURL.String())
1044+
default:
1045+
log.Errorf("Failed to get successful response from %s: (%d) %s", flightRecorderURL.String(), rsp.StatusCode, rsp.Status)
1046+
}
1047+
default:
1048+
log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", flightRecorderURL.Scheme)
9051049
}
9061050
}
9071051

@@ -1055,7 +1199,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
10551199
proxySpanOpts := []ot.StartSpanOption{ot.Tags{
10561200
SpanKindTag: SpanKindClient,
10571201
}}
1058-
if parentSpan := ot.SpanFromContext(req.Context()); parentSpan != nil {
1202+
parentSpan := ot.SpanFromContext(req.Context())
1203+
if parentSpan != nil {
10591204
proxySpanOpts = append(proxySpanOpts, ot.ChildOf(parentSpan.Context()))
10601205
}
10611206
ctx.proxySpan = p.tracing.tracer.StartSpan(spanName, proxySpanOpts...)
@@ -1082,12 +1227,11 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
10821227
req = injectClientTraceByEvent(req, ctx.proxySpan)
10831228
}
10841229

1230+
p.writeTraceIfTooSlow(requestStopWatch.elapsed, parentSpan)
10851231
p.metrics.MeasureBackendRequestHeader(ctx.metricsHost(), snet.SizeOfRequestHeader(req))
10861232

10871233
requestStopWatch.Stop()
1088-
10891234
response, err := roundTripper.RoundTrip(req)
1090-
10911235
responseStopWatch.Start()
10921236

10931237
if endpointMetrics != nil {

0 commit comments

Comments
 (0)