* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef FLINK_TNEL_DATAINPUTDESERIALIZER_H
#define FLINK_TNEL_DATAINPUTDESERIALIZER_H
#include <cstdint>
#include <vector>
#include "DataInputView.h"
#include "basictypes/Double.h"
#include "include/common.h"
class DataInputDeserializer : public DataInputView {
public:
explicit DataInputDeserializer(const uint8_t *buffer = nullptr, int anEnd = 0, int position = 0)
: buffer_(buffer), end_(anEnd), position_(position) {};
~DataInputDeserializer() override = default;
void setBuffer(const uint8_t *buffer, int bufferLength, int start, int len);
int readUnsignedByte() override;
uint8_t readByte() override;
int64_t readLong() override;
int readInt() override;
std::string readUTF() override;
int readUnsignedShort() override;
bool readBoolean() override;
void readFully(uint8_t* b, int size, int off, int len) override;
void *GetBuffer() override
{
return const_cast<void *>(static_cast<const void *>(buffer_));
}
int Available()
{
if (position_ < end_) {
return end_ - position_;
} else {
return 0;
}
}
double readDouble() override;
const uint8_t* getData() override {
return buffer_;
}
size_t getPosition() override {
return position_;
}
void setPosition(size_t position) override {
position_ = position;
}
private:
const uint8_t *buffer_;
int end_ = 0;
int position_ = 0;
std::vector<uint8_t> byteArr;
std::vector<char> charArr;
};
inline void DataInputDeserializer::setBuffer(const uint8_t *buffer, int bufferLength, int start, int len)
{
LOG("DataInputDeserializer::setBuffer " << reinterpret_cast<long>(buffer)
<< " bufferLength " << bufferLength
<< " start " << start
<< " len " << len)
if (start < 0 || len < 0 || start + len > bufferLength) {
THROW_LOGIC_EXCEPTION("Invalid bounds.");
}
buffer_ = buffer;
position_ = start;
end_ = start + len;
}
inline uint8_t DataInputDeserializer::readByte()
{
LOG("DataInputDeserializer::readByte " << reinterpret_cast<long>(buffer_)
<< " position_ " << position_
<< " end_ " << end_)
if (unlikely(position_ < 0 || position_ > end_)) {
THROW_LOGIC_EXCEPTION("readByte EOFException.");
}
return buffer_[position_++];
}
inline int64_t DataInputDeserializer::readLong()
{
if (unlikely(position_ < 0 || position_ + sizeof(int64_t) > static_cast<size_t>(end_))) {
THROW_LOGIC_EXCEPTION("EOFException, position_: " << position_ << ", end_: " << end_);
}
auto ret = static_cast <int64_t>
((static_cast<uint64_t>(buffer_[position_]) << 56) |
(static_cast<uint64_t>(buffer_[position_ + 1]) << 48) |
(static_cast<uint64_t>(buffer_[position_ + 2]) << 40) |
(static_cast<uint64_t>(buffer_[position_ + 3]) << 32) |
(static_cast<uint64_t>(buffer_[position_ + 4]) << 24) |
(static_cast<uint64_t>(buffer_[position_ + 5]) << 16) |
(static_cast<uint64_t>(buffer_[position_ + 6]) << 8) |
static_cast<uint64_t>(buffer_[position_ + 7]));
position_ += 8;
return ret;
}
inline int DataInputDeserializer::readInt()
{
if (unlikely(position_ < 0 || position_ + static_cast<int>(sizeof(uint32_t)) > end_)) {
THROW_LOGIC_EXCEPTION("readInt EOFException");
}
uint32_t value = (static_cast<uint32_t>(buffer_[position_]) << 24) |
(static_cast<uint32_t>(buffer_[position_ + 1]) << 16) |
(static_cast<uint32_t>(buffer_[position_ + 2]) << 8) |
static_cast<uint32_t>(buffer_[position_ + 3]);
position_ += sizeof(uint32_t);
LOG("position_ " << position_ << " address " << reinterpret_cast<long>(buffer_) \
<< " value " << value)
return static_cast<int>(value);
}
inline void DataInputDeserializer::readFully(uint8_t* b, int size, int off, int len)
{
if (likely(len >= 0 && off <= size && position_ <= end_ - len)) {
std::copy(buffer_ + position_, buffer_ + position_ + len, b + off);
position_ += len;
} else {
THROW_LOGIC_EXCEPTION("EOFException, position: " << position_ << ", end_: " << end_ << ", len: " << len);
}
}
inline int DataInputDeserializer::readUnsignedByte()
{
if (likely(position_ < end_)) {
return (buffer_[position_++] & 0xff);
} else {
THROW_LOGIC_EXCEPTION("EOFException")
}
}
inline std::string DataInputDeserializer::readUTF()
{
auto utflen = readUnsignedShort();
byteArr.clear();
byteArr.resize(utflen);
charArr.clear();
charArr.resize(utflen);
int c;
int char2;
int char3;
int count = 0;
int chararrCount = 0;
readFully(byteArr.data(), utflen, 0, utflen);
while (count < utflen) {
c = (int) byteArr[count] & 0xff;
if (c > 127) {
break;
}
count++;
charArr[chararrCount++] = (char) c;
}
while (count < utflen) {
c = (int) byteArr[count] & 0xff;
switch (c >> 4) {
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
case 7:
count++;
charArr[chararrCount++] = (char) c;
break;
case 12:
case 13:
count += 2;
if (unlikely(count > utflen)) {
THROW_LOGIC_EXCEPTION("malformed input: partial character at end");
}
char2 = (int) byteArr[count - 1];
if (unlikely((char2 & 0xC0) != 0x80)) {
THROW_LOGIC_EXCEPTION("malformed input around byte " + count);
}
charArr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
break;
case 14:
count += 3;
if (unlikely(count > utflen)) {
THROW_LOGIC_EXCEPTION("malformed input: partial character at end");
}
char2 = (int) byteArr[count - 2];
char3 = (int) byteArr[count - 1];
if (unlikely(((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))) {
THROW_LOGIC_EXCEPTION("malformed input around byte " + (count - 1));
}
charArr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
break;
default:
THROW_LOGIC_EXCEPTION("malformed input around byte " + count);
}
}
return std::string(charArr.data(), charArr.size());
}
inline int DataInputDeserializer::readUnsignedShort()
{
if (likely(position_ < end_ - 1)) {
int f1 = position_++;
return ((buffer_[f1] & 0xff) << 8) | (buffer_[position_++] & 0xff);
} else {
THROW_LOGIC_EXCEPTION("readUnsignedShort EOFException, end_: " << end_);
}
}
inline bool DataInputDeserializer::readBoolean()
{
if (likely(position_ < end_)) {
return buffer_[position_++] != 0;
} else {
THROW_LOGIC_EXCEPTION("readBoolean EOFException, endl_: " << end_);
}
}
inline double DataInputDeserializer::readDouble()
{
return Double::doubleToLongBits(readLong());
}
#endif