* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "EventDeserializer.h"
StreamRecord* BatchEventDeserializer::deserialize(std::unique_ptr<Event> event)
{
if (event == nullptr) {
throw std::runtime_error("Event is null.");
}
if (collectedCnt == 0) {
createNewEventBatch();
}
reinterpret_cast<Vector<int>*>(vb->GetVectors()[0])->SetValue(collectedCnt, (int) event->getEventType());
if (event->getEventType() == EventType::PERSON) {
convertPerson(std::move(event));
setNullAuction();
setNullBid();
} else if (event->getEventType() == EventType::AUCTION) {
setNullPerson();
convertAuction(std::move(event));
setNullBid();
} else if (event->getEventType() == EventType::BID) {
setNullPerson();
setNullAuction();
convertBid(std::move(event));
}
collectedCnt++;
if (collectedCnt == batchSize) {
reUseRecord->setValue(vb);
collectedCnt = 0;
return reUseRecord;
}
return nullptr;
}
void BatchEventDeserializer::createNewEventBatch()
{
collectedCnt = 0;
vb = new omnistream::VectorBatch(batchSize);
std::vector<int> columnTypes = {DataTypeId::OMNI_INT};
columnTypes.insert(columnTypes.end(), Event::PersonTypes.begin(), Event::PersonTypes.end());
columnTypes.insert(columnTypes.end(), Event::AuctionTypes.begin(), Event::AuctionTypes.end());
columnTypes.insert(columnTypes.end(), Event::BidTypes.begin(), Event::BidTypes.end());
for (auto typeId : columnTypes) {
BaseVector* vec = nullptr;
switch (typeId) {
case DataTypeId::OMNI_INT:
vec = new Vector<int>(batchSize);
break;
case DataTypeId::OMNI_TIMESTAMP:
case DataTypeId::OMNI_LONG:
vec = new Vector<int64_t>(batchSize);
break;
case DataTypeId::OMNI_VARCHAR:
case DataTypeId::OMNI_CHAR:
vec = new Vector<LargeStringContainer<std::string_view>>(batchSize);
break;
default:
std::runtime_error("Not supported datatype");
}
vb->Append(vec);
}
}
void BatchEventDeserializer::convertPerson(std::unique_ptr<Event> event)
{
static int offset = 1;
std::unique_ptr<Person> ptr(dynamic_cast<Person*>(event.get()));
event.release();
auto vectors = vb->GetVectors();
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 0])->SetValue(collectedCnt, ptr->id);
std::string_view sv(ptr->name.data(), ptr->name.size());
reinterpret_cast<VarcharVec *>(vectors[offset + 1])->SetValue(collectedCnt, sv);
reinterpret_cast<VarcharVec *>(vectors[offset + 2])->SetValue(collectedCnt, ptr->emailAddress);
sv = std::string_view(ptr->creditCard.data(), ptr->creditCard.size());
reinterpret_cast<VarcharVec *>(vectors[offset + 3])->SetValue(collectedCnt, sv);
reinterpret_cast<VarcharVec *>(vectors[offset + 4])->SetValue(collectedCnt, ptr->city);
reinterpret_cast<VarcharVec *>(vectors[offset + 5])->SetValue(collectedCnt, ptr->state);
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 6])->SetValue(collectedCnt, ptr->dateTime);
reinterpret_cast<VarcharVec *>(vectors[offset + 7])->SetValue(collectedCnt, ptr->extra);
}
void BatchEventDeserializer::setNullPerson()
{
static int offset = 1;
auto vectors = vb->GetVectors();
for (int i = 0; i < 8; i++) {
vectors[offset + i]->SetNull(collectedCnt);
}
}
void BatchEventDeserializer::convertAuction(std::unique_ptr<Event> event)
{
static int offset = 1 + static_cast<int>(Event::PersonTypes.size());
std::unique_ptr<Auction> ptr(dynamic_cast<Auction*>(event.get()));
event.release();
auto vectors = vb->GetVectors();
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 0])->SetValue(collectedCnt, ptr->id);
reinterpret_cast<VarcharVec *>(vectors[offset + 1])->SetValue(collectedCnt, ptr->itemName);
reinterpret_cast<VarcharVec *>(vectors[offset + 2])->SetValue(collectedCnt, ptr->description);
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 3])->SetValue(collectedCnt, ptr->initialBid);
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 4])->SetValue(collectedCnt, ptr->reserve);
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 5])->SetValue(collectedCnt, ptr->dateTime);
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 6])->SetValue(collectedCnt, ptr->expires);
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 7])->SetValue(collectedCnt, ptr->seller);
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 8])->SetValue(collectedCnt, ptr->category);
reinterpret_cast<VarcharVec *>(vectors[offset + 9])->SetValue(collectedCnt, ptr->extra);
}
void BatchEventDeserializer::setNullAuction()
{
static int offset = 1 + static_cast<int>(Event::PersonTypes.size());
auto vectors = vb->GetVectors();
for (int i = 0; i < 10; i++) {
vectors[offset + i]->SetNull(collectedCnt);
}
}
void BatchEventDeserializer::convertBid(std::unique_ptr<Event> event)
{
static int offset = 1 + static_cast<int>(Event::PersonTypes.size()) + static_cast<int>(Event::AuctionTypes.size());
std::unique_ptr<Bid> ptr(dynamic_cast<Bid*>(event.get()));
event.release();
auto vectors = vb->GetVectors();
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 0])->SetValue(collectedCnt, ptr->auction);
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 1])->SetValue(collectedCnt, ptr->bidder);
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 2])->SetValue(collectedCnt, ptr->price);
reinterpret_cast<VarcharVec *>(vectors[offset + 3])->SetValue(collectedCnt, ptr->channel);
reinterpret_cast<VarcharVec *>(vectors[offset + 4])->SetValue(collectedCnt, ptr->url);
reinterpret_cast<Vector<int64_t>* >(vectors[offset + 5])->SetValue(collectedCnt, ptr->dateTime);
reinterpret_cast<VarcharVec *>(vectors[offset + 6])->SetValue(collectedCnt, ptr->extra);
}
void BatchEventDeserializer::setNullBid()
{
static int offset = 1 + static_cast<int>(Event::PersonTypes.size()) + static_cast<int>(Event::AuctionTypes.size());
auto vectors = vb->GetVectors();
for (int i = 0; i < 7; i++) {
vectors[offset + i]->SetNull(collectedCnt);
}
}
BatchEventDeserializer::~BatchEventDeserializer()
{
delete reUseRecord;
}