Skip to content

Commit 3c69ffe

Browse files
committed
feat(queuev2): wire CachedScheduledQueue into the timer queue factory
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
1 parent 1a76e6c commit 3c69ffe

10 files changed

Lines changed: 520 additions & 1 deletion
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// The MIT License (MIT)
2+
3+
// Copyright (c) 2017-2020 Uber Technologies Inc.
4+
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
package queuev2
24+
25+
import (
26+
"context"
27+
28+
hcommon "github.com/uber/cadence/service/history/common"
29+
)
30+
31+
type cachedScheduledQueue struct {
32+
*scheduledQueue
33+
reader CachedQueueReader
34+
}
35+
36+
func newCachedScheduledQueue(inner *scheduledQueue, reader CachedQueueReader) Queue {
37+
// Wrap the queue state update so the reader evicts tasks below the current
38+
// ack level whenever the queue persists its progress. GetMinReadLevel returns
39+
// MaximumHistoryTaskKey when no virtual queues exist yet; UpdateReadLevel
40+
// normalises that sentinel so it is treated as a no-op advance.
41+
originalUpdateFn := inner.base.updateQueueStateFn
42+
inner.base.updateQueueStateFn = func(ctx context.Context) {
43+
originalUpdateFn(ctx)
44+
reader.UpdateReadLevel(inner.base.virtualQueueManager.GetMinReadLevel())
45+
}
46+
47+
return &cachedScheduledQueue{scheduledQueue: inner, reader: reader}
48+
}
49+
50+
func (q *cachedScheduledQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
51+
q.reader.Inject(info.Tasks)
52+
q.scheduledQueue.NotifyNewTask(clusterName, info)
53+
}
54+
55+
func (q *cachedScheduledQueue) Start() {
56+
q.reader.Start()
57+
q.scheduledQueue.Start()
58+
}
59+
60+
func (q *cachedScheduledQueue) Stop() {
61+
q.scheduledQueue.Stop()
62+
q.reader.Stop()
63+
}
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
// The MIT License (MIT)
2+
3+
// Copyright (c) 2017-2020 Uber Technologies Inc.
4+
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
package queuev2
24+
25+
import (
26+
"context"
27+
"testing"
28+
"time"
29+
30+
"github.com/stretchr/testify/assert"
31+
"github.com/stretchr/testify/require"
32+
"go.uber.org/goleak"
33+
"go.uber.org/mock/gomock"
34+
35+
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
36+
"github.com/uber/cadence/common/metrics"
37+
"github.com/uber/cadence/common/persistence"
38+
hcommon "github.com/uber/cadence/service/history/common"
39+
"github.com/uber/cadence/service/history/config"
40+
"github.com/uber/cadence/service/history/shard"
41+
"github.com/uber/cadence/service/history/task"
42+
)
43+
44+
func TestCachedScheduledQueue_Construction(t *testing.T) {
45+
defer goleak.VerifyNone(t)
46+
ctrl := gomock.NewController(t)
47+
48+
mockShard := shard.NewTestContext(
49+
t, ctrl,
50+
&persistence.ShardInfo{ShardID: 10, RangeID: 1, TransferAckLevel: 0},
51+
config.NewForTest(),
52+
)
53+
54+
options := testScheduledQueueOptions()
55+
mockReader := NewMockCachedQueueReader(ctrl)
56+
57+
inner := newScheduledQueue(mockShard, persistence.HistoryTaskCategoryTimer,
58+
task.NewMockProcessor(ctrl), task.NewMockExecutor(ctrl),
59+
mockShard.GetLogger(), metrics.NoopClient, metrics.NoopScope,
60+
options, mockReader)
61+
62+
q := newCachedScheduledQueue(inner, mockReader)
63+
64+
require.NotNil(t, q)
65+
csq, ok := q.(*cachedScheduledQueue)
66+
require.True(t, ok, "expected *cachedScheduledQueue")
67+
assert.NotNil(t, csq.reader)
68+
assert.NotNil(t, csq.scheduledQueue)
69+
}
70+
71+
func TestCachedScheduledQueue_NotifyNewTask_Empty(t *testing.T) {
72+
ctrl := gomock.NewController(t)
73+
mockReader := NewMockCachedQueueReader(ctrl)
74+
75+
// Inject is always called even with nil tasks; the real implementation handles it.
76+
mockReader.EXPECT().Inject([]persistence.Task(nil)).Times(1)
77+
78+
csq := &cachedScheduledQueue{
79+
scheduledQueue: &scheduledQueue{
80+
base: &queueBase{
81+
metricsScope: metrics.NoopScope,
82+
},
83+
newTimerCh: make(chan struct{}, 1),
84+
},
85+
reader: mockReader,
86+
}
87+
88+
csq.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{
89+
Tasks: nil,
90+
})
91+
}
92+
93+
func TestCachedScheduledQueue_NotifyNewTask_WithTasks(t *testing.T) {
94+
ctrl := gomock.NewController(t)
95+
mockReader := NewMockCachedQueueReader(ctrl)
96+
97+
tasks := []persistence.Task{
98+
&persistence.DecisionTimeoutTask{
99+
TaskData: persistence.TaskData{
100+
VisibilityTimestamp: time.Now(),
101+
},
102+
},
103+
}
104+
105+
mockReader.EXPECT().Inject(tasks).Times(1)
106+
107+
csq := &cachedScheduledQueue{
108+
scheduledQueue: &scheduledQueue{
109+
base: &queueBase{
110+
metricsScope: metrics.NoopScope,
111+
},
112+
newTimerCh: make(chan struct{}, 1),
113+
},
114+
reader: mockReader,
115+
}
116+
117+
csq.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{
118+
Tasks: tasks,
119+
})
120+
}
121+
122+
// TestCachedScheduledQueue_StartStop verifies that Start and Stop delegate to both
123+
// the reader and the inner scheduledQueue.
124+
func TestCachedScheduledQueue_StartStop(t *testing.T) {
125+
defer goleak.VerifyNone(t)
126+
ctrl := gomock.NewController(t)
127+
128+
mockShard := shard.NewTestContext(
129+
t, ctrl,
130+
&persistence.ShardInfo{ShardID: 10, RangeID: 1, TransferAckLevel: 0},
131+
config.NewForTest(),
132+
)
133+
134+
options := testScheduledQueueOptions()
135+
mockReader := NewMockCachedQueueReader(ctrl)
136+
137+
// processEventLoop calls LookAHead after the timer gate fires, and GetTask
138+
// when processing new tasks. Both can fire multiple times.
139+
mockReader.EXPECT().LookAHead(gomock.Any(), gomock.Any()).Return(&LookAHeadResponse{}, nil).AnyTimes()
140+
mockReader.EXPECT().GetTask(gomock.Any(), gomock.Any()).DoAndReturn(
141+
func(_ context.Context, req *GetTaskRequest) (*GetTaskResponse, error) {
142+
return &GetTaskResponse{
143+
Progress: &GetTaskProgress{
144+
Range: req.Progress.Range,
145+
NextTaskKey: req.Progress.ExclusiveMaxTaskKey,
146+
},
147+
}, nil
148+
},
149+
).AnyTimes()
150+
mockReader.EXPECT().Start().Times(1)
151+
mockReader.EXPECT().Stop().Times(1)
152+
153+
inner := newScheduledQueue(mockShard, persistence.HistoryTaskCategoryTimer,
154+
task.NewMockProcessor(ctrl), task.NewMockExecutor(ctrl),
155+
mockShard.GetLogger(), metrics.NoopClient, metrics.NoopScope,
156+
options, mockReader)
157+
158+
q := newCachedScheduledQueue(inner, mockReader)
159+
160+
q.Start()
161+
q.Stop()
162+
}
163+
164+
// TestCachedScheduledQueue_StartStopDelegation verifies that cachedScheduledQueue.Start
165+
// and Stop delegate to the embedded reader.
166+
func TestCachedScheduledQueue_StartStopDelegation(t *testing.T) {
167+
ctrl := gomock.NewController(t)
168+
mockReader := NewMockCachedQueueReader(ctrl)
169+
mockReader.EXPECT().Start().Times(1)
170+
mockReader.EXPECT().Stop().Times(1)
171+
172+
csq := &cachedScheduledQueue{
173+
reader: mockReader,
174+
}
175+
176+
csq.reader.Start()
177+
csq.reader.Stop()
178+
}
179+
180+
func TestCachedScheduledQueue_EvictionHookWired(t *testing.T) {
181+
defer goleak.VerifyNone(t)
182+
ctrl := gomock.NewController(t)
183+
184+
mockShard := shard.NewTestContext(
185+
t, ctrl,
186+
&persistence.ShardInfo{ShardID: 10, RangeID: 1, TransferAckLevel: 0},
187+
config.NewForTest(),
188+
)
189+
190+
options := testScheduledQueueOptions()
191+
mockReader := NewMockCachedQueueReader(ctrl)
192+
193+
inner := newScheduledQueue(mockShard, persistence.HistoryTaskCategoryTimer,
194+
task.NewMockProcessor(ctrl), task.NewMockExecutor(ctrl),
195+
mockShard.GetLogger(), metrics.NoopClient, metrics.NoopScope,
196+
options, mockReader)
197+
198+
// Replace the original updateQueueStateFn with a no-op so the hook closure
199+
// can be called without triggering real shard persistence.
200+
inner.base.updateQueueStateFn = func(ctx context.Context) {}
201+
202+
q := newCachedScheduledQueue(inner, mockReader)
203+
csq := q.(*cachedScheduledQueue)
204+
205+
// The eviction hook should be wired into updateQueueStateFn.
206+
assert.NotNil(t, csq.scheduledQueue.base.updateQueueStateFn)
207+
208+
// Calling the hooked function exercises the closure (covers the inner body).
209+
mockReader.EXPECT().UpdateReadLevel(gomock.Any()).Times(1)
210+
csq.scheduledQueue.base.updateQueueStateFn(context.Background())
211+
}
212+
213+
func testScheduledQueueOptions() *Options {
214+
return &Options{
215+
DeleteBatchSize: dynamicproperties.GetIntPropertyFn(100),
216+
RedispatchInterval: dynamicproperties.GetDurationPropertyFn(10 * time.Second),
217+
PageSize: dynamicproperties.GetIntPropertyFn(100),
218+
PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(10 * time.Second),
219+
MaxPollInterval: dynamicproperties.GetDurationPropertyFn(10 * time.Second),
220+
MaxPollIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1),
221+
UpdateAckInterval: dynamicproperties.GetDurationPropertyFn(10 * time.Second),
222+
UpdateAckIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1),
223+
MaxPollRPS: dynamicproperties.GetIntPropertyFn(100),
224+
MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100),
225+
PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0),
226+
VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(10 * time.Second),
227+
CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90),
228+
EnablePendingTaskCountAlert: func() bool { return true },
229+
MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2),
230+
}
231+
}

