* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "RecordWriterOutput.h"
#include "streaming/runtime/streamrecord/StreamElementSerializer.h"
namespace omnistream::datastream {
RecordWriterOutput::RecordWriterOutput(
TypeSerializer* outSerializer, omnistream::datastream::RecordWriter* recordWriter)
: recordWriter_(recordWriter),
announcedStatus(new WatermarkStatus(WatermarkStatus::activeStatus))
{
LOG(">>>>>");
if (outSerializer != nullptr) {
serializationDelegate_ = new SerializationDelegate(new StreamElementSerializer(outSerializer));
}
}
RecordWriterOutput::~RecordWriterOutput()
{
delete reusableWatermark_;
}
void RecordWriterOutput::collect(void* record)
{
LOG(">>>>>>>");
auto* streamRecord = static_cast<StreamRecord*>(record);
auto* value = static_cast<Object*>(streamRecord->getValue());
collectAndCheckIfChained(streamRecord);
if (!value->isPool) {
value->putRefCount();
} else {
(dynamic_cast<Long*>(value))->putRefCount();
}
}
void RecordWriterOutput::close()
{
recordWriter_->close();
}
void RecordWriterOutput::emitWatermark(Watermark* mark)
{
if (announcedStatus->IsIdle()) {
return;
}
if (reusableWatermark_ == nullptr) {
reusableWatermark_ = new InternalWatermark(0);
}
watermarkGauge.setCurrentwatermark(mark->getTimestamp());
reusableWatermark_->setTimestamp(mark->getTimestamp());
serializationDelegate_->setInstance(reusableWatermark_);
recordWriter_->emit(serializationDelegate_);
}
void RecordWriterOutput::emitWatermarkStatus(WatermarkStatus* watermarkStatus)
{
if (!announcedStatus->Equals(watermarkStatus)) {
announcedStatus = watermarkStatus;
serializationDelegate_->setInstance(watermarkStatus);
recordWriter_->emit(serializationDelegate_);
}
}
}