Vector Vectors
Velox vector use used to represent columnar data in memory, they are used as input and ouput for most the velox components.
The basic memory layout extends the Apache Arrow format, and is composed of a size variable (denoting the number of rows represented in the Vector), the data type TypePtr, and an optional nullability bitmap BufferPtr to represent null values.
The BaseVector class also provides a collection of methods to help users copy, resize, hash, compare, and print Vectors.
Vector can represent fized size Primitive type like Integer, Float, Double, Boolean, etc. as well as variable types like String, Array, Map, and Row. Vector can also be nested in arbitrary ways and can leverage different encodings formats such as Flat, Dictionary, Constant & RLE.
/// Base class for all columnar-based vectors of any type.
class BaseVector {
public:
static constexpr uint64_t kNullHash = bits::kNullHash;
VectorEncoding::Simple encoding() const {
return encoding_;
}
velox::memory::MemoryPool* pool() const {
return pool_;
}
virtual bool isNullAt(vector_size_t idx) const {
VELOX_DCHECK_GE(idx, 0, "Index must not be negative");
VELOX_DCHECK_LT(idx, length_, "Index is too large");
return rawNulls_ ? bits::isBitNull(rawNulls_, idx) : false;
}
const TypePtr& type() const {
return type_;
}
TypeKind typeKind() const {
return typeKind_;
}
/// Returns a smart pointer to the null bitmap data for this vector. May hold
/// nullptr if there are no nulls. Not const because some vectors may generate
/// this on first access. For ConstantVector, this method returns a BufferPtr
/// of only size 1. For DictionaryVector, this method returns a BufferPtr for
/// only nulls in the top-level layer.
const BufferPtr& nulls() const {
return nulls_;
}
/// Ensures the vector has capacity for the nulls and returns the shared
/// pointer to the buffer containing them.
/// Optional parameter 'setNotNull' is passed to ensureNullsCapacity() and is
/// used to ensure all the rows will be 'not nulls' if set to true.
BufferPtr& mutableNulls(vector_size_t size, bool setNotNull = false) {
ensureNullsCapacity(size, setNotNull);
return nulls_;
}
/// @return the number of rows of data in this vector
vector_size_t size() const {
return length_;
}
/// @return true if this vector has the same value at the given index as the
/// other vector at the other vector's index (including if both are null),
/// false otherwise
/// @throws if the type_ of other doesn't match the type_ of this
virtual bool equalValueAt(
const BaseVector* other,
vector_size_t index,
vector_size_t otherIndex) const {
static constexpr CompareFlags kEqualValueAtFlags =
CompareFlags::equality(CompareFlags::NullHandlingMode::kNullAsValue);
// Will always have value because nullHandlingMode is NullAsValue.
return compare(other, index, otherIndex, kEqualValueAtFlags).value() == 0;
}
static int32_t countNulls(const BufferPtr& nulls, vector_size_t size) {
return countNulls(nulls, 0, size);
}
static VectorPtr createConstant(
const TypePtr& type,
Variant value,
vector_size_t size,
velox::memory::MemoryPool* pool);
static VectorPtr createNullConstant(
const TypePtr& type,
vector_size_t size,
velox::memory::MemoryPool* pool);
/// Wraps the 'vector' in the provided dictionary encoding. If the input
/// vector is constant and the nulls buffer is empty, this method may return a
/// ConstantVector. Additionally, if 'flattenIfRedundant' is true, this method
/// may return a flattened version of the expected dictionary vector if
/// applying the dictionary encoding would result in a suboptimally encoded
/// vector.
static VectorPtr wrapInDictionary(
BufferPtr nulls,
BufferPtr indices,
vector_size_t size,
VectorPtr vector,
bool flattenIfRedundant = false);
template <typename T = BaseVector>
static std::shared_ptr<T> create(
const TypePtr& type,
vector_size_t size,
velox::memory::MemoryPool* pool) {
return std::static_pointer_cast<T>(createInternal(type, size, pool));
}
static VectorPtr getOrCreateEmpty(
VectorPtr vector,
const TypePtr& type,
velox::memory::MemoryPool* pool) {
return vector ? vector : create(type, 0, pool);
}
/// Set 'nulls' to be the nulls buffer of this vector. This API should not be
/// used on ConstantVector.
void setNulls(const BufferPtr& nulls);
/// Returns a brief summary of the vector. If 'recursive' is true, includes a
/// summary of all the layers of encodings starting with the top layer.
///
/// For example,
/// with recursive 'false':
///
/// [DICTIONARY INTEGER: 5 elements, no nulls]
///
/// with recursive 'true':
///
/// [DICTIONARY INTEGER: 5 elements, no nulls], [FLAT INTEGER: 10
/// elements, no nulls]
std::string toString(bool recursive) const;
/// Same as toString(false). Provided to allow for easy invocation from LLDB.
std::string toString() const {
return toString(false);
}
}
TypePtr type_;
const TypeKind typeKind_;
const VectorEncoding::Simple encoding_;
BufferPtr nulls_;
// Caches raw pointer to 'nulls->as<uint64_t>().
const uint64_t* rawNulls_ = nullptr;
velox::memory::MemoryPool* pool_;
tsan_atomic<vector_size_t> length_{0};
};
using VectorPtr = std::shared_ptr<BaseVector>;
Flat Vector
FlatVector used to represent fixed size data types like Integer, Float, Double, Boolean, etc. And They are
/// FlatVector is marked final to allow for inlining on virtual methods called
/// on a pointer that has the static type FlatVector<T>; this can be a
/// significant performance win when these methods are called in loops.
template <typename T>
class FlatVector final : public SimpleVector<T> {
public:
using value_type = T;
/// Returns a smart pointer holding the values for
/// this vector. This is used during execution to process over the subset of
/// values when possible.
const BufferPtr& values() const override {
return values_;
}
/// Ensures that 'values_' is singly-referenced and has space for the current
/// size of the Vector. Sets any newly added elements to T() if the new size >
/// old size.
///
/// If 'values_' is nullptr, read-only, not uniquely-referenced, or doesn't
/// have capacity for 'size' elements allocates new buffer and copies data to
/// it. Updates 'rawValues_' to point to element 0 of
/// values_->as<T>().
BufferPtr mutableValues(vector_size_t /*ignored*/ = 0) {
const auto numNewBytes = BaseVector::byteSize<T>(BaseVector::length_);
if (values_ && !values_->isView() && values_->unique()) {
if (values_->size() < numNewBytes) {
AlignedBuffer::reallocate<T>(&values_, BaseVector::length_, T());
}
} else {
BufferPtr newValues = AlignedBuffer::allocate<T>(
BaseVector::length_, BaseVector::pool(), T());
if (values_) {
const auto numCopyBytes =
std::min<vector_size_t>(values_->size(), numNewBytes);
if constexpr (!std::is_same_v<T, bool>) {
auto dst = newValues->asMutable<char>();
auto src = values_->as<char>();
memcpy(dst, src, numCopyBytes);
} else {
auto dst = newValues->asMutable<T>();
auto src = values_->as<T>();
if constexpr (Buffer::is_pod_like_v<T>) {
memcpy(dst, src, numCopyBytes);
} else {
std::copy(src, src + numCopyBytes / sizeof(T), dst);
}
}
}
values_ = newValues;
}
rawValues_ = values_->asMutable<T>();
return values_;
}
/// Returns the raw values of this vector as a continuous array.
const T* rawValues() const;
template <typename As>
const As* rawValues() const {
return reinterpret_cast<const As*>(rawValues_);
}
/// Bool uses compact representation, use mutableRawValues<uint64_t> and
/// bits::setBit instead.
T* mutableRawValues() {
if (!(values_ && values_->isMutable())) {
BufferPtr newValues =
AlignedBuffer::allocate<T>(BaseVector::length_, BaseVector::pool());
if (values_) {
// This codepath is not yet enabled for OPAQUE types (asMutable will
// fail below)
int32_t numBytes = BaseVector::byteSize<T>(BaseVector::length_);
memcpy(newValues->asMutable<uint8_t>(), rawValues_, numBytes);
}
values_ = newValues;
rawValues_ = values_->asMutable<T>();
}
return rawValues_;
}
void setNull(vector_size_t idx, bool isNull) override {
BaseVector::setNull(idx, isNull);
if (!isNull) {
ensureValues();
}
}
// Contiguous values.
// If strings, these are velox::StringViews into memory held by
// 'stringBuffers_'
BufferPtr values_;
// Caches 'values->as<T>()'
T* rawValues_;
// If T is velox::StringView, the referenced is held by
// one of these.
std::vector<BufferPtr> stringBuffers_;
// Used by 'acquireSharedStringBuffers()' to fast check if a buffer to share
// has already been referenced by 'stringBuffers_'.
//
// NOTE: we need to ensure 'stringBuffers_' and 'stringBufferSet_' are
// always consistent.
folly::F14FastSet<const Buffer*> stringBufferSet_;
};
Buffer
Represents vector payloads, like arrays of numbers or strings or associated null flags. Buffers are reference counted and must be held by BufferPtr. Buffers can either own their memory or can be views on externally managed memory, see BufferView. A buffer that owns its memory (AlignedBuffer) allocates memory from a MemoryPool, which allows tracking and custom memory management. A buffer that owns its memory can be in a mutable state if there is a single reference to it. Copying the BufferPtr is possible only if the Buffer is in the immutable state. The typical use case has an operator reusing the same buffer for consecutive batches of output if the buffer is singly referenced. A new Buffer should be made if there are other references to the last used Buffer.
class Buffer {
public:
static std::string typeString(Type type);
Type type() const {
return type_;
}
int refCount() const noexcept {
return referenceCount_.load(std::memory_order_acquire);
}
void release() {
if (referenceCount_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
releaseResources();
if (pool_) {
freeToPool();
} else {
delete this;
}
}
}
template <typename T>
const T* as() const {
// We can't check actual types, but we can sanity-check POD/non-POD
// conversion. `void` is special as it's used in type-erased contexts
VELOX_DCHECK(std::is_void_v<T> || isPOD() == is_pod_like_v<T>);
return reinterpret_cast<const T*>(data_);
}
template <typename T>
T* asMutable() const {
// TODO: change this to isMutable(). See
// https://github.com/facebookincubator/velox/issues/6562.
VELOX_CHECK(!isView());
// We can't check actual types, but we can sanity-check POD/non-POD
// conversion. `void` is special as it's used in type-erased contexts
VELOX_DCHECK(std::is_void_v<T> || isPOD() == is_pod_like_v<T>);
return reinterpret_cast<T*>(data_);
}
bool unique() const noexcept {
return refCount() == 1;
}
velox::memory::MemoryPool* pool() const noexcept {
return pool_;
}
protected:
Buffer(
Type type,
uint8_t* data,
size_t capacity,
velox::memory::MemoryPool* pool)
: pool_{pool}, data_{data}, capacity_{capacity}, type_{type} {}
velox::memory::MemoryPool* const pool_;
uint8_t* const data_;
uint64_t size_{0};
uint64_t capacity_;
std::atomic_int32_t referenceCount_{0};
const Type type_;
// Needs to use setCapacity() from static method reallocate().
friend class AlignedBuffer;
};
using BufferPtr = boost::intrusive_ptr<Buffer>;
// Allocate memory to store 10 integer elements
constexpr int32_t size = 10;
BufferPtr buffer = AlignedBuffer::allocate<int32_t>(size, pool_.get(), 1);
const int32_t* values = buffer->as<int32_t>();
for (int i = 0; i < size; i++) {
EXPECT_EQ(values[i], 1);
}
int32_t* rawValue = buffer->asMutable<int32_t>();
for (int i = 0; i < size; i++) {
rawValue[i] = i + 10;
}
for (int i = 0; i < size; i++) {
EXPECT_EQ(values[i], i + 10);
}
std::cout << *buffer << std::endl;
// { size: 40, capacity: 96, refCount: 1, unique: true, isMutable: true, isView: false, data: [ 0a 00 00 00 0b 00 00 00 0c 00 00 00 0d 00 00 00 0e 00 00 00 0f 00 00 00 10 00 00 00 11 00 00 00 12 00 00 00 13 00 00 00 || <-- size | remaining allocated --> || 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ] }
EXPECT_EQ(buffer->refCount(), 1);
BufferPtr anotherBuffer = buffer;
EXPECT_EQ(anotherBuffer->refCount(), 2);
// Create a nulls buffer
BufferPtr nullsBuffer = AlignedBuffer::allocate<bool>(10, pool_.get(), bits::kNotNull);
const uint64_t* nulls = nullsBuffer->as<uint64_t>();
uint64_t* rawNulls = nullsBuffer->asMutable<uint64_t>();
for (int i = 0; i < 10; i++) {
if (i % 3 == 0) {
bits::setNull(rawNulls, i);
EXPECT_TRUE(bits::isBitNull(nulls, i));
}
}
TEST_F(VectorTest, simpleTest) {
constexpr size_t vectorSize = 10;
{
VectorPtr integerVector = BaseVector::create(INTEGER(), vectorSize, pool_.get());
// Alternate values should be null
// Vector should have square of number from to 10
BufferPtr values = integerVector->values();
ASSERT_TRUE(values);
int32_t* rawValues = values->asMutable<int32_t>();
uint64_t* rawNulls = integerVector->mutableRawNulls();
ASSERT_TRUE(rawNulls);
size_t nullCount = 0;
for (int i = 0; std::cmp_less(i , vectorSize); i++) {
rawValues[i] = i * i;
if (i % 2 == 0) {
bits::setNull(rawNulls, i);
nullCount++;
}
}
ASSERT_TRUE(integerVector->isFlatEncoding());
FlatVector<int32_t>* integerFlat = integerVector->asFlatVector<int32_t>();
ASSERT_TRUE(integerVector->isNullAt(0));
ASSERT_EQ(integerFlat->valueAt(1), 1);
ASSERT_TRUE(integerVector->isNullAt(2));
ASSERT_EQ(integerFlat->valueAt(3), 9);
}
{
// String Vector
std::vector<std::optional<std::string>> strings {
"a",
"bb",
"inlineString"
"this cannot be inlined",
"another lengthy string",
"cat",
std::nullopt,
"once again another lengthy string",
"I will be null :-(",
};
VectorPtr stringVector = BaseVector::create(VARCHAR(), strings.size(), pool_.get());
EXPECT_TRUE(stringVector->isFlatEncoding());
auto stringFlatVector = stringVector->asFlatVector<StringView>();
auto rawValues = stringFlatVector->mutableRawValues();
auto rawNulls = stringVector->mutableRawNulls();
size_t bufferCapacity = 0;
for (auto& s : strings) {
if (s.has_value() && s.value().size() > 12) {
bufferCapacity += s.value().size();
}
}
BufferPtr stringBuffer = AlignedBuffer::allocate<char>(bufferCapacity, pool_.get());
char* rawStringBuffer = stringBuffer->asMutable<char>();
size_t stringBufferOffset = 0;
for (int i = 0; i < strings.size(); i++) {
if (!strings[i].has_value()) {
bits::setNull(rawNulls, i);
continue;
}
if (strings[i].value().size() <= StringView::kInlineSize) {
rawValues[i] = StringView(strings[i].value());
} else {
// std::memcpy(rawStringBuffer + stringBufferOffset, strings[i].value().data(), strings[i].value().size());
strcpy(rawStringBuffer + stringBufferOffset, strings[i].value().data());
rawValues[i] = StringView(rawStringBuffer + stringBufferOffset, strings[i].value().size());
stringBufferOffset += strings[i].value().size();
}
}
stringFlatVector->setStringBuffers({std::move(stringBuffer)});
std::cout << stringFlatVector->toString(0, strings.size()) << std::endl;
}
}