Skip to content

Commit 5dfc6f9

Browse files
sdruzkinmeta-codesync[bot]
authored andcommitted
perf: Reduce Thrift+Nimble decoding CPU time by 32% by reusing ZSTD_DCtx across streams in Nimble deserialization (#639)
Summary: Pull Request resolved: #639 ZSTD_decompress() (one-shot API) internally allocates and frees a ZSTD_DCtx (~100-200KB) on every call. In the Nimble legacy deserialization path, this happens once per stream per batch, creating a hot alloc/free cycle through jemalloc's large extent allocator. This diff adds a persistent ZSTD_DCtx owned by nimble::Deserializer and threads it through StreamDataReader and StreamData to use ZSTD_decompressDCtx() instead. The change is backward-compatible: all new parameters default to nullptr, falling back to the original ZSTD_decompress() when no context is provided. Reusing ZSTD_DCtx should be safe according to https://facebook.github.io/zstd/zstd_manual.html#Chapter9 # Benchmark results on aidn_v2 CPU time went from 40 CPU seconds to 27 CPU seconds. Reviewed By: harsharastogi Differential Revision: D99456594
1 parent 1014d16 commit 5dfc6f9

5 files changed

Lines changed: 222 additions & 20 deletions

File tree

dwio/nimble/serializer/Deserializer.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,12 @@ class DeserializerImpl : public Decoder {
157157
// Add data starting at the given row offset.
158158
// version: the auto-detected serialization version, used to determine
159159
// encoding enabled and varint row count settings.
160+
// dctx: optional reusable ZSTD decompression context.
160161
void addBatch(
161162
uint32_t rowOffset,
162163
std::string_view data,
163-
SerializationVersion version) {
164+
SerializationVersion version,
165+
ZSTD_DCtx* dctx = nullptr) {
164166
if (data.empty()) {
165167
return;
166168
}
@@ -172,7 +174,8 @@ class DeserializerImpl : public Decoder {
172174
data,
173175
pool_,
174176
serde::StreamData::Options{
175-
.version = version, .bufferPool = bufferPool_.get()})});
177+
.version = version, .bufferPool = bufferPool_.get()},
178+
dctx)});
176179
}
177180

178181
// Record a segment where this key is present in every row (in-map stream
@@ -689,7 +692,8 @@ void Deserializer::deserialize(
689692
// Iterate batches and add stream data with row offsets. Streams missing from
690693
// a batch will have gaps that are filled later during reading.
691694
uint32_t rowOffset{0};
692-
serde::StreamDataReader reader{pool_, options_};
695+
auto* dctx = dctx_.get();
696+
serde::StreamDataReader reader{pool_, options_, dctx};
693697
for (auto sv : data) {
694698
const auto batchRows = reader.initialize(sv);
695699
const auto version = reader.version();
@@ -709,7 +713,7 @@ void Deserializer::deserialize(
709713
auto* decoder = deserializers_[offset];
710714
if (decoder != nullptr) {
711715
DeserializerImpl::toDecoderImpl(decoder)->addBatch(
712-
rowOffset, streamData, version);
716+
rowOffset, streamData, version, dctx);
713717
}
714718
}
715719
});

dwio/nimble/serializer/Deserializer.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
*/
1616
#pragma once
1717

18+
#include <zstd.h>
19+
1820
#include "dwio/nimble/serializer/Options.h"
1921
#include "dwio/nimble/velox/FieldReader.h"
2022
#include "folly/container/F14Map.h"
21-
#include "velox/common/memory/Memory.h"
2223
#include "velox/vector/BaseVector.h"
2324

