* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifdef WITH_OMNISTATESTORE
#include <gtest/gtest.h>
#include "boost_state_db.h"
#include "bss_types.h"
#include "typeutils/BinaryRowDataSerializer.h"
#include "state/VoidNamespaceSerializer.h"
#include "state/InternalKeyContextImpl.h"
#include "state/RegisteredKeyValueStateBackendMetaInfo.h"
#include "state/bss/BssStateTable.h"
#include "state/bss/BssValueState.h"
#include "api/common/state/ValueStateDescriptor.h"
#include "globals.h"
#include "state/bss/BssListState.h"
#include "state/bss/BssMapState.h"
#include "api/common/state/ListStateDescriptor.h"
#include <random>
#include <cstdint>
#include <stdexcept>
using namespace ock::bss;
long newDataTime1 = 0;
long writeTotalTime1 = 0;
long writeRocksDBTime1 = 0;
long serializeTime1 = 0;
long readTotalTime1[] = {0, 0, 0, 0, 0};
long readRocksDBTime1[] = {0, 0, 0, 0, 0};
long desTime1[] = {0, 0, 0, 0, 0};
class UUIDGenerator {
public:
static uint32_t generateUUID()
{
thread_local std::random_device rd;
thread_local std::mt19937 gen(rd());
thread_local std::uniform_int_distribution<uint32_t> dis(1, UINT32_MAX);
return dis(gen);
}
};
void initialValueState(BssValueState<RowData*, VoidNamespace, RowData*>*& valueState)
{
BoostStateDBPtr nDB = BoostStateDBFactory::Create();
ConfigRef config = std::make_shared<Config>();
config->Init(NO_0, NO_127, NO_128);
config->mMemorySegmentSize = IO_SIZE_64M;
config->SetEvictMinSize(IO_SIZE_1K);
config->SetTaskSlotFlag(UUIDGenerator::generateUUID());
nDB->Open(config);
valueState->CreateTable(nDB);
}
void initialListState(BssListState<RowData*, int64_t, int64_t>*& listState)
{
BoostStateDBPtr nDB = BoostStateDBFactory::Create();
ConfigRef config = std::make_shared<Config>();
config->Init(NO_0, NO_127, NO_128);
config->mMemorySegmentSize = IO_SIZE_64M;
config->SetEvictMinSize(IO_SIZE_1K);
config->SetTaskSlotFlag(UUIDGenerator::generateUUID());
nDB->Open(config);
listState->CreateTable(nDB);
}
void initialMapState(BssMapState<RowData*, int64_t, int64_t, int64_t>*& mapState)
{
BoostStateDBPtr nDB = BoostStateDBFactory::Create();
ConfigRef config = std::make_shared<Config>();
config->Init(NO_0, NO_127, NO_128);
config->mMemorySegmentSize = IO_SIZE_64M;
config->SetEvictMinSize(IO_SIZE_1K);
config->SetTaskSlotFlag(UUIDGenerator::generateUUID());
nDB->Open(config);
mapState->CreateTable(nDB);
}
TableRef CreateFromDB(StateType keyedStateType, std::string tableName, BoostStateDB* db)
{
auto tblDesc = std::make_shared<ock::bss::TableDescription>(
keyedStateType, "dbTable", -1, ock::bss::TableSerializer{}, db->GetConfig());
return db->GetTableOrCreate(tblDesc);
}
TEST(BssStateTest, kvtable_long_test)
{
ConfigRef config = std::make_shared<Config>();
config->Init(NO_0, NO_127, NO_128);
config->mMemorySegmentSize = IO_SIZE_64M;
config->SetEvictMinSize(IO_SIZE_1K);
config->SetTaskSlotFlag(UUIDGenerator::generateUUID());
BoostStateDBPtr nDB = BoostStateDBFactory::Create();
nDB->Open(config);
KVTableRef kVTable = nullptr;
kVTable = std::dynamic_pointer_cast<KVTable>(CreateFromDB(StateType::VALUE, "kVTable", nDB));
DataOutputSerializer keyOutputSerializer;
OutputBufferStatus outputBufferStatus;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus);
LongSerializer longSerializer;
long vectorBatchId = 1;
longSerializer.serialize(&vectorBatchId, keyOutputSerializer);
BinaryData priKey(keyOutputSerializer.getData(), (int32_t)(keyOutputSerializer.getPosition()));
BinaryData val(keyOutputSerializer.getData(), (int32_t)(keyOutputSerializer.getPosition()));
uint32_t keyHashCode = HashCode::Hash(keyOutputSerializer.getData(), (int32_t)(keyOutputSerializer.getPosition()));
kVTable->Put(keyHashCode, priKey, val);
BinaryData readValue;
kVTable->Get(keyHashCode, priKey, readValue);
DataInputDeserializer serializedData(reinterpret_cast<const uint8_t*>(readValue.Data()), readValue.Length(), 0);
void* readLong = longSerializer.deserialize(serializedData);
ASSERT_EQ(*reinterpret_cast<long*>(readLong), 1);
nDB->Close();
delete nDB;
}
omnistream::VectorBatch* GetVectorBatch(int64_t rowCount)
{
auto vBatch = new omnistream::VectorBatch(rowCount);
for (int i = 0; i < 5; ++i) {
auto pVector = new omniruntime::vec::Vector<long>(rowCount);
for (int j = 0; j < rowCount; ++j) {
pVector->SetValue(j, 100 + j + i);
}
vBatch->Append(pVector);
}
return vBatch;
}
TEST(BssStateTest, BinaryRowDataSerialzerTest)
{
BinaryRowData* value = BinaryRowData::createBinaryRowDataWithMem(3);
value->setLong(0, 100);
value->setLong(1, 200);
value->setLong(2, 300);
DataOutputSerializer keyOutputSerializer;
OutputBufferStatus outputBufferStatus;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus);
BinaryRowDataSerializer* binaryRowDataSerializer = new BinaryRowDataSerializer(1);
binaryRowDataSerializer->serialize(value, keyOutputSerializer);
BinaryData priKey(keyOutputSerializer.getData(), (int32_t)(keyOutputSerializer.getPosition()));
DataInputDeserializer serializedData(priKey.Data(), priKey.Length(), 0);
void* readLong = binaryRowDataSerializer->deserialize(serializedData);
auto res = *reinterpret_cast<BinaryRowData*>(readLong);
EXPECT_EQ(*res.getLong(0), 100);
EXPECT_EQ(*res.getLong(1), 200);
}
TEST(BssStateTest, KVTable_putNGet_binaryRowData)
{
ConfigRef config = std::make_shared<Config>();
config->Init(NO_0, NO_127, NO_128);
config->mMemorySegmentSize = IO_SIZE_64M;
config->SetEvictMinSize(IO_SIZE_1K);
config->SetTaskSlotFlag(UUIDGenerator::generateUUID());
BoostStateDBPtr nDB = BoostStateDBFactory::Create();
nDB->Open(config);
KVTableRef kVTable = nullptr;
kVTable = std::dynamic_pointer_cast<KVTable>(CreateFromDB(StateType::VALUE, "kVTable", nDB));
DataOutputSerializer keyOutputSerializer;
OutputBufferStatus outputBufferStatus;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus);
BinaryRowDataSerializer* binaryRowDataSerializer = new BinaryRowDataSerializer(3);
BinaryRowData* value = BinaryRowData::createBinaryRowDataWithMem(3);
value->setLong(0, 100);
value->setLong(1, 200);
value->setLong(2, 300);
binaryRowDataSerializer->serialize(value, keyOutputSerializer);
BinaryData priKey(keyOutputSerializer.getData(), (int32_t)(keyOutputSerializer.getPosition()));
BinaryData val(keyOutputSerializer.getData(), (int32_t)(keyOutputSerializer.getPosition()));
uint32_t keyHashCode = HashCode::Hash(keyOutputSerializer.getData(), (int32_t)(keyOutputSerializer.getPosition()));
kVTable->Put(keyHashCode, priKey, val);
BinaryData readValue;
kVTable->Get(keyHashCode, priKey, readValue);
DataInputDeserializer serializedData(reinterpret_cast<const uint8_t*>(readValue.Data()), readValue.Length(), 0);
void* readLong = binaryRowDataSerializer->deserialize(serializedData);
auto res = *reinterpret_cast<BinaryRowData*>(readLong);
EXPECT_EQ(*res.getLong(0), 100);
EXPECT_EQ(*res.getLong(1), 200);
kVTable->Get(keyHashCode, priKey, readValue);
DataInputDeserializer serializedData1(reinterpret_cast<const uint8_t*>(readValue.Data()), readValue.Length(), 0);
readLong = binaryRowDataSerializer->deserialize(serializedData1);
res = *reinterpret_cast<BinaryRowData*>(readLong);
EXPECT_EQ(*res.getLong(0), 100);
EXPECT_EQ(*res.getLong(1), 200);
nDB->Close();
delete nDB;
}
TEST(BssStateTest, BssValueStateTest)
{
VoidNamespaceSerializer* voidNamespaceSerializer = new VoidNamespaceSerializer();
BinaryRowDataSerializer* binaryRowDataSerializer = new BinaryRowDataSerializer(3);
InternalKeyContextImpl<RowData*>* context = new InternalKeyContextImpl<RowData*>(new KeyGroupRange(0, 1), 1);
BinaryRowData* keyRowData = BinaryRowData::createBinaryRowDataWithMem(3);
keyRowData->setLong(0, 100);
keyRowData->setLong(1, 100);
keyRowData->setLong(2, 100);
context->setCurrentKey(keyRowData);
context->setCurrentKeyGroupIndex(1);
RegisteredKeyValueStateBackendMetaInfo* metaInfo =
new RegisteredKeyValueStateBackendMetaInfo("metaInfo", voidNamespaceSerializer, binaryRowDataSerializer);
BssStateTable<RowData*, VoidNamespace, RowData*> bssStateTable(context, metaInfo, binaryRowDataSerializer);
BssValueState<RowData*, VoidNamespace, RowData*>* valueState =
BssValueState<RowData*, VoidNamespace, RowData*>::create(
new ValueStateDescriptor<RowData*>("ValueStateTest", binaryRowDataSerializer),
&bssStateTable,
binaryRowDataSerializer);
initialValueState(valueState);
BinaryRowData* value = BinaryRowData::createBinaryRowDataWithMem(3);
value->setLong(0, 100);
value->setLong(1, 200);
value->setLong(2, 300);
valueState->update(value);
BinaryRowData* result = dynamic_cast<BinaryRowData*>(valueState->value());
EXPECT_EQ(*result->getLong(0), 100);
EXPECT_EQ(*result->getLong(1), 200);
EXPECT_EQ(*result->getLong(2), 300);
LOG("*******************************************");
value->setLong(0, 200);
result = dynamic_cast<BinaryRowData*>(valueState->value());
EXPECT_EQ(*result->getLong(0), 100);
EXPECT_EQ(*result->getLong(1), 200);
EXPECT_EQ(*result->getLong(2), 300);
LOG("*******************************************");
valueState->update(value);
result = dynamic_cast<BinaryRowData*>(valueState->value());
EXPECT_EQ(*result->getLong(0), 200);
EXPECT_EQ(*result->getLong(1), 200);
EXPECT_EQ(*result->getLong(2), 300);
LOG("*******************************************");
auto newKey = BinaryRowData::createBinaryRowDataWithMem(1);
context->setCurrentKey(BinaryRowData::createBinaryRowDataWithMem(1));
result = dynamic_cast<BinaryRowData*>(valueState->value());
ASSERT_EQ(nullptr, result);
LOG("*******************************************");
context->setCurrentKey(keyRowData);
result = dynamic_cast<BinaryRowData*>(valueState->value());
LOG("*******************************************");
EXPECT_EQ(*result->getLong(0), 200);
EXPECT_EQ(*result->getLong(1), 200);
EXPECT_EQ(*result->getLong(2), 300);
LOG("*******************************************");
valueState->clear();
result = dynamic_cast<BinaryRowData*>(valueState->value());
ASSERT_EQ(nullptr, result);
LOG("*******************************************");
delete newKey;
delete keyRowData;
delete value;
}
BinaryData GetBssUnifyingData(int64_t longValue, uint32_t& hashcode, DataOutputSerializer& keyOutputSerializer)
{
LongSerializer longSerializer;
longSerializer.serialize(&longValue, keyOutputSerializer);
BinaryData bssData(keyOutputSerializer.getData(), keyOutputSerializer.getPosition());
hashcode = HashCode::Hash(keyOutputSerializer.getData(), keyOutputSerializer.getPosition());
return bssData;
}
TEST(BssStateTest, Bss_KListState_put_get)
{
ConfigRef config = std::make_shared<Config>();
config->Init(NO_0, NO_127, NO_128);
config->mMemorySegmentSize = IO_SIZE_64M;
config->SetEvictMinSize(IO_SIZE_1K);
config->SetTaskSlotFlag(UUIDGenerator::generateUUID());
BoostStateDBPtr nDB = BoostStateDBFactory::Create();
nDB->Open(config);
KListTableRef kListTable;
kListTable = std::dynamic_pointer_cast<KListTable>(CreateFromDB(StateType::LIST, "kListTable", nDB));
int64_t key = 10000;
uint32_t keyHashCode;
OutputBufferStatus outputBufferStatus;
DataOutputSerializer keyOutputSerializer;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus);
BinaryData keyData = GetBssUnifyingData(key, keyHashCode, keyOutputSerializer);
int64_t value = 9999;
uint32_t valueHashCode;
OutputBufferStatus outputBufferStatus1;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus1);
BinaryData valueData = GetBssUnifyingData(value, valueHashCode, keyOutputSerializer);
kListTable->Put(keyHashCode, keyData, valueData);
auto serializer = new LongSerializer();
auto getResult = kListTable->Get(keyHashCode, keyData);
for (std::size_t i = 0; i < getResult.addresses.size(); ++i) {
DataInputDeserializer serializedData(
reinterpret_cast<const uint8_t*>(getResult.addresses.at(i)), getResult.lengths.at(i), 0);
void* resPtr = serializer->deserialize(serializedData);
LOG("result is " << *(long*)resPtr);
EXPECT_EQ(9999, *(long*)resPtr);
}
}
TEST(BssStateTest, Bss_KListState_add_get)
{
ConfigRef config = std::make_shared<Config>();
config->Init(NO_0, NO_127, NO_128);
config->mMemorySegmentSize = IO_SIZE_64M;
config->SetEvictMinSize(IO_SIZE_1K);
config->SetTaskSlotFlag(UUIDGenerator::generateUUID());
BoostStateDBPtr nDB = BoostStateDBFactory::Create();
nDB->Open(config);
KListTableRef kListTable;
kListTable = std::dynamic_pointer_cast<KListTable>(CreateFromDB(StateType::LIST, "kListTable", nDB));
int64_t key = 10000;
uint32_t keyHashCode;
OutputBufferStatus outputBufferStatus;
DataOutputSerializer keyOutputSerializer;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus);
const BinaryData keyData = GetBssUnifyingData(key, keyHashCode, keyOutputSerializer);
int64_t value = 9999;
uint32_t valueHashCode;
OutputBufferStatus outputBufferStatus1;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus1);
BinaryData valueData = GetBssUnifyingData(value, valueHashCode, keyOutputSerializer);
LongSerializer* serializer = new LongSerializer();
int64_t value1 = 9998;
uint32_t valueHashCode1;
OutputBufferStatus outputBufferStatus2;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus2);
BinaryData valueData1 = GetBssUnifyingData(value1, valueHashCode1, keyOutputSerializer);
int64_t value2 = 9997;
uint32_t valueHashCode2;
OutputBufferStatus outputBufferStatus3;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus3);
BinaryData valueData2 = GetBssUnifyingData(value2, valueHashCode2, keyOutputSerializer);
kListTable->Add(keyHashCode, keyData, valueData);
kListTable->Add(keyHashCode, keyData, valueData1);
kListTable->Add(keyHashCode, keyData, valueData2);
auto getResult = kListTable->Get(keyHashCode, keyData);
kListTable->CleanResource(getResult.resId);
std::vector<int64_t>* result = new std::vector<int64_t>();
for (int i = 0; i < getResult.addresses.size(); ++i) {
DataInputDeserializer serializedData(
reinterpret_cast<const uint8_t*>(getResult.addresses.at(i)), getResult.lengths.at(i), 0);
auto tempData = serializer->deserialize(serializedData);
LOG("tempData: " << (long)tempData);
LOG("tempData1: " << (long*)tempData);
result->push_back(*(long*)tempData);
LOG("==================================");
}
EXPECT_EQ(3, result->size());
EXPECT_EQ(9999, result->at(0));
EXPECT_EQ(9998, result->at(1));
EXPECT_EQ(9997, result->at(2));
}
TEST(BssStateTest, Bss_KMapState_add_get1)
{
ConfigRef config = std::make_shared<Config>();
config->Init(NO_0, NO_127, NO_128);
config->mMemorySegmentSize = IO_SIZE_64M;
config->SetEvictMinSize(IO_SIZE_1K);
config->SetTaskSlotFlag(UUIDGenerator::generateUUID());
BoostStateDBPtr nDB = BoostStateDBFactory::Create();
nDB->Open(config);
KMapTableRef kMapTable;
kMapTable = std::dynamic_pointer_cast<KMapTable>(CreateFromDB(StateType::MAP, "kMapTable", nDB));
int64_t key1 = 10000;
uint32_t keyHashCode1;
OutputBufferStatus outputBufferStatus;
DataOutputSerializer keyOutputSerializer;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus);
const BinaryData keyData1 = GetBssUnifyingData(key1, keyHashCode1, keyOutputSerializer);
int64_t key2 = 10001;
uint32_t keyHashCode2;
OutputBufferStatus outputBufferStatus1;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus1);
const BinaryData keyData2 = GetBssUnifyingData(key2, keyHashCode2, keyOutputSerializer);
int64_t key3 = 10002;
uint32_t keyHashCode3;
OutputBufferStatus outputBufferStatus2;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus2);
const BinaryData keyData3 = GetBssUnifyingData(key3, keyHashCode3, keyOutputSerializer);
int64_t value1 = 9998;
uint32_t valueHashCode1;
OutputBufferStatus outputBufferStatus3;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus3);
BinaryData valueData1 = GetBssUnifyingData(value1, valueHashCode1, keyOutputSerializer);
int64_t value2 = 9997;
uint32_t valueHashCode2;
OutputBufferStatus outputBufferStatus4;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus4);
BinaryData valueData2 = GetBssUnifyingData(value2, valueHashCode2, keyOutputSerializer);
int64_t value3 = 9996;
uint32_t valueHashCode3;
OutputBufferStatus outputBufferStatus5;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus5);
BinaryData valueData3 = GetBssUnifyingData(value3, valueHashCode3, keyOutputSerializer);
int64_t prikey = 100;
uint32_t prikeyHashCode;
OutputBufferStatus outputBufferStatus6;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus6);
BinaryData prikeyData = GetBssUnifyingData(prikey, prikeyHashCode, keyOutputSerializer);
LongSerializer* longSerializer = new LongSerializer();
kMapTable->Put(keyHashCode1, prikeyData, keyData1, valueData1);
kMapTable->Put(keyHashCode2, prikeyData, keyData2, valueData2);
kMapTable->Put(keyHashCode3, prikeyData, keyData3, valueData3);
auto iterator = kMapTable->EntryIteratorWrraper(keyHashCode1, prikeyData);
while (iterator->HasNext()) {
LOG("iterator1 test");
auto temp = iterator->Next();
auto keyData = temp[0];
auto valueData = temp[1];
DataInputDeserializer keySerializedData(reinterpret_cast<const uint8_t*>(keyData.Data()), keyData.Length(), 0);
auto keyRes = *(long*)longSerializer->deserialize(keySerializedData);
DataInputDeserializer valSerializedData(
reinterpret_cast<const uint8_t*>(valueData.Data()), valueData.Length(), 0);
auto valRes = *(long*)longSerializer->deserialize(valSerializedData);
EXPECT_EQ(keyRes, 10000);
EXPECT_EQ(valRes, 9998);
}
delete iterator;
auto iterator2 = kMapTable->EntryIteratorWrraper(keyHashCode2, prikeyData);
while (iterator2->HasNext()) {
LOG("iterator2 test");
auto temp = iterator2->Next();
auto keyData = temp[0];
auto valueData = temp[1];
DataInputDeserializer keySerializedData(reinterpret_cast<const uint8_t*>(keyData.Data()), keyData.Length(), 0);
auto keyRes = *(long*)longSerializer->deserialize(keySerializedData);
DataInputDeserializer valSerializedData(
reinterpret_cast<const uint8_t*>(valueData.Data()), valueData.Length(), 0);
auto valRes = *(long*)longSerializer->deserialize(valSerializedData);
EXPECT_EQ(keyRes, 10001);
EXPECT_EQ(valRes, 9997);
}
delete iterator2;
auto iterator3 = kMapTable->EntryIteratorWrraper(keyHashCode3, prikeyData);
while (iterator3->HasNext()) {
auto temp = iterator3->Next();
auto keyData = temp[0];
auto valueData = temp[1];
DataInputDeserializer keySerializedData(reinterpret_cast<const uint8_t*>(keyData.Data()), keyData.Length(), 0);
auto keyRes = *(long*)longSerializer->deserialize(keySerializedData);
DataInputDeserializer valSerializedData(
reinterpret_cast<const uint8_t*>(valueData.Data()), valueData.Length(), 0);
auto valRes = *(long*)longSerializer->deserialize(valSerializedData);
EXPECT_EQ(keyRes, 10002);
EXPECT_EQ(valRes, 9996);
}
delete iterator3;
}
TEST(BssStateTest, Bss_KMapState_add_get)
{
std::srand(static_cast<unsigned int>(std::time(nullptr)));
ConfigRef config = std::make_shared<Config>();
config->Init(NO_0, NO_127, NO_128);
config->mMemorySegmentSize = IO_SIZE_64M;
config->SetEvictMinSize(IO_SIZE_1K);
config->SetTaskSlotFlag(UUIDGenerator::generateUUID());
BoostStateDBPtr nDB = BoostStateDBFactory::Create();
nDB->Open(config);
KMapTableRef kMapTable;
kMapTable = std::dynamic_pointer_cast<KMapTable>(CreateFromDB(StateType::MAP, "kMapTable", nDB));
int64_t key = 10000;
uint32_t keyHashCode;
OutputBufferStatus outputBufferStatus;
DataOutputSerializer keyOutputSerializer;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus);
const BinaryData keyData = GetBssUnifyingData(key, keyHashCode, keyOutputSerializer);
int64_t key1 = 10000;
uint32_t keyHashCode1;
OutputBufferStatus outputBufferStatus1;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus1);
const BinaryData keyData1 = GetBssUnifyingData(key1, keyHashCode1, keyOutputSerializer);
int64_t key2 = 10000;
uint32_t keyHashCode2;
OutputBufferStatus outputBufferStatus2;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus2);
const BinaryData keyData2 = GetBssUnifyingData(key2, keyHashCode2, keyOutputSerializer);
int64_t key3 = 10000;
uint32_t keyHashCode3;
OutputBufferStatus outputBufferStatus3;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus3);
const BinaryData keyData3 = GetBssUnifyingData(key3, keyHashCode3, keyOutputSerializer);
int64_t value = 9999;
uint32_t valueHashCode;
OutputBufferStatus outputBufferStatus4;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus4);
BinaryData valueData = GetBssUnifyingData(value, valueHashCode, keyOutputSerializer);
int64_t value1 = 9998;
uint32_t valueHashCode1;
OutputBufferStatus outputBufferStatus5;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus5);
BinaryData valueData1 = GetBssUnifyingData(value1, valueHashCode1, keyOutputSerializer);
int64_t value2 = 9997;
uint32_t valueHashCode2;
OutputBufferStatus outputBufferStatus6;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus6);
BinaryData valueData2 = GetBssUnifyingData(value2, valueHashCode2, keyOutputSerializer);
int64_t prikey = 100;
uint32_t prikeyHashCode;
OutputBufferStatus outputBufferStatus7;
keyOutputSerializer.setBackendBuffer(&outputBufferStatus7);
BinaryData prikeyData = GetBssUnifyingData(prikey, prikeyHashCode, keyOutputSerializer);
kMapTable->Put(keyHashCode, prikeyData, keyData, valueData);
kMapTable->Put(keyHashCode, prikeyData, keyData, valueData1);
kMapTable->Put(keyHashCode, prikeyData, keyData, valueData2);
BinaryData valueDataFromMapState;
kMapTable->Get(keyHashCode, prikeyData, keyData, valueDataFromMapState);
DataInputDeserializer serializedData(
reinterpret_cast<const uint8_t*>(valueDataFromMapState.Data()), valueDataFromMapState.Length(), 0);
LongSerializer* longSerializer = new LongSerializer();
auto res = longSerializer->deserialize(serializedData);
auto size = *(long*)res;
EXPECT_EQ(size, 9997);
kMapTable->Remove(keyHashCode, prikeyData);
auto* wrapper = kMapTable->EntryIteratorWrraper(keyHashCode, prikeyData);
ASSERT_EQ(false, wrapper->HasNext());
delete wrapper;
}
TEST(BssStateTest, BssListState)
{
LongSerializer* serializer = LongSerializer::INSTANCE;
InternalKeyContextImpl<RowData*>* context = new InternalKeyContextImpl<RowData*>(new KeyGroupRange(0, 1), 1);
BinaryRowData* keyRowData = BinaryRowData::createBinaryRowDataWithMem(1);
keyRowData->setLong(0, 100);
context->setCurrentKey(keyRowData);
context->setCurrentKeyGroupIndex(1);
auto* metaInfo = new RegisteredKeyValueStateBackendMetaInfo("metaInfo", serializer, serializer);
BssListStateTable<RowData*, int64_t, int64_t> bssListStateTable(context, metaInfo, new BinaryRowDataSerializer(1));
auto* listState = new BssListState<RowData*, int64_t, int64_t>(&bssListStateTable, serializer, serializer);
initialListState(listState);
listState->setCurrentNamespace(1000);
listState->add(10);
auto list = listState->get();
EXPECT_EQ(list->size(), 1);
EXPECT_EQ((*list)[0], 10);
listState->add(10);
listState->add(20);
list = listState->get();
LOG("list size " << list->size());
EXPECT_EQ(list->size(), 3);
EXPECT_EQ((*list)[1], 10);
EXPECT_EQ((*list)[2], 20);
}
TEST(BssStateTest, BssMapState_putNget)
{
LongSerializer* serializer = LongSerializer::INSTANCE;
InternalKeyContextImpl<RowData*>* context = new InternalKeyContextImpl<RowData*>(new KeyGroupRange(0, 1), 1);
BinaryRowData* keyRowData = BinaryRowData::createBinaryRowDataWithMem(1);
keyRowData->setLong(0, 100);
context->setCurrentKey(keyRowData);
context->setCurrentKeyGroupIndex(1);
auto* metaInfo = new RegisteredKeyValueStateBackendMetaInfo("metaInfo", serializer, serializer);
BssMapStateTable<RowData*, int64_t, int64_t, int64_t> bssMapStateTable(
context, new BinaryRowDataSerializer(1), serializer, metaInfo);
auto* mapState = new BssMapState<RowData*, int64_t, int64_t, int64_t>(
&bssMapStateTable, new BinaryRowDataSerializer(1), serializer, serializer);
initialMapState(mapState);
mapState->setCurrentNamespace(1000);
mapState->put(10, 10);
auto* entries = mapState->entries();
LOG("map size is " << entries->size());
auto val = mapState->get(10);
EXPECT_EQ(val, 10);
mapState->put(10, 20);
entries = mapState->entries();
LOG("map size is " << entries->size());
val = mapState->get(10);
EXPECT_EQ(val, 20);
mapState->put(20, 10);
entries = mapState->entries();
LOG("map size is " << entries->size());
val = mapState->get(20);
EXPECT_EQ(val, 10);
mapState->put(20, 20);
entries = mapState->entries();
LOG("map size is " << entries->size());
val = mapState->get(20);
EXPECT_EQ(val, 20);
}
TEST(BssStateTest, BssMapState_putNentries)
{
LongSerializer* serializer = LongSerializer::INSTANCE;
InternalKeyContextImpl<RowData*>* context = new InternalKeyContextImpl<RowData*>(new KeyGroupRange(0, 1), 1);
BinaryRowData* keyRowData = BinaryRowData::createBinaryRowDataWithMem(1);
keyRowData->setLong(0, 100);
context->setCurrentKey(keyRowData);
context->setCurrentKeyGroupIndex(1);
auto* metaInfo = new RegisteredKeyValueStateBackendMetaInfo("metaInfo", serializer, serializer);
BssMapStateTable<RowData*, int64_t, int64_t, int64_t> bssMapStateTable(
context, new BinaryRowDataSerializer(1), serializer, metaInfo);
auto* mapState = new BssMapState<RowData*, int64_t, int64_t, int64_t>(
&bssMapStateTable, new BinaryRowDataSerializer(1), serializer, serializer);
initialMapState(mapState);
mapState->setCurrentNamespace(1000);
mapState->put(10, 10);
auto* entries = mapState->entries();
LOG("map size is " << entries->size());
LOG("================================================================================");
mapState->put(20, 10);
entries = mapState->entries();
LOG("map size is " << entries->size());
LOG("================================================================================");
mapState->put(10, 20);
mapState->put(30, 35);
entries = mapState->entries();
LOG("map size is " << entries->size());
mapState->remove(30);
entries = mapState->entries();
LOG("map size is " << entries->size());
LOG("================================================================================");
EXPECT_EQ(entries->size(), 2);
for (auto& pair : *entries) {
LOG("key : " << pair.first << ", value : " << pair.second);
}
EXPECT_EQ(entries->at(20), 10);
EXPECT_EQ(entries->at(10), 20);
}
TEST(BssStateTest, BssListState_vectorbatch)
{
LongSerializer* serializer = LongSerializer::INSTANCE;
InternalKeyContextImpl<RowData*>* context = new InternalKeyContextImpl<RowData*>(new KeyGroupRange(0, 1), 1);
BinaryRowData* keyRowData = BinaryRowData::createBinaryRowDataWithMem(1);
keyRowData->setLong(0, 100);
context->setCurrentKey(keyRowData);
context->setCurrentKeyGroupIndex(1);
auto* metaInfo = new RegisteredKeyValueStateBackendMetaInfo("metaInfo", serializer, serializer);
BssListStateTable<RowData*, int64_t, int64_t> bssListStateTable(context, metaInfo, new BinaryRowDataSerializer(1));
auto* listState = new BssListState<RowData*, int64_t, int64_t>(&bssListStateTable, serializer, serializer);
initialListState(listState);
listState->setCurrentNamespace(1000);
omnistream::VectorBatch* tmpBatch = GetVectorBatch(vecBatchRows);
auto batchid = listState->getVectorBatchesSize();
listState->addVectorBatch(tmpBatch);
LOG("vectorbatch id " << batchid);
auto res = listState->getVectorBatch(batchid);
LOG("vectorbatch row : " << tmpBatch->GetRowCount() << ", vector : " << tmpBatch->GetVectorCount());
for (int i = 0; i < tmpBatch->GetVectorCount(); ++i) {
for (int j = 0; j < tmpBatch->GetRowCount(); ++j) {
EXPECT_EQ(tmpBatch->GetValueAt<long>(i, j), res->GetValueAt<long>(i, j));
}
}
}
TEST(BssStateTest, BssMapState_vectorbatch)
{
LongSerializer* serializer = LongSerializer::INSTANCE;
InternalKeyContextImpl<RowData*>* context = new InternalKeyContextImpl<RowData*>(new KeyGroupRange(0, 1), 1);
BinaryRowData* keyRowData = BinaryRowData::createBinaryRowDataWithMem(1);
keyRowData->setLong(0, 100);
context->setCurrentKey(keyRowData);
context->setCurrentKeyGroupIndex(1);
auto* metaInfo = new RegisteredKeyValueStateBackendMetaInfo("metaInfo", serializer, serializer);
BssMapStateTable<RowData*, int64_t, int64_t, int64_t> bssMapStateTable(
context, new BinaryRowDataSerializer(1), serializer, metaInfo);
auto* mapState =
new BssMapState<RowData*, int64_t, int64_t, int64_t>(&bssMapStateTable, serializer, serializer, serializer);
initialMapState(mapState);
mapState->setCurrentNamespace(1000);
omnistream::VectorBatch* tmpBatch = GetVectorBatch(vecBatchRows);
mapState->addVectorBatch(tmpBatch);
LOG("vectorbatch id " << mapState->getVectorBatchesSize());
auto res = mapState->getVectorBatch(mapState->getVectorBatchesSize() - 1);
LOG("vectorbatch row : " << tmpBatch->GetRowCount() << ", vector : " << tmpBatch->GetVectorCount());
for (int i = 0; i < tmpBatch->GetVectorCount(); ++i) {
for (int j = 0; j < tmpBatch->GetRowCount(); ++j) {
EXPECT_EQ(tmpBatch->GetValueAt<long>(i, j), res->GetValueAt<long>(i, j));
}
}
omnistream::VectorBatch* tmpBatch2 = GetVectorBatch(vecBatchRows);
mapState->addVectorBatch(tmpBatch2);
LOG("vectorbatch id " << mapState->getVectorBatchesSize());
auto res1 = mapState->getVectorBatch(mapState->getVectorBatchesSize() - 1);
LOG("vectorbatch row : " << tmpBatch2->GetRowCount() << ", vector : " << tmpBatch->GetVectorCount());
for (int i = 0; i < tmpBatch2->GetVectorCount(); ++i) {
for (int j = 0; j < tmpBatch2->GetRowCount(); ++j) {
EXPECT_EQ(tmpBatch2->GetValueAt<long>(i, j), res1->GetValueAt<long>(i, j));
}
}
}
#endif