* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef FLINK_BENCHMARK_DYNAMICKAFKARECORDSERIALIZATIONSCHEMA_H
#define FLINK_BENCHMARK_DYNAMICKAFKARECORDSERIALIZATIONSCHEMA_H
#include <vector>
#include <memory>
#include <stdexcept>
#include <cstdint>
#include <nlohmann/json.hpp>
#include "data/vectorbatch/VectorBatch.h"
#include "table/data/RowData.h"
#include "KafkaRecordSerializationSchema.h"
#include "table/data/GenericRowData.h"
#include "table/data/Row.h"
#include "KeyValueByteContainer.h"
#include "basictypes/String.h"
class DynamicKafkaRecordSerializationSchema : public KafkaRecordSerializationSchema<RowData> {
public:
using MetadataPositions = std::vector<int>;
DynamicKafkaRecordSerializationSchema() {};
DynamicKafkaRecordSerializationSchema(std::vector<std::string>& inputFields, std::vector<std::string>& inputTypes);
void RowToJson(RowData* row);
void RowToJson(omnistream::VectorBatch* input, int rowIndex);
KeyValueByteContainer Serialize(RowData* consumedRow);
KeyValueByteContainer Serialize(omnistream::VectorBatch* input, int rowIndex);
KeyValueByteContainer Serialize(String* element);
KeyValueByteContainer Serialize(Row* row);
private:
std::vector<FieldGetter> keyFieldGetters_;
std::vector<FieldGetter> valueFieldGetters_;
bool hasMetadata_;
MetadataPositions metadataPositions_;
bool upsertMode_;
std::vector<std::string> inputFields_;
std::vector<std::string> inputTypes_;
std::vector<std::pair<int32_t, int32_t>> decimalInfo;
nlohmann::ordered_json j;
std::ostringstream oss;
char timeBuffer[80];
static GenericRowData createProjectedRow(RowData& sourceRow, RowKind kind, std::vector<FieldGetter>& getters);
};
#endif