Skip to content

Commit c0918eb

Browse files
authored
fix(storage): ensure proper reference counting for ByteBuf in write operations(#2452) (#2500)
fix(storage): ensure proper reference counting for ByteBuf in write o… (#2452) * fix(storage): ensure proper reference counting for ByteBuf in write operations * feat(storage): implement fast retry mechanism and improve resource management in write operations * test(storage): add concurrency test for write operations and ensure buffer release * test(storage): add test for write permit acquisition and blocking behavior * style(test): format code for consistency in AbstractObjectStorageTest * feat(storage): add constructor for MemoryObjectStorage with concurrency support * fix(storage): ensure proper release of ByteBuf resources in write operations * chore: polish code * fix(storage): improve error handling and resource management in write operations * fix(storage): ensure proper release of resources on timeout in AbstractObjectStorage * test(storage): increase timeout duration for resource cleanup assertions
1 parent 8476ae9 commit c0918eb

3 files changed

Lines changed: 219 additions & 3 deletions

File tree

s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,23 +264,25 @@ public CompletableFuture<ByteBuf> rangeRead(ReadOptions options, String objectPa
264264
public CompletableFuture<WriteResult> write(WriteOptions options, String objectPath, ByteBuf data) {
265265
CompletableFuture<Void> cf = new CompletableFuture<>();
266266
CompletableFuture<WriteResult> retCf = acquireWritePermit(cf).thenApply(nil -> new WriteResult(bucketURI.bucketId()));
267-
retCf = retCf.whenComplete((nil, ex) -> data.release());
268267
if (retCf.isDone()) {
268+
data.release();
269269
return retCf;
270270
}
271271
TimerUtil timerUtil = new TimerUtil();
272272
networkOutboundBandwidthLimiter
273273
.consume(options.throttleStrategy(), data.readableBytes())
274274
.whenCompleteAsync((v, ex) -> {
275+
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, options.throttleStrategy())
276+
.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
275277
if (ex != null) {
276278
cf.completeExceptionally(ex);
279+
data.release();
277280
return;
278281
}
279282
if (checkTimeout(options, cf)) {
283+
data.release();
280284
return;
281285
}
282-
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, options.throttleStrategy())
283-
.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
284286
queuedWrite0(options, objectPath, data, cf);
285287
}, writeLimiterCallbackExecutor);
286288
return retCf;
@@ -312,6 +314,7 @@ private void write0(WriteOptions options, String path, ByteBuf data, Completable
312314

313315
if (checkTimeout(options, finalCf)) {
314316
attemptCf.completeExceptionally(new TimeoutException());
317+
data.release();
315318
return;
316319
}
317320

@@ -357,6 +360,7 @@ private void write0(WriteOptions options, String path, ByteBuf data, Completable
357360

358361
writeCf.thenAccept(nil -> {
359362
recordWriteStats(path, objectSize, timerUtil);
363+
data.release();
360364
if (completedFlag.compareAndSet(false, true)) {
361365
finalCf.complete(null);
362366
}
@@ -369,6 +373,7 @@ private void write0(WriteOptions options, String path, ByteBuf data, Completable
369373
if (retryStrategy == RetryStrategy.ABORT || checkS3ApiMode) {
370374
// no need to retry
371375
logger.error("PutObject for object {} fail", path, cause);
376+
data.release();
372377
if (completedFlag.compareAndSet(false, true)) {
373378
finalCf.completeExceptionally(cause);
374379
}

s3stream/src/main/java/com/automq/stream/s3/operator/MemoryObjectStorage.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ public MemoryObjectStorage(boolean manualMergeRead, short bucketId) {
4747
this.bucketId = bucketId;
4848
}
4949

50+
public MemoryObjectStorage(int concurrencyCount) {
51+
super(BucketURI.parse(0 + "@s3://b"),
52+
new RecordTestNetworkBandwidthLimiter(), new RecordTestNetworkBandwidthLimiter(),
53+
concurrencyCount, 0, true, false, false, "memory");
54+
this.bucketId = 0;
55+
}
56+
5057
public MemoryObjectStorage(short bucketId) {
5158
this(false, bucketId);
5259
}

s3stream/src/test/java/com/automq/stream/s3/operator/AbstractObjectStorageTest.java

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,28 @@
2222
import org.junit.jupiter.api.Tag;
2323
import org.junit.jupiter.api.Test;
2424

25+
import java.lang.reflect.Field;
26+
import java.util.ArrayList;
27+
import java.util.List;
2528
import java.util.concurrent.CompletableFuture;
2629
import java.util.concurrent.ExecutionException;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.TimeoutException;
33+
import java.util.concurrent.atomic.AtomicInteger;
2734

2835
import io.netty.buffer.ByteBuf;
2936

37+
import static org.awaitility.Awaitility.await;
3038
import static org.junit.jupiter.api.Assertions.assertEquals;
3139
import static org.junit.jupiter.api.Assertions.assertFalse;
40+
import static org.junit.jupiter.api.Assertions.assertThrows;
3241
import static org.junit.jupiter.api.Assertions.assertTrue;
3342
import static org.mockito.ArgumentMatchers.any;
3443
import static org.mockito.ArgumentMatchers.eq;
3544
import static org.mockito.Mockito.anyLong;
3645
import static org.mockito.Mockito.anyString;
46+
import static org.mockito.Mockito.mock;
3747
import static org.mockito.Mockito.spy;
3848
import static org.mockito.Mockito.timeout;
3949
import static org.mockito.Mockito.verify;
@@ -142,6 +152,200 @@ void testByteBufRefCnt() throws ExecutionException, InterruptedException {
142152
}).get();
143153
}
144154

155+
156+
@Test
157+
void testFastRetry() throws Throwable {
158+
// Initialize memory storage and spy to track method calls
159+
objectStorage = new MemoryObjectStorage();
160+
objectStorage = spy(objectStorage);
161+
162+
// Configure write options: enable fast retry, disable normal retry
163+
ObjectStorage.WriteOptions options = new ObjectStorage.WriteOptions()
164+
.enableFastRetry(true)
165+
.retry(false);
166+
167+
// Mock S3 latency calculator via reflection to force fast retry condition
168+
Field latencyCalculatorField = AbstractObjectStorage.class.getDeclaredField("s3LatencyCalculator");
169+
latencyCalculatorField.setAccessible(true);
170+
S3LatencyCalculator mockCalculator = mock(S3LatencyCalculator.class);
171+
when(mockCalculator.valueAtPercentile(anyLong(), anyLong())).thenReturn(100L); // Force low latency to trigger fast retry
172+
latencyCalculatorField.set(objectStorage, mockCalculator);
173+
174+
// Track doWrite() calls: first call hangs, second completes immediately
175+
AtomicInteger callCount = new AtomicInteger();
176+
CompletableFuture<Void> firstFuture = new CompletableFuture<>();
177+
when(objectStorage.doWrite(any(), anyString(), any())).thenAnswer(inv -> {
178+
int count = callCount.getAndIncrement();
179+
return (count == 0) ? firstFuture : CompletableFuture.completedFuture(null);
180+
});
181+
182+
// Execute write operation
183+
ByteBuf data = TestUtils.randomPooled(1024);
184+
assertEquals(1, data.refCnt()); // Verify initial ref count
185+
186+
CompletableFuture<ObjectStorage.WriteResult> writeFuture = objectStorage.write(options, "testKey", data);
187+
writeFuture.get(200, TimeUnit.MILLISECONDS); // Wait for write completion
188+
189+
// Verify: two calls made (initial + retry), data ref count maintained during retry
190+
assertEquals(1, data.refCnt());
191+
assertEquals(2, callCount.get());
192+
193+
// Complete initial future and verify data release
194+
firstFuture.complete(null);
195+
await().atMost(1, TimeUnit.SECONDS)
196+
.untilAsserted(() -> assertEquals(0, data.refCnt())); // Ensure buffer released
197+
}
198+
199+
@Test
200+
void testWriteRetryTimeout() throws Throwable {
201+
// Setup storage with 100ms timeout (clearer time unit)
202+
objectStorage = spy(new MemoryObjectStorage());
203+
ObjectStorage.WriteOptions options = new ObjectStorage.WriteOptions()
204+
.retry(true)
205+
.timeout(1000L);
206+
207+
// Mock hanging write operation
208+
AtomicInteger callCount = new AtomicInteger();
209+
when(objectStorage.doWrite(any(), anyString(), any())).thenAnswer(inv -> {
210+
int count = callCount.getAndIncrement();
211+
if (count < 12) {
212+
CompletableFuture<Void> future = new CompletableFuture<>();
213+
Executors.newSingleThreadScheduledExecutor().schedule(
214+
() -> future.completeExceptionally(new TimeoutException("Simulated timeout")),
215+
100, TimeUnit.MILLISECONDS
216+
);
217+
return future;
218+
}
219+
// Second call: immediate success
220+
return CompletableFuture.completedFuture(null);
221+
});
222+
223+
// Execute test
224+
ByteBuf data = TestUtils.randomPooled(1024);
225+
CompletableFuture<ObjectStorage.WriteResult> writeFuture =
226+
objectStorage.write(options, "testKey", data);
227+
// Verify timeout exception
228+
assertThrows(TimeoutException.class,
229+
() -> writeFuture.get(1, TimeUnit.SECONDS));
230+
// Verify resource cleanup
231+
await().atMost(2, TimeUnit.SECONDS)
232+
.untilAsserted(() -> assertEquals(0, data.refCnt()));
233+
// Verify: no successful calls made
234+
assertTrue(callCount.get() < 12);
235+
}
236+
237+
@Test
238+
void testWritePermit() throws Exception {
239+
final int maxConcurrency = 5;
240+
objectStorage = spy(new MemoryObjectStorage(maxConcurrency));
241+
242+
ObjectStorage.WriteOptions options = new ObjectStorage.WriteOptions()
243+
.enableFastRetry(false)
244+
.retry(false);
245+
246+
// Use completable future to block first 5 calls
247+
CompletableFuture<Void> barrierFuture = new CompletableFuture<>();
248+
AtomicInteger callCount = new AtomicInteger();
249+
250+
when(objectStorage.doWrite(any(), anyString(), any())).thenAnswer(inv -> {
251+
int count = callCount.getAndIncrement();
252+
return (count < maxConcurrency)
253+
? barrierFuture // Block first 5 calls
254+
: CompletableFuture.completedFuture(null); // Immediate success for 6th
255+
});
256+
257+
// Phase 1: Submit max concurrency requests
258+
List<ByteBuf> buffers = new ArrayList<>();
259+
for (int i = 0; i < maxConcurrency; i++) {
260+
ByteBuf data = TestUtils.randomPooled(1024);
261+
buffers.add(data);
262+
objectStorage.write(options, "testKey", data);
263+
}
264+
265+
// Verify initial calls reached max concurrency
266+
await().atMost(1, TimeUnit.SECONDS)
267+
.untilAsserted(() -> assertEquals(maxConcurrency, callCount.get()));
268+
269+
// Phase 2: Submit 6th request beyond concurrency limit
270+
CompletableFuture<ObjectStorage.WriteResult> sixthWriteFuture =
271+
CompletableFuture.supplyAsync(() ->
272+
objectStorage.write(options, "testKey", TestUtils.random(1024))
273+
).thenCompose(f -> f);
274+
275+
// Release blocked calls and verify completion
276+
barrierFuture.complete(null);
277+
await().atMost(2, TimeUnit.SECONDS)
278+
.untilAsserted(() -> {
279+
assertEquals(maxConcurrency + 1, callCount.get());
280+
assertTrue(sixthWriteFuture.isDone());
281+
282+
// Verify: all buffers released
283+
for (ByteBuf buffer : buffers) {
284+
assertEquals(0, buffer.refCnt());
285+
}
286+
});
287+
}
288+
289+
@Test
290+
void testWaitWritePermit() throws Exception {
291+
final int maxConcurrency = 1;
292+
objectStorage = spy(new MemoryObjectStorage(maxConcurrency));
293+
294+
ObjectStorage.WriteOptions options = new ObjectStorage.WriteOptions()
295+
.enableFastRetry(false)
296+
.retry(false);
297+
298+
// Block first call using completable future
299+
CompletableFuture<Void> blockingFuture = new CompletableFuture<>();
300+
AtomicInteger callCount = new AtomicInteger();
301+
302+
when(objectStorage.doWrite(any(), anyString(), any())).thenAnswer(inv -> {
303+
callCount.incrementAndGet();
304+
return blockingFuture; // Always return blocking future for first call
305+
});
306+
307+
// Phase 1: Acquire the only permit
308+
ByteBuf firstBuffer = TestUtils.randomPooled(1024);
309+
objectStorage.write(options, "testKey", firstBuffer);
310+
311+
// Verify permit acquisition
312+
await().until(() -> callCount.get() == 1);
313+
314+
// Phase 2: Verify blocking behavior with interrupt
315+
Thread blockingThread = new Thread(() -> {
316+
ByteBuf byteBuf = TestUtils.randomPooled(1024);
317+
try {
318+
CompletableFuture<ObjectStorage.WriteResult> future =
319+
objectStorage.write(options, "testKey", byteBuf);
320+
ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get());
321+
assertTrue(exception.getCause() instanceof InterruptedException);
322+
} catch (Exception e) {
323+
// Ignore
324+
} finally {
325+
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
326+
assertEquals(0, byteBuf.refCnt());
327+
});
328+
}
329+
});
330+
331+
blockingThread.start();
332+
333+
Thread.sleep(1000);
334+
335+
// Interrupt and verify
336+
blockingThread.interrupt();
337+
blockingThread.join();
338+
339+
// Verify resource cleanup
340+
assertEquals(1, firstBuffer.refCnt());
341+
342+
// Cleanup
343+
blockingFuture.complete(null);
344+
await().atMost(2, TimeUnit.SECONDS)
345+
.untilAsserted(() -> assertEquals(0, firstBuffer.refCnt()));
346+
}
347+
348+
145349
@Test
146350
void testReadToEndOfObject() throws ExecutionException, InterruptedException {
147351
objectStorage = new MemoryObjectStorage(true);

0 commit comments

Comments
 (0)