Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import com.arcadedb.GlobalConfiguration;
import com.arcadedb.TestHelper;
import com.arcadedb.exception.CommandParsingException;
import com.arcadedb.query.sql.executor.Result;
import com.arcadedb.query.sql.executor.ResultSet;
import com.arcadedb.schema.DocumentType;
import com.arcadedb.schema.LocalTimeSeriesType;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Tests for CREATE TIMESERIES TYPE SQL statement.
Expand Down Expand Up @@ -108,4 +110,24 @@ void createMinimal() {
assertThat(tsType.getShardCount()).isEqualTo(expectedShards);
assertThat(tsType.getRetentionMs()).isEqualTo(0L);
}

@Test
void precisionClauseIsNotSupported() {
// 'PRECISION NANOSECOND' after the timestamp column is documented in some examples
// but is not part of the grammar. The parser must reject it clearly.
assertThatThrownBy(() ->
database.command("sql",
"CREATE TIMESERIES TYPE BadPrecision TIMESTAMP ts PRECISION NANOSECOND FIELDS (value DOUBLE)"))
.isInstanceOf(CommandParsingException.class);
}

@Test
void compactionIntervalTwoWordFormIsNotSupported() {
// The documented form 'COMPACTION INTERVAL' (two words) is not a valid token.
// Only 'COMPACTION_INTERVAL' (with underscore) is recognized by the grammar.
assertThatThrownBy(() ->
database.command("sql",
"CREATE TIMESERIES TYPE BadCompaction TIMESTAMP ts FIELDS (value DOUBLE) COMPACTION INTERVAL 30 DAYS"))
.isInstanceOf(CommandParsingException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,32 @@
*/
package com.arcadedb.server.http.handler;

import com.arcadedb.database.DatabaseFactory;
import com.arcadedb.database.DatabaseInternal;
import com.arcadedb.engine.timeseries.ColumnDefinition;
import com.arcadedb.engine.timeseries.LineProtocolParser;
import com.arcadedb.engine.timeseries.LineProtocolParser.Precision;
import com.arcadedb.engine.timeseries.LineProtocolParser.Sample;
import com.arcadedb.engine.timeseries.TimeSeriesEngine;
import com.arcadedb.log.LogManager;
import com.arcadedb.schema.DocumentType;
import com.arcadedb.schema.LocalTimeSeriesType;
import com.arcadedb.serializer.json.JSONObject;
import com.arcadedb.server.http.HttpServer;
import com.arcadedb.server.security.ServerSecurityUser;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.StatusCodes;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Deque;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.zip.GZIPInputStream;

/**
* HTTP handler for InfluxDB Line Protocol ingestion.
Expand Down Expand Up @@ -62,8 +72,34 @@ protected boolean requiresJsonPayload() {

@Override
protected String parseRequestPayload(final HttpServerExchange e) {
// Store the raw payload for Line Protocol parsing
rawPayload = super.parseRequestPayload(e);
if (!e.isInIoThread() && !e.isBlocking())
e.startBlocking();

final AtomicReference<byte[]> bytesRef = new AtomicReference<>();
e.getRequestReceiver().receiveFullBytes(
(exchange, data) -> bytesRef.set(data),
(exchange, err) -> {
LogManager.instance().log(this, Level.SEVERE, "receiveFullBytes completed with an error: %s", err, err.getMessage());
exchange.setStatusCode(StatusCodes.INTERNAL_SERVER_ERROR);
exchange.getResponseSender().send("Invalid Request");
});

final byte[] rawBytes = bytesRef.get();
if (rawBytes == null) {
rawPayload = null;
return null;
}
Comment on lines +78 to +91
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The use of receiveFullBytes is asynchronous and does not block the calling thread. Since bytesRef.get() is called immediately after, it will likely return null because the request body has not been fully read yet, leading to an incorrect "Request body is empty" error. Since this handler runs on a worker thread and has called e.startBlocking(), you should use the blocking getInputStream() to read the payload.

    final byte[] rawBytes;
    try {
      rawBytes = e.getInputStream().readAllBytes();
    } catch (final IOException ex) {
      LogManager.instance().log(this, Level.SEVERE, "Error reading request payload", ex);
      return null;
    }

    if (rawBytes.length == 0) {
      rawPayload = null;
      return null;
    }

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushback: AbstractServerHttpHandler.parseRequestPayload (lines 63-73) uses this exact same pattern - AtomicReference + receiveFullBytes + .get(). When blocking mode is started and the handler runs on a worker thread, receiveFullBytes completes synchronously before returning. This is the established pattern across all handlers in the codebase.


final var contentEncoding = e.getRequestHeaders().get(Headers.CONTENT_ENCODING);
if (contentEncoding != null && !contentEncoding.isEmpty() && "gzip".equalsIgnoreCase(contentEncoding.getFirst())) {
try (final GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(rawBytes))) {
rawPayload = new String(gzip.readAllBytes(), DatabaseFactory.getDefaultCharset());
} catch (final IOException ex) {
throw new IllegalArgumentException("Failed to decompress gzip body: " + ex.getMessage(), ex);
}
} else {
rawPayload = new String(rawBytes, DatabaseFactory.getDefaultCharset());
}
return rawPayload;
}

Expand Down Expand Up @@ -94,17 +130,23 @@ protected ExecutionResponse execute(final HttpServerExchange exchange, final Ser

// Group by measurement and insert
int inserted = 0;
final Set<String> unknownTypes = new LinkedHashSet<>();
final Set<String> nonTimeSeriesTypes = new LinkedHashSet<>();
database.begin();
try {
for (final Sample sample : samples) {
final String measurement = sample.getMeasurement();

if (!database.getSchema().existsType(measurement))
continue; // skip unknown measurement types
if (!database.getSchema().existsType(measurement)) {
unknownTypes.add(measurement);
continue;
}

final DocumentType docType = database.getSchema().getType(measurement);
if (!(docType instanceof LocalTimeSeriesType tsType) || tsType.getEngine() == null)
continue; // skip non-timeseries types
if (!(docType instanceof LocalTimeSeriesType tsType) || tsType.getEngine() == null) {
nonTimeSeriesTypes.add(measurement);
continue;
}

final TimeSeriesEngine engine = tsType.getEngine();
final List<ColumnDefinition> columns = tsType.getTsColumns();
Expand Down Expand Up @@ -137,9 +179,18 @@ protected ExecutionResponse execute(final HttpServerExchange exchange, final Ser
throw e;
}

// Return 204 No Content (InfluxDB convention) or 200 with count
if (inserted == 0)
return new ExecutionResponse(204, "");
if (inserted == 0 && !unknownTypes.isEmpty())
return new ExecutionResponse(400,
new JSONObject().put("error", "Unknown timeseries type(s): " + String.join(", ", unknownTypes)
+ ". Create the type first with CREATE TIMESERIES TYPE.").toString());

if (!unknownTypes.isEmpty())
LogManager.instance().log(this, Level.WARNING,
"Skipped line protocol samples for unknown timeseries type(s): %s", null, unknownTypes);

if (!nonTimeSeriesTypes.isEmpty())
LogManager.instance().log(this, Level.WARNING,
"Skipped line protocol samples for non-timeseries type(s): %s", null, nonTimeSeriesTypes);

return new ExecutionResponse(204, "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import com.arcadedb.serializer.json.JSONObject;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.zip.GZIPOutputStream;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -89,6 +91,45 @@ void emptyBody() throws Exception {
});
}

@Test
void unknownMeasurementTypeReturnsError() throws Exception {
// When all samples reference measurement types that have no matching TIMESERIES TYPE,
// the handler must NOT silently return 204 - that hides data loss from the caller.
testEachServer((serverIndex) -> {
final String lineProtocol = "ghost_metric,host=srv1 value=1.0 1000\n";
final int statusCode = postLineProtocol(serverIndex, lineProtocol, "ms");
assertThat(statusCode).isEqualTo(400);
});
}

@Test
void gzipCompressedBodyIsAccepted() throws Exception {
// Telegraf's [[outputs.influxdb]] plugin sends Content-Encoding: gzip by default.
// The write handler must decompress the body before parsing it.
testEachServer((serverIndex) -> {
command(serverIndex,
"CREATE TIMESERIES TYPE disk TIMESTAMP ts TAGS (host STRING) FIELDS (used DOUBLE)");

final String lineProtocol = "disk,host=server1 used=42.0 1000000000\ndisk,host=server2 used=77.5 2000000000\n";

final byte[] compressed;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final GZIPOutputStream gzip = new GZIPOutputStream(baos)) {
gzip.write(lineProtocol.getBytes(StandardCharsets.UTF_8));
gzip.finish();
compressed = baos.toByteArray();
}

final int statusCode = postLineProtocolGzip(serverIndex, compressed, "ns");
assertThat(statusCode).isEqualTo(204);

final JSONObject result = executeCommand(serverIndex, "sql", "SELECT FROM disk");
assertThat(result).isNotNull();
final JSONArray records = result.getJSONObject("result").getJSONArray("records");
assertThat(records.length()).isEqualTo(2);
});
}

private int postLineProtocol(final int serverIndex, final String body, final String precision) throws Exception {
final HttpURLConnection connection = (HttpURLConnection) new URI(
"http://127.0.0.1:248" + serverIndex + "/api/v1/ts/graph/write?precision=" + precision)
Expand All @@ -108,4 +149,25 @@ private int postLineProtocol(final int serverIndex, final String body, final Str

return connection.getResponseCode();
}

private int postLineProtocolGzip(final int serverIndex, final byte[] compressedBody, final String precision) throws Exception {
final HttpURLConnection connection = (HttpURLConnection) new URI(
"http://127.0.0.1:248" + serverIndex + "/api/v1/ts/graph/write?precision=" + precision)
.toURL()
.openConnection();

connection.setRequestMethod("POST");
connection.setRequestProperty("Authorization",
"Basic " + Base64.getEncoder().encodeToString(("root:" + BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS).getBytes()));
connection.setRequestProperty("Content-Type", "text/plain");
connection.setRequestProperty("Content-Encoding", "gzip");
connection.setDoOutput(true);

try (final OutputStream os = connection.getOutputStream()) {
os.write(compressedBody);
os.flush();
}

return connection.getResponseCode();
}
}
Loading