Skip to content

Commit 97f9ed3

Browse files
committed
cleanup
1 parent 10f7e1b commit 97f9ed3

2 files changed

Lines changed: 39 additions & 66 deletions

File tree

blockstm/bench_test.go

Lines changed: 3 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,16 @@ package blockstm
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
7-
"runtime"
86
"strconv"
97
"sync/atomic"
108
"testing"
119

1210
"github.com/test-go/testify/require"
13-
"golang.org/x/sync/errgroup"
1411

1512
storetypes "cosmossdk.io/store/types"
1613
)
1714

18-
func executeBlock(stores map[storetypes.StoreKey]int, storage MultiStore, worker int, block *MockBlock) error {
19-
incarnationCache := make([]atomic.Pointer[map[string]any], block.Size())
20-
for i := 0; i < block.Size(); i++ {
21-
m := make(map[string]any)
22-
incarnationCache[i].Store(&m)
23-
}
24-
return ExecuteBlock(context.Background(), block.Size(), stores, storage, worker, func(txn TxnIndex, store MultiStore) {
25-
cache := incarnationCache[txn].Swap(nil)
26-
block.ExecuteTx(txn, store, *cache)
27-
incarnationCache[txn].Store(cache)
28-
})
29-
}
30-
3115
func BenchmarkBlockSTM(b *testing.B) {
3216
stores := map[storetypes.StoreKey]int{StoreKeyAuth: 0, StoreKeyBank: 1}
3317
for i := 0; i < 26; i++ {
@@ -92,17 +76,19 @@ func BenchmarkBlockSTM(b *testing.B) {
9276
b.ResetTimer()
9377
var executedTotal, validatedTotal uint64
9478
for i := 0; i < b.N; i++ {
95-
executed, validated, err := executeBlockForBench(
79+
executed, validated, err := executeBlockWithEstimatesImpl(
9680
context.Background(),
9781
tc.block.Size(),
9882
stores,
9983
storage,
10084
worker,
85+
nil,
10186
func(txn TxnIndex, store MultiStore) {
10287
cache := incarnationCache[txn].Swap(nil)
10388
tc.block.ExecuteTx(txn, store, *cache)
10489
incarnationCache[txn].Store(cache)
10590
},
91+
false,
10692
)
10793
require.NoError(b, err)
10894
executedTotal += executed
@@ -148,47 +134,6 @@ func iterateNewKeysBlock(size int) *MockBlock {
148134
return NewMockBlock(txs)
149135
}
150136

151-
// executeBlockForBench is a benchmark-only variant of ExecuteBlockWithEstimates that
152-
// exposes scheduler counters (executed / validated) without emitting telemetry.
153-
func executeBlockForBench(
154-
ctx context.Context,
155-
blockSize int,
156-
stores map[storetypes.StoreKey]int,
157-
storage MultiStore,
158-
executors int,
159-
txExecutor TxExecutor,
160-
) (executed, validated uint64, err error) {
161-
if executors < 0 {
162-
return 0, 0, fmt.Errorf("invalid number of executors: %d", executors)
163-
}
164-
if executors == 0 {
165-
executors = min(runtime.GOMAXPROCS(0), runtime.NumCPU())
166-
}
167-
168-
scheduler := NewScheduler(blockSize)
169-
mvMemory := NewMVMemory(blockSize, stores, storage, scheduler)
170-
171-
var wg errgroup.Group
172-
wg.SetLimit(executors)
173-
for i := 0; i < executors; i++ {
174-
e := NewExecutor(ctx, scheduler, txExecutor, mvMemory, i)
175-
wg.Go(e.Run)
176-
}
177-
if err := wg.Wait(); err != nil {
178-
return 0, 0, err
179-
}
180-
181-
if !scheduler.Done() {
182-
if ctx.Err() != nil {
183-
return 0, 0, ctx.Err()
184-
}
185-
return 0, 0, errors.New("scheduler did not complete")
186-
}
187-
188-
mvMemory.WriteSnapshot(storage)
189-
return uint64(scheduler.executedTxns.Load()), uint64(scheduler.validatedTxns.Load()), nil
190-
}
191-
192137
func runSequential(storage MultiStore, block *MockBlock) {
193138
for i, tx := range block.Txs {
194139
block.Results[i] = tx(storage, nil)

blockstm/stm.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,31 @@ func ExecuteBlockWithEstimates(
3636
estimates []MultiLocations, // txn -> multi-locations
3737
txExecutor TxExecutor,
3838
) error {
39+
_, _, err := executeBlockWithEstimatesImpl(
40+
ctx,
41+
blockSize,
42+
stores,
43+
storage,
44+
executors,
45+
estimates,
46+
txExecutor,
47+
true,
48+
)
49+
return err
50+
}
51+
52+
func executeBlockWithEstimatesImpl(
53+
ctx context.Context,
54+
blockSize int,
55+
stores map[storetypes.StoreKey]int,
56+
storage MultiStore,
57+
executors int,
58+
estimates []MultiLocations, // txn -> multi-locations
59+
txExecutor TxExecutor,
60+
emitTelemetry bool,
61+
) (executed, validated uint64, err error) {
3962
if executors < 0 {
40-
return fmt.Errorf("invalid number of executors: %d", executors)
63+
return 0, 0, fmt.Errorf("invalid number of executors: %d", executors)
4164
}
4265
if executors == 0 {
4366
executors = maxParallelism()
@@ -68,25 +91,30 @@ func ExecuteBlockWithEstimates(
6891
}
6992
}()
7093

71-
err := wg.Wait()
94+
err = wg.Wait()
7295
close(cancelDone)
7396
if err != nil {
74-
return err
97+
return 0, 0, err
7598
}
7699

77100
if !scheduler.Done() {
78101
if ctx.Err() != nil {
79-
return ctx.Err()
102+
return 0, 0, ctx.Err()
80103
}
81-
return errors.New("scheduler did not complete")
104+
return 0, 0, errors.New("scheduler did not complete")
82105
}
83106

84-
telemetry.SetGauge(float32(scheduler.executedTxns.Load()), TelemetrySubsystem, KeyExecutedTxs) //nolint:staticcheck // TODO: switch to OpenTelemetry
85-
telemetry.SetGauge(float32(scheduler.validatedTxns.Load()), TelemetrySubsystem, KeyValidatedTxs) //nolint:staticcheck // TODO: switch to OpenTelemetry
107+
executed = uint64(scheduler.executedTxns.Load())
108+
validated = uint64(scheduler.validatedTxns.Load())
109+
110+
if emitTelemetry {
111+
telemetry.SetGauge(float32(executed), TelemetrySubsystem, KeyExecutedTxs) //nolint:staticcheck // TODO: switch to OpenTelemetry
112+
telemetry.SetGauge(float32(validated), TelemetrySubsystem, KeyValidatedTxs) //nolint:staticcheck // TODO: switch to OpenTelemetry
113+
}
86114

87115
// Write the snapshot into the storage
88116
mvMemory.WriteSnapshot(storage)
89-
return nil
117+
return executed, validated, nil
90118
}
91119

92120
func maxParallelism() int {

0 commit comments

Comments
 (0)