Skip to content

Commit 5de4d21

Browse files
committed
fix(desktop): narrow peer-health lock scope to prevent UI freeze during ProbeNode
1 parent f8b7160 commit 5de4d21

5 files changed

Lines changed: 241 additions & 99 deletions

File tree

internal/app/desktop/window.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ type Window struct {
167167
consoleMu sync.Mutex
168168

169169
window *app.Window
170+
171+
transferInvalidateMu sync.Mutex
172+
transferInvalidatePending bool
170173
}
171174

172175
// pendingAttachMsg is the payload delivered over the pendingAttach channel
@@ -1731,11 +1734,8 @@ func (w *Window) layoutFileCard(gtx layout.Context, message service.DirectMessag
17311734
// awaiting confirmation (waiting_ack needs redraw for ack arrival).
17321735
transferInProgress := (isMine && !senderCompleted && transferState != "") ||
17331736
receiverDownloadActive || transferState == "waiting_ack"
1734-
if transferInProgress && w.window != nil {
1735-
window := w.window
1736-
time.AfterFunc(500*time.Millisecond, func() {
1737-
window.Invalidate()
1738-
})
1737+
if transferInProgress {
1738+
w.scheduleTransferInvalidate(500 * time.Millisecond)
17391739
}
17401740

17411741
macro := op.Record(gtx.Ops)
@@ -1950,6 +1950,31 @@ func (w *Window) layoutFileCard(gtx layout.Context, message service.DirectMessag
19501950
return dims
19511951
}
19521952

1953+
// scheduleTransferInvalidate coalesces redraw requests for in-progress file
1954+
// transfers. layoutFileCard runs every frame; spawning a new timer on each
1955+
// frame leads to an unbounded timer/goroutine backlog under active transfers,
1956+
// which can starve the Gio event loop and present as a frozen window.
1957+
func (w *Window) scheduleTransferInvalidate(delay time.Duration) {
1958+
w.transferInvalidateMu.Lock()
1959+
if w.transferInvalidatePending {
1960+
w.transferInvalidateMu.Unlock()
1961+
return
1962+
}
1963+
w.transferInvalidatePending = true
1964+
w.transferInvalidateMu.Unlock()
1965+
1966+
time.AfterFunc(delay, func() {
1967+
w.transferInvalidateMu.Lock()
1968+
w.transferInvalidatePending = false
1969+
window := w.window
1970+
w.transferInvalidateMu.Unlock()
1971+
1972+
if window != nil {
1973+
window.Invalidate()
1974+
}
1975+
})
1976+
}
1977+
19531978
// layoutFileProgressBar renders a progress bar for the sender side.
19541979
// percent is the current transfer progress (0–100). When percent is 0
19551980
// a minimal sliver is shown to indicate the transfer has been initiated.

internal/app/desktop/window_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,29 @@ func TestEllipsize(t *testing.T) {
7979
}
8080
}
8181

82+
func TestScheduleTransferInvalidateCoalesces(t *testing.T) {
83+
w := &Window{}
84+
85+
w.scheduleTransferInvalidate(10 * time.Millisecond)
86+
w.scheduleTransferInvalidate(10 * time.Millisecond)
87+
88+
w.transferInvalidateMu.Lock()
89+
pending := w.transferInvalidatePending
90+
w.transferInvalidateMu.Unlock()
91+
if !pending {
92+
t.Fatalf("expected transfer invalidate to be pending")
93+
}
94+
95+
time.Sleep(30 * time.Millisecond)
96+
97+
w.transferInvalidateMu.Lock()
98+
pending = w.transferInvalidatePending
99+
w.transferInvalidateMu.Unlock()
100+
if pending {
101+
t.Fatalf("expected transfer invalidate pending flag to clear after timer fires")
102+
}
103+
}
104+
82105
// TestNetworkStatusSummary verifies that the aggregate network status is based
83106
// on the number of usable peers (healthy + degraded) among currently live
84107
// peers. Stalled peers count as connected-but-not-usable, while reconnecting

internal/core/node/aggregate_status.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,19 @@ func (s *Service) AggregateStatus() domain.AggregateStatusSnapshot {
134134
}
135135

136136
// aggregateStatusFrame builds the protocol frame for the
137-
// fetch_aggregate_status local RPC command.
137+
// fetch_aggregate_status local RPC command. Both snapshots are read under
138+
// a single RLock acquisition to avoid re-acquiring the lock twice — with
139+
// Go's writer-preferring RWMutex, each separate RLock is a window where
140+
// a queued writer can interleave and stall the caller.
138141
func (s *Service) aggregateStatusFrame() protocol.Frame {
139-
snap := s.AggregateStatus()
140-
vpSnap := s.VersionPolicySnapshot()
142+
s.mu.RLock()
143+
snap := s.aggregateStatus
144+
var vpSnap domain.VersionPolicySnapshot
145+
if s.versionPolicy != nil {
146+
vpSnap = s.versionPolicy.snapshot
147+
}
148+
s.mu.RUnlock()
149+
141150
return protocol.Frame{
142151
Type: "aggregate_status",
143152
AggregateStatus: &protocol.AggregateStatusFrame{

internal/core/node/peer_management.go

Lines changed: 156 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -2631,6 +2631,26 @@ func (s *Service) updatePeerStateLocked(health *peerHealth, next string) {
26312631
s.refreshAggregateStatusLocked()
26322632
}
26332633

2634+
// peerHealthSnap holds the minimal per-peer data extracted from s.mu-protected
2635+
// state in a single short critical section. All subsequent formatting and
2636+
// enrichment runs lock-free, preventing the long RLock hold that caused
2637+
// writer starvation (bootstrapLoop's refreshAggregateStatus needs s.mu.Lock
2638+
// every 2 s, and Go's RWMutex is writer-preferring — a long RLock blocks
2639+
// subsequent RLock callers once a writer is queued, freezing the entire
2640+
// ProbeNode RPC chain and the UI).
2641+
type peerHealthSnap struct {
2642+
health peerHealth
2643+
peerID domain.PeerIdentity
2644+
clientVersion string
2645+
clientBuild int
2646+
pendingCount int
2647+
capabilities []string
2648+
sessionVersion int
2649+
sessionConnID uint64
2650+
inboundConnIDs []uint64
2651+
versionLockoutActive bool
2652+
}
2653+
26342654
func (s *Service) peerHealthFrames() []protocol.PeerHealthFrame {
26352655
// Collect CM slot snapshots before taking s.mu to avoid nested locking
26362656
// (connManager.Slots() acquires cm.mu independently).
@@ -2671,74 +2691,141 @@ func (s *Service) peerHealthFrames() []protocol.PeerHealthFrame {
26712691
}
26722692
}
26732693

2694+
// ---------------------------------------------------------------
2695+
// Short critical section: copy all s.mu-protected data into local
2696+
// snapshots so the lock is released before any formatting work.
2697+
// This prevents writer starvation on s.mu — bootstrapLoop calls
2698+
// refreshAggregateStatus (write lock) every 2 s, and Go's RWMutex
2699+
// is writer-preferring: once a writer is queued, new RLock callers
2700+
// block. Holding RLock for the full frame-building loop made
2701+
// subsequent ProbeNode RPCs (each needing RLock) wait behind the
2702+
// queued writer, freezing the UI for the entire 3-second timeout.
2703+
// ---------------------------------------------------------------
26742704
s.mu.RLock()
2675-
defer s.mu.RUnlock()
26762705
now := time.Now().UTC()
26772706

26782707
live := s.liveTrafficLocked()
26792708

2680-
seen := make(map[domain.PeerAddress]struct{}, len(s.health))
2681-
items := make([]protocol.PeerHealthFrame, 0, len(s.health)+len(live))
2709+
healthSnaps := make([]peerHealthSnap, 0, len(s.health))
26822710
for _, health := range s.health {
2683-
seen[health.Address] = struct{}{}
2684-
sent := health.BytesSent
2685-
recv := health.BytesReceived
2686-
if lv, ok := live[health.Address]; ok {
2711+
snap := peerHealthSnap{
2712+
health: *health,
2713+
peerID: s.peerIDs[health.Address],
2714+
clientVersion: s.peerVersions[health.Address],
2715+
clientBuild: s.peerBuilds[health.Address],
2716+
pendingCount: len(s.pending[health.Address]),
2717+
capabilities: s.peerCapabilitiesLocked(health.Address),
2718+
versionLockoutActive: s.isPeerVersionLockedOutLocked(health.Address),
2719+
inboundConnIDs: s.inboundConnIDsLocked(health.Address),
2720+
}
2721+
snap.health.State = s.computePeerStateAtLocked(health, now)
2722+
for dialAddr, session := range s.sessions {
2723+
if s.resolveHealthAddress(dialAddr) == health.Address {
2724+
snap.sessionVersion = session.version
2725+
snap.sessionConnID = uint64(session.connID)
2726+
break
2727+
}
2728+
}
2729+
healthSnaps = append(healthSnaps, snap)
2730+
}
2731+
2732+
// Snapshot inbound-only peers (live traffic but no health entry).
2733+
type inboundLiveSnap struct {
2734+
address domain.PeerAddress
2735+
peerID domain.PeerIdentity
2736+
clientVersion string
2737+
clientBuild int
2738+
capabilities []string
2739+
sessionVersion int
2740+
sent int64
2741+
received int64
2742+
inboundConnIDs []uint64
2743+
}
2744+
healthAddrs := make(map[domain.PeerAddress]struct{}, len(s.health))
2745+
for _, h := range s.health {
2746+
healthAddrs[h.Address] = struct{}{}
2747+
}
2748+
var inboundSnaps []inboundLiveSnap
2749+
for addr, lv := range live {
2750+
if _, ok := healthAddrs[addr]; ok {
2751+
continue
2752+
}
2753+
ils := inboundLiveSnap{
2754+
address: addr,
2755+
peerID: s.peerIDs[addr],
2756+
clientVersion: s.peerVersions[addr],
2757+
clientBuild: s.peerBuilds[addr],
2758+
capabilities: s.peerCapabilitiesLocked(addr),
2759+
sent: lv.sent,
2760+
received: lv.received,
2761+
inboundConnIDs: s.inboundConnIDsLocked(addr),
2762+
}
2763+
if session, ok := s.sessions[addr]; ok {
2764+
ils.sessionVersion = session.version
2765+
}
2766+
inboundSnaps = append(inboundSnaps, ils)
2767+
}
2768+
2769+
s.mu.RUnlock()
2770+
// ---------------------------------------------------------------
2771+
// Lock released — all remaining work is pure computation on local
2772+
// copies, safe to run without holding any lock.
2773+
// ---------------------------------------------------------------
2774+
2775+
seen := make(map[domain.PeerAddress]struct{}, len(healthSnaps))
2776+
items := make([]protocol.PeerHealthFrame, 0, len(healthSnaps)+len(inboundSnaps))
2777+
for _, snap := range healthSnaps {
2778+
h := &snap.health
2779+
seen[h.Address] = struct{}{}
2780+
sent := h.BytesSent
2781+
recv := h.BytesReceived
2782+
if lv, ok := live[h.Address]; ok {
26872783
sent += lv.sent
26882784
recv += lv.received
26892785
}
26902786
phf := protocol.PeerHealthFrame{
2691-
Address: string(health.Address),
2692-
PeerID: string(s.peerIDs[health.Address]),
2693-
Network: classifyAddress(health.Address).String(),
2694-
Direction: string(health.Direction),
2695-
ClientVersion: s.peerVersions[health.Address],
2696-
ClientBuild: s.peerBuilds[health.Address],
2697-
State: s.computePeerStateAtLocked(health, now),
2698-
Connected: health.Connected,
2699-
PendingCount: len(s.pending[health.Address]),
2700-
LastConnectedAt: formatTime(health.LastConnectedAt),
2701-
LastDisconnectedAt: formatTime(health.LastDisconnectedAt),
2702-
LastPingAt: formatTime(health.LastPingAt),
2703-
LastPongAt: formatTime(health.LastPongAt),
2704-
LastUsefulSendAt: formatTime(health.LastUsefulSendAt),
2705-
LastUsefulReceiveAt: formatTime(health.LastUsefulReceiveAt),
2706-
ConsecutiveFailures: health.ConsecutiveFailures,
2707-
LastError: health.LastError,
2708-
Score: health.Score,
2709-
BannedUntil: formatTime(health.BannedUntil),
2787+
Address: string(h.Address),
2788+
PeerID: string(snap.peerID),
2789+
Network: classifyAddress(h.Address).String(),
2790+
Direction: string(h.Direction),
2791+
ClientVersion: snap.clientVersion,
2792+
ClientBuild: snap.clientBuild,
2793+
State: h.State,
2794+
Connected: h.Connected,
2795+
PendingCount: snap.pendingCount,
2796+
LastConnectedAt: formatTime(h.LastConnectedAt),
2797+
LastDisconnectedAt: formatTime(h.LastDisconnectedAt),
2798+
LastPingAt: formatTime(h.LastPingAt),
2799+
LastPongAt: formatTime(h.LastPongAt),
2800+
LastUsefulSendAt: formatTime(h.LastUsefulSendAt),
2801+
LastUsefulReceiveAt: formatTime(h.LastUsefulReceiveAt),
2802+
ConsecutiveFailures: h.ConsecutiveFailures,
2803+
LastError: h.LastError,
2804+
Score: h.Score,
2805+
BannedUntil: formatTime(h.BannedUntil),
27102806
BytesSent: sent,
27112807
BytesReceived: recv,
27122808
TotalTraffic: sent + recv,
2713-
Capabilities: s.peerCapabilitiesLocked(health.Address),
2809+
Capabilities: snap.capabilities,
27142810

27152811
// Machine-readable disconnect diagnostics.
2716-
LastErrorCode: health.LastErrorCode,
2717-
LastDisconnectCode: health.LastDisconnectCode,
2718-
IncompatibleVersionAttempts: int(health.IncompatibleVersionAttempts),
2719-
LastIncompatibleVersionAt: formatTime(health.LastIncompatibleVersionAt),
2720-
ObservedPeerVersion: int(health.ObservedPeerVersion),
2721-
ObservedPeerMinimumVersion: int(health.ObservedPeerMinimumVersion),
2722-
VersionLockoutActive: s.isPeerVersionLockedOutLocked(health.Address),
2723-
}
2724-
// sessions is keyed by dial address which may differ from the
2725-
// health address when a fallback port was used. Iterate to find
2726-
// the matching session by resolved health key.
2727-
for dialAddr, session := range s.sessions {
2728-
if s.resolveHealthAddress(dialAddr) == health.Address {
2729-
phf.ProtocolVersion = session.version
2730-
// PeerHealthFrame.ConnID is the wire-level uint64; convert
2731-
// from the typed session identifier before serialising.
2732-
phf.ConnID = uint64(session.connID)
2733-
break
2734-
}
2812+
LastErrorCode: h.LastErrorCode,
2813+
LastDisconnectCode: h.LastDisconnectCode,
2814+
IncompatibleVersionAttempts: int(h.IncompatibleVersionAttempts),
2815+
LastIncompatibleVersionAt: formatTime(h.LastIncompatibleVersionAt),
2816+
ObservedPeerVersion: int(h.ObservedPeerVersion),
2817+
ObservedPeerMinimumVersion: int(h.ObservedPeerMinimumVersion),
2818+
VersionLockoutActive: snap.versionLockoutActive,
2819+
2820+
ProtocolVersion: snap.sessionVersion,
2821+
ConnID: snap.sessionConnID,
27352822
}
27362823
// Enrich with CM slot lifecycle data if this peer has an outbound slot.
2737-
if snap, ok := slotByAddr[health.Address]; ok {
2738-
phf.SlotState = snap.State
2739-
phf.SlotRetryCount = snap.RetryCount
2740-
phf.SlotGeneration = snap.Generation
2741-
phf.SlotConnectedAddr = snap.ConnectedAddr
2824+
if sl, ok := slotByAddr[h.Address]; ok {
2825+
phf.SlotState = sl.State
2826+
phf.SlotRetryCount = sl.RetryCount
2827+
phf.SlotGeneration = sl.Generation
2828+
phf.SlotConnectedAddr = sl.ConnectedAddr
27422829
}
27432830
// When an outbound session exists, emit a single row with the
27442831
// outbound ConnID — even if inbound connections coexist (both
@@ -2751,9 +2838,8 @@ func (s *Service) peerHealthFrames() []protocol.PeerHealthFrame {
27512838
enrichCaptureFields(&phf, captureByConn)
27522839
items = append(items, phf)
27532840
} else {
2754-
inboundConns := s.inboundConnIDsLocked(health.Address)
2755-
if len(inboundConns) > 0 {
2756-
for _, cid := range inboundConns {
2841+
if len(snap.inboundConnIDs) > 0 {
2842+
for _, cid := range snap.inboundConnIDs {
27572843
row := phf
27582844
row.ConnID = cid
27592845
row.Direction = string(peerDirectionInbound)
@@ -2767,32 +2853,26 @@ func (s *Service) peerHealthFrames() []protocol.PeerHealthFrame {
27672853
}
27682854

27692855
// Include inbound-only peers that have live traffic but no health entry yet.
2770-
for addr, lv := range live {
2771-
if _, ok := seen[addr]; ok {
2772-
continue
2773-
}
2856+
for _, ils := range inboundSnaps {
27742857
inboundPHF := protocol.PeerHealthFrame{
2775-
Address: string(addr),
2776-
PeerID: string(s.peerIDs[addr]),
2777-
Network: classifyAddress(addr).String(),
2778-
Direction: string(peerDirectionInbound),
2779-
ClientVersion: s.peerVersions[addr],
2780-
ClientBuild: s.peerBuilds[addr],
2781-
State: peerStateHealthy,
2782-
Connected: true,
2783-
BytesSent: lv.sent,
2784-
BytesReceived: lv.received,
2785-
TotalTraffic: lv.sent + lv.received,
2786-
Capabilities: s.peerCapabilitiesLocked(addr),
2787-
}
2788-
if session, ok := s.sessions[addr]; ok {
2789-
inboundPHF.ProtocolVersion = session.version
2858+
Address: string(ils.address),
2859+
PeerID: string(ils.peerID),
2860+
Network: classifyAddress(ils.address).String(),
2861+
Direction: string(peerDirectionInbound),
2862+
ClientVersion: ils.clientVersion,
2863+
ClientBuild: ils.clientBuild,
2864+
State: peerStateHealthy,
2865+
Connected: true,
2866+
BytesSent: ils.sent,
2867+
BytesReceived: ils.received,
2868+
TotalTraffic: ils.sent + ils.received,
2869+
Capabilities: ils.capabilities,
2870+
ProtocolVersion: ils.sessionVersion,
27902871
}
27912872
// Emit one row per inbound conn_id (same pattern as health-based
27922873
// inbound peers) so capture enrichment can match by ConnID.
2793-
inboundConns := s.inboundConnIDsLocked(addr)
2794-
if len(inboundConns) > 0 {
2795-
for _, cid := range inboundConns {
2874+
if len(ils.inboundConnIDs) > 0 {
2875+
for _, cid := range ils.inboundConnIDs {
27962876
row := inboundPHF
27972877
row.ConnID = cid
27982878
enrichCaptureFields(&row, captureByConn)

0 commit comments

Comments
 (0)