Skip to content

Commit 71ff205

Browse files
fix(blockstm): wake up suspended executors on cancel (cosmos#25793)
2 parents bb98cd6 + c6909d5 commit 71ff205

6 files changed

Lines changed: 98 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
9393

9494
### Bug Fixes
9595

96+
* (blockstm) [#25789](https://github.com/cosmos/cosmos-sdk/issues/25789) Wake up suspended executors when scheduler doesn't complete to prevent goroutine leaks.
9697
* (grpc) [#25647](https://github.com/cosmos/cosmos-sdk/pull/25647) Return actual `earliest_store_height` in `node.Status` gRPC endpoint instead of hardcoded `0`.
9798
* (types/query) [#25665](https://github.com/cosmos/cosmos-sdk/issues/25665) Fix pagination offset when querying a collection with predicate function.
9899
* (x/staking) [#25649](https://github.com/cosmos/cosmos-sdk/pull/25649) Add missing `defer iterator.Close()` calls in `IterateDelegatorRedelegations` and `GetRedelegations` to prevent resource leaks.

blockstm/mvmemory.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,15 @@ func (mv *MVMemory) ConvertWritesToEstimates(txn TxnIndex) {
109109
}
110110
}
111111

112+
// ClearEstimates removes estimate marks for canceled transactions.
113+
func (mv *MVMemory) ClearEstimates(txn TxnIndex) {
114+
for i, locations := range mv.readLastWrittenLocations(txn) {
115+
for _, key := range locations {
116+
mv.data[i].Delete(key, txn)
117+
}
118+
}
119+
}
120+
112121
func (mv *MVMemory) ValidateReadSet(txn TxnIndex) bool {
113122
// Invariant: at least one `Record` call has been made for `txn`
114123
rs := *mv.lastReadSet[txn].Load()

blockstm/scheduler.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,3 +213,16 @@ func (s *Scheduler) Stats() string {
213213
return fmt.Sprintf("executed: %d, validated: %d",
214214
s.executedTxns.Load(), s.validatedTxns.Load())
215215
}
216+
217+
// CancelAll wakes up all suspended executors.
218+
// Called during context cancellation to prevent hanging.
219+
func (s *Scheduler) CancelAll(preCancel func(i TxnIndex)) {
220+
for i := range s.txnStatus {
221+
s.txnStatus[i].TryCancel(func() {
222+
if preCancel != nil {
223+
idx := TxnIndex(i)
224+
preCancel(idx)
225+
}
226+
})
227+
}
228+
}

blockstm/status.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,15 @@ func (s *StatusEntry) setStatus(status, preStatus Status) {
7878
}
7979

8080
func (s *StatusEntry) Resume() {
81+
// Resume is normally called for a txn that is currently suspended.
82+
// With cancellation, a suspended txn may already have been woken and had its
83+
// condition cleared; in that case this becomes a no-op.
8184
s.Lock()
85+
defer s.Unlock()
86+
87+
if s.status != StatusSuspended || s.cond == nil {
88+
return
89+
}
8290

8391
// status must be SUSPENDED and cond != nil
8492
if s.status != StatusSuspended || s.cond == nil {
@@ -89,8 +97,6 @@ func (s *StatusEntry) Resume() {
8997
s.status = StatusExecuting
9098
s.cond.Notify()
9199
s.cond = nil
92-
93-
s.Unlock()
94100
}
95101

96102
func (s *StatusEntry) SetExecuted() {
@@ -138,3 +144,22 @@ func (s *StatusEntry) Suspend(cond *Condvar) {
138144

139145
s.Unlock()
140146
}
147+
148+
// TryCancel wakes up a suspended executor if it's suspended.
149+
// Called during context cancellation to prevent hanging.
150+
func (s *StatusEntry) TryCancel(preCancel func()) {
151+
s.Lock()
152+
defer s.Unlock()
153+
154+
if s.status == StatusSuspended {
155+
if preCancel != nil {
156+
preCancel()
157+
}
158+
159+
if s.cond != nil {
160+
s.status = StatusExecuting
161+
s.cond.Notify()
162+
s.cond = nil
163+
}
164+
}
165+
}

blockstm/stm.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,30 @@ func ExecuteBlockWithEstimates(
5454
e := NewExecutor(ctx, scheduler, txExecutor, mvMemory, i)
5555
wg.Go(e.Run)
5656
}
57-
if err := wg.Wait(); err != nil {
57+
58+
// wake up suspended executors when context is canceled to prevent hanging
59+
cancelDone := make(chan struct{})
60+
go func() {
61+
select {
62+
case <-ctx.Done():
63+
scheduler.CancelAll(func(i TxnIndex) {
64+
// clear estimates before waking up so they don't suspend again
65+
mvMemory.ClearEstimates(i)
66+
})
67+
case <-cancelDone:
68+
}
69+
}()
70+
71+
err := wg.Wait()
72+
close(cancelDone)
73+
if err != nil {
5874
return err
5975
}
6076

6177
if !scheduler.Done() {
6278
if ctx.Err() != nil {
63-
// canceled
6479
return ctx.Err()
6580
}
66-
6781
return errors.New("scheduler did not complete")
6882
}
6983

blockstm/stm_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,46 @@ import (
44
"bytes"
55
"context"
66
"encoding/binary"
7+
"errors"
78
"fmt"
89
"math/rand"
910
"testing"
11+
"time"
1012

1113
"github.com/test-go/testify/require"
1214

1315
storetypes "cosmossdk.io/store/types"
1416
)
1517

18+
func TestExecuteBlock_CancelWakesSuspendedExecutors(t *testing.T) {
19+
stores := map[storetypes.StoreKey]int{StoreKeyAuth: 0}
20+
storage := NewMultiMemDB(stores)
21+
22+
// Mark key "k" as ESTIMATE for txn 0.
23+
estimates := make([]MultiLocations, 2)
24+
estimates[0] = MultiLocations{0: Locations{Key([]byte("k"))}}
25+
26+
ctx, cancel := context.WithCancel(context.Background())
27+
defer cancel()
28+
29+
go func() {
30+
time.Sleep(20 * time.Millisecond)
31+
cancel()
32+
}()
33+
34+
err := ExecuteBlockWithEstimates(ctx, 2, stores, storage, 2, estimates,
35+
func(txn TxnIndex, store MultiStore) {
36+
if txn == 0 {
37+
time.Sleep(250 * time.Millisecond)
38+
return
39+
}
40+
// Txn 1 suspends on ESTIMATE.
41+
store.GetKVStore(StoreKeyAuth).Get([]byte("k"))
42+
},
43+
)
44+
require.True(t, errors.Is(err, context.Canceled))
45+
}
46+
1647
func accountName(i int64) string {
1748
return fmt.Sprintf("account%05d", i)
1849
}

0 commit comments

Comments
 (0)