* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include <stdexcept>
#include "StreamRecord.h"
#include "basictypes/Tuple2.h"
#include "StreamElementSerializer.h"
namespace omnistream::datastream {
StreamElementSerializer::StreamElementSerializer(TypeSerializer* typeSerializer) : typeSerializer_(typeSerializer)
{
reUsableRecord_ = new StreamRecord();
reUsableWatermark_ = new Watermark(0);
if (!typeSerializer || (strcmp(typeSerializer_->getName(), "BinaryRowDataSerializer") == 0)) {
isDatastream = false;
} else {
isDatastream = true;
}
}
void* StreamElementSerializer::deserialize(DataInputView& source)
{
int tag = static_cast<uint8_t>(source.readByte());
#ifdef DEBUG
LOG("tag: " + std::to_string(tag));
#endif
if (tag == static_cast<int>(StreamElementTag::TAG_REC_WITH_TIMESTAMP)) {
long timestamp = source.readLong();
#ifdef DEBUG
LOG("timestamp: " + std::to_string(timestamp));
LOG("typeSerializer_: is kind of " << typeSerializer_->getName());
#endif
if (!isDatastream) {
auto binaryRowData = typeSerializer_->deserialize(source);
reUsableRecord_->setValue(binaryRowData);
reUsableRecord_->setTag(StreamElementTag::TAG_REC_WITH_TIMESTAMP);
reUsableRecord_->setTimestamp(timestamp);
} else {
Object* buffer = typeSerializer_->GetBuffer();
typeSerializer_->deserialize(buffer, source);
reUsableRecord_->setValue(buffer);
reUsableRecord_->setTag(StreamElementTag::TAG_REC_WITH_TIMESTAMP);
reUsableRecord_->setTimestamp(timestamp);
}
return reUsableRecord_;
} else if (tag == static_cast<int>(StreamElementTag::TAG_REC_WITHOUT_TIMESTAMP)) {
#ifdef DEBUG
LOG("typeSerializer_: is kind of " << typeSerializer_->getName());
#endif
if (!isDatastream) {
auto binaryRowData = typeSerializer_->deserialize(source);
reUsableRecord_->setValue(binaryRowData);
} else {
Object* buffer = typeSerializer_->GetBuffer();
typeSerializer_->deserialize(buffer, source);
reUsableRecord_->setValue(buffer);
}
return reUsableRecord_;
} else if (tag == static_cast<int>(StreamElementTag::TAG_WATERMARK)) {
long timestamp = source.readLong();
reUsableWatermark_->setTimestamp(timestamp);
reUsableWatermark_->setTag(StreamElementTag::TAG_WATERMARK);
return reUsableWatermark_;
} else if (tag == static_cast<int>(StreamElementTag::TAG_STREAM_STATUS)) {
return new Watermark(source.readInt());
} else {
THROW_LOGIC_EXCEPTION("Corrupt stream, found tag:" + std::to_string(tag));
}
}
void StreamElementSerializer::serialize(Object* input, DataOutputSerializer& target)
{
auto element = reinterpret_cast<StreamElement*>(input);
#ifdef DEBUG
LOG(">>>> Tag: " + std::to_string(static_cast<uint8_t>(element->getTag())));
#endif
if (element->getTag() == StreamElementTag::TAG_REC_WITH_TIMESTAMP ||
element->getTag() == StreamElementTag::TAG_REC_WITHOUT_TIMESTAMP) {
auto* asRecord = static_cast<StreamRecord*>(element);
if (asRecord->hasTimestamp()) {
target.write(static_cast<uint32_t>(StreamElementTag::TAG_REC_WITH_TIMESTAMP));
target.writeRecordTimestamp(asRecord->getTimestamp());
} else {
target.write(static_cast<uint32_t>(StreamElementTag::TAG_REC_WITHOUT_TIMESTAMP));
}
#ifdef DEBUG
LOG(">>> After write tag:" << typeSerializer_->getName());
#endif
typeSerializer_->serialize(static_cast<Object*>(asRecord->getValue()), target);
} else if (element->getTag() == StreamElementTag::TAG_WATERMARK) {
target.write(static_cast<uint8_t>(StreamElementTag::TAG_WATERMARK));
auto* asWatermark = static_cast<Watermark*>(element);
target.writeLong(asWatermark->getTimestamp());
} else {
std::cout << "type name: " << typeSerializer_->getName() << std::endl;
THROW_LOGIC_EXCEPTION(
"Unknown element_, can not serialize it" + std::to_string(static_cast<uint8_t>(element->getTag())));
}
}
const char* StreamElementSerializer::getName() const
{
return "StreamElementSerializer";
}
}