|
22 | 22 | #include "dwio/nimble/common/EncodingPrimitives.h" |
23 | 23 | #include "dwio/nimble/common/EncodingType.h" |
24 | 24 | #include "dwio/nimble/common/Exceptions.h" |
| 25 | +#include "dwio/nimble/common/Types.h" |
25 | 26 | #include "dwio/nimble/common/Vector.h" |
26 | 27 | #include "dwio/nimble/encodings/Encoding.h" |
| 28 | +#include "dwio/nimble/encodings/EncodingIdentifier.h" |
| 29 | +#include "dwio/nimble/encodings/EncodingSelection.h" |
27 | 30 | #include "dwio/nimble/encodings/legacy/EncodingFactory.h" |
28 | 31 |
|
29 | 32 | // Stores integer data in a delta encoding. We use three child encodings: |
@@ -69,8 +72,16 @@ class DeltaEncoding final |
69 | 72 | void skip(uint32_t rowCount) final; |
70 | 73 | void materialize(uint32_t rowCount, void* buffer) final; |
71 | 74 |
|
| 75 | + template <typename DecoderVisitor> |
| 76 | + void readWithVisitor(DecoderVisitor& visitor, ReadWithVisitorParams& params); |
| 77 | + |
72 | 78 | std::string debugString(int offset) const final; |
73 | 79 |
|
| 80 | + static std::string_view encode( |
| 81 | + EncodingSelection<physicalType>& selection, |
| 82 | + std::span<const physicalType> values, |
| 83 | + Buffer& buffer); |
| 84 | + |
74 | 85 | private: |
75 | 86 | physicalType currentValue_; |
76 | 87 | std::unique_ptr<Encoding> deltas_; |
@@ -174,162 +185,112 @@ void DeltaEncoding<T>::materialize(uint32_t rowCount, void* buffer) { |
174 | 185 | } |
175 | 186 | } |
176 | 187 |
|
177 | | -// namespace internal { |
| 188 | +template <typename T> |
| 189 | +template <typename DecoderVisitor> |
| 190 | +void DeltaEncoding<T>::readWithVisitor( |
| 191 | + DecoderVisitor& visitor, |
| 192 | + ReadWithVisitorParams& params) { |
| 193 | + detail::readWithVisitorSlow( |
| 194 | + visitor, |
| 195 | + params, |
| 196 | + [&](auto toSkip) { skip(toSkip); }, |
| 197 | + [&] { |
| 198 | + bool isRestatement; |
| 199 | + isRestatements_->materialize(1, &isRestatement); |
| 200 | + if (isRestatement) { |
| 201 | + restatements_->materialize(1, ¤tValue_); |
| 202 | + } else { |
| 203 | + physicalType delta; |
| 204 | + deltas_->materialize(1, &delta); |
| 205 | + currentValue_ += delta; |
| 206 | + } |
| 207 | + return currentValue_; |
| 208 | + }); |
| 209 | +} |
| 210 | + |
| 211 | +namespace internal { |
| 212 | + |
| 213 | +template <typename physicalType> |
| 214 | +void computeDeltas( |
| 215 | + std::span<const physicalType> values, |
| 216 | + Vector<physicalType>* deltas, |
| 217 | + Vector<physicalType>* restatements, |
| 218 | + Vector<bool>* isRestatements) { |
| 219 | + isRestatements->emplace_back(true); |
| 220 | + restatements->emplace_back(values[0]); |
| 221 | + if constexpr (isSignedIntegralType<physicalType>()) { |
| 222 | + for (uint32_t i = 1; i < values.size(); ++i) { |
| 223 | + const bool crossesZero = values[i] > 0 && values[i - 1] < 0; |
| 224 | + if (FOLLY_LIKELY(values[i] >= values[i - 1] && !crossesZero)) { |
| 225 | + isRestatements->emplace_back(false); |
| 226 | + deltas->emplace_back(values[i] - values[i - 1]); |
| 227 | + } else { |
| 228 | + isRestatements->emplace_back(true); |
| 229 | + restatements->emplace_back(values[i]); |
| 230 | + } |
| 231 | + } |
| 232 | + } else { |
| 233 | + for (uint32_t i = 1; i < values.size(); ++i) { |
| 234 | + if (FOLLY_LIKELY(values[i] >= values[i - 1])) { |
| 235 | + isRestatements->emplace_back(false); |
| 236 | + deltas->emplace_back(values[i] - values[i - 1]); |
| 237 | + } else { |
| 238 | + isRestatements->emplace_back(true); |
| 239 | + restatements->emplace_back(values[i]); |
| 240 | + } |
| 241 | + } |
| 242 | + } |
| 243 | +} |
| 244 | + |
| 245 | +} // namespace internal |
178 | 246 |
|
179 | | -// template <typename physicalType> |
180 | | -// void computeDeltas( |
181 | | -// std::span<const physicalType> values, |
182 | | -// Vector<physicalType>* deltas, |
183 | | -// Vector<physicalType>* restatements, |
184 | | -// Vector<bool>* isRestatements) { |
185 | | -// isRestatements->push_back(true); |
186 | | -// restatements->push_back(values[0]); |
187 | | -// // For signed integer types we avoid the potential overflow in the |
188 | | -// // delta by restating whenever the last value was negative and the |
189 | | -// // next is positive. We could be more elegant by storing the |
190 | | -// // deltas as the appropriate unsigned type. |
191 | | -// if constexpr (isSignedIntegralType<physicalType>()) { |
192 | | -// for (uint32_t i = 1; i < values.size(); ++i) { |
193 | | -// const bool crossesZero = values[i] > 0 && values[i - 1] < 0; |
194 | | -// if (values[i] >= values[i - 1] && !crossesZero) { |
195 | | -// isRestatements->push_back(false); |
196 | | -// deltas->push_back(values[i] - values[i - 1]); |
197 | | -// } else { |
198 | | -// isRestatements->push_back(true); |
199 | | -// restatements->push_back(values[i]); |
200 | | -// } |
201 | | -// } |
202 | | -// } else { |
203 | | -// for (uint32_t i = 1; i < values.size(); ++i) { |
204 | | -// if (values[i] >= values[i - 1]) { |
205 | | -// isRestatements->push_back(false); |
206 | | -// deltas->push_back(values[i] - values[i - 1]); |
207 | | -// } else { |
208 | | -// isRestatements->push_back(true); |
209 | | -// restatements->push_back(values[i]); |
210 | | -// } |
211 | | -// } |
212 | | -// } |
213 | | -// } |
| 247 | +template <typename T> |
| 248 | +std::string_view DeltaEncoding<T>::encode( |
| 249 | + EncodingSelection<physicalType>& selection, |
| 250 | + std::span<const physicalType> values, |
| 251 | + Buffer& buffer) { |
| 252 | + if (values.empty()) { |
| 253 | + NIMBLE_INCOMPATIBLE_ENCODING("DeltaEncoding can't be used with 0 rows."); |
| 254 | + } |
214 | 255 |
|
215 | | -// } // namespace internal |
| 256 | + const uint32_t rowCount = static_cast<uint32_t>(values.size()); |
| 257 | + Vector<physicalType> deltas(&buffer.getMemoryPool()); |
| 258 | + Vector<physicalType> restatements(&buffer.getMemoryPool()); |
| 259 | + Vector<bool> isRestatements(&buffer.getMemoryPool()); |
216 | 260 |
|
217 | | -// template <typename T> |
218 | | -// bool DeltaEncoding<T>::estimateSize( |
219 | | -// velox::memory::MemoryPool& memoryPool, |
220 | | -// std::span<const T> dataValues, |
221 | | -// OptimalSearchParams searchParams, |
222 | | -// encodings::EncodingParameters& encodingParameters, |
223 | | -// uint32_t* size) { |
224 | | -// auto values = |
225 | | -// EncodingPhysicalType<T>::asEncodingPhysicalTypeSpan(dataValues); if |
226 | | -// (values.empty()) { |
227 | | -// return false; |
228 | | -// } |
229 | | -// Vector<physicalType> deltas(&memoryPool); |
230 | | -// Vector<physicalType> restatements(&memoryPool); |
231 | | -// Vector<bool> isRestatements(&memoryPool); |
232 | | -// internal::computeDeltas(values, &deltas, &restatements, &isRestatements); |
233 | | -// uint32_t deltasSize; |
234 | | -// auto& deltaEncodingParameters = encodingParameters.set_delta(); |
235 | | -// estimateOptimalEncodingSize<physicalType>( |
236 | | -// memoryPool, |
237 | | -// deltas, |
238 | | -// searchParams, |
239 | | -// &deltasSize, |
240 | | -// deltaEncodingParameters.deltasParameters().ensure()); |
241 | | -// uint32_t restatementsSize; |
242 | | -// estimateOptimalEncodingSize<physicalType>( |
243 | | -// memoryPool, |
244 | | -// restatements, |
245 | | -// searchParams, |
246 | | -// &restatementsSize, |
247 | | -// deltaEncodingParameters.restatementsParameters().ensure()); |
248 | | -// uint32_t isRestatementsSize; |
249 | | -// estimateOptimalEncodingSize<bool>( |
250 | | -// memoryPool, |
251 | | -// isRestatements, |
252 | | -// searchParams, |
253 | | -// &isRestatementsSize, |
254 | | -// deltaEncodingParameters.isRestatementsParameters().ensure()); |
255 | | -// *size = Encoding::kPrefixSize + 8 + deltasSize + restatementsSize + |
256 | | -// isRestatementsSize; |
257 | | -// return true; |
258 | | -// } |
| 261 | + internal::computeDeltas(values, &deltas, &restatements, &isRestatements); |
259 | 262 |
|
260 | | -// template <typename T> |
261 | | -// std::string_view DeltaEncoding<T>::serialize( |
262 | | -// std::span<const T> values, |
263 | | -// Buffer* buffer) { |
264 | | -// uint32_t unusedSize; |
265 | | -// encodings::EncodingParameters encodingParameters; |
266 | | -// // Hrm should we pass these in? This call won't normally be used outside of |
267 | | -// // testing. |
268 | | -// OptimalSearchParams searchParams; |
269 | | -// estimateSize( |
270 | | -// buffer->getMemoryPool(), |
271 | | -// values, |
272 | | -// searchParams, |
273 | | -// encodingParameters, |
274 | | -// &unusedSize); |
275 | | -// return serialize(values, encodingParameters, buffer); |
276 | | -// } |
| 263 | + Buffer tempBuffer{buffer.getMemoryPool()}; |
277 | 264 |
|
278 | | -// template <typename T> |
279 | | -// std::string_view DeltaEncoding<T>::serialize( |
280 | | -// std::span<const T> dataValues, |
281 | | -// const encodings::EncodingParameters& encodingParameters, |
282 | | -// Buffer* buffer) { |
283 | | -// auto values = |
284 | | -// EncodingPhysicalType<T>::asEncodingPhysicalTypeSpan(dataValues); if |
285 | | -// (values.empty()) { |
286 | | -// NIMBLE_INCOMPATIBLE_ENCODING("DeltaEncoding can't be used with 0 rows."); |
287 | | -// } |
288 | | -// NIMBLE_CHECK( |
289 | | -// encodingParameters.getType() == |
290 | | -// encodings::EncodingParameters::Type::delta && |
291 | | -// encodingParameters.delta_ref().has_value() && |
292 | | -// encodingParameters.delta_ref()->deltasParameters().has_value() && |
293 | | -// encodingParameters.delta_ref() |
294 | | -// ->restatementsParameters() |
295 | | -// .has_value() && |
296 | | -// encodingParameters.delta_ref() |
297 | | -// ->isRestatementsParameters() |
298 | | -// .has_value(), |
299 | | -// "Incomplete or incompatible Delta encoding parameters."); |
| 265 | + const std::string_view serializedDeltas = |
| 266 | + selection.template encodeNested<physicalType>( |
| 267 | + EncodingIdentifiers::Delta::Deltas, deltas, tempBuffer); |
| 268 | + const std::string_view serializedRestatements = |
| 269 | + selection.template encodeNested<physicalType>( |
| 270 | + EncodingIdentifiers::Delta::Restatements, restatements, tempBuffer); |
| 271 | + const std::string_view serializedIsRestatements = |
| 272 | + selection.template encodeNested<bool>( |
| 273 | + EncodingIdentifiers::Delta::IsRestatements, |
| 274 | + isRestatements, |
| 275 | + tempBuffer); |
300 | 276 |
|
301 | | -// auto& memoryPool = buffer->getMemoryPool(); |
302 | | -// const uint32_t rowCount = values.size(); |
303 | | -// Vector<physicalType> deltas(&memoryPool); |
304 | | -// Vector<physicalType> restatements(&memoryPool); |
305 | | -// Vector<bool> isRestatements(&memoryPool); |
306 | | -// auto& deltaEncodingParameters = encodingParameters.delta_ref().value(); |
307 | | -// internal::computeDeltas(values, &deltas, &restatements, &isRestatements); |
308 | | -// std::string_view serializedDeltas = serializeEncoding<physicalType>( |
309 | | -// deltas, deltaEncodingParameters.deltasParameters().value(), buffer); |
310 | | -// std::string_view serializedRestatements = serializeEncoding<physicalType>( |
311 | | -// restatements, |
312 | | -// deltaEncodingParameters.restatementsParameters().value(), |
313 | | -// buffer); |
314 | | -// std::string_view serializedIsRestatements = serializeEncoding<bool>( |
315 | | -// isRestatements, |
316 | | -// deltaEncodingParameters.isRestatementsParameters().value(), |
317 | | -// buffer); |
318 | | -// const uint32_t encodingSize = Encoding::kPrefixSize + 8 + |
319 | | -// serializedDeltas.size() + serializedRestatements.size() + |
320 | | -// serializedIsRestatements.size(); |
321 | | -// char* reserved = buffer->reserve(encodingSize); |
322 | | -// char* pos = reserved; |
323 | | -// Encoding::serializePrefix( |
324 | | -// EncodingType::Delta, TypeTraits<T>::dataType, rowCount, pos); |
325 | | -// encoding::writeUint32(serializedDeltas.size(), pos); |
326 | | -// encoding::writeUint32(serializedRestatements.size(), pos); |
327 | | -// encoding::writeBytes(serializedDeltas, pos); |
328 | | -// encoding::writeBytes(serializedRestatements, pos); |
329 | | -// encoding::writeBytes(serializedIsRestatements, pos); |
330 | | -// NIMBLE_DCHECK_EQ(pos - reserved, encodingSize, "Encoding size mismatch."); |
331 | | -// return {reserved, encodingSize}; |
332 | | -// } |
| 277 | + const uint32_t encodingSize = Encoding::kPrefixSize + 8 + |
| 278 | + static_cast<uint32_t>(serializedDeltas.size()) + |
| 279 | + static_cast<uint32_t>(serializedRestatements.size()) + |
| 280 | + static_cast<uint32_t>(serializedIsRestatements.size()); |
| 281 | + char* reserved = buffer.reserve(encodingSize); |
| 282 | + char* pos = reserved; |
| 283 | + Encoding::serializePrefix( |
| 284 | + EncodingType::Delta, TypeTraits<T>::dataType, rowCount, false, pos); |
| 285 | + encoding::writeUint32(static_cast<uint32_t>(serializedDeltas.size()), pos); |
| 286 | + encoding::writeUint32( |
| 287 | + static_cast<uint32_t>(serializedRestatements.size()), pos); |
| 288 | + encoding::writeBytes(serializedDeltas, pos); |
| 289 | + encoding::writeBytes(serializedRestatements, pos); |
| 290 | + encoding::writeBytes(serializedIsRestatements, pos); |
| 291 | + NIMBLE_DCHECK_EQ(pos - reserved, encodingSize, "Encoding size mismatch."); |
| 292 | + return {reserved, encodingSize}; |
| 293 | +} |
333 | 294 |
|
334 | 295 | template <typename T> |
335 | 296 | std::string DeltaEncoding<T>::debugString(int offset) const { |
|
0 commit comments