service/history/queuev2/timer_queue_factory.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,14 +151,19 @@ func (f *timerQueueFactory) createQueuev2(
151151
MaxVirtualQueueCount: config.QueueMaxVirtualQueueCount,
152152
}
153153

154+
var cachedReader CachedQueueReader
154155
reader := NewQueueReader(
155156
shard,
156157
persistence.HistoryTaskCategoryTimer,
157158
options.MaxPollInterval,
158159
options.MaxPollIntervalJitterCoefficient,
159160
)
161+
if config.TimerProcessorEnableCachedScheduledQueue() {
162+
cachedReader = newCachedQueueReader(reader, newInMemQueue(), shard, metricsScope)
163+
reader = cachedReader
164+
}
160165

161-
return newScheduledQueue(
166+
base := newScheduledQueue(
162167
shard,
163168
persistence.HistoryTaskCategoryTimer,
164169
f.taskProcessor,
@@ -169,4 +174,10 @@ func (f *timerQueueFactory) createQueuev2(
169174
options,
170175
reader,
171176
)
177+
178+
if cachedReader != nil {
179+
return newCachedScheduledQueue(base, cachedReader)
180+
}
181+
182+
return base
172183
}

service/history/queuev2/timer_queue_factory_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"go.uber.org/goleak"
88
"go.uber.org/mock/gomock"
99

10+
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
1011
"github.com/uber/cadence/common/persistence"
1112
"github.com/uber/cadence/common/reconciliation/invariant"
1213
"github.com/uber/cadence/service/history/config"
@@ -50,6 +51,34 @@ func TestTimerQueueFactory_Category(t *testing.T) {
5051
assert.Equal(t, persistence.HistoryTaskCategoryTimer, category)
5152
}
5253

54+
func TestTimerQueueFactory_CreateQueueV2_Cached(t *testing.T) {
55+
defer goleak.VerifyNone(t)
56+
ctrl := gomock.NewController(t)
57+
58+
cfg := config.NewForTest()
59+
cfg.TimerProcessorEnableCachedScheduledQueue = dynamicproperties.GetBoolPropertyFn(true)
60+
61+
mockShard := shard.NewTestContext(
62+
t, ctrl, &persistence.ShardInfo{
63+
ShardID: 10,
64+
RangeID: 1,
65+
TransferAckLevel: 0,
66+
},
67+
cfg)
68+
69+
factory := &timerQueueFactory{
70+
taskProcessor: task.NewMockProcessor(ctrl),
71+
archivalClient: archiver.NewMockClient(ctrl),
72+
}
73+
74+
processor := factory.createQueuev2(mockShard, execution.NewMockCache(ctrl), invariant.NewMockInvariant(ctrl))
75+
76+
assert.NotNil(t, processor)
77+
assert.Implements(t, (*queue.Processor)(nil), processor)
78+
_, ok := processor.(*cachedScheduledQueue)
79+
assert.True(t, ok, "expected *cachedScheduledQueue when config is enabled")
80+
}
81+
5382
func TestTimerQueueFactory_IsQueueV2Enabled(t *testing.T) {
5483
defer goleak.VerifyNone(t)
5584
ctrl := gomock.NewController(t)

0 commit comments

Comments
 (0)