* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_SUBTASKSIMPLEVERSIONEDSERIALIZER_H
#define OMNISTREAM_SUBTASKSIMPLEVERSIONEDSERIALIZER_H
#include <vector>
#include <string>
#include <iostream>
#include <sstream>
#include <cstdint>
#include "core/include/common.h"
#include "core/memory/DataOutputSerializer.h"
#include "core/memory/DataInputDeserializer.h"
#include "core/io/SimpleVersionedSerializer.h"
#include "core/io/SimpleVersionedSerialization.h"
#include "SubtaskCommittableManager.h"
#include "RequestSimpleVersionedSerializer.h"
template <typename CommT>
class SubTaskSimpleVersionedSerializer : public SimpleVersionedSerializer<SubtaskCommittableManager<CommT>> {
public:
SubTaskSimpleVersionedSerializer(
std::shared_ptr<SimpleVersionedSerializer<CommT>> committableSerializer,
int subtaskId,
int numberOfSubtasks,
long checkpointId)
: committableSerializer_(std::move(committableSerializer)),
subtaskId_(subtaskId),
numberOfSubtasks_(numberOfSubtasks),
checkpointId_(checkpointId)
{
}
int getVersion() const override
{
return 0;
}
std::vector<uint8_t> serialize(const SubtaskCommittableManager<CommT>& subtask) override
{
DataOutputSerializer out(256);
auto requests = subtask.GetRequests();
std::vector<CommitRequestImpl<CommT>> list;
list.reserve(requests.size());
for (const auto& item : requests) {
list.push_back(*(item->Copy()));
}
RequestSimpleVersionedSerializer<CommT> requestSerializer(committableSerializer_);
SimpleVersionedSerialization::writeVersionAndSerializeList(requestSerializer, list, out);
return std::vector<uint8_t>(out.getData(), out.getData() + out.length());
}
SubtaskCommittableManager<CommT>* deserialize(int version, std::vector<uint8_t>& serialized) override
{
DataInputDeserializer input(serialized.data(), serialized.size(), 0);
RequestSimpleVersionedSerializer<CommT> requestSerializer(committableSerializer_);
auto* list = SimpleVersionedSerialization::readVersionAndDeserializeList(requestSerializer, input);
std::vector<std::shared_ptr<CommitRequestImpl<CommT>>> requests;
requests.reserve(list->size());
for (auto& item : *list) {
requests.push_back(std::make_shared<CommitRequestImpl<CommT>>(std::move(item)));
}
delete list;
return new SubtaskCommittableManager<CommT>(
requests,
input.readInt(),
input.readInt(),
input.readInt(),
subtaskId_,
std::optional<long>(checkpointId_));
}
private:
std::shared_ptr<SimpleVersionedSerializer<CommT>> committableSerializer_;
int subtaskId_;
int numberOfSubtasks_;
long checkpointId_;
};
#endif