diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 8ffbc9f2f9bde..c9cd2d44af04b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -265,6 +265,10 @@ public File getTsFile() { return tsFile; } + public String getDatabaseName() { + return Objects.isNull(resource) ? null : resource.getDatabaseName(); + } + public File getModFile() { return modFile; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 58d4c29eddc57..1ec4d6d53b7ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -52,6 +52,7 @@ import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req; @@ -459,29 +460,31 @@ protected String getSenderPort() { protected TSStatus loadFileV1(final PipeTransferFileSealReqV1 req, final String fileAbsolutePath) throws IOException { return isUsingAsyncLoadTsFileStrategy.get() - ? loadTsFileAsync(Collections.singletonList(fileAbsolutePath)) - : loadTsFileSync(fileAbsolutePath); + ? loadTsFileAsync(null, Collections.singletonList(fileAbsolutePath)) + : loadTsFileSync(null, fileAbsolutePath); } @Override protected TSStatus loadFileV2( final PipeTransferFileSealReqV2 req, final List fileAbsolutePaths) throws IOException, IllegalPathException { - return req instanceof PipeTransferTsFileSealWithModReq - // TsFile's absolute path will be the second element - ? (isUsingAsyncLoadTsFileStrategy.get() - ? loadTsFileAsync(fileAbsolutePaths) - : loadTsFileSync(fileAbsolutePaths.get(1))) - : loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths); + if (req instanceof PipeTransferTsFileSealWithModReq) { + final String dataBaseName = + ((PipeTransferTsFileSealWithModReq) req).getDatabaseNameByTsFileName(); + return isUsingAsyncLoadTsFileStrategy.get() + ? loadTsFileAsync(dataBaseName, fileAbsolutePaths) + : loadTsFileSync(dataBaseName, fileAbsolutePaths.get(req.getFileNames().size() - 1)); + } + return loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths); } - private TSStatus loadTsFileAsync(final List absolutePaths) throws IOException { + private TSStatus loadTsFileAsync(final String dataBaseName, final List absolutePaths) + throws IOException { final Map loadAttributes = - ActiveLoadPathHelper.buildAttributes( - null, + buildLoadTsFileAttributesForAsync( + dataBaseName, shouldConvertDataTypeOnTypeMismatch, validateTsFile.get(), - null, shouldMarkAsPipeRequest.get()); if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) { throw new PipeException("Load active listening pipe dir is not set."); @@ -489,15 +492,38 @@ private TSStatus loadTsFileAsync(final List absolutePaths) throws IOExce return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } - private TSStatus loadTsFileSync(final String fileAbsolutePath) throws FileNotFoundException { + static Map buildLoadTsFileAttributesForAsync( + final String dataBaseName, + final boolean shouldConvertDataTypeOnTypeMismatch, + final boolean validateTsFile, + final boolean shouldMarkAsPipeRequest) { + return ActiveLoadPathHelper.buildAttributes( + dataBaseName, + LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName), + shouldConvertDataTypeOnTypeMismatch, + validateTsFile, + null, + shouldMarkAsPipeRequest); + } + + private TSStatus loadTsFileSync(final String dataBaseName, final String fileAbsolutePath) + throws FileNotFoundException { + return executeStatementAndClassifyExceptions( + buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath, validateTsFile.get())); + } + + static LoadTsFileStatement buildLoadTsFileStatementForSync( + final String dataBaseName, final String fileAbsolutePath, final boolean validateTsFile) + throws FileNotFoundException { final LoadTsFileStatement statement = LoadTsFileStatement.createUnchecked(fileAbsolutePath); statement.setDeleteAfterLoad(true); statement.setConvertOnTypeMismatch(true); - statement.setVerifySchema(validateTsFile.get()); + statement.setVerifySchema(validateTsFile); statement.setAutoCreateDatabase( IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()); - - return executeStatementAndClassifyExceptions(statement); + statement.setDatabase(dataBaseName); + statement.updateDatabaseLevelByTreeDatabase(); + return statement; } private TSStatus loadSchemaSnapShot( @@ -704,12 +730,7 @@ private TSStatus executeStatementAndClassifyExceptions( return STATEMENT_STATUS_VISITOR.process(statement, result); } } catch (final Exception e) { - PipeLogger.log( - LOGGER::warn, - e, - "Receiver id = %s: Exception encountered while executing statement %s: ", - receiverId.get(), - statement.getPipeLoggingString()); + logStatementExceptionIfNecessary(statement, e); return STATEMENT_EXCEPTION_VISITOR.process(statement, e); } finally { if (Objects.nonNull(allocatedMemoryBlock)) { @@ -719,6 +740,29 @@ private TSStatus executeStatementAndClassifyExceptions( } } + private void logStatementExceptionIfNecessary(final Statement statement, final Exception e) { + if (shouldLogStatementException(receiverId.get(), statement, e)) { + PipeLogger.log( + LOGGER::warn, + e, + "Receiver id = %s: Exception encountered while executing statement %s: ", + receiverId.get(), + Objects.isNull(statement) ? null : statement.getPipeLoggingString()); + } + } + + static boolean shouldLogStatementException( + final long receiverId, final Statement statement, final Exception e) { + // Use the reducer cache as a gate. The actual stack trace is logged only when it passes. + return PipePeriodicalLogReducer.log( + message -> {}, + "Receiver id = %s, statement = %s, exception = %s, message = %s", + receiverId, + Objects.isNull(statement) ? null : statement.getPipeLoggingString(), + e.getClass().getName(), + e.getMessage()); + } + private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement statement) { if (statement == null) { return RpcUtils.getStatus( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java index 098c983977a65..7b8246e3dab7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.exception.sql.SemanticException; @@ -48,6 +49,13 @@ public class PipeStatementExceptionVisitor extends StatementVisitor { @Override public TSStatus visitNode(final StatementNode node, final Exception context) { + if (context instanceof IoTDBRuntimeException + && ((IoTDBRuntimeException) context).getErrorCode() + == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) { + return new TSStatus( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(context.getMessage()); + } return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) .setMessage(context.getMessage()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java index 28959a1a0903c..7d0aa99cb13ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java @@ -25,7 +25,9 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.Map; public class PipeTransferTsFileSealWithModReq extends PipeTransferFileSealReqV2 { @@ -38,17 +40,59 @@ protected PipeRequestType getPlanType() { return PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD; } + private static final String DATABASE_NAME_KEY_PREFIX = "DATABASE_NAME_"; + + public String getDatabaseNameByTsFileName() { + return getParameters() == null + ? null + : getParameters() + .get( + generateDatabaseNameWithFileNameKey(getFileNames().get(getFileNames().size() - 1))); + } + + private static String generateDatabaseNameWithFileNameKey(final String fileName) { + return DATABASE_NAME_KEY_PREFIX + fileName; + } + + private static Map generateDatabaseNameParameter( + final String tsFileName, final String dataBaseName) { + return dataBaseName == null + ? new HashMap<>() + : Collections.singletonMap(generateDatabaseNameWithFileNameKey(tsFileName), dataBaseName); + } + /////////////////////////////// Thrift /////////////////////////////// public static PipeTransferTsFileSealWithModReq toTPipeTransferReq( String modFileName, long modFileLength, String tsFileName, long tsFileLength) throws IOException { + return toTPipeTransferReq(modFileName, modFileLength, tsFileName, tsFileLength, null); + } + + public static PipeTransferTsFileSealWithModReq toTPipeTransferReq( + final String modFileName, + final long modFileLength, + final String tsFileName, + final long tsFileLength, + final String dataBaseName) + throws IOException { return (PipeTransferTsFileSealWithModReq) new PipeTransferTsFileSealWithModReq() .convertToTPipeTransferReq( Arrays.asList(modFileName, tsFileName), Arrays.asList(modFileLength, tsFileLength), - new HashMap<>()); + generateDatabaseNameParameter(tsFileName, dataBaseName)); + } + + public static PipeTransferTsFileSealWithModReq toTPipeTransferReq( + final String tsFileName, final long tsFileLength, final String dataBaseName) + throws IOException { + return (PipeTransferTsFileSealWithModReq) + new PipeTransferTsFileSealWithModReq() + .convertToTPipeTransferReq( + Collections.singletonList(tsFileName), + Collections.singletonList(tsFileLength), + generateDatabaseNameParameter(tsFileName, dataBaseName)); } public static PipeTransferTsFileSealWithModReq fromTPipeTransferReq(TPipeTransferReq req) { @@ -61,11 +105,31 @@ public static PipeTransferTsFileSealWithModReq fromTPipeTransferReq(TPipeTransfe public static byte[] toTPipeTransferBytes( String modFileName, long modFileLength, String tsFileName, long tsFileLength) throws IOException { + return toTPipeTransferBytes(modFileName, modFileLength, tsFileName, tsFileLength, null); + } + + public static byte[] toTPipeTransferBytes( + final String modFileName, + final long modFileLength, + final String tsFileName, + final long tsFileLength, + final String dataBaseName) + throws IOException { return new PipeTransferTsFileSealWithModReq() .convertToTPipeTransferSnapshotSealBytes( Arrays.asList(modFileName, tsFileName), Arrays.asList(modFileLength, tsFileLength), - new HashMap<>()); + generateDatabaseNameParameter(tsFileName, dataBaseName)); + } + + public static byte[] toTPipeTransferBytes( + final String tsFileName, final long tsFileLength, final String dataBaseName) + throws IOException { + return new PipeTransferTsFileSealWithModReq() + .convertToTPipeTransferSnapshotSealBytes( + Collections.singletonList(tsFileName), + Collections.singletonList(tsFileLength), + generateDatabaseNameParameter(tsFileName, dataBaseName)); } /////////////////////////////// Object /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 81e745dc6a7ff..b2af96a37c5f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -255,7 +255,7 @@ private void doTransfer( final Map, Double> pipe2WeightMap = batchToTransfer.deepCopyPipe2WeightMap(); for (final File tsFile : sealedFiles) { - doTransfer(pipe2WeightMap, socket, tsFile, null, tsFile.getName()); + doTransfer(pipe2WeightMap, socket, tsFile, null, null, tsFile.getName()); try { RetryUtils.retryOnException( () -> { @@ -379,6 +379,7 @@ private void doTransfer( pipeTsFileInsertionEvent.isWithMod() && supportModsIfIsDataNodeReceiver ? pipeTsFileInsertionEvent.getModFile() : null, + pipeTsFileInsertionEvent.getDatabaseName(), pipeTsFileInsertionEvent.toString()); } @@ -387,6 +388,7 @@ private void doTransfer( final AirGapSocket socket, final File tsFile, final File modFile, + final String dataBaseName, final String receiverStatusContext) throws PipeException, IOException { final String errorMessage = String.format("Seal file %s error. Socket %s.", tsFile, socket); @@ -397,7 +399,7 @@ private void doTransfer( if (!sendWeighted( socket, PipeTransferTsFileSealWithModReq.toTPipeTransferBytes( - modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()), + modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length(), dataBaseName), pipe2WeightMap)) { receiverStatusHandler.handle( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) @@ -411,7 +413,10 @@ private void doTransfer( transferFilePieces(pipe2WeightMap, tsFile, socket, false); if (!sendWeighted( socket, - PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), tsFile.length()), + dataBaseName == null + ? PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), tsFile.length()) + : PipeTransferTsFileSealWithModReq.toTPipeTransferBytes( + tsFile.getName(), tsFile.length(), dataBaseName), pipe2WeightMap)) { receiverStatusHandler.handle( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 9d0c1563c7883..b61cb4543c74e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -255,7 +255,8 @@ private void transferInBatchWithoutCheck( eventsHadBeenAddedToRetryQueue, sealedFile, null, - false)); + false, + null)); } } catch (final Exception e) { PipeLogger.log(LOGGER::warn, e, "Failed to transfer tsfile batch (%s).", sealedFiles); @@ -400,7 +401,8 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE pipeTsFileInsertionEvent.getTsFile(), pipeTsFileInsertionEvent.getModFile(), pipeTsFileInsertionEvent.isWithMod() - && clientManager.supportModsIfIsDataNodeReceiver()); + && clientManager.supportModsIfIsDataNodeReceiver(), + pipeTsFileInsertionEvent.getDatabaseName()); transfer(pipeTransferTsFileHandler); return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 8d9648f52921d..d7515141daeb8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -78,6 +78,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { private File currentFile; private final boolean transferMod; + private final String dataBaseName; private final int readFileBufferSize; private PipeTsFileMemoryBlock memoryBlock; @@ -98,7 +99,8 @@ public PipeTransferTsFileHandler( final AtomicBoolean eventsHadBeenAddedToRetryQueue, final File tsFile, final File modFile, - final boolean transferMod) + final boolean transferMod, + final String dataBaseName) throws InterruptedException { super(connector); @@ -111,6 +113,7 @@ public PipeTransferTsFileHandler( this.tsFile = tsFile; this.modFile = modFile; this.transferMod = transferMod; + this.dataBaseName = dataBaseName; currentFile = transferMod ? modFile : tsFile; // NOTE: Waiting for resource enough for slicing here may cause deadlock! @@ -191,8 +194,16 @@ public void transfer( final TPipeTransferReq uncompressedReq = transferMod ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq( - modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()) - : PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()); + modFile.getName(), + modFile.length(), + tsFile.getName(), + tsFile.length(), + dataBaseName) + : dataBaseName == null + ? PipeTransferTsFileSealReq.toTPipeTransferReq( + tsFile.getName(), tsFile.length()) + : PipeTransferTsFileSealWithModReq.toTPipeTransferReq( + tsFile.getName(), tsFile.length(), dataBaseName); final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq); pipeName2WeightMap.forEach( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index ef3d59f0d2a54..1bb0c383ff87b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -275,7 +275,7 @@ private void doTransfer(final PipeTabletEventTsFileBatch batchToTransfer) final Map, Double> pipe2WeightMap = batchToTransfer.deepCopyPipe2WeightMap(); for (final File tsFile : sealedFiles) { - doTransfer(pipe2WeightMap, tsFile, null); + doTransfer(pipe2WeightMap, tsFile, null, null); try { RetryUtils.retryOnException( () -> { @@ -428,7 +428,8 @@ private void doTransferWrapper(final PipeTsFileInsertionEvent pipeTsFileInsertio pipeTsFileInsertionEvent.getCreationTime()), 1.0), pipeTsFileInsertionEvent.getTsFile(), - pipeTsFileInsertionEvent.isWithMod() ? pipeTsFileInsertionEvent.getModFile() : null); + pipeTsFileInsertionEvent.isWithMod() ? pipeTsFileInsertionEvent.getModFile() : null, + pipeTsFileInsertionEvent.getDatabaseName()); } finally { pipeTsFileInsertionEvent.decreaseReferenceCount( IoTDBDataRegionSyncSink.class.getName(), false); @@ -438,7 +439,8 @@ private void doTransferWrapper(final PipeTsFileInsertionEvent pipeTsFileInsertio private void doTransfer( final Map, Double> pipeName2WeightMap, final File tsFile, - final File modFile) + final File modFile, + final String dataBaseName) throws PipeException, IOException { final Pair clientAndStatus = clientManager.getClient(); @@ -454,7 +456,11 @@ private void doTransfer( final TPipeTransferReq req = compressIfNeeded( PipeTransferTsFileSealWithModReq.toTPipeTransferReq( - modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length())); + modFile.getName(), + modFile.length(), + tsFile.getName(), + tsFile.length(), + dataBaseName)); pipeName2WeightMap.forEach( (pipePair, weight) -> @@ -479,7 +485,11 @@ private void doTransfer( try { final TPipeTransferReq req = compressIfNeeded( - PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length())); + dataBaseName == null + ? PipeTransferTsFileSealReq.toTPipeTransferReq( + tsFile.getName(), tsFile.length()) + : PipeTransferTsFileSealWithModReq.toTPipeTransferReq( + tsFile.getName(), tsFile.length(), dataBaseName)); pipeName2WeightMap.forEach( (pipePair, weight) -> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index 6e74ceed206ec..404b957c7865c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -20,7 +20,9 @@ package org.apache.iotdb.db.queryengine.plan.statement.crud; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.plan.statement.Statement; @@ -42,10 +44,13 @@ import java.util.List; import java.util.Map; +import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT; + public class LoadTsFileStatement extends Statement { private final File file; private int databaseLevel; + private String database; private boolean verifySchema = true; private boolean deleteAfterLoad = false; private boolean convertOnTypeMismatch = true; @@ -201,6 +206,14 @@ public int getDatabaseLevel() { return databaseLevel; } + public void setDatabase(String database) { + this.database = database; + } + + public String getDatabase() { + return database; + } + public void setVerifySchema(boolean verifySchema) { this.verifySchema = verifySchema; } @@ -281,6 +294,7 @@ public boolean isAsyncLoad() { private void initAttributes(final Map loadAttributes) { this.databaseLevel = LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes); + this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes); this.deleteAfterLoad = LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes); this.convertOnTypeMismatch = LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes); @@ -293,6 +307,28 @@ private void initAttributes(final Map loadAttributes) { } } + public void updateDatabaseLevelByTreeDatabase() { + final Integer databaseLevel = getDatabaseLevelByTreeDatabase(database); + if (databaseLevel != null) { + this.databaseLevel = databaseLevel; + } + } + + public static Integer getDatabaseLevelByTreeDatabase(final String database) { + if (database == null) { + return null; + } + try { + final String[] nodes = PathUtils.splitPathToDetachedNodes(database); + if (nodes.length > 1 && PATH_ROOT.equals(nodes[0])) { + return nodes.length - 1; + } + } catch (final IllegalPathException ignored) { + // Keep the configured database level when database is not a legal tree path. + } + return null; + } + public boolean reconstructStatementIfMiniFileConverted(final List isMiniTsFile) { int lastNonMiniTsFileIndex = -1; @@ -352,6 +388,7 @@ public List getSubStatements() { final LoadTsFileStatement statement = new LoadTsFileStatement(); statement.databaseLevel = this.databaseLevel; + statement.database = this.database; statement.verifySchema = this.verifySchema; statement.deleteAfterLoad = this.deleteAfterLoad; statement.convertOnTypeMismatch = this.convertOnTypeMismatch; @@ -395,6 +432,8 @@ public String toString() { + deleteAfterLoad + ", database-level=" + databaseLevel + + ", database=" + + database + ", verify-schema=" + verifySchema + ", convert-on-type-mismatch=" diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java index 965f2941dc659..2503b822b9eac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java @@ -46,6 +46,7 @@ public final class ActiveLoadPathHelper { private static final List KEY_ORDER = Collections.unmodifiableList( Arrays.asList( + LoadTsFileConfigurator.DATABASE_NAME_KEY, LoadTsFileConfigurator.DATABASE_LEVEL_KEY, LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY, LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY, @@ -62,8 +63,28 @@ public static Map buildAttributes( final Boolean verify, final Long tabletConversionThresholdBytes, final Boolean pipeGenerated) { + return buildAttributes( + null, + databaseLevel, + convertOnTypeMismatch, + verify, + tabletConversionThresholdBytes, + pipeGenerated); + } + + public static Map buildAttributes( + final String databaseName, + final Integer databaseLevel, + final Boolean convertOnTypeMismatch, + final Boolean verify, + final Long tabletConversionThresholdBytes, + final Boolean pipeGenerated) { final Map attributes = new LinkedHashMap<>(); + if (Objects.nonNull(databaseName) && !databaseName.isEmpty()) { + attributes.put(LoadTsFileConfigurator.DATABASE_NAME_KEY, databaseName); + } + if (Objects.nonNull(databaseLevel)) { attributes.put(LoadTsFileConfigurator.DATABASE_LEVEL_KEY, databaseLevel.toString()); } @@ -149,6 +170,10 @@ public static void applyAttributesToStatement( final LoadTsFileStatement statement, final boolean defaultVerify) { + Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY)) + .filter(name -> !name.isEmpty()) + .ifPresent(statement::setDatabase); + Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY)) .ifPresent( level -> { @@ -216,6 +241,11 @@ private static Optional extractAndValidateAttributeValue( private static void validateAttributeValue(final String key, final String value) { switch (key) { + case LoadTsFileConfigurator.DATABASE_NAME_KEY: + if (value == null || value.isEmpty()) { + throw new SemanticException("Database name must not be empty."); + } + break; case LoadTsFileConfigurator.DATABASE_LEVEL_KEY: LoadTsFileConfigurator.validateDatabaseLevelParam(value); break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java index 510d47b0b23df..8b689c6fb22ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java @@ -40,6 +40,7 @@ public static void validateParameters(final String key, final String value) { case ON_SUCCESS_KEY: validateOnSuccessParam(value); break; + case DATABASE_NAME_KEY: case TABLET_CONVERSION_THRESHOLD_KEY: break; case CONVERT_ON_TYPE_MISMATCH_KEY: @@ -87,6 +88,12 @@ public static int parseOrGetDefaultDatabaseLevel(final Map loadA DATABASE_LEVEL_KEY, String.valueOf(DATABASE_LEVEL_DEFAULT_VALUE))); } + public static final String DATABASE_NAME_KEY = "database-name"; + + public static String parseDatabaseName(final Map loadAttributes) { + return loadAttributes.get(DATABASE_NAME_KEY); + } + public static final String ON_SUCCESS_KEY = "on-success"; public static final String ON_SUCCESS_DELETE_VALUE = "delete"; public static final String ON_SUCCESS_NONE_VALUE = "none"; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java index 2b20f1d91efb8..756d11818251f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.receiver; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; @@ -62,4 +63,17 @@ public void testTTLIdempotency() { StatusUtils.OK, new TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode())))) .getCode()); } + + @Test + public void testDatabaseNotExistRuntimeExceptionClassification() { + Assert.assertEquals( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode(), + IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR + .process( + new InsertRowsStatement(), + new IoTDBRuntimeException( + "Create DataPartition failed because the database: root.test.sg_0 is not exists", + TSStatusCode.DATABASE_NOT_EXIST.getStatusCode())) + .getCode()); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java new file mode 100644 index 0000000000000..f41c44763f997 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.protocol.thrift; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; +import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; + +public class IoTDBDataNodeReceiverTest { + + @Test + public void testLoadTsFileSyncStatementUsesTreeDatabaseLevelFromDatabaseName() throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-tree-database-level", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + "root.test.sg_0", tsFile.toString(), true); + + Assert.assertEquals("root.test.sg_0", statement.getDatabase()); + Assert.assertEquals(2, statement.getDatabaseLevel()); + } finally { + Files.deleteIfExists(tsFile); + } + } + + @Test + public void testLoadTsFileAsyncAttributesUseTreeDatabaseLevelFromDatabaseName() throws Exception { + final Path tsFile = Files.createTempFile("pipe-async-load-tree-database-level", ".tsfile"); + try { + final Map attributes = + IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync( + "root.test.sg_0", true, true, true); + + Assert.assertEquals( + "root.test.sg_0", attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY)); + Assert.assertEquals("2", attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY)); + + final LoadTsFileStatement statement = LoadTsFileStatement.createUnchecked(tsFile.toString()); + ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, true); + Assert.assertEquals("root.test.sg_0", statement.getDatabase()); + Assert.assertEquals(2, statement.getDatabaseLevel()); + } finally { + Files.deleteIfExists(tsFile); + } + } + + @Test + public void testLoadTsFileSyncStatementKeepsDefaultDatabaseLevelWhenDatabaseNameIsNull() + throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-default-database-level", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null, tsFile.toString(), true); + + Assert.assertNull(statement.getDatabase()); + Assert.assertEquals( + IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(), + statement.getDatabaseLevel()); + } finally { + Files.deleteIfExists(tsFile); + } + } + + @Test + public void testRepeatedStatementExceptionLogIsReduced() throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-log-reducer", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + "root.test.sg_0", tsFile.toString(), true); + final long receiverId = System.nanoTime(); + final Exception exception = new RuntimeException("repeated receiver exception " + receiverId); + + Assert.assertTrue( + IoTDBDataNodeReceiver.shouldLogStatementException(receiverId, statement, exception)); + Assert.assertFalse( + IoTDBDataNodeReceiver.shouldLogStatementException(receiverId, statement, exception)); + Assert.assertTrue( + IoTDBDataNodeReceiver.shouldLogStatementException( + receiverId, statement, new RuntimeException("another receiver exception"))); + } finally { + Files.deleteIfExists(tsFile); + } + } +}