2425
namespace facebook::nimble {
25-
2626
class Deserializer {
2727
public:
2828
using Options = DeserializerOptions;
@@ -43,6 +43,12 @@ class Deserializer {
4343
velox::VectorPtr& vector) const;
4444

4545
private:
46+
struct ZstdDCtxDeleter {
47+
void operator()(ZSTD_DCtx* ctx) const {
48+
ZSTD_freeDCtx(ctx);
49+
}
50+
};
51+
4652
// Creates deserializers for a type and its FlatMap inMap streams.
4753
void createDeserializersForType(const Type& type, uint32_t depth);
4854

@@ -82,6 +88,9 @@ class Deserializer {
8288
// Offsets that were set in inMapPresentOffsets_ this batch (for efficient
8389
// reset).
8490
mutable std::vector<uint32_t> inMapPresentOffsetsList_;
91+
92+
// Reusable ZSTD decompression context shared across all StreamData objects.
93+
mutable std::unique_ptr<ZSTD_DCtx, ZstdDCtxDeleter> dctx_{ZSTD_createDCtx()};
8594
};
8695

8796
} // namespace facebook::nimble

dwio/nimble/serializer/DeserializerImpl.cpp

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ StreamData::StreamData(
3232
ScalarKind kind,
3333
std::string_view data,
3434
velox::memory::MemoryPool* pool,
35-
const Options& options)
35+
const Options& options,
36+
ZSTD_DCtx* dctx)
3637
: kind_{kind},
3738
pool_{pool},
39+
dctx_{dctx},
3840
encodingEnabled_{nonLegacyFormat(options.version)},
3941
useVarintRowCount_{!isTabletRawFormat(options.version)},
4042
bufferPool_{options.bufferPool} {
@@ -63,10 +65,14 @@ uint32_t StreamData::decodeStrings(uint32_t count, std::string_view* output) {
6365
return index;
6466
}
6567

66-
void StreamData::reset(std::string_view data, SerializationVersion version) {
68+
void StreamData::reset(
69+
std::string_view data,
70+
SerializationVersion version,
71+
ZSTD_DCtx* dctx) {
6772
readRows_ = 0;
6873
encoding_.reset();
6974
stringBuffers_.clear();
75+
dctx_ = dctx;
7076
encodingEnabled_ = nonLegacyFormat(version);
7177
useVarintRowCount_ = !isTabletRawFormat(version);
7278
// Re-initialize with new data.
@@ -110,8 +116,17 @@ void StreamData::decompress() {
110116
decompressedSize != ZSTD_CONTENTSIZE_UNKNOWN,
111117
"Error determining decompressed size");
112118
decompressionBuffer_.resize(decompressedSize);
113-
const auto ret = ZSTD_decompress(
114-
decompressionBuffer_.data(), decompressedSize, pos_, compressedSize);
119+
const auto ret = dctx_ ? ZSTD_decompressDCtx(
120+
dctx_,
121+
decompressionBuffer_.data(),
122+
decompressedSize,
123+
pos_,
124+
compressedSize)
125+
: ZSTD_decompress(
126+
decompressionBuffer_.data(),
127+
decompressedSize,
128+
pos_,
129+
compressedSize);
115130
NIMBLE_CHECK(!ZSTD_isError(ret), "Error decompressing data");
116131
pos_ = decompressionBuffer_.data();
117132
end_ = pos_ + decompressionBuffer_.size();
@@ -201,8 +216,12 @@ uint32_t StreamData::decode(
201216

202217
StreamDataReader::StreamDataReader(
203218
velox::memory::MemoryPool* pool,
204-
const DeserializerOptions& options)
205-
: options_{options}, pool_{pool}, chunkStrippingBuffer_{pool_} {
219+
const DeserializerOptions& options,
220+
ZSTD_DCtx* dctx)
221+
: options_{options},
222+
pool_{pool},
223+
dctx_{dctx},
224+
chunkStrippingBuffer_{pool_} {
206225
NIMBLE_CHECK_NOT_NULL(pool_);
207226
}
208227

@@ -345,11 +364,17 @@ void StreamDataReader::appendChunkData(
345364
"Error determining decompressed size");
346365
const auto offset = chunkStrippingBuffer_.size();
347366
chunkStrippingBuffer_.resize(offset + decompressedSize);
348-
const auto ret = ZSTD_decompress(
349-
chunkStrippingBuffer_.data() + offset,
350-
decompressedSize,
351-
data,
352-
length);
367+
const auto ret = dctx_ ? ZSTD_decompressDCtx(
368+
dctx_,
369+
chunkStrippingBuffer_.data() + offset,
370+
decompressedSize,
371+
data,
372+
length)
373+
: ZSTD_decompress(
374+
chunkStrippingBuffer_.data() + offset,
375+
decompressedSize,
376+
data,
377+
length);
353378
NIMBLE_CHECK(!ZSTD_isError(ret), "Error decompressing chunk data");
354379
break;
355380
}

dwio/nimble/serializer/DeserializerImpl.h

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include <functional>
2020
#include <optional>
2121

22+
#include <zstd.h>
23+
2224
#include "dwio/nimble/common/BufferPool.h"
2325
#include "dwio/nimble/common/Vector.h"
2426
#include "dwio/nimble/encodings/Encoding.h"
@@ -51,11 +53,15 @@ class StreamData {
5153
/// @param data Stream data to initialize with.
5254
/// @param pool Memory pool for encoding buffer allocation.
5355
/// @param options Decode configuration (version, bufferPool).
56+
/// @param dctx Optional reusable ZSTD decompression context. When non-null,
57+
/// ZSTD_decompressDCtx is used instead of ZSTD_decompress, avoiding
58+
/// per-call allocation of a ~100-200KB DCtx.
5459
StreamData(
5560
ScalarKind kind,
5661
std::string_view data,
5762
velox::memory::MemoryPool* pool,
58-
const Options& options);
63+
const Options& options,
64+
ZSTD_DCtx* dctx = nullptr);
5965

6066
uint32_t copyTo(char* output, uint32_t bufferSize);
6167

@@ -79,7 +85,11 @@ class StreamData {
7985
/// @param data New stream data to initialize with.
8086
/// @param version Serialization version determining encoding and row count
8187
/// format.
82-
void reset(std::string_view data, SerializationVersion version);
88+
/// @param dctx Optional reusable ZSTD decompression context.
89+
void reset(
90+
std::string_view data,
91+
SerializationVersion version,
92+
ZSTD_DCtx* dctx = nullptr);
8393

8494
ScalarKind kind() const {
8595
return kind_;
@@ -114,6 +124,9 @@ class StreamData {
114124

115125
const ScalarKind kind_{ScalarKind::Undefined};
116126
velox::memory::MemoryPool* const pool_{nullptr};
127+
// Optional reusable ZSTD decompression context. Non-owning; caller manages
128+
// lifetime. When non-null, avoids per-call ZSTD_DCtx allocation (~100-200KB).
129+
ZSTD_DCtx* dctx_{nullptr};
117130
// Whether nimble encoding is enabled. Non-const to allow reset() to change.
118131
bool encodingEnabled_{false};
119132
// Whether encoding headers use varint row counts (true for kCompact/
@@ -150,7 +163,8 @@ class StreamDataReader {
150163
public:
151164
StreamDataReader(
152165
velox::memory::MemoryPool* pool,
153-
const DeserializerOptions& options);
166+
const DeserializerOptions& options,
167+
ZSTD_DCtx* dctx = nullptr);
154168

155169
/// Returns number of rows serialized.
156170
/// Validates that the version in serialized data matches options.
@@ -202,6 +216,7 @@ class StreamDataReader {
202216

203217
const DeserializerOptions& options_;
204218
velox::memory::MemoryPool* const pool_;
219+
ZSTD_DCtx* const dctx_{nullptr};
205220

206221
// Serialization version detected from data. If the data has a version
207222
// header, this is read from the first byte; otherwise defaults to kLegacy.

dwio/nimble/serializer/tests/SerializerImplTest.cpp

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1604,3 +1604,152 @@ TEST_F(TabletRawChunkStripTest, largePayloadMultipleChunks) {
16041604
ASSERT_EQ(result.size(), 1);
16051605
EXPECT_EQ(result[0].second, payload);
16061606
}
1607+
1608+
// Tests for ZSTD_DCtx reuse in StreamData and StreamDataReader (D99456594).
1609+
// Inherits from TabletRawChunkStripTest for shared helpers
1610+
// (buildCompressedChunk, pool_, etc.).
1611+
1612+
class ZstdDCtxReuseTest : public TabletRawChunkStripTest {
1613+
protected:
1614+
void SetUp() override {
1615+
TabletRawChunkStripTest::SetUp();
1616+
dctx_.reset(ZSTD_createDCtx());
1617+
}
1618+
1619+
// Build legacy-format compressed stream data for a non-string scalar:
1620+
// [CompressionType::Zstd (1B)][ZSTD-compressed payload]
1621+
static std::string buildLegacyCompressedData(std::string_view payload) {
1622+
std::string result;
1623+
result.push_back(static_cast<char>(CompressionType::Zstd));
1624+
const auto maxSize = ZSTD_compressBound(payload.size());
1625+
const auto offset = result.size();
1626+
result.resize(offset + maxSize);
1627+
const auto compressedSize = ZSTD_compress(
1628+
result.data() + offset, maxSize, payload.data(), payload.size(), 1);
1629+
NIMBLE_CHECK(!ZSTD_isError(compressedSize));
1630+
result.resize(offset + compressedSize);
1631+
return result;
1632+
}
1633+
1634+
// Like TabletRawChunkStripTest::deserializeTabletRaw, but passes
1635+
// the fixture's shared dctx to StreamDataReader.
1636+
std::vector<std::pair<uint32_t, std::string>> deserializeTabletRawWithDCtx(
1637+
uint32_t rowCount,
1638+
const std::vector<std::pair<uint32_t, std::string>>& streams) {
1639+
uint32_t maxOffset = 0;
1640+
for (const auto& [offset, _] : streams) {
1641+
maxOffset = std::max(maxOffset, offset);
1642+
}
1643+
std::vector<uint32_t> sizes(streams.empty() ? 0 : maxOffset + 1, 0);
1644+
std::string streamData;
1645+
for (const auto& [offset, data] : streams) {
1646+
sizes[offset] = static_cast<uint32_t>(data.size());
1647+
streamData.append(data);
1648+
}
1649+
1650+
std::string buffer;
1651+
serde::detail::writeHeader(
1652+
buffer, SerializationVersion::kTabletRaw, rowCount);
1653+
buffer.append(streamData);
1654+
serde::detail::writeRawTrailer(sizes, EncodingType::Trivial, buffer);
1655+
1656+
DeserializerOptions options{.hasHeader = true};
1657+
StreamDataReader reader(pool_.get(), options, dctx_.get());
1658+
auto actualRows = reader.initialize(std::string_view(buffer));
1659+
EXPECT_EQ(actualRows, rowCount);
1660+
1661+
std::vector<std::pair<uint32_t, std::string>> result;
1662+
reader.iterateStreams([&](uint32_t offset, std::string_view data) {
1663+
result.emplace_back(offset, std::string(data));
1664+
});
1665+
return result;
1666+
}
1667+
1668+
struct DCtxDeleter {
1669+
void operator()(ZSTD_DCtx* ctx) const {
1670+
ZSTD_freeDCtx(ctx);
1671+
}
1672+
};
1673+
1674+
std::unique_ptr<ZSTD_DCtx, DCtxDeleter> dctx_;
1675+
};
1676+
1677+
TEST_F(ZstdDCtxReuseTest, streamDataLegacyZstdWithDCtx) {
1678+
const std::vector<int32_t> expected = {10, 20, 30, 40};
1679+
std::string_view payload(
1680+
reinterpret_cast<const char*>(expected.data()),
1681+
expected.size() * sizeof(int32_t));
1682+
auto compressed = buildLegacyCompressedData(payload);
1683+
1684+
serde::StreamData sd(
1685+
ScalarKind::Int32,
1686+
SerializationVersion::kLegacy,
1687+
compressed,
1688+
pool_.get(),
1689+
dctx_.get());
1690+
1691+
std::vector<int32_t> output(expected.size());
1692+
sd.copyTo(
1693+
reinterpret_cast<char*>(output.data()), output.size() * sizeof(int32_t));
1694+
EXPECT_EQ(output, expected);
1695+
}
1696+
1697+
TEST_F(ZstdDCtxReuseTest, streamDataDCtxReusedAcrossReset) {
1698+
const std::vector<int32_t> values1 = {1, 2, 3};
1699+
std::string_view payload1(
1700+
reinterpret_cast<const char*>(values1.data()),
1701+
values1.size() * sizeof(int32_t));
1702+
auto compressed1 = buildLegacyCompressedData(payload1);
1703+
1704+
const std::vector<int32_t> values2 = {100, 200};
1705+
std::string_view payload2(
1706+
reinterpret_cast<const char*>(values2.data()),
1707+
values2.size() * sizeof(int32_t));
1708+
auto compressed2 = buildLegacyCompressedData(payload2);
1709+
1710+
// First decompression with dctx.
1711+
serde::StreamData sd(
1712+
ScalarKind::Int32,
1713+
SerializationVersion::kLegacy,
1714+
compressed1,
1715+
pool_.get(),
1716+
dctx_.get());
1717+
1718+
std::vector<int32_t> output1(values1.size());
1719+
sd.copyTo(
1720+
reinterpret_cast<char*>(output1.data()),
1721+
output1.size() * sizeof(int32_t));
1722+
EXPECT_EQ(output1, values1);
1723+
1724+
// Reset with new data, same dctx.
1725+
sd.reset(compressed2, SerializationVersion::kLegacy, dctx_.get());
1726+
1727+
std::vector<int32_t> output2(values2.size());
1728+
sd.copyTo(
1729+
reinterpret_cast<char*>(output2.data()),
1730+
output2.size() * sizeof(int32_t));
1731+
EXPECT_EQ(output2, values2);
1732+
}
1733+
1734+
TEST_F(ZstdDCtxReuseTest, streamDataReaderCompressedChunkWithDCtx) {
1735+
std::string payload = "compressed data that should be zstd encoded for test";
1736+
auto chunk = buildCompressedChunk(payload);
1737+
auto result = deserializeTabletRawWithDCtx(10, {{0, chunk}});
1738+
1739+
ASSERT_EQ(result.size(), 1);
1740+
EXPECT_EQ(result[0].first, 0);
1741+
EXPECT_EQ(result[0].second, payload);
1742+
}
1743+
1744+
TEST_F(ZstdDCtxReuseTest, streamDataReaderDCtxReusedAcrossStreams) {
1745+
std::string payload1 = "first compressed stream payload data";
1746+
std::string payload2 = "second compressed stream payload data";
1747+
auto chunk1 = buildCompressedChunk(payload1);
1748+
auto chunk2 = buildCompressedChunk(payload2);
1749+
1750+
auto result = deserializeTabletRawWithDCtx(10, {{0, chunk1}, {1, chunk2}});
1751+
1752+
ASSERT_EQ(result.size(), 2);
1753+
EXPECT_EQ(result[0].second, payload1);
1754+
EXPECT_EQ(result[1].second, payload2);
1755+
}

0 commit comments

Comments
 (0)