918b9420创建于 2025年11月29日历史提交
/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#include "OutputConversionOperator.h"

OutputConversionOperator::OutputConversionOperator(const nlohmann::json& config,
    Output* output) : output(output),
    description(config) {
    this->collector = new TimestampedCollector(this->output);
    inputTypes = config["inputTypes"].get<std::vector<std::basic_string<char>>>();
    outputTypes = config["outputTypes"].get<std::vector<std::basic_string<char>>>();
}

void OutputConversionOperator::open() {
}

Row* OutputConversionOperator::toExternal(RowData *internalRecord)
{
    std::vector<std::any> fieldByPosition;
    std::map<std::string, std::any> fieldByName;
    std::map<std::string, int> positionByName;
    for (size_t i = 0; i < outputTypes.size(); i++) {
        if (outputTypes[i] == "BIGINT" || outputTypes[i].find("TIMESTAMP") != std::string::npos) {
            fieldByPosition.emplace_back(*internalRecord->getLong(i));
        } else if (outputTypes[i] == "INTEGER") {
            fieldByPosition.emplace_back(*internalRecord->getInt(i));
        } else if (outputTypes[i] == "DOUBLE") {
            fieldByPosition.emplace_back(*internalRecord->getLong(i));
        } else if (outputTypes[i] == "BOOLEAN") {
            fieldByPosition.emplace_back(*internalRecord->getInt(i));
        } else if (outputTypes[i] == "STRING"
        || outputTypes[i] == "VARCHAR" || outputTypes[i] == "VARCHAR(2147483647)") {
            BinaryStringData *stringRowData = internalRecord->getString(i);
            std::string strValue = std::string(stringRowData->toString()->begin(),
                                               stringRowData->toString()->end());
            fieldByPosition.emplace_back(strValue);
        } else {
            LOG("Data type not supported: " << outputTypes[i])
            throw std::runtime_error("Data type not supported");
        }
    }
    auto row = new Row(internalRecord->getRowKind(), fieldByPosition, fieldByName, positionByName);
    return row;
}

RowData* OutputConversionOperator::getEntireRow(omnistream::VectorBatch *batch, int rowId)
{
    int colsCount = batch->GetVectorCount();
    BinaryRowData *row = BinaryRowData::createBinaryRowDataWithMem(colsCount);
    for (int32_t i = 0; i < colsCount; i++) {
        int pos = i;
        if (inputTypes[i] == "BIGINT" || inputTypes[i].find("TIMESTAMP") != std::string::npos) {
            row->setLong(i, reinterpret_cast<omniruntime::vec::Vector<int64_t> *>(batch->Get(pos))->GetValue(rowId));
        } else if (inputTypes[i] == "INTEGER") {
            row->setInt(i, reinterpret_cast<omniruntime::vec::Vector<int32_t> *>(batch->Get(pos))->GetValue(rowId));
        } else if (inputTypes[i] == "DOUBLE") {
            row->setLong(i, reinterpret_cast<omniruntime::vec::Vector<double> *>(batch->Get(pos))->GetValue(rowId));
        } else if (inputTypes[i] == "BOOLEAN") {
            row->setInt(i, reinterpret_cast<omniruntime::vec::Vector<bool> *>(batch->Get(pos))->GetValue(rowId));
        } else if (inputTypes[i] == "STRING"
            || inputTypes[i] == "VARCHAR" || inputTypes[i] == "VARCHAR(2147483647)") {
                auto str =
                        reinterpret_cast
                        <omniruntime::vec::Vector<omniruntime::vec::LargeStringContainer<std::string_view>> *>
                        (batch->Get(pos))->GetValue(rowId);
                auto u32Str = std::make_unique<std::u32string>(str.begin(), str.end());
                row->setString(i, new BinaryStringData(u32Str.get()));
        } else {
            LOG("Data type not supported: " << inputTypes[i])
            delete row;
            throw std::runtime_error("Data type not supported");
        };
    }
    row->setRowKind(batch->getRowKind(rowId));
    return row;
}

void OutputConversionOperator::processBatch(StreamRecord *record)
{
    Row *externalRecord;
    omnistream::VectorBatch *input = reinterpret_cast<omnistream::VectorBatch *>(record->getValue());
    int rowCount = input->GetRowCount();
    for (int row = 0; row < rowCount; ++row) {
        RowData *currentRow = getEntireRow(input, row);
        externalRecord = toExternal(currentRow);
        externalRecord->setInternalRow(currentRow);
        collector->collectExternalRow(externalRecord);
        delete externalRecord;
    }
}

void OutputConversionOperator::processElement(StreamRecord *record) {}

Output* OutputConversionOperator::getOutput()
{
    return this->output;
}

void OutputConversionOperator::processWatermark(Watermark *watermark) {}

void OutputConversionOperator::processWatermarkStatus(WatermarkStatus *watermarkStatus) {}

OutputConversionOperator::~OutputConversionOperator()
{
    delete collector;
}