3434import java .util .LinkedList ;
3535import java .util .List ;
3636import java .util .Locale ;
37+ import java .util .Map ;
38+ import java .util .Objects ;
3739import java .util .PriorityQueue ;
3840import java .util .concurrent .CompletableFuture ;
3941import java .util .concurrent .ConcurrentHashMap ;
4042import java .util .concurrent .ConcurrentLinkedDeque ;
4143import java .util .concurrent .ConcurrentMap ;
4244import java .util .concurrent .ConcurrentNavigableMap ;
4345import java .util .concurrent .ConcurrentSkipListMap ;
46+ import java .util .concurrent .ExecutionException ;
4447import java .util .concurrent .ExecutorService ;
4548import java .util .concurrent .ScheduledExecutorService ;
4649import java .util .concurrent .TimeUnit ;
50+ import java .util .concurrent .TimeoutException ;
4751import java .util .concurrent .atomic .AtomicLong ;
4852import java .util .concurrent .locks .ReentrantReadWriteLock ;
4953import java .util .function .Function ;
@@ -56,11 +60,13 @@ public class RecordAccumulator implements Closeable {
5660
5761 private static final long DEFAULT_LOCK_WARNING_TIMEOUT = TimeUnit .MILLISECONDS .toNanos (5 );
5862 private static final long DEFAULT_UPLOAD_WARNING_TIMEOUT = TimeUnit .SECONDS .toNanos (5 );
63+ private static final String OBJECT_PATH_OFFSET_DELIMITER = "-" ;
64+ private static final String OBJECT_PATH_FORMAT = "%s%d" + OBJECT_PATH_OFFSET_DELIMITER + "%d" ; // {objectPrefix}/{startOffset}-{endOffset}
5965 protected final ObjectWALConfig config ;
6066 protected final Time time ;
6167 protected final ObjectStorage objectStorage ;
6268 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock ();
63- private final ConcurrentNavigableMap <Long /* inclusive first offset */ , List < Record > > uploadMap = new ConcurrentSkipListMap <>();
69+ private final ConcurrentNavigableMap <Long /* inclusive first offset */ , UploadTask > uploadMap = new ConcurrentSkipListMap <>();
6470 private final ConcurrentNavigableMap <Pair <Long /* epoch */ , Long /* exclusive end offset */ >, WALObject > previousObjectMap = new ConcurrentSkipListMap <>();
6571 private final ConcurrentNavigableMap <Long /* exclusive end offset */ , WALObject > objectMap = new ConcurrentSkipListMap <>();
6672 private final String nodePrefix ;
@@ -77,6 +83,7 @@ public class RecordAccumulator implements Closeable {
7783 private volatile long lastUploadTimestamp = System .currentTimeMillis ();
7884 private final AtomicLong nextOffset = new AtomicLong ();
7985 private final AtomicLong flushedOffset = new AtomicLong ();
86+ private final AtomicLong trimOffset = new AtomicLong (-1 );
8087
8188 public RecordAccumulator (Time time , ObjectStorage objectStorage ,
8289 ObjectWALConfig config ) {
@@ -100,21 +107,32 @@ public void start() {
100107 String path = object .key ();
101108 String [] parts = path .split ("/" );
102109 try {
103- long firstOffset = Long .parseLong (parts [parts .length - 1 ]);
104- long epoch = Long .parseLong (parts [parts .length - 3 ]);
110+ WALObject walObject ;
105111
112+ long epoch = Long .parseLong (parts [parts .length - 3 ]);
106113 // Skip the object if it belongs to a later epoch.
107114 if (epoch > config .epoch ()) {
108115 return ;
109116 }
110117
111118 long length = object .size ();
112- long endOffset = firstOffset + length - WALObjectHeader .WAL_HEADER_SIZE ;
119+
120+ String rawOffset = parts [parts .length - 1 ];
121+ if (rawOffset .contains (OBJECT_PATH_OFFSET_DELIMITER )) {
122+ // new format: {startOffset}-{endOffset}
123+ long startOffset = Long .parseLong (rawOffset .substring (0 , rawOffset .indexOf (OBJECT_PATH_OFFSET_DELIMITER )));
124+ long endOffset = Long .parseLong (rawOffset .substring (rawOffset .indexOf (OBJECT_PATH_OFFSET_DELIMITER ) + 1 ));
125+ walObject = new WALObject (object .bucketId (), path , startOffset , endOffset , length );
126+ } else {
127+ // old format: {startOffset}
128+ long startOffset = Long .parseLong (rawOffset );
129+ walObject = new WALObject (object .bucketId (), path , startOffset , length );
130+ }
113131
114132 if (epoch != config .epoch ()) {
115- previousObjectMap .put (Pair .of (epoch , endOffset ), new WALObject ( object . bucketId (), path , firstOffset , length ) );
133+ previousObjectMap .put (Pair .of (epoch , walObject . endOffset ()), walObject );
116134 } else {
117- objectMap .put (endOffset , new WALObject ( object . bucketId ( ), path , firstOffset , length ) );
135+ objectMap .put (walObject . endOffset ( ), walObject );
118136 }
119137 objectDataBytes .addAndGet (length );
120138 } catch (NumberFormatException e ) {
@@ -196,13 +214,17 @@ public void close() {
196214 // Wait for all upload tasks to complete.
197215 if (!pendingFutureMap .isEmpty ()) {
198216 log .info ("Wait for {} pending upload tasks to complete." , pendingFutureMap .size ());
199- CompletableFuture .allOf (pendingFutureMap .keySet ().toArray (new CompletableFuture [0 ])).join ();
217+ try {
218+ CompletableFuture .allOf (pendingFutureMap .keySet ().toArray (new CompletableFuture [0 ])).get (30 , TimeUnit .SECONDS );
219+ } catch (InterruptedException | ExecutionException | TimeoutException e ) {
220+ log .error ("Failed to wait for pending upload tasks to complete." , e );
221+ }
200222 }
201223
202224 if (utilityService != null && !utilityService .isShutdown ()) {
203225 utilityService .shutdown ();
204226 try {
205- if (!utilityService .awaitTermination (30 , TimeUnit .SECONDS )) {
227+ if (!utilityService .awaitTermination (1 , TimeUnit .SECONDS )) {
206228 log .error ("Monitor executor {} did not terminate in time" , executorService );
207229 utilityService .shutdownNow ();
208230 }
@@ -246,7 +268,12 @@ public List<WALObject> objectList() throws WALFencedException {
246268 }
247269
248270 // Visible for testing
249- public ScheduledExecutorService executorService () {
271+ String objectPrefix () {
272+ return objectPrefix ;
273+ }
274+
275+ // Visible for testing
276+ ScheduledExecutorService executorService () {
250277 return executorService ;
251278 }
252279
@@ -374,6 +401,7 @@ public CompletableFuture<Void> trim(long offset) throws WALFencedException {
374401 return CompletableFuture .completedFuture (null );
375402 }
376403
404+ trimOffset .set (offset );
377405 long startTime = time .nanoseconds ();
378406
379407 List <ObjectStorage .ObjectPath > deleteObjectList = new ArrayList <>();
@@ -409,7 +437,7 @@ public CompletableFuture<Void> trim(long offset) throws WALFencedException {
409437
410438 // Not thread safe, caller should hold lock.
411439 // Visible for testing.
412- public void unsafeUpload (boolean force ) throws WALFencedException {
440+ void unsafeUpload (boolean force ) throws WALFencedException {
413441 if (!force ) {
414442 checkWriteStatus ();
415443 }
@@ -452,7 +480,7 @@ private void unsafeUpload(PriorityQueue<Record> recordQueue) {
452480
453481 while (!recordQueue .isEmpty ()) {
454482 // The retained bytes in the batch must larger than record header size.
455- long retainedBytesInBatch = config .maxBytesInBatch () - dataBuffer .readableBytes () - WALObjectHeader .WAL_HEADER_SIZE ;
483+ long retainedBytesInBatch = config .maxBytesInBatch () - dataBuffer .readableBytes () - WALObjectHeader .DEFAULT_WAL_HEADER_SIZE ;
456484 if (config .strictBatchLimit () && retainedBytesInBatch <= RecordHeader .RECORD_HEADER_SIZE ) {
457485 break ;
458486 }
@@ -461,7 +489,7 @@ private void unsafeUpload(PriorityQueue<Record> recordQueue) {
461489
462490 // Records larger than the batch size will be uploaded immediately.
463491 assert record != null ;
464- if (config .strictBatchLimit () && record .record .readableBytes () >= config .maxBytesInBatch () - WALObjectHeader .WAL_HEADER_SIZE ) {
492+ if (config .strictBatchLimit () && record .record .readableBytes () >= config .maxBytesInBatch () - WALObjectHeader .DEFAULT_WAL_HEADER_SIZE ) {
465493 dataBuffer .addComponent (true , record .record );
466494 recordList .add (record );
467495 break ;
@@ -490,17 +518,17 @@ assert record != null;
490518 long endOffset = firstOffset + dataLength ;
491519
492520 CompositeByteBuf objectBuffer = ByteBufAlloc .compositeByteBuffer ();
493- WALObjectHeader header = new WALObjectHeader (firstOffset , dataLength , stickyRecordLength , config .nodeId (), config .epoch ());
521+ WALObjectHeader header = new WALObjectHeader (firstOffset , dataLength , stickyRecordLength , config .nodeId (), config .epoch (), trimOffset . get () );
494522 objectBuffer .addComponent (true , header .marshal ());
495523 objectBuffer .addComponent (true , dataBuffer );
496524
497525 // Trigger upload.
498526 int objectLength = objectBuffer .readableBytes ();
499- uploadMap .put (firstOffset , recordList );
527+ uploadMap .put (firstOffset , new UploadTask ( recordList ) );
500528
501529 // Enable fast retry.
502530 ObjectStorage .WriteOptions writeOptions = new ObjectStorage .WriteOptions ().enableFastRetry (true );
503- String path = objectPrefix + firstOffset ;
531+ String path = String . format ( OBJECT_PATH_FORMAT , objectPrefix , firstOffset , endOffset ) ;
504532 CompletableFuture <ObjectStorage .WriteResult > uploadFuture = objectStorage .write (writeOptions , path , objectBuffer );
505533
506534 CompletableFuture <Void > finalFuture = recordUploadMetrics (uploadFuture , startTime , objectLength )
@@ -512,10 +540,16 @@ assert record != null;
512540 log .warn ("Failed to acquire lock in {}ms, cost: {}ms, operation: upload" , TimeUnit .NANOSECONDS .toMillis (DEFAULT_LOCK_WARNING_TIMEOUT ), TimeUnit .NANOSECONDS .toMillis (time .nanoseconds () - lockStartTime ));
513541 }
514542
515- objectMap .put (endOffset , new WALObject (result .bucket (), path , firstOffset , objectLength ));
543+ objectMap .put (endOffset , new WALObject (result .bucket (), path , firstOffset , endOffset , objectLength ));
516544 objectDataBytes .addAndGet (objectLength );
517545
518- List <Record > uploadedRecords = uploadMap .remove (firstOffset );
546+ uploadMap .get (firstOffset ).markFinished ();
547+ List <UploadTask > finishedTasks = new ArrayList <>();
548+ // Remove consecutive completed tasks from head.
549+ Map .Entry <Long , UploadTask > entry ;
550+ while ((entry = uploadMap .firstEntry ()) != null && entry .getValue ().isFinished ()) {
551+ finishedTasks .add (uploadMap .remove (entry .getKey ()));
552+ }
519553
520554 // Update flushed offset
521555 if (!uploadMap .isEmpty ()) {
@@ -527,7 +561,11 @@ assert record != null;
527561 }
528562
529563 // Release lock and complete future in callback thread.
530- callbackService .submit (() -> uploadedRecords .forEach (record -> record .future .complete (flushedOffset ::get )));
564+ for (UploadTask task : finishedTasks ) {
565+ callbackService .submit (() ->
566+ task .records ().forEach (record -> record .future .complete (flushedOffset ::get ))
567+ );
568+ }
531569 } finally {
532570 lock .writeLock ().unlock ();
533571 }
@@ -536,7 +574,7 @@ assert record != null;
536574 bufferedDataBytes .addAndGet (-dataLength );
537575 throwable = ExceptionUtils .getRootCause (throwable );
538576 if (throwable instanceof WALFencedException ) {
539- List <Record > uploadedRecords = uploadMap .remove (firstOffset );
577+ List <Record > uploadedRecords = uploadMap .remove (firstOffset ). records () ;
540578 Throwable finalThrowable = throwable ;
541579 // Release lock and complete future in callback thread.
542580 callbackService .submit (() -> uploadedRecords .forEach (record -> record .future .completeExceptionally (finalThrowable )));
@@ -560,6 +598,27 @@ protected CompletableFuture<ObjectStorage.WriteResult> recordUploadMetrics(
560598 });
561599 }
562600
601+ private static class UploadTask {
602+ private final List <Record > records ;
603+ private boolean finished = false ;
604+
605+ public UploadTask (List <Record > records ) {
606+ this .records = records ;
607+ }
608+
609+ public List <Record > records () {
610+ return records ;
611+ }
612+
613+ public void markFinished () {
614+ this .finished = true ;
615+ }
616+
617+ public boolean isFinished () {
618+ return finished ;
619+ }
620+ }
621+
563622 protected static class Record {
564623 public final ByteBuf record ;
565624 public final CompletableFuture <AppendResult .CallbackResult > future ;
@@ -577,12 +636,22 @@ public static class WALObject implements Comparable<WALObject> {
577636 private final short bucketId ;
578637 private final String path ;
579638 private final long startOffset ;
639+ private final long endOffset ;
580640 private final long length ;
581641
582642 public WALObject (short bucketId , String path , long startOffset , long length ) {
583643 this .bucketId = bucketId ;
584644 this .path = path ;
585645 this .startOffset = startOffset ;
646+ this .endOffset = WALObjectHeader .calculateEndOffsetV0 (startOffset , length );
647+ this .length = length ;
648+ }
649+
650+ public WALObject (short bucketId , String path , long startOffset , long endOffset , long length ) {
651+ this .bucketId = bucketId ;
652+ this .path = path ;
653+ this .startOffset = startOffset ;
654+ this .endOffset = endOffset ;
586655 this .length = length ;
587656 }
588657
@@ -606,5 +675,33 @@ public long startOffset() {
606675 public long length () {
607676 return length ;
608677 }
678+
679+ public long endOffset () {
680+ return endOffset ;
681+ }
682+
683+ @ Override
684+ public String toString () {
685+ return "WALObject{" +
686+ "bucketId=" + bucketId +
687+ ", path='" + path + '\'' +
688+ ", startOffset=" + startOffset +
689+ ", endOffset=" + endOffset +
690+ ", length=" + length +
691+ '}' ;
692+ }
693+
694+ @ Override
695+ public boolean equals (Object o ) {
696+ if (!(o instanceof WALObject ))
697+ return false ;
698+ WALObject object = (WALObject ) o ;
699+ return bucketId == object .bucketId && startOffset == object .startOffset && endOffset == object .endOffset && length == object .length && Objects .equals (path , object .path );
700+ }
701+
702+ @ Override
703+ public int hashCode () {
704+ return Objects .hash (bucketId , path , startOffset , endOffset , length );
705+ }
609706 }
610707}
0 commit comments