Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.concurrent.locks.Lock;
Expand All @@ -44,16 +45,18 @@
*/
public class LazyIndex<T extends AbstractIndex> implements Closeable {

private enum IndexType {
OFFSET, TIME
@FunctionalInterface
private interface IndexLoader<T extends AbstractIndex> {
T load(File file) throws IOException;
}

private interface IndexWrapper extends Closeable {
File file();
void updateParentDir(File file);
void renameTo(File file) throws IOException;
boolean deleteIfExists() throws IOException;
void close() throws IOException;
void renameTo(File file);
boolean deleteIfExists();
@Override
void close();
void closeHandler();
}

Expand All @@ -76,20 +79,30 @@ public void updateParentDir(File parentDir) {
}

@Override
public void renameTo(File f) throws IOException {
public void renameTo(File f) {
try {
Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false);
} catch (NoSuchFileException e) {
if (file.exists())
throw e;
if (file.exists()) {
throw new UncheckedIOException(
String.format("Error renaming index file %s to %s", file, f), e);
}
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error renaming index file %s to %s", file, f), e);
} finally {
file = f;
}
}

@Override
public boolean deleteIfExists() throws IOException {
return Files.deleteIfExists(file.toPath());
public boolean deleteIfExists() {
try {
return Files.deleteIfExists(file.toPath());
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error deleting index file %s", file), e);
}
}

@Override
Expand Down Expand Up @@ -119,18 +132,33 @@ public void updateParentDir(File parentDir) {
}

@Override
public void renameTo(File f) throws IOException {
index.renameTo(f);
public void renameTo(File f) {
try {
index.renameTo(f);
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error renaming index file %s to %s", index.file(), f), e);
}
}

@Override
public boolean deleteIfExists() throws IOException {
return index.deleteIfExists();
public boolean deleteIfExists() {
try {
return index.deleteIfExists();
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error deleting index file %s", index.file()), e);
}
}

@Override
public void close() throws IOException {
index.close();
public void close() {
try {
index.close();
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error closing index file %s", index.file()), e);
}
}

@Override
Expand All @@ -140,50 +168,55 @@ public void closeHandler() {
}

private final Lock lock = new ReentrantLock();
private final long baseOffset;
private final int maxIndexSize;
private final IndexType indexType;
private final IndexLoader<T> indexLoader;

private volatile IndexWrapper indexWrapper;

private LazyIndex(IndexWrapper indexWrapper, long baseOffset, int maxIndexSize, IndexType indexType) {
private LazyIndex(IndexWrapper indexWrapper, IndexLoader<T> indexLoader) {
this.indexWrapper = indexWrapper;
this.baseOffset = baseOffset;
this.maxIndexSize = maxIndexSize;
this.indexType = indexType;
this.indexLoader = indexLoader;
}

public static LazyIndex<OffsetIndex> forOffset(File file, long baseOffset, int maxIndexSize) {
return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, IndexType.OFFSET);
return new LazyIndex<>(new IndexFile(file),
f -> new OffsetIndex(f, baseOffset, maxIndexSize, true));
}

public static LazyIndex<TimeIndex> forTime(File file, long baseOffset, int maxIndexSize) {
return new LazyIndex<>(new IndexFile(file), baseOffset, maxIndexSize, IndexType.TIME);
return new LazyIndex<>(new IndexFile(file),
f -> new TimeIndex(f, baseOffset, maxIndexSize, true));
}

public File file() {
return indexWrapper.file();
}

@SuppressWarnings("unchecked")
public T get() throws IOException {
public T get() {
IndexWrapper wrapper = indexWrapper;
if (wrapper instanceof IndexValue<?>)
if (wrapper instanceof IndexValue<?>) {
return ((IndexValue<T>) wrapper).index;
else {
lock.lock();
try {
if (indexWrapper instanceof IndexValue<?>)
return ((IndexValue<T>) indexWrapper).index;
else if (indexWrapper instanceof IndexFile indexFile) {
IndexValue<T> indexValue = new IndexValue<>(loadIndex(indexFile.file));
indexWrapper = indexValue;
return indexValue.index;
} else
throw new IllegalStateException("Unexpected type for indexWrapper " + indexWrapper.getClass());
} finally {
lock.unlock();
}
lock.lock();
try {
if (indexWrapper instanceof IndexValue<?>) {
return ((IndexValue<T>) indexWrapper).index;
} else if (indexWrapper instanceof IndexFile indexFile) {
T loaded;
try {
loaded = indexLoader.load(indexFile.file);
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error loading index file %s", indexFile.file), e);
}
IndexValue<T> indexValue = new IndexValue<>(loaded);
indexWrapper = indexValue;
return indexValue.index;
} else {
throw new IllegalStateException("Unexpected type for indexWrapper " + indexWrapper.getClass());
}
} finally {
lock.unlock();
}
}

Expand All @@ -196,7 +229,7 @@ public void updateParentDir(File parentDir) {
}
}

public void renameTo(File f) throws IOException {
public void renameTo(File f) {
lock.lock();
try {
indexWrapper.renameTo(f);
Expand All @@ -205,7 +238,7 @@ public void renameTo(File f) throws IOException {
}
}

public boolean deleteIfExists() throws IOException {
public boolean deleteIfExists() {
lock.lock();
try {
return indexWrapper.deleteIfExists();
Expand All @@ -215,7 +248,7 @@ public boolean deleteIfExists() throws IOException {
}

@Override
public void close() throws IOException {
public void close() {
lock.lock();
try {
indexWrapper.close();
Expand All @@ -233,12 +266,4 @@ public void closeHandler() {
}
}

@SuppressWarnings("unchecked")
private T loadIndex(File file) throws IOException {
return switch (indexType) {
case OFFSET -> (T) new OffsetIndex(file, baseOffset, maxIndexSize, true);
case TIME -> (T) new TimeIndex(file, baseOffset, maxIndexSize, true);
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;

import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class LazyIndexTest {

@Test
public void forOffsetReturnsOffsetIndexOnGet() throws IOException {
File file = nonExistentTempFile();
try (LazyIndex<OffsetIndex> idx = LazyIndex.forOffset(file, 0L, 1000)) {
assertEquals(file, idx.file());
OffsetIndex loaded = idx.get();
assertNotNull(loaded);
assertEquals(0L, loaded.baseOffset());
}
}

@Test
public void forTimeReturnsTimeIndexOnGet() throws IOException {
File file = nonExistentTempFile();
try (LazyIndex<TimeIndex> idx = LazyIndex.forTime(file, 0L, 1500)) {
assertEquals(file, idx.file());
TimeIndex loaded = idx.get();
assertNotNull(loaded);
assertEquals(0L, loaded.baseOffset());
}
}

@Test
public void getReturnsSameInstanceOnSubsequentCalls() throws IOException {
File file = nonExistentTempFile();
try (LazyIndex<OffsetIndex> idx = LazyIndex.forOffset(file, 0L, 1000)) {
OffsetIndex first = idx.get();
OffsetIndex second = idx.get();
assertSame(first, second);
}
}

@Test
public void getDoesNotLoadUntilCalled() throws IOException {
File file = nonExistentTempFile();
try (LazyIndex<OffsetIndex> idx = LazyIndex.forOffset(file, 0L, 1000)) {
// file() and updateParentDir do not trigger load; the underlying file is not yet created
assertEquals(file, idx.file());
assertFalse(file.exists(), "Index file should not exist before get() triggers load");
idx.get();
assertTrue(file.exists(), "Index file should exist after get() triggers load");
}
}

@Test
public void getWrapsIOExceptionAsUncheckedWhenParentDirMissing() throws IOException {
File badFile = new File(nonExistentTempDir(), "0.index");
LazyIndex<OffsetIndex> idx = LazyIndex.forOffset(badFile, 0L, 1000);
UncheckedIOException ex = assertThrows(UncheckedIOException.class, idx::get);
assertTrue(ex.getMessage().contains("Error loading index file"),
"Message should describe the failure, got: " + ex.getMessage());
assertNotNull(ex.getCause(), "Cause should be the underlying IOException");
assertInstanceOf(IOException.class, ex.getCause());
}

@Test
public void renameToBeforeLoadMovesFileAndUpdatesPath() throws IOException {
File initial = nonExistentTempFile();
File renamed = nonExistentTempFile();
try (LazyIndex<OffsetIndex> idx = LazyIndex.forOffset(initial, 0L, 1000)) {
idx.get(); // create the on-disk file
idx.renameTo(renamed);
assertEquals(renamed, idx.file());
assertTrue(renamed.exists());
}
}

@Test
public void renameToToleratesAlreadyDeletedSource() throws IOException {
File initial = nonExistentTempFile();
LazyIndex<OffsetIndex> idx = LazyIndex.forOffset(initial, 0L, 1000);
// Pre-load state is IndexFile; do NOT call get() so the on-disk file is never created.
File renamed = nonExistentTempFile();
// Source does not exist; renameTo should swallow NoSuchFileException and still update the path.
assertDoesNotThrow(() -> idx.renameTo(renamed));
assertEquals(renamed, idx.file());
}

@Test
public void deleteIfExistsReturnsFalseWhenFileAbsent() throws IOException {
File file = nonExistentTempFile();
try (LazyIndex<OffsetIndex> idx = LazyIndex.forOffset(file, 0L, 1000)) {
assertFalse(idx.deleteIfExists());
}
}

@Test
public void deleteIfExistsRemovesFileAfterLoad() throws IOException {
File file = nonExistentTempFile();
LazyIndex<OffsetIndex> idx = LazyIndex.forOffset(file, 0L, 1000);
idx.get();
assertTrue(file.exists());
assertTrue(idx.deleteIfExists());
assertFalse(file.exists());
}

@Test
public void updateParentDirAdjustsFilePath() throws IOException {
File initialFile = nonExistentTempFile();
File newParent = TestUtils.tempDirectory();
try (LazyIndex<OffsetIndex> idx = LazyIndex.forOffset(initialFile, 0L, 1000)) {
idx.updateParentDir(newParent);
assertEquals(new File(newParent, initialFile.getName()), idx.file());
}
}

@Test
public void closeIsIdempotentBeforeAndAfterLoad() throws IOException {
File file = nonExistentTempFile();
LazyIndex<OffsetIndex> idx = LazyIndex.forOffset(file, 0L, 1000);
assertDoesNotThrow(idx::close); // close before any get()
File file2 = nonExistentTempFile();
LazyIndex<OffsetIndex> idx2 = LazyIndex.forOffset(file2, 0L, 1000);
idx2.get();
assertDoesNotThrow(idx2::close); // close after get()
}

private static File nonExistentTempFile() throws IOException {
File file = TestUtils.tempFile();
Files.deleteIfExists(file.toPath());
return file;
}

private static File nonExistentTempDir() {
File dir = TestUtils.tempDirectory();
return new File(dir, "this-subdir-does-not-exist");
}
}
Loading