Skip to content

Commit 58a51ad

Browse files
committed
feat: add monitor metrics
1 parent c57be0c commit 58a51ad

2 files changed

Lines changed: 44 additions & 2 deletions

File tree

node/pkg/dal/apiv2/controller.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,33 @@ import (
1414
"bisonai.com/miko/node/pkg/dal/utils/keycache"
1515
"bisonai.com/miko/node/pkg/dal/utils/stats"
1616
errorsentinel "bisonai.com/miko/node/pkg/error"
17+
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/prometheus/client_golang/prometheus/promauto"
19+
"github.com/prometheus/client_golang/prometheus/promhttp"
1720
"github.com/rs/zerolog/log"
1821
"nhooyr.io/websocket"
1922
"nhooyr.io/websocket/wsjson"
2023
)
2124

25+
var (
26+
wsActiveConnections = promauto.NewGauge(prometheus.GaugeOpts{
27+
Name: "dal_websocket_active_connections",
28+
Help: "Current number of active WebSocket connections",
29+
})
30+
wsConnectionsTotal = promauto.NewCounter(prometheus.CounterOpts{
31+
Name: "dal_websocket_connections_total",
32+
Help: "Total number of WebSocket connections",
33+
})
34+
wsSubscriptionsTotal = promauto.NewCounter(prometheus.CounterOpts{
35+
Name: "dal_websocket_subscriptions_total",
36+
Help: "Total number of WebSocket subscriptions",
37+
})
38+
restRequestsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
39+
Name: "dal_rest_requests_total",
40+
Help: "Total number of REST API requests",
41+
}, []string{"method", "path"})
42+
)
43+
2244
func Start(ctx context.Context, opts ...ServerV2Option) error {
2345
config := &ServerV2Config{
2446
Port: "8090",
@@ -82,16 +104,26 @@ func NewServer(collector *collector.Collector, keyCache *keycache.KeyCache, hub
82104
serveMux.HandleFunc("GET /latest-data-feeds/{symbols}", s.LatestFeedsHandler)
83105
serveMux.HandleFunc("GET /latest-data-feeds-unstrict/{symbols}", s.LatestFeedsHandlerUnstrict)
84106

107+
serveMux.Handle("GET /metrics", promhttp.Handler())
85108
serveMux.HandleFunc("/", s.HealthCheckHandler)
86109

87110
// Apply the RequestLoggerMiddleware to the ServeMux
88111
loggedMux := statsApp.RequestLoggerMiddleware(serveMux)
89112

90-
s.handler = loggedMux
113+
s.handler = metricsMiddleware(loggedMux)
91114

92115
return s
93116
}
94117

118+
func metricsMiddleware(next http.Handler) http.Handler {
119+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
120+
if r.URL.Path != "/ws" && r.URL.Path != "/metrics" && r.URL.Path != "/" {
121+
restRequestsTotal.WithLabelValues(r.Method, r.URL.Path).Inc()
122+
}
123+
next.ServeHTTP(w, r)
124+
})
125+
}
126+
95127
func (s *ServerV2) ServeHTTP(w http.ResponseWriter, r *http.Request) {
96128
key := r.Header.Get("X-API-Key")
97129

@@ -133,6 +165,8 @@ func (s *ServerV2) WSHandler(w http.ResponseWriter, r *http.Request) {
133165
defer c.Close(websocket.StatusInternalError, "the sky is falling")
134166

135167
s.hub.Register <- c
168+
wsConnectionsTotal.Inc()
169+
wsActiveConnections.Set(float64(s.hub.ConnectionCount()))
136170

137171
key := r.Header.Get("X-API-Key")
138172
id, err := stats.InsertWebsocketConnection(r.Context(), key)
@@ -144,6 +178,7 @@ func (s *ServerV2) WSHandler(w http.ResponseWriter, r *http.Request) {
144178

145179
defer func() {
146180
s.hub.Unregister <- c
181+
wsActiveConnections.Set(float64(s.hub.ConnectionCount()))
147182
err = stats.UpdateWebsocketConnection(r.Context(), id)
148183
if err != nil {
149184
log.Error().Err(err).Msg("failed to update websocket connection")
@@ -161,6 +196,7 @@ func (s *ServerV2) WSHandler(w http.ResponseWriter, r *http.Request) {
161196

162197
if msg.Method == "SUBSCRIBE" {
163198
s.hub.HandleSubscription(r.Context(), c, msg, id)
199+
wsSubscriptionsTotal.Add(float64(len(msg.Params)))
164200
}
165201
}
166202
}

node/pkg/dal/hub/hub.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,13 @@ func (h *Hub) HandleSubscription(ctx context.Context, client *websocket.Conn, ms
9999
}(valid)
100100
}
101101

102-
func (h *Hub) handleClientRegistration(ctx context.Context) {
102+
func (h *Hub) ConnectionCount() int {
103+
h.mu.RLock()
104+
defer h.mu.RUnlock()
105+
return len(h.Clients)
106+
}
107+
108+
func (h *Hub) handleClientRegistration() {
103109
for {
104110
select {
105111
case <-ctx.Done():

0 commit comments

Comments
 (0)