Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ public File getTsFile() {
return tsFile;
}

public String getDatabaseName() {
return Objects.isNull(resource) ? null : resource.getDatabaseName();
}

public File getModFile() {
return modFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
Expand Down Expand Up @@ -459,45 +460,70 @@ protected String getSenderPort() {
protected TSStatus loadFileV1(final PipeTransferFileSealReqV1 req, final String fileAbsolutePath)
throws IOException {
return isUsingAsyncLoadTsFileStrategy.get()
? loadTsFileAsync(Collections.singletonList(fileAbsolutePath))
: loadTsFileSync(fileAbsolutePath);
? loadTsFileAsync(null, Collections.singletonList(fileAbsolutePath))
: loadTsFileSync(null, fileAbsolutePath);
}

@Override
protected TSStatus loadFileV2(
final PipeTransferFileSealReqV2 req, final List<String> fileAbsolutePaths)
throws IOException, IllegalPathException {
return req instanceof PipeTransferTsFileSealWithModReq
// TsFile's absolute path will be the second element
? (isUsingAsyncLoadTsFileStrategy.get()
? loadTsFileAsync(fileAbsolutePaths)
: loadTsFileSync(fileAbsolutePaths.get(1)))
: loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
if (req instanceof PipeTransferTsFileSealWithModReq) {
final String dataBaseName =
((PipeTransferTsFileSealWithModReq) req).getDatabaseNameByTsFileName();
return isUsingAsyncLoadTsFileStrategy.get()
? loadTsFileAsync(dataBaseName, fileAbsolutePaths)
: loadTsFileSync(dataBaseName, fileAbsolutePaths.get(req.getFileNames().size() - 1));
}
return loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
}

private TSStatus loadTsFileAsync(final List<String> absolutePaths) throws IOException {
private TSStatus loadTsFileAsync(final String dataBaseName, final List<String> absolutePaths)
throws IOException {
final Map<String, String> loadAttributes =
ActiveLoadPathHelper.buildAttributes(
null,
buildLoadTsFileAttributesForAsync(
dataBaseName,
shouldConvertDataTypeOnTypeMismatch,
validateTsFile.get(),
null,
shouldMarkAsPipeRequest.get());
if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) {
throw new PipeException("Load active listening pipe dir is not set.");
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

private TSStatus loadTsFileSync(final String fileAbsolutePath) throws FileNotFoundException {
static Map<String, String> buildLoadTsFileAttributesForAsync(
final String dataBaseName,
final boolean shouldConvertDataTypeOnTypeMismatch,
final boolean validateTsFile,
final boolean shouldMarkAsPipeRequest) {
return ActiveLoadPathHelper.buildAttributes(
dataBaseName,
LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName),
shouldConvertDataTypeOnTypeMismatch,
validateTsFile,
null,
shouldMarkAsPipeRequest);
}

private TSStatus loadTsFileSync(final String dataBaseName, final String fileAbsolutePath)
throws FileNotFoundException {
return executeStatementAndClassifyExceptions(
buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath, validateTsFile.get()));
}

static LoadTsFileStatement buildLoadTsFileStatementForSync(
final String dataBaseName, final String fileAbsolutePath, final boolean validateTsFile)
throws FileNotFoundException {
final LoadTsFileStatement statement = LoadTsFileStatement.createUnchecked(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
statement.setVerifySchema(validateTsFile.get());
statement.setVerifySchema(validateTsFile);
statement.setAutoCreateDatabase(
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());

return executeStatementAndClassifyExceptions(statement);
statement.setDatabase(dataBaseName);
statement.updateDatabaseLevelByTreeDatabase();
return statement;
}

private TSStatus loadSchemaSnapShot(
Expand Down Expand Up @@ -704,12 +730,7 @@ private TSStatus executeStatementAndClassifyExceptions(
return STATEMENT_STATUS_VISITOR.process(statement, result);
}
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Exception encountered while executing statement %s: ",
receiverId.get(),
statement.getPipeLoggingString());
logStatementExceptionIfNecessary(statement, e);
return STATEMENT_EXCEPTION_VISITOR.process(statement, e);
} finally {
if (Objects.nonNull(allocatedMemoryBlock)) {
Expand All @@ -719,6 +740,29 @@ private TSStatus executeStatementAndClassifyExceptions(
}
}

private void logStatementExceptionIfNecessary(final Statement statement, final Exception e) {
if (shouldLogStatementException(receiverId.get(), statement, e)) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Exception encountered while executing statement %s: ",
receiverId.get(),
Objects.isNull(statement) ? null : statement.getPipeLoggingString());
}
}

static boolean shouldLogStatementException(
final long receiverId, final Statement statement, final Exception e) {
// Use the reducer cache as a gate. The actual stack trace is logged only when it passes.
return PipePeriodicalLogReducer.log(
message -> {},
"Receiver id = %s, statement = %s, exception = %s, message = %s",
receiverId,
Objects.isNull(statement) ? null : statement.getPipeLoggingString(),
e.getClass().getName(),
e.getMessage());
}

private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement statement) {
if (statement == null) {
return RpcUtils.getStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.exception.sql.SemanticException;
Expand Down Expand Up @@ -48,6 +49,13 @@
public class PipeStatementExceptionVisitor extends StatementVisitor<TSStatus, Exception> {
@Override
public TSStatus visitNode(final StatementNode node, final Exception context) {
if (context instanceof IoTDBRuntimeException
&& ((IoTDBRuntimeException) context).getErrorCode()
== TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
.setMessage(context.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class PipeTransferTsFileSealWithModReq extends PipeTransferFileSealReqV2 {

Expand All @@ -38,17 +40,59 @@ protected PipeRequestType getPlanType() {
return PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD;
}

private static final String DATABASE_NAME_KEY_PREFIX = "DATABASE_NAME_";

public String getDatabaseNameByTsFileName() {
return getParameters() == null
? null
: getParameters()
.get(
generateDatabaseNameWithFileNameKey(getFileNames().get(getFileNames().size() - 1)));
}

private static String generateDatabaseNameWithFileNameKey(final String fileName) {
return DATABASE_NAME_KEY_PREFIX + fileName;
}

private static Map<String, String> generateDatabaseNameParameter(
final String tsFileName, final String dataBaseName) {
return dataBaseName == null
? new HashMap<>()
: Collections.singletonMap(generateDatabaseNameWithFileNameKey(tsFileName), dataBaseName);
}

/////////////////////////////// Thrift ///////////////////////////////

public static PipeTransferTsFileSealWithModReq toTPipeTransferReq(
String modFileName, long modFileLength, String tsFileName, long tsFileLength)
throws IOException {
return toTPipeTransferReq(modFileName, modFileLength, tsFileName, tsFileLength, null);
}

public static PipeTransferTsFileSealWithModReq toTPipeTransferReq(
final String modFileName,
final long modFileLength,
final String tsFileName,
final long tsFileLength,
final String dataBaseName)
throws IOException {
return (PipeTransferTsFileSealWithModReq)
new PipeTransferTsFileSealWithModReq()
.convertToTPipeTransferReq(
Arrays.asList(modFileName, tsFileName),
Arrays.asList(modFileLength, tsFileLength),
new HashMap<>());
generateDatabaseNameParameter(tsFileName, dataBaseName));
}

public static PipeTransferTsFileSealWithModReq toTPipeTransferReq(
final String tsFileName, final long tsFileLength, final String dataBaseName)
throws IOException {
return (PipeTransferTsFileSealWithModReq)
new PipeTransferTsFileSealWithModReq()
.convertToTPipeTransferReq(
Collections.singletonList(tsFileName),
Collections.singletonList(tsFileLength),
generateDatabaseNameParameter(tsFileName, dataBaseName));
}

public static PipeTransferTsFileSealWithModReq fromTPipeTransferReq(TPipeTransferReq req) {
Expand All @@ -61,11 +105,31 @@ public static PipeTransferTsFileSealWithModReq fromTPipeTransferReq(TPipeTransfe
public static byte[] toTPipeTransferBytes(
String modFileName, long modFileLength, String tsFileName, long tsFileLength)
throws IOException {
return toTPipeTransferBytes(modFileName, modFileLength, tsFileName, tsFileLength, null);
}

public static byte[] toTPipeTransferBytes(
final String modFileName,
final long modFileLength,
final String tsFileName,
final long tsFileLength,
final String dataBaseName)
throws IOException {
return new PipeTransferTsFileSealWithModReq()
.convertToTPipeTransferSnapshotSealBytes(
Arrays.asList(modFileName, tsFileName),
Arrays.asList(modFileLength, tsFileLength),
new HashMap<>());
generateDatabaseNameParameter(tsFileName, dataBaseName));
}

public static byte[] toTPipeTransferBytes(
final String tsFileName, final long tsFileLength, final String dataBaseName)
throws IOException {
return new PipeTransferTsFileSealWithModReq()
.convertToTPipeTransferSnapshotSealBytes(
Collections.singletonList(tsFileName),
Collections.singletonList(tsFileLength),
generateDatabaseNameParameter(tsFileName, dataBaseName));
}

/////////////////////////////// Object ///////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ private void doTransfer(
final Map<Pair<String, Long>, Double> pipe2WeightMap = batchToTransfer.deepCopyPipe2WeightMap();

for (final File tsFile : sealedFiles) {
doTransfer(pipe2WeightMap, socket, tsFile, null, tsFile.getName());
doTransfer(pipe2WeightMap, socket, tsFile, null, null, tsFile.getName());
try {
RetryUtils.retryOnException(
() -> {
Expand Down Expand Up @@ -379,6 +379,7 @@ private void doTransfer(
pipeTsFileInsertionEvent.isWithMod() && supportModsIfIsDataNodeReceiver
? pipeTsFileInsertionEvent.getModFile()
: null,
pipeTsFileInsertionEvent.getDatabaseName(),
pipeTsFileInsertionEvent.toString());
}

Expand All @@ -387,6 +388,7 @@ private void doTransfer(
final AirGapSocket socket,
final File tsFile,
final File modFile,
final String dataBaseName,
final String receiverStatusContext)
throws PipeException, IOException {
final String errorMessage = String.format("Seal file %s error. Socket %s.", tsFile, socket);
Expand All @@ -397,7 +399,7 @@ private void doTransfer(
if (!sendWeighted(
socket,
PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()),
modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length(), dataBaseName),
pipe2WeightMap)) {
receiverStatusHandler.handle(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
Expand All @@ -411,7 +413,10 @@ private void doTransfer(
transferFilePieces(pipe2WeightMap, tsFile, socket, false);
if (!sendWeighted(
socket,
PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), tsFile.length()),
dataBaseName == null
? PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), tsFile.length())
: PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
tsFile.getName(), tsFile.length(), dataBaseName),
pipe2WeightMap)) {
receiverStatusHandler.handle(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ private void transferInBatchWithoutCheck(
eventsHadBeenAddedToRetryQueue,
sealedFile,
null,
false));
false,
null));
}
} catch (final Exception e) {
PipeLogger.log(LOGGER::warn, e, "Failed to transfer tsfile batch (%s).", sealedFiles);
Expand Down Expand Up @@ -400,7 +401,8 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE
pipeTsFileInsertionEvent.getTsFile(),
pipeTsFileInsertionEvent.getModFile(),
pipeTsFileInsertionEvent.isWithMod()
&& clientManager.supportModsIfIsDataNodeReceiver());
&& clientManager.supportModsIfIsDataNodeReceiver(),
pipeTsFileInsertionEvent.getDatabaseName());

transfer(pipeTransferTsFileHandler);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler {
private File currentFile;

private final boolean transferMod;
private final String dataBaseName;

private final int readFileBufferSize;
private PipeTsFileMemoryBlock memoryBlock;
Expand All @@ -98,7 +99,8 @@ public PipeTransferTsFileHandler(
final AtomicBoolean eventsHadBeenAddedToRetryQueue,
final File tsFile,
final File modFile,
final boolean transferMod)
final boolean transferMod,
final String dataBaseName)
throws InterruptedException {
super(connector);

Expand All @@ -111,6 +113,7 @@ public PipeTransferTsFileHandler(
this.tsFile = tsFile;
this.modFile = modFile;
this.transferMod = transferMod;
this.dataBaseName = dataBaseName;
currentFile = transferMod ? modFile : tsFile;

// NOTE: Waiting for resource enough for slicing here may cause deadlock!
Expand Down Expand Up @@ -191,8 +194,16 @@ public void transfer(
final TPipeTransferReq uncompressedReq =
transferMod
? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length())
: PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length());
modFile.getName(),
modFile.length(),
tsFile.getName(),
tsFile.length(),
dataBaseName)
: dataBaseName == null
? PipeTransferTsFileSealReq.toTPipeTransferReq(
tsFile.getName(), tsFile.length())
: PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
tsFile.getName(), tsFile.length(), dataBaseName);
final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq);

pipeName2WeightMap.forEach(
Expand Down
Loading
Loading