Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Bug Fixes

* (blockstm) [#25789](https://github.com/cosmos/cosmos-sdk/issues/25789) Wake up suspended executors when scheduler doesn't complete to prevent goroutine leaks.
* (grpc) [#25647](https://github.com/cosmos/cosmos-sdk/pull/25647) Return actual `earliest_store_height` in `node.Status` gRPC endpoint instead of hardcoded `0`.
* (types/query) [#25665](https://github.com/cosmos/cosmos-sdk/issues/25665) Fix pagination offset when querying a collection with predicate function.
* (x/staking) [#25649](https://github.com/cosmos/cosmos-sdk/pull/25649) Add missing `defer iterator.Close()` calls in `IterateDelegatorRedelegations` and `GetRedelegations` to prevent resource leaks.
Expand Down
9 changes: 9 additions & 0 deletions blockstm/mvmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ func (mv *MVMemory) ConvertWritesToEstimates(txn TxnIndex) {
}
}

// ClearEstimates removes estimate marks for canceled transactions.
func (mv *MVMemory) ClearEstimates(txn TxnIndex) {
for i, locations := range mv.readLastWrittenLocations(txn) {
for _, key := range locations {
mv.data[i].Delete(key, txn)
}
}
}

func (mv *MVMemory) ValidateReadSet(txn TxnIndex) bool {
// Invariant: at least one `Record` call has been made for `txn`
rs := *mv.lastReadSet[txn].Load()
Expand Down
13 changes: 13 additions & 0 deletions blockstm/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,16 @@ func (s *Scheduler) Stats() string {
return fmt.Sprintf("executed: %d, validated: %d",
s.executedTxns.Load(), s.validatedTxns.Load())
}

// CancelAll wakes up all suspended executors.
// Called during context cancellation to prevent hanging.
func (s *Scheduler) CancelAll(preCancel func(i TxnIndex)) {
for i := range s.txnStatus {
s.txnStatus[i].TryCancel(func() {
if preCancel != nil {
idx := TxnIndex(i)
preCancel(idx)
}
})
}
}
29 changes: 27 additions & 2 deletions blockstm/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,15 @@ func (s *StatusEntry) setStatus(status, preStatus Status) {
}

func (s *StatusEntry) Resume() {
// Resume is normally called for a txn that is currently suspended.
// With cancellation, a suspended txn may already have been woken and had its
// condition cleared; in that case this becomes a no-op.
s.Lock()
defer s.Unlock()

if s.status != StatusSuspended || s.cond == nil {
return
}

// status must be SUSPENDED and cond != nil
if s.status != StatusSuspended || s.cond == nil {
Expand All @@ -89,8 +97,6 @@ func (s *StatusEntry) Resume() {
s.status = StatusExecuting
s.cond.Notify()
s.cond = nil

s.Unlock()
}

func (s *StatusEntry) SetExecuted() {
Expand Down Expand Up @@ -138,3 +144,22 @@ func (s *StatusEntry) Suspend(cond *Condvar) {

s.Unlock()
}

// TryCancel wakes up a suspended executor if it's suspended.
// Called during context cancellation to prevent hanging.
func (s *StatusEntry) TryCancel(preCancel func()) {
s.Lock()
defer s.Unlock()

if s.status == StatusSuspended {
if preCancel != nil {
preCancel()
}

if s.cond != nil {
s.status = StatusExecuting
s.cond.Notify()
s.cond = nil
}
}
}
20 changes: 17 additions & 3 deletions blockstm/stm.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,30 @@ func ExecuteBlockWithEstimates(
e := NewExecutor(ctx, scheduler, txExecutor, mvMemory, i)
wg.Go(e.Run)
}
if err := wg.Wait(); err != nil {

// wake up suspended executors when context is canceled to prevent hanging
cancelDone := make(chan struct{})
go func() {
select {
case <-ctx.Done():
scheduler.CancelAll(func(i TxnIndex) {
// clear estimates before waking up so they don't suspend again
mvMemory.ClearEstimates(i)
})
case <-cancelDone:
}
}()
Comment on lines +60 to +69

Check notice

Code scanning / CodeQL

Spawning a Go routine

Spawning a Go routine may be a possible source of non-determinism

err := wg.Wait()
close(cancelDone)
if err != nil {
return err
}

if !scheduler.Done() {
if ctx.Err() != nil {
// canceled
return ctx.Err()
}

return errors.New("scheduler did not complete")
}

Expand Down
31 changes: 31 additions & 0 deletions blockstm/stm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,46 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"testing"
"time"

"github.com/test-go/testify/require"

storetypes "cosmossdk.io/store/types"
)

func TestExecuteBlock_CancelWakesSuspendedExecutors(t *testing.T) {
stores := map[storetypes.StoreKey]int{StoreKeyAuth: 0}
storage := NewMultiMemDB(stores)

// Mark key "k" as ESTIMATE for txn 0.
estimates := make([]MultiLocations, 2)
estimates[0] = MultiLocations{0: Locations{Key([]byte("k"))}}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
time.Sleep(20 * time.Millisecond)
cancel()
}()

err := ExecuteBlockWithEstimates(ctx, 2, stores, storage, 2, estimates,
func(txn TxnIndex, store MultiStore) {
if txn == 0 {
time.Sleep(250 * time.Millisecond)
return
}
// Txn 1 suspends on ESTIMATE.
store.GetKVStore(StoreKeyAuth).Get([]byte("k"))
},
)
require.True(t, errors.Is(err, context.Canceled))
}

func accountName(i int64) string {
return fmt.Sprintf("account%05d", i)
}
Expand Down
Loading