* Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved.
* Description: Expand operator source file
*/
#include "expand.h"
#include "expression/jsonparser/jsonparser.h"
#include "util/config/QueryConfig.h"
#include "util/config_util.h"
#include "vector/vector_helper.h"
using namespace omniruntime::vec;
using namespace omniruntime::expressions;
using namespace omniruntime::codegen;
namespace omniruntime {
namespace op {
bool ExpandOperator::needsInput()
{
return GetStatus() != OMNI_STATUS_FINISHED && !noMoreInput_ && rowIndex_ == 0;
}
int32_t ExpandOperator::AddInput(VectorBatch *vecBatch)
{
if (vecBatch->GetRowCount() > 0) {
SetInputVecBatch(vecBatch);
} else {
VectorHelper::FreeVecBatch(vecBatch);
ResetInputVecBatch();
SetStatus(OMNI_STATUS_NORMAL);
}
return 0;
}
int32_t ExpandOperator::GetOutput(VectorBatch **outputVecBatch)
{
if (this->inputVecBatch == nullptr) {
if (noMoreInput_) {
SetStatus(OMNI_STATUS_FINISHED);
}
return 0;
}
projectedVecs = this->exprEvaluators[rowIndex_]->Evaluate(inputVecBatch, executionContext.get());
int rowCount = this->projectedVecs->GetRowCount();
*outputVecBatch = this->projectedVecs;
this->projectedVecs = nullptr;
++rowIndex_;
if (rowIndex_ == this->exprEvaluators.size()) {
rowIndex_ = 0;
VectorHelper::FreeVecBatch(inputVecBatch);
ResetInputVecBatch();
}
return rowCount;
}
OmniStatus ExpandOperator::Close()
{
if (projectedVecs != nullptr) {
VectorHelper::FreeVecBatch(projectedVecs);
projectedVecs = nullptr;
}
return OMNI_STATUS_NORMAL;
}
ExpandOperatorFactory *CreateExpandOperatorFactory(
std::shared_ptr<const ExpandNode> expandNode, const config::QueryConfig &queryConfig)
{
auto projections = expandNode->GetProjections();
auto sourceTypes = *(expandNode->InputType());
auto overflowConfig = queryConfig.IsOverFlowASNull() == true ? new OverflowConfig(OVERFLOW_CONFIG_NULL)
: new OverflowConfig(OVERFLOW_CONFIG_EXCEPTION);
std::vector<std::shared_ptr<ExpressionEvaluator>> exprEvaluators;
for (const auto& projection : projections) {
auto exprEvaluator = std::make_shared<ExpressionEvaluator>(projection, sourceTypes, overflowConfig);
exprEvaluators.push_back(exprEvaluator);
}
return new ExpandOperatorFactory(move(exprEvaluators));
}
omniruntime::op::Operator *ExpandOperatorFactory::CreateOperator()
{
return new ExpandOperator(this->exprEvaluators);
}
}
}