Skip to content

Commit 497dbfa

Browse files
authored
Add loader metrics of number of logs generated (#41)
1 parent 01ffc38 commit 497dbfa

8 files changed

Lines changed: 189 additions & 26 deletions

File tree

go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ require (
88
github.com/cenkalti/backoff/v4 v4.3.0
99
github.com/elastic/go-elasticsearch/v6 v6.8.10
1010
github.com/go-kit/log v0.2.1
11+
github.com/gorilla/mux v1.8.0
1112
github.com/grafana/dskit v0.0.0-20240712071108-b834d6b908f5
1213
github.com/grafana/loki v1.6.2-0.20231114151751-3a7b5d246b01
14+
github.com/prometheus/client_golang v1.20.5
1315
github.com/prometheus/common v0.55.0
1416
github.com/sirupsen/logrus v1.9.3
1517
github.com/spf13/pflag v1.0.5
@@ -33,7 +35,7 @@ require (
3335
github.com/buger/jsonparser v1.1.1 // indirect
3436
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b // indirect
3537
github.com/cespare/xxhash v1.1.0 // indirect
36-
github.com/cespare/xxhash/v2 v2.2.0 // indirect
38+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
3739
github.com/coreos/go-semver v0.3.0 // indirect
3840
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
3941
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
@@ -66,7 +68,6 @@ require (
6668
github.com/golang/snappy v0.0.4 // indirect
6769
github.com/google/btree v1.1.2 // indirect
6870
github.com/google/uuid v1.6.0 // indirect
69-
github.com/gorilla/mux v1.8.0 // indirect
7071
github.com/gorilla/websocket v1.5.0 // indirect
7172
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // indirect
7273
github.com/grafana/loki/pkg/push v0.0.0-20231023154132-0a7737e7c7eb // indirect
@@ -92,7 +93,7 @@ require (
9293
github.com/jpillora/backoff v1.0.0 // indirect
9394
github.com/json-iterator/go v1.1.12 // indirect
9495
github.com/julienschmidt/httprouter v1.3.0 // indirect
95-
github.com/klauspost/compress v1.17.8 // indirect
96+
github.com/klauspost/compress v1.17.9 // indirect
9697
github.com/kylelemons/godebug v1.1.0 // indirect
9798
github.com/mailru/easyjson v0.7.7 // indirect
9899
github.com/mattn/go-colorable v0.1.13 // indirect
@@ -116,7 +117,6 @@ require (
116117
github.com/pkg/errors v0.9.1 // indirect
117118
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
118119
github.com/prometheus/alertmanager v0.26.0 // indirect
119-
github.com/prometheus/client_golang v1.19.1 // indirect
120120
github.com/prometheus/client_model v0.6.1 // indirect
121121
github.com/prometheus/common/sigv4 v0.1.0 // indirect
122122
github.com/prometheus/exporter-toolkit v0.10.1-0.20230714054209-2f4150c63f97 // indirect

go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
121121
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
122122
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
123123
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
124-
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
125-
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
124+
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
125+
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
126126
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
127127
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
128128
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@@ -473,8 +473,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
473473
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
474474
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
475475
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
476-
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
477-
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
476+
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
477+
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
478478
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
479479
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
480480
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -598,8 +598,8 @@ github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3O
598598
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
599599
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
600600
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
601-
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
602-
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
601+
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
602+
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
603603
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
604604
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
605605
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=

internal/component.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"sync"
6+
)
7+
8+
type Component interface {
9+
Start(ctx context.Context, wg *sync.WaitGroup, errCh chan<- error)
10+
}

internal/generator/generator.go

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
"fmt"
66
"math/rand"
77
"os"
8+
"sync"
89
"time"
910

11+
"github.com/prometheus/client_golang/prometheus"
12+
1013
"github.com/ViaQ/cluster-logging-load-client/internal/clients"
1114

1215
"github.com/elastic/go-elasticsearch/v6"
@@ -29,7 +32,7 @@ const (
2932
ElasticsearchClientType ClientType = "elasticsearch"
3033
)
3134

32-
// Options describes the settings that can be modified for the querier
35+
// Options describes the settings that can be modified for the log generator
3336
type Options struct {
3437
// Client describes the client to use for forwarding
3538
Client ClientType
@@ -43,6 +46,12 @@ type Options struct {
4346
DisableSecurityCheck bool
4447
// LogsPerSecond is the number of logs to write per second
4548
LogsPerSecond int
49+
50+
LogType string
51+
LogFormat string
52+
LabelType string
53+
SyntheticPayloadSize int
54+
UseRandomHostname bool
4655
}
4756

4857
// LogGenerator describes an object which generates logs
@@ -54,12 +63,22 @@ type LogGenerator struct {
5463
rate int
5564
writeToDestination func(string, string, LabelSetOptions) error
5665
deferClose func()
66+
logCount prometheus.Counter
67+
opts Options
5768
}
5869

59-
func NewLogGenerator(opts Options) (*LogGenerator, error) {
70+
func NewLogGenerator(opts Options, registry *prometheus.Registry) (*LogGenerator, error) {
6071
generator := LogGenerator{
72+
opts: opts,
6173
rate: opts.LogsPerSecond,
74+
logCount: prometheus.NewCounter(prometheus.CounterOpts{
75+
Name: "log_generator_messages_produced_total",
76+
Help: "Total number of messages produced by the log generator",
77+
}),
6278
}
79+
registry.MustRegister(
80+
generator.logCount,
81+
)
6382

6483
switch opts.Client {
6584
case "file":
@@ -113,40 +132,53 @@ func NewLogGenerator(opts Options) (*LogGenerator, error) {
113132
return &generator, nil
114133
}
115134

116-
func (g *LogGenerator) GenerateLogs(logType LogType, logFormat Format, logSize int, labelOpts LabelSetOptions, randomizeHostname bool) {
135+
func (g *LogGenerator) Start(ctx context.Context, wg *sync.WaitGroup, errCh chan<- error) {
136+
wg.Add(1)
137+
go func() {
138+
<-ctx.Done()
139+
log.Debug("Shutting down log generator...")
140+
g.deferClose()
141+
}()
142+
go func() {
143+
defer wg.Done()
144+
g.GenerateLogs()
145+
}()
146+
}
147+
148+
func (g *LogGenerator) GenerateLogs() {
117149
host, err := os.Hostname()
118150
if err != nil {
119151
log.Fatalf("error getting hostname: %s", err)
120152
}
121-
122153
defer g.deferClose()
123154

124155
var lineCount int64 = 0
125156

126157
logHostname := host
127-
if randomizeHostname {
158+
if g.opts.UseRandomHostname {
128159
logHostname = fmt.Sprintf("%s.%032X", host, rand.Uint64())
129160
}
130161

131162
for {
132163
next := time.Now().UTC().Add(1 * time.Second)
133164

134165
for i := 0; i < g.rate; i++ {
135-
logLine, err := RandomLog(logType, logSize)
166+
logLine, err := RandomLog(LogType(g.opts.LogType), g.opts.SyntheticPayloadSize)
136167
if err != nil {
137168
log.Fatalf("error creating log: %s", err)
138169
}
139170

140-
formattedLogLine, err := FormatLog(logFormat, logHostname, lineCount, logLine)
171+
formattedLogLine, err := FormatLog(Format(g.opts.LogFormat), logHostname, lineCount, logLine)
141172
if err != nil {
142173
log.Fatalf("error formating log: %s", err)
143174
}
144175

145-
err = g.writeToDestination(host, formattedLogLine, labelOpts)
176+
err = g.writeToDestination(host, formattedLogLine, LabelSetOptions(g.opts.LabelType))
146177
if err != nil {
147178
log.Fatalf("error writing log: %s", err)
148179
}
149180

181+
g.logCount.Inc()
150182
lineCount++
151183
}
152184

internal/web/config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package web
2+
3+
type ServerConfig struct {
4+
ListenAddress string `yaml:"listenAddress"`
5+
TLS *TLSConfig `yaml:"tls"`
6+
}
7+
8+
type TLSConfig struct {
9+
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
10+
CertificateFile string `yaml:"certificateFile"`
11+
KeyFile string `yaml:"keyFile"`
12+
}

internal/web/doc.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package web
2+
3+
// Shamelessly inspired (stolen?) from https://github.com/xperimental/logging-roundtrip/blob/main/internal/web

internal/web/server.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package web
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"sync"
8+
9+
"github.com/gorilla/mux"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/client_golang/prometheus/promhttp"
12+
"github.com/sirupsen/logrus"
13+
)
14+
15+
var errNeedCertificateAndKey = errors.New("need both certificate and key to start TLS server")
16+
17+
type Server struct {
18+
cfg ServerConfig
19+
log logrus.FieldLogger
20+
listenAddress string
21+
server *http.Server
22+
}
23+
24+
func NewServer(cfg ServerConfig, log logrus.FieldLogger, registry prometheus.Gatherer) *Server {
25+
m := mux.NewRouter()
26+
m.Path("/metrics").Methods(http.MethodGet).Handler(promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
27+
28+
s := &Server{
29+
cfg: cfg,
30+
log: log.WithField("component", "server"),
31+
listenAddress: cfg.ListenAddress,
32+
server: &http.Server{
33+
Addr: cfg.ListenAddress,
34+
Handler: m,
35+
},
36+
}
37+
38+
return s
39+
}
40+
41+
func (s *Server) Start(ctx context.Context, wg *sync.WaitGroup, errCh chan<- error) {
42+
wg.Add(1)
43+
go func() {
44+
<-ctx.Done()
45+
s.log.Debug("Shutting down server...")
46+
if err := s.server.Shutdown(context.Background()); err != nil {
47+
errCh <- err
48+
}
49+
}()
50+
51+
go func() {
52+
defer wg.Done()
53+
54+
if s.cfg.TLS != nil {
55+
tls := s.cfg.TLS
56+
if tls.CertificateFile == "" || tls.KeyFile == "" {
57+
errCh <- errNeedCertificateAndKey
58+
return
59+
}
60+
61+
s.log.Infof("Starting TLS server on %s", s.listenAddress)
62+
if err := s.server.ListenAndServeTLS(tls.CertificateFile, tls.KeyFile); err != nil && !errors.Is(err, http.ErrServerClosed) {
63+
errCh <- err
64+
}
65+
}
66+
67+
s.log.Infof("Starting server on %s", s.listenAddress)
68+
if err := s.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
69+
errCh <- err
70+
}
71+
}()
72+
}

main.go

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
package main
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
7+
"os"
8+
"os/signal"
9+
"sync"
10+
"syscall"
11+
12+
"github.com/ViaQ/cluster-logging-load-client/internal/web"
13+
"github.com/prometheus/client_golang/prometheus"
614

715
log "github.com/sirupsen/logrus"
816
"github.com/spf13/pflag"
@@ -55,25 +63,51 @@ func main() {
5563

5664
switch opts.Command {
5765
case "generate":
66+
registry := prometheus.NewRegistry()
5867
generatorOpts := generator.Options{
5968
Client: generator.ClientType(opts.Destination),
6069
ClientURL: opts.ClientURL,
6170
FileName: opts.OutputFile,
6271
Tenant: opts.Tenant,
6372
DisableSecurityCheck: opts.DisableSecurityCheck,
6473
LogsPerSecond: opts.LogsPerSecond,
74+
LogType: opts.LogType,
75+
LogFormat: opts.LogFormat,
76+
LabelType: opts.LabelType,
77+
SyntheticPayloadSize: opts.SyntheticPayloadSize,
78+
UseRandomHostname: opts.UseRandomHostname,
6579
}
66-
logGenerator, err := generator.NewLogGenerator(generatorOpts)
80+
logGenerator, err := generator.NewLogGenerator(generatorOpts, registry)
6781
if err != nil {
6882
panic(err)
6983
}
70-
logGenerator.GenerateLogs(
71-
generator.LogType(opts.LogType),
72-
generator.Format(opts.LogFormat),
73-
opts.SyntheticPayloadSize,
74-
generator.LabelSetOptions(opts.LabelType),
75-
opts.UseRandomHostname,
76-
)
84+
components := []internal.Component{
85+
logGenerator,
86+
web.NewServer(web.ServerConfig{
87+
ListenAddress: ":8081",
88+
}, log.StandardLogger(), registry),
89+
}
90+
91+
wg := &sync.WaitGroup{}
92+
errCh := make(chan error, 1)
93+
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM|syscall.SIGINT)
94+
defer cancel()
95+
96+
go func() {
97+
for err := range errCh {
98+
log.Errorf("Fatal error: %v", err)
99+
cancel()
100+
}
101+
}()
102+
103+
for _, c := range components {
104+
c.Start(ctx, wg, errCh)
105+
}
106+
107+
log.Debug("All components running.")
108+
wg.Wait()
109+
close(errCh)
110+
log.Debug("All components stopped.")
77111
case "query":
78112
querierOpts := querier.Options{
79113
Client: querier.ClientType(opts.Destination),

0 commit comments

Comments
 (0)