* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "VeloxRuntime.h"
#include <algorithm>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include "VeloxBackend.h"
#include "compute/ResultIterator.h"
#include "compute/Runtime.h"
#include "compute/VeloxPlanConverter.h"
#include "config/VeloxConfig.h"
#include "operators/serializer/VeloxRowToColumnarConverter.h"
#include "operators/writer/VeloxArrowWriter.h"
#include "shuffle/VeloxShuffleReader.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "utils/ConfigExtractor.h"
#include "utils/VeloxArrowUtils.h"
#ifdef ENABLE_HDFS
#include "operators/writer/VeloxParquetDataSourceHDFS.h"
#endif
#ifdef ENABLE_S3
#include "operators/writer/VeloxParquetDataSourceS3.h"
#endif
#ifdef ENABLE_GCS
#include "operators/writer/VeloxParquetDataSourceGCS.h"
#endif
#ifdef ENABLE_ABFS
#include "operators/writer/VeloxParquetDataSourceABFS.h"
#endif
using namespace facebook;
namespace gluten {
VeloxRuntime::VeloxRuntime(
const std::string& kind,
VeloxMemoryManager* vmm,
const std::unordered_map<std::string, std::string>& confMap)
: Runtime(kind, vmm, confMap) {
veloxCfg_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap_));
debugModeEnabled_ = veloxCfg_->get<bool>(kDebugModeEnabled, false);
FLAGS_minloglevel = veloxCfg_->get<uint32_t>(kGlogSeverityLevel, FLAGS_minloglevel);
FLAGS_v = veloxCfg_->get<uint32_t>(kGlogVerboseLevel, FLAGS_v);
}
void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size, std::optional<std::string> dumpFile) {
if (debugModeEnabled_ || dumpFile.has_value()) {
try {
auto planJson = substraitFromPbToJson("Plan", data, size, dumpFile);
LOG_IF(INFO, debugModeEnabled_) << std::string(50, '#') << " received substrait::Plan: " << taskInfo_ << std::endl
<< planJson;
} catch (const std::exception& e) {
LOG(WARNING) << "Error converting Substrait plan to JSON: " << e.what();
}
}
GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed");
}
void VeloxRuntime::parseSplitInfo(const uint8_t* data, int32_t size, std::optional<std::string> dumpFile) {
if (debugModeEnabled_ || dumpFile.has_value()) {
try {
auto splitJson = substraitFromPbToJson("ReadRel.LocalFiles", data, size, dumpFile);
LOG_IF(INFO, debugModeEnabled_) << std::string(50, '#')
<< " received substrait::ReadRel.LocalFiles: " << taskInfo_ << std::endl
<< splitJson;
} catch (const std::exception& e) {
LOG(WARNING) << "Error converting Substrait plan to JSON: " << e.what();
}
}
::substrait::ReadRel_LocalFiles localFile;
GLUTEN_CHECK(parseProtobuf(data, size, &localFile) == true, "Parse substrait plan failed");
localFiles_.push_back(localFile);
}
void VeloxRuntime::getInfoAndIds(
const std::unordered_map<velox::core::PlanNodeId, std::shared_ptr<SplitInfo>>& splitInfoMap,
const std::unordered_set<velox::core::PlanNodeId>& leafPlanNodeIds,
std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
std::vector<velox::core::PlanNodeId>& scanIds,
std::vector<velox::core::PlanNodeId>& streamIds) {
for (const auto& leafPlanNodeId : leafPlanNodeIds) {
auto it = splitInfoMap.find(leafPlanNodeId);
if (it == splitInfoMap.end()) {
throw std::runtime_error("Could not find leafPlanNodeId.");
}
auto splitInfo = it->second;
if (splitInfo->isStream) {
streamIds.emplace_back(leafPlanNodeId);
} else {
scanInfos.emplace_back(splitInfo);
scanIds.emplace_back(leafPlanNodeId);
}
}
}
std::string VeloxRuntime::planString(bool details, const std::unordered_map<std::string, std::string>& sessionConf) {
std::vector<std::shared_ptr<ResultIterator>> inputs;
auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool();
VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(), sessionConf, std::nullopt, true);
auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_, localFiles_);
return veloxPlan->toString(details, true);
}
VeloxMemoryManager* VeloxRuntime::memoryManager() {
auto vmm = dynamic_cast<VeloxMemoryManager*>(memoryManager_);
GLUTEN_CHECK(vmm != nullptr, "Not a Velox memory manager");
return vmm;
}
std::shared_ptr<ResultIterator> VeloxRuntime::createResultIterator(
const std::string& spillDir,
const std::vector<std::shared_ptr<ResultIterator>>& inputs,
const std::unordered_map<std::string, std::string>& sessionConf) {
LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" << printConfig(confMap_);
VeloxPlanConverter veloxPlanConverter(
inputs, memoryManager()->getLeafMemoryPool().get(), sessionConf, *localWriteFilesTempPath());
veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, std::move(localFiles_));
std::vector<std::shared_ptr<SplitInfo>> scanInfos;
std::vector<velox::core::PlanNodeId> scanIds;
std::vector<velox::core::PlanNodeId> streamIds;
getInfoAndIds(veloxPlanConverter.splitInfos(), veloxPlan_->leafPlanNodeIds(), scanInfos, scanIds, streamIds);
auto wholestageIter = std::make_unique<WholeStageResultIterator>(
memoryManager(), veloxPlan_, scanIds, scanInfos, streamIds, spillDir, sessionConf, taskInfo_);
return std::make_shared<ResultIterator>(std::move(wholestageIter), this);
}
std::shared_ptr<ColumnarToRowConverter> VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) {
auto veloxPool = memoryManager()->getLeafMemoryPool();
return std::make_shared<VeloxColumnarToRowConverter>(veloxPool, column2RowMemThreshold);
}
std::shared_ptr<ColumnarBatch> VeloxRuntime::createOrGetEmptySchemaBatch(int32_t numRows) {
auto& lookup = emptySchemaBatchLoopUp_;
if (lookup.find(numRows) == lookup.end()) {
auto veloxPool = memoryManager()->getLeafMemoryPool();
const std::shared_ptr<VeloxColumnarBatch>& batch =
VeloxColumnarBatch::from(veloxPool.get(), gluten::createZeroColumnBatch(numRows));
lookup.emplace(numRows, batch);
}
return lookup.at(numRows);
}
std::shared_ptr<ColumnarBatch> VeloxRuntime::select(
std::shared_ptr<ColumnarBatch> batch,
const std::vector<int32_t>& columnIndices) {
auto veloxPool = memoryManager()->getLeafMemoryPool();
auto veloxBatch = gluten::VeloxColumnarBatch::from(veloxPool.get(), batch);
auto outputBatch = veloxBatch->select(veloxPool.get(), std::move(columnIndices));
return outputBatch;
}
std::shared_ptr<RowToColumnarConverter> VeloxRuntime::createRow2ColumnarConverter(struct ArrowSchema* cSchema) {
auto veloxPool = memoryManager()->getLeafMemoryPool();
return std::make_shared<VeloxRowToColumnarConverter>(cSchema, veloxPool);
}
std::shared_ptr<ShuffleWriter> VeloxRuntime::createShuffleWriter(
int numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options) {
auto veloxPool = memoryManager()->getLeafMemoryPool();
auto arrowPool = memoryManager()->getArrowMemoryPool();
GLUTEN_ASSIGN_OR_THROW(
std::shared_ptr<ShuffleWriter> shuffleWriter,
VeloxShuffleWriter::create(
options.shuffleWriterType,
numPartitions,
std::move(partitionWriter),
std::move(options),
veloxPool,
arrowPool));
return shuffleWriter;
}
std::shared_ptr<VeloxDataSource> VeloxRuntime::createDataSource(
const std::string& filePath,
std::shared_ptr<arrow::Schema> schema) {
static std::atomic_uint32_t id{0UL};
auto veloxPool = memoryManager()->getAggregateMemoryPool()->addAggregateChild("datasource." + std::to_string(id++));
auto sinkPool = memoryManager()->getLeafMemoryPool();
if (isSupportedHDFSPath(filePath)) {
#ifdef ENABLE_HDFS
return std::make_shared<VeloxParquetDataSourceHDFS>(filePath, veloxPool, sinkPool, schema);
#else
throw std::runtime_error(
"The write path is hdfs path but the HDFS haven't been enabled when writing parquet data in velox runtime!");
#endif
} else if (isSupportedS3SdkPath(filePath)) {
#ifdef ENABLE_S3
return std::make_shared<VeloxParquetDataSourceS3>(filePath, veloxPool, sinkPool, schema);
#else
throw std::runtime_error(
"The write path is S3 path but the S3 haven't been enabled when writing parquet data in velox runtime!");
#endif
} else if (isSupportedGCSPath(filePath)) {
#ifdef ENABLE_GCS
return std::make_shared<VeloxParquetDataSourceGCS>(filePath, veloxPool, sinkPool, schema);
#else
throw std::runtime_error(
"The write path is GCS path but the GCS haven't been enabled when writing parquet data in velox runtime!");
#endif
} else if (isSupportedABFSPath(filePath)) {
#ifdef ENABLE_ABFS
return std::make_shared<VeloxParquetDataSourceABFS>(filePath, veloxPool, sinkPool, schema);
#else
throw std::runtime_error(
"The write path is ABFS path but the ABFS haven't been enabled when writing parquet data in velox runtime!");
#endif
}
return std::make_shared<VeloxParquetDataSource>(filePath, veloxPool, sinkPool, schema);
}
std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
ShuffleReaderOptions options) {
auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema));
auto codec = gluten::createArrowIpcCodec(options.compressionType, options.codecBackend);
auto ctxVeloxPool = memoryManager()->getLeafMemoryPool();
auto veloxCompressionType = facebook::velox::common::stringToCompressionKind(options.compressionTypeStr);
auto deserializerFactory = std::make_unique<gluten::VeloxColumnarBatchDeserializerFactory>(
schema,
std::move(codec),
veloxCompressionType,
rowType,
options.batchSize,
options.bufferSize,
memoryManager()->getArrowMemoryPool(),
ctxVeloxPool,
options.shuffleWriterType);
auto reader = std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
return reader;
}
std::unique_ptr<ColumnarBatchSerializer> VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) {
auto arrowPool = memoryManager()->getArrowMemoryPool();
auto veloxPool = memoryManager()->getLeafMemoryPool();
return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool, cSchema);
}
void VeloxRuntime::dumpConf(const std::string& path) {
const auto& backendConfMap = VeloxBackend::get()->getBackendConf()->rawConfigs();
auto allConfMap = backendConfMap;
for (const auto& pair : confMap_) {
allConfMap.insert_or_assign(pair.first, pair.second);
}
std::ofstream outFile(path);
if (!outFile.is_open()) {
LOG(ERROR) << "Failed to open file for writing: " << path;
return;
}
size_t maxKeyLength = 0;
for (const auto& pair : allConfMap) {
maxKeyLength = std::max(maxKeyLength, pair.first.length());
}
outFile << "[Backend Conf]" << std::endl;
for (const auto& pair : backendConfMap) {
outFile << std::left << std::setw(maxKeyLength + 1) << pair.first << ' ' << pair.second << std::endl;
}
outFile << std::endl << "[Session Conf]" << std::endl;
for (const auto& pair : confMap_) {
outFile << std::left << std::setw(maxKeyLength + 1) << pair.first << ' ' << pair.second << std::endl;
}
outFile.close();
}
std::shared_ptr<ArrowWriter> VeloxRuntime::createArrowWriter(const std::string& path) {
int64_t batchSize = 4096;
if (auto it = confMap_.find(kSparkBatchSize); it != confMap_.end()) {
batchSize = std::atol(it->second.c_str());
}
return std::make_shared<VeloxArrowWriter>(path, batchSize, memoryManager()->getLeafMemoryPool().get());
}
}