diff --git a/velox/experimental/wave/CMakeLists.txt b/velox/experimental/wave/CMakeLists.txt index 709bed6bdfcc..cb76c28c72e1 100644 --- a/velox/experimental/wave/CMakeLists.txt +++ b/velox/experimental/wave/CMakeLists.txt @@ -15,3 +15,4 @@ add_subdirectory(common) add_subdirectory(exec) add_subdirectory(vector) +add_subdirectory(dwio/decode) diff --git a/velox/experimental/wave/dwio/decode/CMakeLists.txt b/velox/experimental/wave/dwio/decode/CMakeLists.txt new file mode 100644 index 000000000000..cb6bedda4d49 --- /dev/null +++ b/velox/experimental/wave/dwio/decode/CMakeLists.txt @@ -0,0 +1,21 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_subdirectory(tests) + +add_library(velox_wave_decode GpuDecoder.cu) + +set_target_properties(velox_wave_decode PROPERTIES CUDA_ARCHITECTURES native) + +target_link_libraries(velox_wave_decode velox_wave_common) diff --git a/velox/experimental/wave/dwio/decode/DecodeStep.h b/velox/experimental/wave/dwio/decode/DecodeStep.h new file mode 100644 index 000000000000..7ed2c2ebaeb8 --- /dev/null +++ b/velox/experimental/wave/dwio/decode/DecodeStep.h @@ -0,0 +1,236 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/experimental/wave/common/Buffer.h" +#include "velox/experimental/wave/common/GpuArena.h" +#include "velox/experimental/wave/vector/Operand.h" + +namespace facebook::velox::wave { + +/// Instructions for GPU decode. This can be decoding, +/// or pre/post processing other than decoding. +enum class DecodeStep { + kConstant32, + kConstant64, + kConstantChar, + kConstantBool, + kConstantBytes, + kTrivial, + kTrivialNoOp, + kMainlyConstant, + kBitpack32, + kBitpack64, + kRleTotalLength, + kRleBool, + kRle, + kDictionary, + kDictionaryOnBitpack, + kVarint, + kNullable, + kSentinel, + kSparseBool, + kMakeScatterIndices, + kScatter32, + kScatter64, + kLengthToOffset, + kMissing, + kStruct, + kArray, + kMap, + kFlatMap, + kFlatMapNode, + kUnsupported, +}; + +/// Describes a decoding loop's input and result disposition. +struct GpuDecode { + // The operation to perform. Decides which branch of the union to use. + DecodeStep step; + + struct Trivial { + // Type of the input and result data. + WaveTypeKind dataType; + // Input data. + const void* input; + // Begin position for input, scatter and result. + int begin; + // End position (exclusive) for input, scatter and result. + int end; + // If not null, contains the output position relative to result pointer. + const int32_t* scatter; + // Starting address of the result. + void* result; + }; + + struct MainlyConstant { + // Type of the values and result. + WaveTypeKind dataType; + // Number of total values that should be written to result. + int count; + // Common value that is repeated. + const void* commonValue; + // Sparse values that will be picked up when isCommon is false. + const void* otherValues; + // Bitmask indicating whether a values is common or not. + const uint8_t* isCommon; + // Temporary storage to keep non-common value indices. Should be + // preallocated to at least count large. + int32_t* otherIndices; + // Starting address of the result. + void* result; + // If non-null, the count of non-common elements will be written to this + // address. + int32_t* otherCount; + }; + + struct DictionaryOnBitpack { + // Type of the alphabet and result. + WaveTypeKind dataType; + // Dictionary alphabet. + const void* alphabet; + // Indices into the alphabet. + const uint64_t* indices; + // Begin position for indices, scatter and result. + int begin; + // End position (exclusive) for indices, scatter and result. + int end; + // Bit width of each index. + int bitWidth; + // If not null, contains the output position relative to result pointer. + const int32_t* scatter; + // All indices should be offseted by this baseline. + int64_t baseline; + // Starting address of the result. + void* result; + }; + + struct Varint { + // Address of the input data. + const char* input; + // Byte size of the input data. + int size; + // Temporary storage to keep whether each byte is a terminal for a number. + // Should be allocated at least "size" large. + bool* ends; + // Temporary storage to keep the location of end position of each number. + // Should be allocated at least "size" large. + int32_t* endPos; + // Type of the result number. + WaveTypeKind resultType; + // Starting address of the result. + void* result; + // Count of the numbers in the result. + int resultSize; + }; + + struct SparseBool { + // Number of bits in the result. + int totalCount; + // Sparse value; common value is the opposite of this. + bool sparseValue; + // Bit indices where sparse value should be stored. + const int32_t* sparseIndices; + // Number of sparse values. + int sparseCount; + // Temporary storage to keep bool representation of bits. Should be at + // least totalCount large. + bool* bools; + // Address of the result bits. We do not support unaligned result because + // different blocks could access the same byte and cause racing condition. + uint8_t* result; + }; + + struct RleTotalLength { + // Input data to be summed. + const int32_t* input; + // Number of input data. + int count; + // The sum of input data. + int64_t* result; + }; + + struct Rle { + // Type of values and result. + WaveTypeKind valueType; + // Values that will be repeated. + const void* values; + // Length of each value. + const int32_t* lengths; + // Number of values and lengths. + int count; + // Starting address of the result. + const void* result; + }; + + struct MakeScatterIndices { + // Input bits. + const uint8_t* bits; + // Whether set or unset bits should be found in the input. + bool findSetBits; + // Begin bit position of the input. + int begin; + // End bit position (exclusive) of the input. + int end; + // Address of the result indices. + int32_t* indices; + // If non-null, store the number of indices being written. + int32_t* indicesCount; + }; + + union { + Trivial trivial; + MainlyConstant mainlyConstant; + DictionaryOnBitpack dictionaryOnBitpack; + Varint varint; + SparseBool sparseBool; + RleTotalLength rleTotalLength; + Rle rle; + MakeScatterIndices makeScatterIndices; + } data; + + /// Returns the amount f shared memory for standard size thread block for + /// 'step'. + int32_t sharedMemorySize() const; +}; + +struct DecodePrograms { + // Set of decode programs submitted as a unit. Each vector is run + // on its own thread block. The consecutive DecodeSteps in the same program + // are consecutive and the next one can depend on a previous one. + std::vector>> programs; + + /// Unified or device memory buffer where steps in 'programs' write results + /// for the host. Decode results stay on device, only control information like + /// filter result counts or length sums come to the host via this buffer. If + /// nullptr, no data transfer is scheduled. 'result' should be nullptr if all + /// steps are unconditional, like simple decoding. + WaveBufferPtr result; + /// Host addressable copy of 'result'. If result is unified memory + /// this can be nullptr and we just enqueue a prefetch. to host. If + /// 'result' is device memory, this should be pinned host memory + /// with the same size. + WaveBufferPtr hostResult; +}; + +void launchDecode( + const DecodePrograms& programs, + GpuArena* arena, + WaveBufferPtr& extra, + Stream* stream); + +} // namespace facebook::velox::wave diff --git a/velox/experimental/wave/dwio/decode/GpuDecoder-inl.cuh b/velox/experimental/wave/dwio/decode/GpuDecoder-inl.cuh new file mode 100644 index 000000000000..8295b84ec151 --- /dev/null +++ b/velox/experimental/wave/dwio/decode/GpuDecoder-inl.cuh @@ -0,0 +1,597 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include // @manual + +namespace facebook::velox::wave { + +namespace detail { + +template +__device__ void decodeTrivial(GpuDecode::Trivial& op) { + auto address = reinterpret_cast(op.input); + constexpr uint64_t kMask = sizeof(T) - 1; + int32_t lowShift = (address & kMask) * 8; + int32_t highShift = sizeof(T) * 8 - lowShift; + auto source = reinterpret_cast(address & ~kMask); + int32_t end = op.end; + T* result = reinterpret_cast(op.result); + auto scatter = op.scatter; + if (scatter) { + for (auto i = op.begin + threadIdx.x; i < end; i += blockDim.x) { + result[scatter[i]] = + (source[i] >> lowShift) | (source[i + 1] << highShift); + } + } else { + for (auto i = op.begin + threadIdx.x; i < end; i += blockDim.x) { + result[i] = (source[i] >> lowShift) | (source[i + 1] << highShift); + } + } +} + +__device__ inline void decodeTrivial(GpuDecode& plan) { + auto& op = plan.data.trivial; + switch (op.dataType) { + case WaveTypeKind::TINYINT: + decodeTrivial(op); + break; + case WaveTypeKind::SMALLINT: + decodeTrivial(op); + break; + case WaveTypeKind::INTEGER: + case WaveTypeKind::REAL: + decodeTrivial(op); + break; + case WaveTypeKind::BIGINT: + case WaveTypeKind::DOUBLE: + decodeTrivial(op); + break; + default: + if (threadIdx.x == 0) { + printf("ERROR: Unsupported data type for Trivial\n"); + } + } +} + +enum class ResultOp { kDict, kBaseline, kDictScatter, kBaselineScatter }; + +template +__device__ void storeResult( + int32_t i, + int32_t bitfield, + T baseline, + const T* dict, + const int32_t* scatter, + T* result) { + if constexpr (op == ResultOp::kBaseline) { + result[i] = bitfield + baseline; + } else if constexpr (op == ResultOp::kBaselineScatter) { + result[scatter[i]] = bitfield + baseline; + } else if constexpr (op == ResultOp::kDict) { + result[i] = dict[bitfield + baseline]; + } else if constexpr (op == ResultOp::kDictScatter) { + result[scatter[i]] = dict[bitfield + baseline]; + } +} + +template +__device__ void decodeDictionaryOnBitpack(GpuDecode::DictionaryOnBitpack& op) { + int32_t i = op.begin + threadIdx.x; + auto end = op.end; + auto address = reinterpret_cast(op.indices); + int32_t alignOffset = (address & 7) * 8; + address &= ~7UL; + auto words = reinterpret_cast(address); + const T* dict = reinterpret_cast(op.alphabet); + auto scatter = op.scatter; + auto baseline = op.baseline; + auto bitWidth = op.bitWidth; + uint64_t mask = (1LU << bitWidth) - 1; + auto result = reinterpret_cast(op.result); + for (; i < end; i += blockDim.x) { + int32_t bitIndex = i * bitWidth + alignOffset; + int32_t wordIndex = bitIndex >> 6; + int32_t bit = bitIndex & 63; + uint64_t word = words[wordIndex]; + uint64_t low = word >> bit; + if (bitWidth + bit <= 64) { + int32_t index = low & mask; + storeResult(i, index, baseline, dict, scatter, result); + } else { + uint64_t nextWord = words[wordIndex + 1]; + int32_t bitsFromNext = bitWidth - (64 - bit); + int32_t index = + low | ((nextWord & ((1 << bitsFromNext) - 1)) << (64 - bit)); + storeResult(i, index, baseline, dict, scatter, result); + } + } +} + +template +__device__ void decodeDictionaryOnBitpack(GpuDecode::DictionaryOnBitpack& op) { + if (!op.scatter) { + if (op.alphabet) { + decodeDictionaryOnBitpack(op); + } else { + decodeDictionaryOnBitpack(op); + } + } else { + if (op.alphabet) { + decodeDictionaryOnBitpack(op); + } else { + decodeDictionaryOnBitpack(op); + } + } +} + +__device__ inline void decodeDictionaryOnBitpack(GpuDecode& plan) { + auto& op = plan.data.dictionaryOnBitpack; + switch (op.dataType) { + case WaveTypeKind::TINYINT: + decodeDictionaryOnBitpack(op); + break; + case WaveTypeKind::SMALLINT: + decodeDictionaryOnBitpack(op); + break; + case WaveTypeKind::INTEGER: + decodeDictionaryOnBitpack(op); + break; + case WaveTypeKind::BIGINT: + decodeDictionaryOnBitpack(op); + break; + default: + if (threadIdx.x == 0) { + printf("ERROR: Unsupported data type for DictionaryOnBitpack\n"); + assert(false); + } + } +} + +template +__device__ int scatterIndices( + const T* values, + T value, + int32_t begin, + int32_t end, + int32_t* indices) { + typedef cub::BlockScan BlockScan; + extern __shared__ __align__( + alignof(typename BlockScan::TempStorage)) char smem[]; + auto* scanStorage = reinterpret_cast(smem); + int numMatch; + bool match; + int32_t k = 0; + for (auto j = begin; j < end; j += kBlockSize) { + auto jt = j + threadIdx.x; + numMatch = match = (jt < end && values[jt] == value); + int subtotal; + BlockScan(*scanStorage).ExclusiveSum(numMatch, numMatch, subtotal); + __syncthreads(); + if (match) { + indices[k + numMatch] = jt - begin; + } + k += subtotal; + } + return k; +} + +template +__device__ int scatterIndices( + const uint8_t* bits, + bool value, + int32_t begin, + int32_t end, + int32_t* indices) { + typedef cub::BlockScan BlockScan; + extern __shared__ __align__( + alignof(typename BlockScan::TempStorage)) char smem[]; + auto* scanStorage = reinterpret_cast(smem); + constexpr int kPerThread = 8; + int numMatch[kPerThread]; + bool match[kPerThread]; + int32_t k = 0; + constexpr auto kBitsPerBlock = kBlockSize * kPerThread; + for (auto j = begin; j < end; j += kBitsPerBlock) { + auto jt = j + threadIdx.x * kPerThread; + for (auto i = 0; i < kPerThread; ++i) { + numMatch[i] = match[i] = jt + i < end && isSet(bits, jt + i) == value; + } + int subtotal; + BlockScan(*scanStorage).ExclusiveSum(numMatch, numMatch, subtotal); + __syncthreads(); + for (auto i = 0; i < kPerThread; ++i) { + if (match[i]) { + indices[k + numMatch[i]] = jt + i - begin; + } + } + k += subtotal; + } + return k; +} + +__device__ inline void +bitpackBools(const bool* input, int count, uint8_t* out) { + int nbytes = count / 8; + for (auto i = threadIdx.x; i < nbytes; i += blockDim.x) { + uint8_t value = 0; + for (int j = 0; j < 8; ++j) { + value |= input[8 * i + j] << j; + } + out[i] = value; + } + if (threadIdx.x == 0 && count % 8 != 0) { + auto extra = count % 8; + for (int j = count - extra; j < count; ++j) { + setBit(out, j, input[j]); + } + } +} + +__device__ inline void decodeSparseBool(GpuDecode::SparseBool& op) { + for (int i = threadIdx.x; i < op.totalCount; i += blockDim.x) { + op.bools[i] = !op.sparseValue; + } + __syncthreads(); + for (int i = threadIdx.x; i < op.sparseCount; i += blockDim.x) { + op.bools[op.sparseIndices[i]] = op.sparseValue; + } + __syncthreads(); + bitpackBools(op.bools, op.totalCount, op.result); +} + +__device__ inline uint32_t readVarint32(const char** pos) { + uint32_t value = (**pos) & 127; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= (**pos & 127) << 7; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= (**pos & 127) << 14; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= (**pos & 127) << 21; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= (*((*pos)++) & 127) << 28; + return value; +} + +__device__ inline uint64_t readVarint64(const char** pos) { + uint64_t value = (**pos) & 127; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= (**pos & 127) << 7; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= (**pos & 127) << 14; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= (**pos & 127) << 21; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= static_cast(**pos & 127) << 28; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= static_cast(**pos & 127) << 35; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= static_cast(**pos & 127) << 42; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= static_cast(**pos & 127) << 49; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= static_cast(**pos & 127) << 56; + if (!(*((*pos)++) & 128)) { + return value; + } + value |= static_cast(*((*pos)++) & 127) << 63; + return value; +} + +template +__device__ int decodeVarint( + const char* input, + int size, + bool* ends, + int32_t* endPos, + T* output) { + for (auto i = threadIdx.x; i < size; i += blockDim.x) { + ends[i] = ~input[i] & 0x80; + } + auto numOut = scatterIndices(ends, true, 0, size, endPos); + for (auto i = threadIdx.x; i < numOut; i += blockDim.x) { + auto* pos = input + (i == 0 ? 0 : (endPos[i - 1] + 1)); + if constexpr (sizeof(T) == 4) { + output[i] = readVarint32(&pos); + } else { + static_assert(sizeof(T) == 8); + output[i] = readVarint64(&pos); + } + } + return numOut; +} + +template +__device__ void decodeVarint(GpuDecode& plan) { + auto& op = plan.data.varint; + int resultSize; + switch (op.resultType) { + case WaveTypeKind::INTEGER: + resultSize = decodeVarint( + op.input, op.size, op.ends, op.endPos, (uint32_t*)op.result); + break; + case WaveTypeKind::BIGINT: + resultSize = decodeVarint( + op.input, op.size, op.ends, op.endPos, (uint64_t*)op.result); + break; + default: + if (threadIdx.x == 0) { + printf("ERROR: Unsupported result type for varint decoder\n"); + assert(false); + } + } + if (threadIdx.x == 0) { + op.resultSize = resultSize; + } +} + +template +__device__ void decodeMainlyConstant(GpuDecode::MainlyConstant& op) { + auto otherCount = scatterIndices( + op.isCommon, false, 0, op.count, op.otherIndices); + auto commonValue = *(const T*)op.commonValue; + auto* otherValues = (const T*)op.otherValues; + auto* result = (T*)op.result; + for (int i = threadIdx.x; i < op.count; i += blockDim.x) { + result[i] = commonValue; + } + __syncthreads(); + for (int i = threadIdx.x; i < otherCount; i += blockDim.x) { + result[op.otherIndices[i]] = otherValues[i]; + } + if (threadIdx.x == 0 && op.otherCount) { + *op.otherCount = otherCount; + } +} + +template +__device__ void decodeMainlyConstant(GpuDecode& plan) { + auto& op = plan.data.mainlyConstant; + switch (op.dataType) { + case WaveTypeKind::TINYINT: + decodeMainlyConstant(op); + break; + case WaveTypeKind::SMALLINT: + decodeMainlyConstant(op); + break; + case WaveTypeKind::INTEGER: + case WaveTypeKind::REAL: + decodeMainlyConstant(op); + break; + case WaveTypeKind::BIGINT: + case WaveTypeKind::DOUBLE: + decodeMainlyConstant(op); + break; + default: + if (threadIdx.x == 0) { + printf("ERROR: Unsupported data type for MainlyConstant\n"); + assert(false); + } + } +} + +template +__device__ T sum(const U* values, int size) { + using Reduce = cub::BlockReduce; + extern __shared__ __align__( + alignof(typename Reduce::TempStorage)) char smem[]; + auto* reduceStorage = reinterpret_cast(smem); + T total = 0; + for (int i = 0; i < size; i += kBlockSize) { + auto numValid = min(size - i, kBlockSize); + T value; + if (threadIdx.x < numValid) { + value = values[i + threadIdx.x]; + } + total += Reduce(*reduceStorage).Sum(value, numValid); + __syncthreads(); + } + return total; +} + +template +__device__ void rleTotalLength(GpuDecode::RleTotalLength& op) { + auto result = sum(op.input, op.count); + if (threadIdx.x == 0) { + *op.result = result; + } +} + +template +__device__ int upperBound(const T* data, int size, T target) { + int lo = 0, hi = size; + while (lo < hi) { + int i = (lo + hi) / 2; + if (data[i] <= target) { + lo = i + 1; + } else { + hi = i; + } + } + return lo; +} + +template +__device__ void decodeRle(GpuDecode::Rle& op) { + using BlockScan = cub::BlockScan; + extern __shared__ __align__( + alignof(typename BlockScan::TempStorage)) char smem[]; + auto* scanStorage = reinterpret_cast(smem); + + static_assert(sizeof(*scanStorage) >= sizeof(int32_t) * kBlockSize); + auto* offsets = (int32_t*)scanStorage; + auto* values = (const T*)op.values; + auto* result = (T*)op.result; + int total = 0; + for (int i = 0; i < op.count; i += blockDim.x) { + auto ti = threadIdx.x + i; + auto len = ti < op.count ? op.lengths[ti] : 0; + int32_t offset, subtotal; + __syncthreads(); + BlockScan(*scanStorage).InclusiveSum(len, offset, subtotal); + __syncthreads(); + offsets[threadIdx.x] = offset; + __syncthreads(); + for (int j = threadIdx.x; j < subtotal; j += blockDim.x) { + result[total + j] = values[i + upperBound(offsets, blockDim.x, j)]; + } + total += subtotal; + } +} + +template +__device__ void decodeRle(GpuDecode& plan) { + auto& op = plan.data.rle; + switch (op.valueType) { + case WaveTypeKind::TINYINT: + decodeRle(op); + break; + case WaveTypeKind::SMALLINT: + decodeRle(op); + break; + case WaveTypeKind::INTEGER: + case WaveTypeKind::REAL: + decodeRle(op); + break; + case WaveTypeKind::BIGINT: + case WaveTypeKind::DOUBLE: + decodeRle(op); + break; + default: + if (threadIdx.x == 0) { + printf("ERROR: Unsupported value type for Rle\n"); + assert(false); + } + } +} + +template +__device__ void makeScatterIndices(GpuDecode::MakeScatterIndices& op) { + auto indicesCount = scatterIndices( + op.bits, op.findSetBits, op.begin, op.end, op.indices); + if (threadIdx.x == 0 && op.indicesCount) { + *op.indicesCount = indicesCount; + } +} +template +__device__ void decodeSwitch(GpuDecode& op) { + switch (op.step) { + case DecodeStep::kTrivial: + detail::decodeTrivial(op); + break; + case DecodeStep::kDictionaryOnBitpack: + detail::decodeDictionaryOnBitpack(op); + break; + case DecodeStep::kSparseBool: + detail::decodeSparseBool(op.data.sparseBool); + break; + case DecodeStep::kMainlyConstant: + detail::decodeMainlyConstant(op); + break; + case DecodeStep::kVarint: + detail::decodeVarint(op); + break; + case DecodeStep::kRleTotalLength: + detail::rleTotalLength(op.data.rleTotalLength); + break; + case DecodeStep::kRle: + detail::decodeRle(op); + break; + case DecodeStep::kMakeScatterIndices: + detail::makeScatterIndices(op.data.makeScatterIndices); + break; + default: + if (threadIdx.x == 0) { + printf("ERROR: Unsupported DecodeStep (with shared memory)\n"); + } + } +} + +template +__global__ void decodeGlobal(GpuDecode* plan) { + decodeSwitch(plan[blockIdx.x]); +} + +template +int32_t sharedMemorySizeForDecode(DecodeStep step) { + using Reduce32 = cub::BlockReduce; + using BlockScan32 = cub::BlockScan; + switch (step) { + case DecodeStep::kTrivial: + case DecodeStep::kDictionaryOnBitpack: + case DecodeStep::kSparseBool: + return 0; + break; + + case DecodeStep::kRleTotalLength: + return sizeof(typename Reduce32::TempStorage); + case DecodeStep::kMainlyConstant: + case DecodeStep::kRleBool: + case DecodeStep::kRle: + case DecodeStep::kVarint: + case DecodeStep::kMakeScatterIndices: + case DecodeStep::kLengthToOffset: + return sizeof(typename BlockScan32::TempStorage); + default: + assert(false); // Undefined. + return 0; + } +} + +} // namespace detail + +template +void decodeGlobal(GpuDecode* plan, int numBlocks, cudaStream_t stream) { + int32_t sharedSize = 0; + for (auto i = 0; i < numBlocks; ++i) { + sharedSize = std::max( + sharedSize, + detail::sharedMemorySizeForDecode(plan[i].step)); + } + if (sharedSize > 0) { + sharedSize += 15; // allow align at 16. + } + + detail::decodeGlobal + <<>>(plan); +} + +} // namespace facebook::velox::wave diff --git a/velox/experimental/wave/dwio/decode/GpuDecoder.cu b/velox/experimental/wave/dwio/decode/GpuDecoder.cu new file mode 100644 index 000000000000..b0f84cf84f21 --- /dev/null +++ b/velox/experimental/wave/dwio/decode/GpuDecoder.cu @@ -0,0 +1,117 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/experimental/wave/common/Buffer.h" +#include "velox/experimental/wave/common/Cuda.h" +#include "velox/experimental/wave/common/CudaUtil.cuh" +#include "velox/experimental/wave/common/GpuArena.h" +#include "velox/experimental/wave/dwio/decode/GpuDecoder.cuh" + +namespace facebook::velox::wave { + +int32_t GpuDecode::sharedMemorySize() const { + return detail::sharedMemorySizeForDecode(step); +} + +/// Describes multiple sequences of decode ops. Each TB executes a sequence of +/// decode steps. The data area starts with a range of instruction numbers for +/// each thread block. The first TB runs from 0 to ends[0]. The nth runs from +/// ends[nth-1] to ends[nth]. After gridDim.x ends, we round to an 8 aligned +/// offset and have an array of GpuDecodes.] +struct GpuDecodeParams { + // If need to represent more than this many ops, use a dynamically allocated + // external array in 'external'. + static constexpr int32_t kMaxInlineOps = 50; + + // Pointer to standalone description of work. If nullptr, the description of + // work fits inline in 'this'. + GpuDecodeParams* external{nullptr}; + // The end of each decode program. The first starts at 0. The end is + // ends[blockIdx.x]. + int32_t ends + [kMaxInlineOps * (sizeof(GpuDecode) + sizeof(int32_t)) / + sizeof(int32_t)] = {}; +}; + +__global__ void decodeKernel(GpuDecodeParams inlineParams) { + GpuDecodeParams* params = + inlineParams.external ? inlineParams.external : &inlineParams; + int32_t programStart = blockIdx.x == 0 ? 0 : params->ends[blockIdx.x - 1]; + int32_t programEnd = params->ends[blockIdx.x]; + GpuDecode* ops = + reinterpret_cast(¶ms->ends[0] + roundUp(gridDim.x, 2)); + for (auto i = programStart; i < programEnd; ++i) { + detail::decodeSwitch(ops[i]); + } +} + +void launchDecode( + const DecodePrograms& programs, + GpuArena* arena, + WaveBufferPtr& extra, + Stream* stream) { + int32_t numBlocks = programs.programs.size(); + int32_t numOps = 0; + int32_t shared = 0; + for (auto& program : programs.programs) { + numOps += program.size(); + for (auto& step : program) { + shared = std::max( + shared, detail::sharedMemorySizeForDecode(step->step)); + } + } + if (shared > 0) { + shared += 15; // allow align at 16. + } + GpuDecodeParams localParams; + GpuDecodeParams* params = &localParams; + if (numOps > GpuDecodeParams::kMaxInlineOps) { + extra = arena->allocate( + (numBlocks + 1) * (sizeof(GpuDecode) + sizeof(int32_t))); + params = extra->as(); + } + int32_t end = programs.programs[0].size(); + GpuDecode* decodes = + reinterpret_cast(¶ms->ends[0] + roundUp(numBlocks, 2)); + int32_t fill = 0; + for (auto i = 0; i < programs.programs.size(); ++i) { + params->ends[i] = + (i == 0 ? 0 : params->ends[i - 1]) + programs.programs[i].size(); + for (auto& op : programs.programs[i]) { + decodes[fill++] = *op; + } + } + if (extra) { + localParams.external = params; + } + + decodeKernel<<stream()->stream>>>( + localParams); + CUDA_CHECK(cudaGetLastError()); + if (programs.result) { + if (!programs.hostResult) { + stream->prefetch( + nullptr, programs.result->as(), programs.result->size()); + } else { + stream->deviceToHostAsync( + programs.hostResult->as(), + programs.result->as(), + programs.hostResult->size()); + } + } +} + +} // namespace facebook::velox::wave diff --git a/velox/experimental/wave/dwio/decode/GpuDecoder.cuh b/velox/experimental/wave/dwio/decode/GpuDecoder.cuh new file mode 100644 index 000000000000..7ed68d940416 --- /dev/null +++ b/velox/experimental/wave/dwio/decode/GpuDecoder.cuh @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "velox/experimental/wave/common/CudaUtil.cuh" +#include "velox/experimental/wave/dwio/decode/DecodeStep.h" + +namespace facebook::velox::wave { + +/// Returns the amount of shared memory per thread block for 'op' +int32_t sharedMemorySize(GpuDecode& op); + +/// Decode plans passed, one thread block will be running on each plan. +__device__ void decodeNoSharedMemory(GpuDecode* plan); + +/// Decode plans passed, this version requires shared memory thus the +/// concurrency level will be lower. +template +__device__ void decodeWithSharedMemory(GpuDecode* plan); + +/// Decode plans, initiated from host code. +template +void decodeGlobal(GpuDecode* plan, int numBlocks, cudaStream_t stream = 0); + +__host__ __device__ inline bool isSet(const uint8_t* bits, int32_t idx) { + return bits[idx / 8] & (1 << (idx & 7)); +} + +__host__ __device__ inline void setBit(uint8_t* bits, uint32_t idx) { + bits[idx / 8] |= (1 << (idx % 8)); +} + +__host__ __device__ inline void clearBit(uint8_t* bits, uint32_t idx) { + static constexpr uint8_t kZeroBitmasks[] = { + static_cast(~(1 << 0)), + static_cast(~(1 << 1)), + static_cast(~(1 << 2)), + static_cast(~(1 << 3)), + static_cast(~(1 << 4)), + static_cast(~(1 << 5)), + static_cast(~(1 << 6)), + static_cast(~(1 << 7)), + }; + bits[idx / 8] &= kZeroBitmasks[idx % 8]; +} + +__host__ __device__ inline void +setBit(uint8_t* bits, uint32_t idx, bool value) { + value ? setBit(bits, idx) : clearBit(bits, idx); +} + +} // namespace facebook::velox::wave + +#include "velox/experimental/wave/dwio/decode/GpuDecoder-inl.cuh" diff --git a/velox/experimental/wave/dwio/decode/tests/CMakeLists.txt b/velox/experimental/wave/dwio/decode/tests/CMakeLists.txt new file mode 100644 index 000000000000..aac1f7e4e0ec --- /dev/null +++ b/velox/experimental/wave/dwio/decode/tests/CMakeLists.txt @@ -0,0 +1,33 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_wave_decode_test GpuDecoderTest.cu) + +set_target_properties(velox_wave_decode_test PROPERTIES CUDA_ARCHITECTURES + native) + +add_test(velox_wave_common_test velox_wave_common_test) + +target_link_libraries( + velox_wave_decode_test + velox_wave_decode + velox_wave_common + velox_memory + velox_time + velox_exception + gtest + gtest_main + gflags::gflags + glog::glog + Folly::folly) diff --git a/velox/experimental/wave/dwio/decode/tests/GpuDecoderTest.cu b/velox/experimental/wave/dwio/decode/tests/GpuDecoderTest.cu new file mode 100644 index 000000000000..4f391df7a10f --- /dev/null +++ b/velox/experimental/wave/dwio/decode/tests/GpuDecoderTest.cu @@ -0,0 +1,631 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "velox/experimental/gpu/Common.h" +#include "velox/experimental/wave/dwio/decode/GpuDecoder.cuh" + +DEFINE_int32(device_id, 0, ""); +DEFINE_bool(benchmark, false, ""); + +namespace facebook::velox::wave { +namespace { + +using namespace facebook::velox; + +// Returns the number of bytes the "values" will occupy after varint encoding. +uint64_t bulkVarintSize(const uint64_t* values, int count) { + constexpr uint8_t kLookupSizeTable64[64] = { + 10, 9, 9, 9, 9, 9, 9, 9, 8, 8, 8, 8, 8, 8, 8, 7, 7, 7, 7, 7, 7, 7, + 6, 6, 6, 6, 6, 6, 6, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 4, 4, 4, 4, 3, + 3, 3, 3, 3, 3, 3, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1}; + uint64_t size = 0; + for (int i = 0; i < count; ++i) { + size += kLookupSizeTable64[__builtin_clzll(values[i] | 1ULL)]; + } + return size; +} + +template +void writeVarint(T val, char** pos) noexcept { + while (val >= 128) { + *((*pos)++) = 0x80 | (val & 0x7f); + val >>= 7; + } + *((*pos)++) = val; +} + +template +gpu::CudaPtr allocate(int n) { + T* ptr; + CUDA_CHECK_FATAL(cudaMallocManaged(&ptr, n * sizeof(T))); + return gpu::CudaPtr(ptr); +} + +template +void fillRandom(T* values, int32_t numValues) { + uint64_t seed = 0xafbe1647deba879LU; + for (auto i = 0; i < numValues; ++i) { + values[i] = seed; + seed = (seed * 0x5def1) ^ (seed >> 21); + } +} + +// Generate random bits with probability "p" being true and "1 - p" being false. +void fillRandomBits(uint8_t* bits, double p, int numValues) { + for (int i = 0; i < numValues; ++i) { + setBit(bits, i, (double)rand() / RAND_MAX < p); + } +} + +template +inline T* addBytes(T* ptr, int bytes) { + return reinterpret_cast(reinterpret_cast(ptr) + bytes); +} + +template +inline const T* addBytes(const T* ptr, int bytes) { + return reinterpret_cast(reinterpret_cast(ptr) + bytes); +} + +template +void makeBitpackDict( + int32_t bitWidth, + int32_t numValues, + gpu::CudaPtr& cudaPtr, + T*& dict, + uint64_t*& bits, + T*& result, + int32_t** scatter) { + int64_t dictBytes = sizeof(T) << bitWidth; + int64_t bitBytes = (roundUp(numValues * bitWidth, 128) / 8) + 16; + int64_t resultBytes = numValues * sizeof(T); + int scatterBytes = + scatter ? roundUp(numValues * sizeof(int32_t), sizeof(T)) : 0; + if (scatterBytes) { + resultBytes += resultBytes / 2; + } + cudaPtr = allocate(dictBytes + bitBytes + scatterBytes + resultBytes); + T* memory = (T*)cudaPtr.get(); + + dict = memory; + + static int sequence = 1; + ++sequence; + for (auto i = 0; i < dictBytes / sizeof(T); ++i) { + dict[i] = (10 + sequence) * i; + } + + // The bit packed data does not start at a word boundary. + bits = addBytes(reinterpret_cast(memory), dictBytes + 1); + fillRandom(bits, bitBytes / 8); + + if (scatterBytes) { + // Make a scatter vector that makes gaps in the result sequence. + *scatter = + addBytes(reinterpret_cast(memory), dictBytes + bitBytes); + for (auto i = 0; i < numValues; ++i) { + (*scatter)[i] = i + i / 4; + } + } + result = addBytes( + reinterpret_cast(memory), dictBytes + bitBytes + scatterBytes); +} + +class GpuDecoderTest : public ::testing::Test { + protected: + void SetUp() override { + CUDA_CHECK_FATAL(cudaEventCreate(&startEvent_)); + CUDA_CHECK_FATAL(cudaEventCreate(&stopEvent_)); + } + + void TearDown() override { + CUDA_CHECK_FATAL(cudaEventDestroy(startEvent_)); + CUDA_CHECK_FATAL(cudaEventDestroy(stopEvent_)); + } + + void testCase( + const std::string& label, + std::function func, + int64_t bytes, + int32_t numReps) { + func(); + CUDA_CHECK_FATAL(cudaGetLastError()); + if (!FLAGS_benchmark) { + CUDA_CHECK_FATAL(cudaDeviceSynchronize()); + return; + } + CUDA_CHECK_FATAL(cudaEventRecord(startEvent_, 0)); + for (auto count = 0; count < numReps; ++count) { + func(); + } + CUDA_CHECK_FATAL(cudaEventRecord(stopEvent_, 0)); + CUDA_CHECK_FATAL(cudaEventSynchronize(stopEvent_)); + float ms; + CUDA_CHECK_FATAL(cudaEventElapsedTime(&ms, startEvent_, stopEvent_)); + printf( + "%s %.2f (%d at %.2f us each)\n", + label.c_str(), + bytes * numReps * 1e-6 / ms, + numReps, + ms * 1000 / numReps); + } + + template + void testCopyPlan(int64_t numValues, int numBlocks, bool useScatter) { + auto source = allocate(numValues); + auto result = allocate(numValues * 4 / 3); + gpu::CudaPtr scatter; + if (useScatter) { + scatter = allocate(numValues); + for (auto i = 0; i < numValues; ++i) { + scatter[i] = i * 5 / 4; + } + } + fillRandom(source.get(), numValues); + result[numValues] = 0xdeadbeef; + int valuesPerOp = roundUp(numValues / numBlocks, kBlockSize); + int numOps = roundUp(numValues, valuesPerOp) / valuesPerOp; + auto ops = allocate(numOps); + for (auto i = 0; i < numOps; ++i) { + int32_t begin = i * valuesPerOp; + ops[i].step = DecodeStep::kTrivial; + auto& op = ops[i].data.trivial; + op.dataType = WaveTypeTrait::typeKind; + op.begin = begin; + op.end = std::min(numValues, (i + 1) * valuesPerOp); + op.result = result.get(); + op.input = source.get(); + op.scatter = scatter.get(); + } + testCase( + fmt::format( + "copy plan {} numValues={} useScatter={}", + sizeof(T) * 8, + numValues, + useScatter), + [&] { decodeGlobal(ops.get(), numOps); }, + numValues * sizeof(T), + 10); + if (!scatter) { + EXPECT_EQ(0xdeadbeef, result[numValues]); + } + for (auto i = 0; i < numValues; ++i) { + ASSERT_EQ(source[i], result[scatter ? scatter[i] : i]); + } + } + + template + void dictTestPlan( + int32_t bitWidth, + int64_t numValues, + int numBlocks, + bool useScatter) { + gpu::CudaPtr ptr; + T* dict; + uint64_t* bits; + T* result; + int32_t* scatter = nullptr; + makeBitpackDict( + bitWidth, + numValues, + ptr, + dict, + bits, + result, + useScatter ? &scatter : nullptr); + result[numValues] = 0xdeadbeef; + int valuesPerOp = roundUp(numValues / numBlocks, kBlockSize); + int numOps = roundUp(numValues, valuesPerOp) / valuesPerOp; + auto ops = allocate(numOps); + for (auto i = 0; i < numOps; ++i) { + int32_t begin = i * valuesPerOp; + ops[i].step = DecodeStep::kDictionaryOnBitpack; + auto& op = ops[i].data.dictionaryOnBitpack; + op.begin = begin; + op.end = std::min(numValues, (i + 1) * valuesPerOp); + op.result = result; + op.bitWidth = bitWidth; + op.indices = bits; + op.alphabet = dict; + op.scatter = scatter; + op.baseline = 0; + op.dataType = WaveTypeTrait::typeKind; + } + testCase( + fmt::format( + "bitpack dictplan {} numValues={} useScatter={}", + sizeof(T) * 8, + numValues, + useScatter), + [&] { decodeGlobal(ops.get(), numOps); }, + numValues * sizeof(T), + 10); + if (!scatter) { + EXPECT_EQ(0xdeadbeef, result[numValues]); + } + auto mask = (1u << bitWidth) - 1; + for (auto i = 0; i < numValues; ++i) { + int32_t bit = i * bitWidth; + uint64_t word = *addBytes(bits, bit / 8); + T expected = dict[(word >> (bit & 7)) & mask]; + ASSERT_EQ(result[scatter ? scatter[i] : i], expected) << i; + } + } + + template + void testSparseBool(int numValues, int numBlocks) { + auto expected = allocate((numValues + 7) / 8); + fillRandomBits(expected.get(), 0.99, numValues); + auto indices = allocate(numValues); + int indicesCount = 0; + for (int i = 0; i < numValues; ++i) { + if (!isSet(expected.get(), i)) { + indices[indicesCount++] = i; + } + } + auto bools = allocate(numValues * numBlocks); + auto resultSize = (numValues + 7) / 8; + auto result = allocate(resultSize * numBlocks); + auto ops = allocate(numBlocks); + for (int i = 0; i < numBlocks; ++i) { + ops[i].step = DecodeStep::kSparseBool; + auto& op = ops[i].data.sparseBool; + op.totalCount = numValues; + op.sparseValue = false; + op.sparseIndices = indices.get(); + op.sparseCount = indicesCount; + op.bools = bools.get() + i * numValues; + op.result = result.get() + i * resultSize; + } + testCase( + "", + [&] { decodeGlobal(ops.get(), numBlocks); }, + resultSize * numBlocks, + 3); + for (int j = 0; j < numBlocks; ++j) { + auto* actual = ops[j].data.sparseBool.result; + for (int i = 0; i < numValues; ++i) { + ASSERT_EQ(isSet(actual, i), isSet(expected.get(), i)) << i; + } + } + } + + template + void testVarint(int numValues, int numBlocks) { + std::vector expected(numValues); + fillRandom(expected.data(), numValues); + for (int i = 0; i < numValues; ++i) { + if (i % 100 != 0) { + expected[i] %= 128; + } + } + auto inputSize = bulkVarintSize(expected.data(), numValues); + auto input = allocate(inputSize); + auto* rawInput = input.get(); + for (int i = 0; i < numValues; ++i) { + writeVarint(expected[i], &rawInput); + } + auto ends = allocate(inputSize * numBlocks); + auto endPos = allocate(inputSize * numBlocks); + auto result = allocate(inputSize * numBlocks); + auto ops = allocate(numBlocks); + for (int i = 0; i < numBlocks; ++i) { + ops[i].step = DecodeStep::kVarint; + auto& op = ops[i].data.varint; + op.input = input.get(); + op.size = inputSize; + op.ends = ends.get() + i * inputSize; + op.endPos = endPos.get() + i * inputSize; + op.resultType = WaveTypeKind::BIGINT; + op.result = result.get() + i * inputSize; + } + testCase( + "", + [&] { decodeGlobal(ops.get(), numBlocks); }, + numValues * sizeof(uint64_t) * numBlocks, + 3); + for (int j = 0; j < numBlocks; ++j) { + auto& op = ops[j].data.varint; + ASSERT_EQ(op.resultSize, numValues); + for (int i = 0; i < numValues; ++i) { + ASSERT_EQ(reinterpret_cast(op.result)[i], expected[i]); + } + } + } + + template + void testMainlyConstant(int numValues, int numBlocks) { + auto isCommon = allocate((numValues + 7) / 8); + fillRandomBits(isCommon.get(), 0.99, numValues); + auto values = allocate(numValues + 1); + fillRandom(values.get(), numValues + 1); + auto otherIndices = allocate(numValues * numBlocks); + auto result = allocate(numValues * numBlocks); + auto otherCounts = allocate(numBlocks); + auto ops = allocate(numBlocks); + for (int i = 0; i < numBlocks; ++i) { + ops[i].step = DecodeStep::kMainlyConstant; + auto& op = ops[i].data.mainlyConstant; + op.dataType = WaveTypeTrait::typeKind; + op.count = numValues; + op.commonValue = &values[numValues]; + op.otherValues = values.get(); + op.isCommon = isCommon.get(); + op.otherIndices = otherIndices.get() + i * numValues; + op.result = result.get() + i * numValues; + op.otherCount = otherCounts.get() + i; + } + testCase( + "", + [&] { decodeGlobal(ops.get(), numBlocks); }, + numValues * numBlocks * sizeof(T), + 3); + for (int k = 0; k < numBlocks; ++k) { + auto& op = ops[k].data.mainlyConstant; + auto* result = (const T*)op.result; + int j = 0; + for (int i = 0; i < numValues; ++i) { + if (isSet(isCommon.get(), i)) { + ASSERT_EQ(result[i], values[numValues]); + } else { + ASSERT_EQ(result[i], values[j++]); + } + } + ASSERT_EQ(*op.otherCount, j); + } + } + + template + void testRleTotalLength(int numValues, int numBlocks) { + auto values = allocate(numValues); + fillRandom(values.get(), numValues); + int valuesPerOp = (numValues + numBlocks - 1) / numBlocks; + auto ops = allocate(numBlocks); + auto lengths = allocate(numBlocks); + for (auto i = 0; i < numBlocks; ++i) { + ops[i].step = DecodeStep::kRleTotalLength; + auto& op = ops[i].data.rleTotalLength; + op.input = values.get() + i * valuesPerOp; + op.count = std::min(valuesPerOp, numValues - i * valuesPerOp); + op.result = &lengths[i]; + } + testCase( + "", + [&] { decodeGlobal(ops.get(), numBlocks); }, + numValues * sizeof(int32_t), + 5); + for (int i = 0; i < numBlocks; ++i) { + auto& op = ops[i].data.rleTotalLength; + int64_t expected = 0; + for (int j = 0; j < op.count; ++j) { + expected += op.input[j]; + } + ASSERT_EQ(*op.result, expected); + } + } + + template + void testRle(int numValues, int numBlocks) { + auto values = allocate(numValues); + auto lengths = allocate(numValues); + int totalLength = 0; + fillRandom(values.get(), numValues); + fillRandom(lengths.get(), numValues); + for (int i = 0; i < numValues; ++i) { + int limit = i % 1000 == 0 ? 1000 : 10; + lengths[i] = (uint32_t)lengths[i] % limit; + totalLength += lengths[i]; + } + auto ops = allocate(numBlocks); + auto results = allocate(numBlocks); + int valuesPerOp = (numValues + numBlocks - 1) / numBlocks; + for (int i = 0; i < numBlocks; ++i) { + ops[i].step = DecodeStep::kRleTotalLength; + auto& op = ops[i].data.rleTotalLength; + op.input = lengths.get() + i * valuesPerOp; + op.count = std::min(valuesPerOp, numValues - i * valuesPerOp); + op.result = &results[i]; + } + decodeGlobal(ops.get(), numBlocks); + CUDA_CHECK_FATAL(cudaGetLastError()); + CUDA_CHECK_FATAL(cudaDeviceSynchronize()); + auto result = allocate(totalLength); + int lengthSofar = 0; + for (int i = 0; i < numBlocks; ++i) { + int subtotal = *ops[i].data.rleTotalLength.result; + ops[i].step = DecodeStep::kRle; + auto& op = ops[i].data.rle; + op.valueType = WaveTypeTrait::typeKind; + op.values = values.get() + i * valuesPerOp; + op.lengths = lengths.get() + i * valuesPerOp; + op.count = std::min(valuesPerOp, numValues - i * valuesPerOp); + op.result = result.get() + lengthSofar; + lengthSofar += subtotal; + } + testCase( + "", + [&] { decodeGlobal(ops.get(), numBlocks); }, + totalLength * sizeof(T), + 3); + for (int i = 0; i < numBlocks; ++i) { + } + for (int i = 0, j = 0; i < numValues; ++i) { + for (int k = 0; k < lengths[i]; ++k) { + ASSERT_EQ(result[j++], values[i]); + } + } + } + + template + void testMakeScatterIndices(int numValues, int numBlocks) { + auto bits = allocate((numValues * numBlocks + 7) / 8); + fillRandomBits(bits.get(), 0.5, numValues * numBlocks); + auto indices = allocate(numValues * numBlocks); + auto indicesCounts = allocate(numBlocks); + auto ops = allocate(numBlocks); + for (int i = 0; i < numBlocks; ++i) { + ops[i].step = DecodeStep::kMakeScatterIndices; + auto& op = ops[i].data.makeScatterIndices; + op.bits = bits.get(); + op.findSetBits = true; + op.begin = i * numValues; + op.end = op.begin + numValues; + op.indices = indices.get() + i * numValues; + op.indicesCount = indicesCounts.get() + i; + } + testCase( + "", + [&] { decodeGlobal(ops.get(), numBlocks); }, + numValues * numBlocks * sizeof(int32_t), + 3); + for (int i = 0; i < numBlocks; ++i) { + auto& op = ops[i].data.makeScatterIndices; + int k = 0; + for (int j = 0; j < numValues; ++j) { + if (isSet(bits.get(), j + i * numValues)) { + ASSERT_LT(k, *op.indicesCount); + ASSERT_EQ(op.indices[k++], j); + } + } + ASSERT_EQ(k, *op.indicesCount); + } + } + + void testMakeScatterIndicesStream(int numValues, int numBlocks) { + auto bits = allocate((numValues * numBlocks + 7) / 8); + fillRandomBits(bits.get(), 0.5, numValues * numBlocks); + auto indices = allocate(numValues * numBlocks); + auto indicesCounts = allocate(numBlocks); + DecodePrograms programs; + for (int i = 0; i < numBlocks; ++i) { + programs.programs.emplace_back(); + programs.programs.back().push_back(std::make_unique()); + auto opPtr = programs.programs.back().front().get(); + opPtr->step = DecodeStep::kMakeScatterIndices; + auto& op = opPtr->data.makeScatterIndices; + op.bits = bits.get(); + op.findSetBits = true; + op.begin = i * numValues; + op.end = op.begin + numValues; + op.indices = indices.get() + i * numValues; + op.indicesCount = indicesCounts.get() + i; + } + auto stream = std::make_unique(); + auto arena = + std::make_unique(100000000, getAllocator(getDevice())); + WaveBufferPtr extra; + launchDecode(programs, arena.get(), extra, stream.get()); + stream->wait(); + for (int i = 0; i < numBlocks; ++i) { + auto& op = programs.programs[i].front()->data.makeScatterIndices; + int k = 0; + for (int j = 0; j < numValues; ++j) { + if (isSet(bits.get(), j + i * numValues)) { + ASSERT_LT(k, *op.indicesCount); + ASSERT_EQ(op.indices[k++], j); + } + } + ASSERT_EQ(k, *op.indicesCount); + } + } + + private: + cudaEvent_t startEvent_; + cudaEvent_t stopEvent_; +}; + +TEST_F(GpuDecoderTest, trivial) { + testCopyPlan(40'000'003, 1024, false); + testCopyPlan(40'000'003, 1024, true); +} + +TEST_F(GpuDecoderTest, dictionaryOnBitpack) { + dictTestPlan(11, 4'000'037, 1024, false); + dictTestPlan(11, 4'000'037, 1024, false); + dictTestPlan(11, 40'000'003, 1024, false); + dictTestPlan(11, 40'000'003, 1024, false); + dictTestPlan(11, 40'000'003, 1024, true); +} + +TEST_F(GpuDecoderTest, sparseBool) { + testSparseBool<256>(40013, 1024); +} + +TEST_F(GpuDecoderTest, varint) { + testVarint<256>(4001, 1024); +} + +TEST_F(GpuDecoderTest, mainlyConstant) { + testMainlyConstant(40013, 1024); +} + +TEST_F(GpuDecoderTest, rleTotalLength) { + testRleTotalLength<256>(40'000'003, 1024); +} + +TEST_F(GpuDecoderTest, rle) { + testRle(40'000'003, 1024); +} + +TEST_F(GpuDecoderTest, makeScatterIndices) { + testMakeScatterIndices<256>(40013, 1024); +} + +TEST_F(GpuDecoderTest, streamApi) { + // One call with few blocks, another with many, to cover inlined and out of + // line params. + testMakeScatterIndicesStream(100, 20); + testMakeScatterIndicesStream(999, 999); +} +} // namespace +} // namespace facebook::velox::wave + +void printFuncAttrs( + const std::string& heading, + const cudaFuncAttributes& attrs) { + std::cout << heading << " sharedSizeBytes=" << attrs.sharedSizeBytes + << " constSizeBytes" << attrs.constSizeBytes + << " localSizeBytes =" << attrs.localSizeBytes + << "maxThreadsPerBlock=" << attrs.maxThreadsPerBlock + << " numRegs=" << attrs.numRegs + << " maxDynamicSharedSizeBytes=" << attrs.maxDynamicSharedSizeBytes + << std::endl; +} +using namespace facebook::velox::wave; + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::Init init{&argc, &argv}; + + cudaDeviceProp prop; + CUDA_CHECK_FATAL(cudaGetDeviceProperties(&prop, FLAGS_device_id)); + printf("Running on device: %s\n", prop.name); + CUDA_CHECK_FATAL(cudaSetDevice(FLAGS_device_id)); + cudaFuncAttributes attrs; + CUDA_CHECK_FATAL(cudaFuncGetAttributes(&attrs, detail::decodeGlobal<128>)); + printFuncAttrs("decode blocksize 128", attrs); + CUDA_CHECK_FATAL(cudaFuncGetAttributes(&attrs, detail::decodeGlobal<256>)); + printFuncAttrs("decode blocksize 256", attrs); + CUDA_CHECK_FATAL(cudaFuncGetAttributes(&attrs, detail::decodeGlobal<512>)); + printFuncAttrs("decode blocksize 512", attrs); + CUDA_CHECK_FATAL(cudaFuncGetAttributes(&attrs, detail::decodeGlobal<1024>)); + printFuncAttrs("decode blocksize 1024", attrs); + return RUN_ALL_TESTS(); +} diff --git a/velox/experimental/wave/vector/Operand.h b/velox/experimental/wave/vector/Operand.h index 5ba78813086f..cc3cdb8b75b9 100644 --- a/velox/experimental/wave/vector/Operand.h +++ b/velox/experimental/wave/vector/Operand.h @@ -23,6 +23,55 @@ /// side files. namespace facebook::velox::wave { +/// Copy of TypeKind in velox/type/Type.h. Type.h is incompatible with Cuda +/// headers, therefore duplicated here. +enum class WaveTypeKind : int8_t { + + BOOLEAN = 0, + TINYINT = 1, + SMALLINT = 2, + INTEGER = 3, + BIGINT = 4, + REAL = 5, + DOUBLE = 6, + VARCHAR = 7, + VARBINARY = 8, + TIMESTAMP = 9, + HUGEINT = 10, + // Enum values for ComplexTypes start after 30 to leave + // some values space to accommodate adding new scalar/native + // types above. + ARRAY = 30, + MAP = 31, + ROW = 32, + UNKNOWN = 33, + FUNCTION = 34, + OPAQUE = 35, + INVALID = 36 +}; + +template +struct WaveTypeTrait {}; + +template <> +struct WaveTypeTrait { + static constexpr WaveTypeKind typeKind = WaveTypeKind::INTEGER; +}; + +template <> +struct WaveTypeTrait { + static constexpr WaveTypeKind typeKind = WaveTypeKind::INTEGER; +}; + +template <> +struct WaveTypeTrait { + static constexpr WaveTypeKind typeKind = WaveTypeKind::BIGINT; +}; +template <> +struct WaveTypeTrait { + static constexpr WaveTypeKind typeKind = WaveTypeKind::BIGINT; +}; + // Normal thread block size for Wave kernels constexpr int32_t kBlockSize = 256; using OperandId = int32_t;