-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer_offsets_mmap_test.go
More file actions
326 lines (280 loc) · 7.8 KB
/
consumer_offsets_mmap_test.go
File metadata and controls
326 lines (280 loc) · 7.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
package comet
import (
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
)
func TestConsumerOffsetMmap(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "offset-mmap-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
// Create a new mmap offset store
store, err := NewConsumerOffsetMmap(tmpDir, 0)
if err != nil {
t.Fatal(err)
}
defer store.Close()
// Test initial state
offset, exists := store.Get("test-group")
if exists || offset != 0 {
t.Errorf("Expected no offset for new group, got %d (exists=%v)", offset, exists)
}
// Set an offset
if err := store.Set("test-group", 100); err != nil {
t.Fatalf("Failed to set offset: %v", err)
}
// Verify it's immediately visible
offset, exists = store.Get("test-group")
if !exists || offset != 100 {
t.Errorf("Expected offset 100, got %d (exists=%v)", offset, exists)
}
// Close and reopen to verify persistence
store.Close()
store2, err := NewConsumerOffsetMmap(tmpDir, 0)
if err != nil {
t.Fatal(err)
}
defer store2.Close()
// Verify loaded offset
offset, exists = store2.Get("test-group")
if !exists || offset != 100 {
t.Errorf("Expected loaded offset 100, got %d (exists=%v)", offset, exists)
}
// Test multiple groups
if err := store2.Set("group-1", 200); err != nil {
t.Fatal(err)
}
if err := store2.Set("group-2", 300); err != nil {
t.Fatal(err)
}
allOffsets := store2.GetAll()
if len(allOffsets) != 3 {
t.Errorf("Expected 3 offsets, got %d", len(allOffsets))
}
if allOffsets["test-group"] != 100 {
t.Errorf("Expected test-group=100, got %d", allOffsets["test-group"])
}
if allOffsets["group-1"] != 200 {
t.Errorf("Expected group-1=200, got %d", allOffsets["group-1"])
}
if allOffsets["group-2"] != 300 {
t.Errorf("Expected group-2=300, got %d", allOffsets["group-2"])
}
}
func TestConsumerOffsetMmapConcurrency(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "offset-mmap-concurrent")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
store, err := NewConsumerOffsetMmap(tmpDir, 0)
if err != nil {
t.Fatal(err)
}
defer store.Close()
// Test concurrent writes to different groups
var wg sync.WaitGroup
numGroups := 100
numUpdates := 1000
for i := 0; i < numGroups; i++ {
wg.Add(1)
go func(groupID int) {
defer wg.Done()
group := fmt.Sprintf("group-%d", groupID)
for j := 0; j < numUpdates; j++ {
if err := store.Set(group, int64(j)); err != nil {
t.Errorf("Failed to set offset for %s: %v", group, err)
return
}
}
}(i)
}
wg.Wait()
// Verify all groups have the correct final offset
for i := 0; i < numGroups; i++ {
group := fmt.Sprintf("group-%d", i)
offset, exists := store.Get(group)
if !exists {
t.Errorf("Group %s missing", group)
} else if offset != int64(numUpdates-1) {
t.Errorf("Group %s has offset %d, expected %d", group, offset, numUpdates-1)
}
}
}
func TestConsumerOffsetMmapMultiProcess(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "offset-mmap-multiproc")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
// Create first "process" (store instance)
store1, err := NewConsumerOffsetMmap(tmpDir, 0)
if err != nil {
t.Fatal(err)
}
defer store1.Close()
// Create second "process" (another store instance)
store2, err := NewConsumerOffsetMmap(tmpDir, 0)
if err != nil {
t.Fatal(err)
}
defer store2.Close()
// Write from process 1
if err := store1.Set("shared-group", 100); err != nil {
t.Fatal(err)
}
// Should be immediately visible in process 2
offset, exists := store2.Get("shared-group")
if !exists || offset != 100 {
t.Errorf("Process 2 should see offset 100, got %d (exists=%v)", offset, exists)
}
// Update from process 2
if err := store2.Set("shared-group", 200); err != nil {
t.Fatal(err)
}
// Should be immediately visible in process 1
offset, exists = store1.Get("shared-group")
if !exists || offset != 200 {
t.Errorf("Process 1 should see offset 200, got %d (exists=%v)", offset, exists)
}
}
func TestConsumerOffsetMmapMigration(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "offset-mmap-migration")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
// Create old-style offset file manually (simulating the old format)
// The old format was: version(1 byte), count(4 bytes), then entries
oldPath := filepath.Join(tmpDir, "offsets.bin")
oldData := []byte{
1, // version
2, 0, 0, 0, // count = 2
14, 'm', 'i', 'g', 'r', 'a', 't', 'e', 'd', '-', 'g', 'r', 'o', 'u', 'p', // group name length + name
244, 1, 0, 0, 0, 0, 0, 0, // offset = 500 (little endian)
13, 'a', 'n', 'o', 't', 'h', 'e', 'r', '-', 'g', 'r', 'o', 'u', 'p', // group name length + name
88, 2, 0, 0, 0, 0, 0, 0, // offset = 600 (little endian)
}
if err := os.WriteFile(oldPath, oldData, 0644); err != nil {
t.Fatal(err)
}
// Verify old file exists
if _, err := os.Stat(oldPath); err != nil {
t.Fatal("Old offset file should exist")
}
// Create mmap store - should migrate
mmapStore, err := NewConsumerOffsetMmap(tmpDir, 0)
if err != nil {
t.Fatal(err)
}
defer mmapStore.Close()
// Verify migrated offsets
offset, exists := mmapStore.Get("migrated-group")
if !exists || offset != 500 {
t.Errorf("Expected migrated offset 500, got %d (exists=%v)", offset, exists)
}
offset, exists = mmapStore.Get("another-group")
if !exists || offset != 600 {
t.Errorf("Expected migrated offset 600, got %d (exists=%v)", offset, exists)
}
// Old file should be deleted after migration
if _, err := os.Stat(oldPath); !os.IsNotExist(err) {
t.Error("Old offset file should be deleted after migration")
}
}
func TestConsumerOffsetMmapTableFull(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "offset-mmap-full")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
store, err := NewConsumerOffsetMmap(tmpDir, 0)
if err != nil {
t.Fatal(err)
}
defer store.Close()
// Try to fill the table
for i := 0; i < MaxConsumerGroups+10; i++ {
group := fmt.Sprintf("group-%d", i)
err := store.Set(group, int64(i))
if i < MaxConsumerGroups {
if err != nil {
t.Errorf("Should be able to set offset for group %d: %v", i, err)
}
} else {
if err == nil {
t.Errorf("Should fail when table is full at group %d", i)
}
}
}
// Verify we have exactly MaxConsumerGroups entries
used, total := store.GetStats()
if used != MaxConsumerGroups {
t.Errorf("Expected %d used slots, got %d", MaxConsumerGroups, used)
}
if total != MaxConsumerGroups {
t.Errorf("Expected %d total slots, got %d", MaxConsumerGroups, total)
}
}
func BenchmarkConsumerOffsetMmapGet(b *testing.B) {
tmpDir, err := os.MkdirTemp("", "offset-mmap-bench")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(tmpDir)
store, err := NewConsumerOffsetMmap(tmpDir, 0)
if err != nil {
b.Fatal(err)
}
defer store.Close()
// Pre-populate some groups
for i := 0; i < 100; i++ {
group := fmt.Sprintf("group-%d", i)
store.Set(group, int64(i*1000))
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
group := fmt.Sprintf("group-%d", i%100)
offset, _ := store.Get(group)
if offset != int64((i%100)*1000) {
b.Errorf("Wrong offset for %s: %d", group, offset)
}
i++
}
})
}
func BenchmarkConsumerOffsetMmapSet(b *testing.B) {
tmpDir, err := os.MkdirTemp("", "offset-mmap-bench-set")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(tmpDir)
store, err := NewConsumerOffsetMmap(tmpDir, 0)
if err != nil {
b.Fatal(err)
}
defer store.Close()
// Pre-populate groups to avoid allocation during benchmark
for i := 0; i < 100; i++ {
group := fmt.Sprintf("group-%d", i)
store.Set(group, 0)
}
var counter int64
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
offset := atomic.AddInt64(&counter, 1)
group := fmt.Sprintf("group-%d", offset%100)
if err := store.Set(group, offset); err != nil {
b.Error(err)
}
}
})
}