/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */
#include "ChainingOutput.h"
#include "table/data/RowData.h"
#include "metrics/groups/TaskMetricGroup.h"
#include "streaming/api/operators/AbstractStreamOperator.h"


ChainingOutput::ChainingOutput(Input *op) : operator_(op), announcedStatus(new WatermarkStatus(WatermarkStatus::activeStatus))
{
    watermarkGauge = new WatermarkGauge();
}

ChainingOutput::ChainingOutput(Input *op, const std::shared_ptr<omnistream::TaskMetricGroup>& metricGroup,
                               omnistream::OperatorPOD &opConfig)
    : operator_(op), watermarkGauge(new WatermarkGauge()),
      announcedStatus(new WatermarkStatus(WatermarkStatus::activeStatus))
{
    if (metricGroup != nullptr) {
        auto ptr = metricGroup->GetInternalOperatorIOMetric(opConfig.getName(), "numRecordsOut");
        LOG("numRecordsOut add" << reinterpret_cast<long>(ptr.get()))
        numRecordsOut = reinterpret_cast<std::shared_ptr<omnistream::SimpleCounter> &>(ptr);
    } else {
        numRecordsOut = nullptr;
    }
}

void ChainingOutput::collect(void *record)
{
    LOG(" ChainingOutput collect >>>>>>>>")
    LOG("operator address " + std::to_string(reinterpret_cast<long>(operator_)))
    LOG("pass to operator: " + std::string(operator_->getName()))
    LOG("stream record address  " + std::to_string(reinterpret_cast<long>(record)))
    if (numRecordsOut != nullptr) {
        numRecordsOut->Inc(reinterpret_cast<omnistream::VectorBatch*>(
            reinterpret_cast<StreamRecord *>(record)->getValue())->GetRowCount());
    }
    operator_->processBatch(reinterpret_cast<StreamRecord *>(record));
}

void ChainingOutput::close()
{
    // do nothing
}
void ChainingOutput::emitWatermark(Watermark *mark)
{
    LOG("ChainingOutput::emitWatermark: " << mark->getTimestamp() << " name: " << operator_->getName())
    watermarkGauge->setCurrentwatermark(mark->getTimestamp());
    operator_->ProcessWatermark(mark);
}

void ChainingOutput::emitWatermarkStatus(WatermarkStatus *watermarkStatus)
{
    if (!announcedStatus->Equals(watermarkStatus)) {
        announcedStatus = watermarkStatus;
        operator_->processWatermarkStatus(watermarkStatus);
    }
}