* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "SubstraitToVeloxExpr.h"
#include "TypeUtils.h"
#include "velox/connectors/hive/FileProperties.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/core/PlanNode.h"
#include "velox/dwio/common/Options.h"
namespace gluten {
class ResultIterator;
struct SplitInfo {
bool isStream = false;
u_int32_t partitionIndex;
std::vector<std::unordered_map<std::string, std::string>> partitionColumns;
std::vector<std::unordered_map<std::string, std::string>> metadataColumns;
std::vector<std::string> paths;
std::vector<u_int64_t> starts;
std::vector<u_int64_t> lengths;
dwio::common::FileFormat format;
std::vector<std::optional<facebook::velox::FileProperties>> properties;
virtual ~SplitInfo() = default;
};
class SubstraitToVeloxPlanConverter {
public:
SubstraitToVeloxPlanConverter(
memory::MemoryPool* pool,
const std::unordered_map<std::string, std::string>& confMap = {},
const std::optional<std::string> writeFilesTempPath = std::nullopt,
bool validationMode = false)
: pool_(pool), confMap_(confMap), writeFilesTempPath_(writeFilesTempPath), validationMode_(validationMode) {}
core::PlanNodePtr toVeloxPlan(const ::substrait::WriteRel& writeRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::ExpandRel& expandRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::GenerateRel& generateRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::WindowRel& windowRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::WindowGroupLimitRel& windowGroupLimitRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::SetRel& setRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::JoinRel& joinRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::CrossRel& crossRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::AggregateRel& aggRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::ProjectRel& projectRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::FilterRel& filterRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::FetchRel& fetchRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::TopNRel& topNRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& readRel, const RowTypePtr& type);
core::PlanNodePtr toVeloxPlan(const ::substrait::SortRel& sortRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& sRead);
core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel& sRead, int32_t streamIdx);
core::PlanNodePtr toVeloxPlan(const ::substrait::Rel& sRel);
core::PlanNodePtr toVeloxPlan(const ::substrait::RelRoot& sRoot);
core::PlanNodePtr toVeloxPlan(const ::substrait::Plan& substraitPlan);
SubstraitVeloxExprConverter* getExprConverter() {
return exprConverter_.get();
}
void constructFunctionMap(const ::substrait::Plan& substraitPlan);
const std::unordered_map<uint64_t, std::string>& getFunctionMap() const {
return functionMap_;
}
const std::unordered_map<core::PlanNodeId, std::shared_ptr<SplitInfo>>& splitInfos() const {
return splitInfoMap_;
}
void insertInputNode(uint64_t inputIdx, const std::shared_ptr<const core::PlanNode>& inputNode, int planNodeId) {
inputNodesMap_[inputIdx] = inputNode;
planNodeId_ = planNodeId;
}
void setSplitInfos(std::vector<std::shared_ptr<SplitInfo>> splitInfos) {
splitInfos_ = splitInfos;
}
void setValueStreamNodeFactory(
std::function<core::PlanNodePtr(std::string, memory::MemoryPool*, int32_t, RowTypePtr)> factory) {
valueStreamNodeFactory_ = std::move(factory);
}
void setInputIters(std::vector<std::shared_ptr<ResultIterator>> inputIters) {
inputIters_ = std::move(inputIters);
}
int32_t getStreamIndex(const ::substrait::ReadRel& sRel);
std::string findFuncSpec(uint64_t id);
void extractJoinKeys(
const ::substrait::Expression& joinExpression,
std::vector<const ::substrait::Expression::FieldReference*>& leftExprs,
std::vector<const ::substrait::Expression::FieldReference*>& rightExprs);
core::AggregationNode::Step toAggregationStep(const ::substrait::AggregateRel& sAgg);
core::AggregationNode::Step toAggregationFunctionStep(const ::substrait::AggregateFunction& sAggFuc);
std::string toAggregationFunctionName(const std::string& baseName, const core::AggregationNode::Step& step);
std::pair<std::vector<core::FieldAccessTypedExprPtr>, std::vector<core::SortOrder>> processSortField(
const ::google::protobuf::RepeatedPtrField<::substrait::SortField>& sortField,
const RowTypePtr& inputType);
private:
core::PlanNodePtr processEmit(const ::substrait::RelCommon& relCommon, const core::PlanNodePtr& noEmitNode);
static bool checkTypeExtension(const ::substrait::Plan& substraitPlan);
std::string nextPlanNodeId();
std::shared_ptr<const core::PlanNode> toVeloxAgg(
const ::substrait::AggregateRel& sAgg,
const std::shared_ptr<const core::PlanNode>& childNode,
const core::AggregationNode::Step& aggStep);
template <typename T>
core::PlanNodePtr convertSingleInput(T rel) {
VELOX_CHECK(rel.has_input(), "Child Rel is expected here.");
return toVeloxPlan(rel.input());
}
const core::WindowNode::Frame createWindowFrame(
const ::substrait::Expression_WindowFunction_Bound& lower_bound,
const ::substrait::Expression_WindowFunction_Bound& upper_bound,
const ::substrait::WindowType& type,
const RowTypePtr& inputType);
int planNodeId_ = 0;
std::unordered_map<uint64_t, std::string> functionMap_;
std::unordered_map<core::PlanNodeId, std::shared_ptr<SplitInfo>> splitInfoMap_;
std::function<core::PlanNodePtr(std::string, memory::MemoryPool*, int32_t, RowTypePtr)> valueStreamNodeFactory_;
std::vector<std::shared_ptr<ResultIterator>> inputIters_;
std::unordered_map<uint64_t, std::shared_ptr<const core::PlanNode>> inputNodesMap_;
int32_t splitInfoIdx_{0};
std::vector<std::shared_ptr<SplitInfo>> splitInfos_;
std::unique_ptr<SubstraitVeloxExprConverter> exprConverter_;
memory::MemoryPool* pool_;
std::unordered_map<std::string, std::string> confMap_;
std::optional<std::string> writeFilesTempPath_;
bool validationMode_ = false;
};
}