-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoffset_merge_test.go
More file actions
205 lines (168 loc) · 5.11 KB
/
offset_merge_test.go
File metadata and controls
205 lines (168 loc) · 5.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
//go:build integration
package comet
import (
"context"
"fmt"
"sync"
"testing"
"time"
)
// TestOffsetMergeLogic tests the offset merge logic that might cause message loss
func TestOffsetMergeLogic(t *testing.T) {
dir := t.TempDir()
config := DeprecatedMultiProcessConfig(0, 2)
stream := "merge:v1:shard:0000"
// Enable debug
SetDebug(true)
defer SetDebug(false)
// Write 20 messages
t.Logf("=== Writing 20 messages ===")
client, err := NewClient(dir, config)
if err != nil {
t.Fatal(err)
}
var messages [][]byte
for i := 0; i < 20; i++ {
messages = append(messages, []byte(fmt.Sprintf("msg-%02d", i)))
}
ctx := context.Background()
_, err = client.Append(ctx, stream, messages)
if err != nil {
t.Fatal(err)
}
client.Close()
// Simulate the problematic scenario:
// Consumer 1 reads messages 0-9 and ACKs offset 10
// Consumer 2 starts reading from 0 (before Consumer 1's ACK persists)
// Consumer 2 reads messages 0-4 and ACKs offset 5
// Consumer 1's offset 10 persists
// Consumer 2's offset 5 is ignored (lower than 10)
// Result: messages 5-9 are never processed by Consumer 2
t.Logf("\n=== Simulating offset race condition ===")
// Consumer 1: Read and ACK first 10 messages
client1, _ := NewClient(dir, config)
consumer1 := NewConsumer(client1, ConsumerOptions{Group: "test-group"})
msgs1, err := consumer1.Read(ctx, []uint32{0}, 10)
if err != nil {
t.Fatal(err)
}
t.Logf("Consumer 1: Read %d messages", len(msgs1))
// ACK all messages
for _, msg := range msgs1 {
if err := consumer1.Ack(ctx, msg.ID); err != nil {
t.Fatal(err)
}
}
// Get offset before closing
shard1, _ := client1.getOrCreateShard(0)
shard1.mu.RLock()
offset1 := shard1.index.ConsumerOffsets["test-group"]
shard1.mu.RUnlock()
t.Logf("Consumer 1: Offset before close = %d", offset1)
// Start Consumer 2 BEFORE Consumer 1 closes (simulating concurrent access)
client2, _ := NewClient(dir, config)
consumer2 := NewConsumer(client2, ConsumerOptions{Group: "test-group"})
// Check what offset Consumer 2 sees
shard2, _ := client2.getOrCreateShard(0)
shard2.mu.RLock()
offset2Start := shard2.index.ConsumerOffsets["test-group"]
shard2.mu.RUnlock()
t.Logf("Consumer 2: Starting offset = %d", offset2Start)
// Now close Consumer 1 (this triggers persist)
consumer1.Close()
client1.Close()
// Consumer 2 reads 5 messages
msgs2, err := consumer2.Read(ctx, []uint32{0}, 5)
if err != nil {
t.Fatal(err)
}
if len(msgs2) > 0 {
t.Logf("Consumer 2: Read %d messages starting from entry %d", len(msgs2), msgs2[0].ID.EntryNumber)
} else {
t.Logf("Consumer 2: Read 0 messages")
}
// ACK messages
for _, msg := range msgs2 {
if err := consumer2.Ack(ctx, msg.ID); err != nil {
t.Fatal(err)
}
}
// Check offset before close
shard2.mu.RLock()
offset2End := shard2.index.ConsumerOffsets["test-group"]
shard2.mu.RUnlock()
t.Logf("Consumer 2: Offset before close = %d", offset2End)
// Close Consumer 2
consumer2.Close()
client2.Close()
// Wait for persistence
time.Sleep(100 * time.Millisecond)
// Check final persisted offset
client3, _ := NewClient(dir, config)
shard3, _ := client3.getOrCreateShard(0)
shard3.mu.RLock()
finalOffset := shard3.index.ConsumerOffsets["test-group"]
shard3.mu.RUnlock()
client3.Close()
t.Logf("\n=== RESULTS ===")
t.Logf("Consumer 1 processed: messages 0-9, set offset to %d", offset1)
t.Logf("Consumer 2 started at offset %d, processed 5 messages, set offset to %d", offset2Start, offset2End)
t.Logf("Final persisted offset: %d", finalOffset)
// The issue: if final offset is 10 but Consumer 2 only processed up to 5,
// then messages 5-9 are lost for Consumer 2's processing
if finalOffset == 10 && offset2End < 10 {
t.Errorf("CRITICAL: Offset merge caused message loss! Consumer 2 only processed up to %d but offset jumped to %d", offset2End, finalOffset)
}
// Now test with proper locking
t.Logf("\n=== Testing concurrent offset updates ===")
var wg sync.WaitGroup
processed := make(map[int][]string)
var mu sync.Mutex
// Two consumers reading concurrently
for i := 0; i < 2; i++ {
wg.Add(1)
go func(consumerID int) {
defer wg.Done()
client, _ := NewClient(dir, config)
consumer := NewConsumer(client, ConsumerOptions{Group: "concurrent-group"})
// Read 5 messages
msgs, err := consumer.Read(ctx, []uint32{0}, 5)
if err != nil {
return
}
mu.Lock()
for _, msg := range msgs {
processed[consumerID] = append(processed[consumerID], string(msg.Data))
}
mu.Unlock()
// ACK messages
for _, msg := range msgs {
consumer.Ack(ctx, msg.ID)
}
consumer.Close()
client.Close()
}(i)
}
wg.Wait()
t.Logf("\n=== Concurrent processing results ===")
for id, msgs := range processed {
t.Logf("Consumer %d processed: %v", id, msgs)
}
// Check for duplicates or gaps
seen := make(map[string]int)
for _, msgs := range processed {
for _, msg := range msgs {
seen[msg]++
}
}
duplicates := 0
for msg, count := range seen {
if count > 1 {
t.Logf("Message %s processed %d times", msg, count)
duplicates++
}
}
if duplicates > 0 {
t.Logf("Found %d duplicate messages in concurrent processing", duplicates)
}
}