#include <gtest/gtest.h>
#include <string>
#include <memory>
#include <vector>
#include "runtime/checkpoint/TaskStateSnapshotDeserializer.h"
#include "runtime/checkpoint/TaskStateSnapshotSerializer.h"
#include "runtime/state/KeyGroupsSavepointStateHandle.h"
#include "runtime/state/memory/ByteStreamStateHandle.h"
TEST(SerializerTestSuite, PreservesKeyGroupsSavepointStateHandleType) {
KeyGroupRange keyGroupRange(0, 0);
KeyGroupRangeOffsets offsets(keyGroupRange, std::vector<int64_t>{0});
auto streamHandle = std::make_shared<ByteStreamStateHandle>(
"savepoint-handle",
std::vector<uint8_t>{});
auto savepointHandle = std::make_shared<KeyGroupsSavepointStateHandle>(offsets, streamHandle);
nlohmann::json handleJson = TaskStateSnapshotSerializer::parseKeyGroupsStateHandle(savepointHandle);
EXPECT_EQ(
handleJson["@class"].get<std::string>(),
"org.apache.flink.runtime.state.KeyGroupsSavepointStateHandle");
EXPECT_EQ(handleJson["stateHandleName"].get<std::string>(), "KeyGroupsSavepointStateHandle");
}
TEST(DeserializerTestSuite, DeserializesLocalStateSnapshotFromFile) {
const std::string json_content = R"({
"@class": "org.apache.flink.runtime.checkpoint.TaskStateSnapshot",
"subtaskStatesByOperatorID": {
"@class": "java.util.HashMap",
"ccb29b5204e83e8a588b3828afaa7015": {
"@class": "org.apache.flink.runtime.checkpoint.OperatorSubtaskState",
"managedOperatorState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"rawOperatorState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"managedKeyedState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"rawKeyedState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"inputChannelState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"resultSubpartitionState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"inputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"outputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"stateSize": 0, "checkpointedSize": 0, "finished": false
},
"4bf7c1955ffe56e2106d666433eaf137": {
"@class": "org.apache.flink.runtime.checkpoint.OperatorSubtaskState",
"managedOperatorState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"rawOperatorState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"managedKeyedState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", [{
"@class": "org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle",
"directoryStateHandle": {
"@class": "org.apache.flink.runtime.state.DirectoryStateHandle",
"directoryString": "/home/hudsonsheng/RocksDB/tm_localhost:35785-4d040b/localState/aid_23d801926c6066daab3c7eabda5e84e6/jid_095942d305df3523779193e8cf79060a/vtx_4bf7c1955ffe56e2106d666433eaf137_sti_0/chk_11/8d43f9058fcb491d95df87e4597464e4",
"directory": ["java.nio.file.Path", "file:///home/hudsonsheng/RocksDB/tm_localhost:35785-4d040b/localState/aid_23d801926c6066daab3c7eabda5e84e6/jid_095942d305df3523779193e8cf79060a/vtx_4bf7c1955ffe56e2106d666433eaf137_sti_0/chk_11/8d43f9058fcb491d95df87e4597464e4/"],
"stateSize": 0
},
"keyGroupRange": {"@class": "org.apache.flink.runtime.state.KeyGroupRange", "startKeyGroup": 0, "endKeyGroup": 127, "numberOfKeyGroups": 128},
"stateHandleId": {"@class": "org.apache.flink.runtime.state.StateHandleID", "keyString": "b1e89516-e1b3-459d-8073-b986e63cfb6a"},
"checkpointId": 11,
"backendIdentifier": "8d43f905-8fcb-491d-95df-87e4597464e4",
"metaDataState": {"@class": "org.apache.flink.runtime.state.filesystem.FileStateHandle", "stateSize": 81584, "streamStateHandleID": {"@class": "org.apache.flink.runtime.state.PhysicalStateHandleID", "keyString": "file:/home/hudsonsheng/RocksDB/tm_localhost:35785-4d040b/localState/aid_23d801926c6066daab3c7eabda5e84e6/jid_095942d305df3523779193e8cf79060a/vtx_4bf7c1955ffe56e2106d666433eaf137_sti_0/chk_11/e5e08ea2-de9c-4a46-a338-73e661d67186"}},
"sharedState": ["java.util.ArrayList", []], "sharedStateHandles": ["java.util.ArrayList", []],
"stateSize": 81584, "checkpointedSize": 81584
}]],
"rawKeyedState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"inputChannelState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"resultSubpartitionState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"inputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"outputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"stateSize": 81584, "checkpointedSize": 81584, "finished": false
}
},
"isTaskDeployedAsFinished": false, "isTaskFinished": false,
"inputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"outputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"taskDeployedAsFinished": false, "taskFinished": false,
"stateSize": 81584, "checkpointedSize": 81584
})";
auto snapshot = TaskStateSnapshotDeserializer::Deserialize(json_content);
EXPECT_FALSE(snapshot->GetIsTaskFinished());
EXPECT_EQ(snapshot->GetStateSize(), 81584);
const auto& subtask_states = snapshot->GetSubtaskStateMappings();
ASSERT_EQ(subtask_states.size(), 2);
OperatorID opIdWithState = TaskStateSnapshotDeserializer::HexStringToOperatorId<OperatorID>("4bf7c1955ffe56e2106d666433eaf137");
bool found = false;
std::shared_ptr<OperatorSubtaskState> state;
for (const auto& pair : subtask_states) {
if (pair.first == opIdWithState) {
found = true;
state = pair.second;
}
}
ASSERT_TRUE(found);
const auto& keyedStateHandles = state->GetManagedKeyedState();
ASSERT_EQ(keyedStateHandles.Size(), 1);
auto localHandle = std::dynamic_pointer_cast<IncrementalLocalKeyedStateHandle>(keyedStateHandles.ToArray().at(0));
ASSERT_NE(localHandle, nullptr);
const auto& dirHandle = localHandle->getDirectoryStateHandle();
ASSERT_NE(dirHandle, nullptr);
const std::string expected_dir = "/home/hudsonsheng/RocksDB/tm_localhost:35785-4d040b/localState/aid_23d801926c6066daab3c7eabda5e84e6/jid_095942d305df3523779193e8cf79060a/vtx_4bf7c1955ffe56e2106d666433eaf137_sti_0/chk_11/8d43f9058fcb491d95df87e4597464e4";
EXPECT_EQ(dirHandle->getDirectory().toString(), expected_dir);
std::string to_string_output = snapshot->ToString();
std::cout << "Global: " << to_string_output << std::endl;
nlohmann::json parsed_json;
ASSERT_NO_THROW({
parsed_json = nlohmann::json::parse(to_string_output);
}) << "The output of ToString() is not valid JSON. Output was: " << to_string_output;
EXPECT_EQ(parsed_json["stateHandleName"], "TaskStateSnapshot");
std::string opIdHex = "4bf7c1955ffe56e2106d666433eaf137";
ASSERT_TRUE(parsed_json["subtaskStatesByOperatorID"].contains(opIdHex));
}
TEST(DeserializerTestSuite, DeserializesRemoteStateSnapshotFromFile) {
const std::string json_content = R"({
"@class": "org.apache.flink.runtime.checkpoint.TaskStateSnapshot",
"subtaskStatesByOperatorID": {
"@class": "java.util.HashMap",
"ccb29b5204e83e8a588b3828afaa7015": {
"@class": "org.apache.flink.runtime.checkpoint.OperatorSubtaskState",
"managedOperatorState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []], "rawOperatorState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"managedKeyedState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []], "rawKeyedState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"inputChannelState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []], "resultSubpartitionState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"inputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"outputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"stateSize": 0, "checkpointedSize": 0, "finished": false
},
"4bf7c1955ffe56e2106d666433eaf137": {
"@class": "org.apache.flink.runtime.checkpoint.OperatorSubtaskState",
"managedOperatorState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []], "rawOperatorState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"managedKeyedState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", [{
"@class": "org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle",
"backendIdentifier": "8d43f905-8fcb-491d-95df-87e4597464e4",
"keyGroupRange": {"@class": "org.apache.flink.runtime.state.KeyGroupRange", "startKeyGroup": 0, "endKeyGroup": 127, "numberOfKeyGroups": 128},
"checkpointId": 1,
"sharedState": ["java.util.Collections$EmptyList", []],
"privateState": ["java.util.ArrayList", [
{
"handle": {"@class": "org.apache.flink.runtime.state.memory.ByteStreamStateHandle", "data": "Vvm4+BwAAQEabGV2ZWxkYi5CeXRld2lzZUNvbXBhcmF0b3KYr1imAgABAgBRgcd4BgABCQADBQQARtLoITEAAQEabGV2ZWxkYi5CeXRld2lzZUNvbXBhcmF0b3ICBQMIBADIAQHJAQlMZWZ0Q2FjaGU6cy3AMgABARpsZXZlbGRiLkJ5dGV3aXNlQ29tcGFyYXRvcgIFAwoEAMgBAskBClJpZ2h0Q2FjaGXvtbdrNwABARpsZXZlbGRiLkJ5dGV3aXNlQ29tcGFyYXRvcgIFAwwEAMgBA8kBD0VhcmxpZXN0RWxlbWVudLD4/3k6AAEBGmxldmVsZGIuQnl0ZXdpc2VDb21wYXJhdG9yAgUDDgQAyAEEyQESTGVmdENhY2hlQ2xlYW5UaW1lD5BHZzsAAQEabGV2ZWxkYi5CeXRld2lzZUNvbXBhcmF0b3ICBQMQBADIAQXJARNSaWdodENhY2hlQ2xlYW5UaW1lmNaHPDgAAQEabGV2ZWxkYi5CeXRld2lzZUNvbXBhcmF0b3ICBQMSBADIAQbJARBMZWZ0RHVwbGljYXRlUmNk0eMQ3DkAAQEabGV2ZWxkYi5CeXRld2lzZUNvbXBhcmF0b3ICBQMUBADIAQfJARFSaWdodER1cGxpY2F0ZVJjZBtIAPJDAAEBGmxldmVsZGIuQnl0ZXdpc2VDb21wYXJhdG9yAgUDFgQAyAEIyQEbTGVmdER1cGxpY2F0ZVJjZENsZWFuZXJUaW1lFa5C0EQAAQEabGV2ZWxkYi5CeXRld2lzZUNvbXBhcmF0b3ICBQMYBADIAQnJARxSaWdodER1cGxpY2F0ZVJjZENsZWFuZXJUaW1lYndGHTkAAQEabGV2ZWxkYi5CeXRld2lzZUNvbXBhcmF0b3ICBQMaBADIAQrJARFMZWZ0RGF0YVRvdGFsU2l6ZZqvcUZLAAEBGmxldmVsZGIuQnl0ZXdpc2VDb21wYXJhdG9yAgUDHAQAyAELyQEjX3RpbWVyX3N0YXRlL3Byb2Nlc3NpbmdfdXNlci10aW1lcnMatVTLRgABARpsZXZlbGRiLkJ5dGV3aXNlQ29tcGFyYXRvcgIFAx4EAMgBDMkBHl90aW1lcl9zdGF0ZS9ldmVudF91c2VyLXRpbWVyc4u+mDWhAAECBQkAAyEE1lBnACCAWTgAJmNoYW5uZWwgNDQ5LGFwcCA0NDksbnVsbCxyZXF1ZXN0IDQ0OSwAAAABmFLJqY0A+BgAAAAAADh/JmNoYW5uZWwgNjAwLGFwcCA2MDAsbnVsbCxyZXF1ZXN0IDYwMCwAAAABmFLJqY8AlCIAAAAAAKEc4kwFBbTDn8QGBgW+w5/EBgcACAdVbmtub3duAcgBAZILBJuhAAECBQkAAyIE1lBnACHI2gI4ACZjaGFubmVsIDI3NSxhcHAgMjc1LG51bGwscmVxdWVzdCAyNzUsAAAAAZhSyamKAZwHAAAAAAA4fyZjaGFubmVsIDg2NyxhcHAgODY3LG51bGwscmVxdWVzdCA4NjcsAAAAAZhSyamSAc8SAAAAAAAE5EwFBbTDn8QGBgW+w5/EBgcACAdVbmtub3duAcgBAg5AFPqRAAECBQkAAyME1lBnACKDzAIwACZjaGFubmVsIDI3NSxhcHAgMjc1LG51bGwscmVxdWVzdCAyNzUsAAGbBwAAAAAAMH8mY2hhbm5lbCA4NjcsYXBwIDg2NyxudWxsLHJlcXVlc3QgODY3LAABzhIAAAAAAAPjTAUFtMOfxAYGBb7Dn8QGBwAIB1Vua25vd24ByAEDLswS9JAAAQIFCQADJATWUGcAI81WMAAmY2hhbm5lbCA0NDksYXBwIDQ0OSxudWxsLHJlcXVlc3QgNDQ5LAAB0QQAAAAAADB/JmNoYW5uZWwgNjAwLGFwcCA2MDAsbnVsbCxyZXF1ZXN0IDYwMCwAAWsHAAAAAAAJiSMFBbTDn8QGBgW+w5/EBgcACAdVbmtub3duAcgBBAQuaxeRAAECBQkAAyUE1lBnACSDsgEwACZjaGFubmVsIDI3NSxhcHAgMjc1LG51bGwscmVxdWVzdCAyNzUsAAGdBwAAAAAAMH8mY2hhbm5lbCA4NjcsYXBwIDg2NyxudWxsLHJlcXVlc3QgODY3LAAB0BIAAAAAAAXlTAUFtMOfxAYGBb7Dn8QGBwAIB1Vua25vd24ByAEFaGa/MucAAQIFCQADJgTWUGcAJdOXAlsAJmNoYW5uZWwgMjc1LGFwcCAyNzUsbnVsbCxyZXF1ZXN0IDI3NSwAK2NoYW5uZWwgMjc1LGFwcCAyNzUsbnVsbCxyZXF1ZXN0IDI3NSw4NTI3NQGeGwAAAAAAW38mY2hhbm5lbCA4NjcsYXBwIDg2NyxudWxsLHJlcXVlc3QgODY3LAArY2hhbm5lbCA4NjcsYXBwIDg2NyxudWxsLHJlcXVlc3QgODY3LDg1ODY3ARwiAAAAAAAG8ksFBbTDn8QGBgW+w5/EBgcACAdVbmtub3duAcgBBgIFEFnnAAECBQkAAycE1lBnACaAmAJbACZjaGFubmVsIDI3NSxhcHAgMjc1LG51bGwscmVxdWVzdCAyNzUsACtjaGFubmVsIDI3NSxhcHAgMjc1LG51bGwscmVxdWVzdCAyNzUsODQyNzUBmQcAAAAAAFt/JmNoYW5uZWwgODY3LGFwcCA4NjcsbnVsbCxyZXF1ZXN0IDg2NywAK2NoYW5uZWwgODY3LGFwcCA4NjcsbnVsbCxyZXF1ZXN0IDg2Nyw4NDg2NwHMEgAAAAAAAeBMBQW0w5/EBgYFvsOfxAYHAAgHVW5rbm93bgHIAQdOUy0TkQABAgUJAAMoBNZQZwAnxbEBMAAmY2hhbm5lbCAyNzUsYXBwIDI3NSxudWxsLHJlcXVlc3QgMjc1LAABnxsAAAAAADB/JmNoYW5uZWwgODY3LGFwcCA4NjcsbnVsbCxyZXF1ZXN0IDg2NywAAR0iAAAAAAAH80sFBbTDn8QGBgW+w5/EBgcACAdVbmtub3duAcgBCBtpQlaRAAECBQkAAykE1lBnACj4sQEwACZjaGFubmVsIDI3NSxhcHAgMjc1LG51bGwscmVxdWVzdCAyNzUsAAGaBwAAAAAAMH8mY2hhbm5lbCA4NjcsYXBwIDg2NyxudWxsLHJlcXVlc3QgODY3LAABzRIAAAAAAALhTAUFtMOfxAYGBb7Dn8QGBwAIB1Vua25vd24ByAEJRei9HqIAAQIFCQADKgTWUGcAKZ2sAjgAgAABmFLb+QomY2hhbm5lbCAyNzUsYXBwIDI3NSxudWxsLHJlcXVlc3QgMjc1LAABeAoAAAAAADh/gAABmFLb+SEmY2hhbm5lbCA4NjcsYXBwIDg2NyxudWxsLHJlcXVlc3QgODY3LAABPicAAAAAAMgR1lAFBbTDn8QGBgW+w5/EBgcACAdVbmtub3duAcgBDA==", "handleName": "file:/home/hudsonsheng/flink/checkpoints/095942d305df3523779193e8cf79060a/chk-1/632e33a6-4441-403a-98ea-e9ceb4842279", "stateSize": 2602},
"localPath": "MANIFEST-000004", "stateSize": 2602
},
{
"handle": {"@class": "org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle", "stateSize": 22776, "relativePath": "5820ceb4-5f2e-489e-8185-749713ef79ce", "streamStateHandleID": {"@class": "org.apache.flink.runtime.state.PhysicalStateHandleID", "keyString": "file:/home/hudsonsheng/flink/checkpoints/095942d305df3523779193e8cf79060a/chk-1/5820ceb4-5f2e-489e-8185-749713ef79ce"}},
"localPath": "000040.sst", "stateSize": 22776
}
]],
"metaStateHandle": {"@class": "org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle", "stateSize": 81584, "relativePath": "696275ab-57d4-482f-ae52-97bfd07d891e", "streamStateHandleID": {"@class": "org.apache.flink.runtime.state.PhysicalStateHandleID", "keyString": "file:/home/hudsonsheng/flink/checkpoints/095942d305df3523779193e8cf79060a/chk-1/696275ab-57d4-482f-ae52-97bfd07d891e"}},
"stateHandleId": {"@class": "org.apache.flink.runtime.state.StateHandleID", "keyString": "13bbca3d-116b-4d9f-aeff-ad55c9cb29e1"},
"persistedSizeOfThisCheckpoint": 421936, "sharedStateHandles": ["java.util.Collections$EmptyList", []],
"stateSize": 421936, "checkpointedSize": 421936
}]],
"rawKeyedState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []], "inputChannelState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"resultSubpartitionState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"inputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"outputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"stateSize": 421936, "checkpointedSize": 421936, "finished": false
}
},
"isTaskDeployedAsFinished": false, "isTaskFinished": false,
"inputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"outputRescalingDescriptor": {"@class": "org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor", "gateOrPartitionDescriptors": ["[Lorg.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$InflightDataGateOrPartitionRescalingDescriptor;", []]},
"taskDeployedAsFinished": false, "taskFinished": false,
"stateSize": 421936, "checkpointedSize": 421936
})";
auto snapshot = TaskStateSnapshotDeserializer::Deserialize(json_content);
std::cout << "Local: " << snapshot->ToString() << std::endl;
ASSERT_NE(snapshot, nullptr);
EXPECT_FALSE(snapshot->GetIsTaskFinished());
const auto& subtask_states = snapshot->GetSubtaskStateMappings();
ASSERT_EQ(subtask_states.size(), 2);
OperatorID opIdWithState = TaskStateSnapshotDeserializer::HexStringToOperatorId<OperatorID>("4bf7c1955ffe56e2106d666433eaf137");
bool found = false;
std::shared_ptr<OperatorSubtaskState> state;
for (const auto& pair : subtask_states) {
if (pair.first == opIdWithState) {
found = true;
state = pair.second;
break;
}
}
ASSERT_TRUE(found);
ASSERT_NE(state, nullptr);
const auto& keyedStateHandles = state->GetManagedKeyedState();
ASSERT_EQ(keyedStateHandles.Size(), 1);
auto remoteHandle = std::dynamic_pointer_cast<IncrementalRemoteKeyedStateHandle>(keyedStateHandles.ToArray().at(0));
ASSERT_NE(remoteHandle, nullptr);
EXPECT_EQ(remoteHandle->GetCheckpointId(), 1);
const auto& privateState = remoteHandle->GetPrivateState();
ASSERT_EQ(privateState.size(), 2);
auto byteStreamHandle = std::dynamic_pointer_cast<ByteStreamStateHandle>(privateState.at(0).getHandle());
ASSERT_NE(byteStreamHandle, nullptr);
EXPECT_EQ(byteStreamHandle->GetHandleName(), "file:/home/hudsonsheng/flink/checkpoints/095942d305df3523779193e8cf79060a/chk-1/632e33a6-4441-403a-98ea-e9ceb4842279");
EXPECT_EQ(privateState.at(0).getLocalPath(), "MANIFEST-000004");
const auto& decoded_data = byteStreamHandle->GetData();
auto relativeFileHandle = std::dynamic_pointer_cast<RelativeFileStateHandle>(privateState.at(1).getHandle());
ASSERT_NE(relativeFileHandle, nullptr);
EXPECT_EQ(relativeFileHandle->GetRelativePath(), "5820ceb4-5f2e-489e-8185-749713ef79ce");
EXPECT_EQ(privateState.at(1).getLocalPath(), "000040.sst");
EXPECT_EQ(relativeFileHandle->GetStateSize(), 22776);
}
TEST(DeserializerTestSuite, DeserializesRemoteStateSnapshotWithPlainJavaHandleArrays) {
const std::string json_content = R"({
"subtaskStatesByOperatorID": {
"4bf7c1955ffe56e2106d666433eaf137": {
"managedKeyedState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", [{
"@class": "IncrementalRemoteKeyedStateHandle",
"backendIdentifier": "8d43f905-8fcb-491d-95df-87e4597464e4",
"keyGroupRange": {"startKeyGroup": 0, "endKeyGroup": 127},
"checkpointId": 1,
"privateState": [{
"handle": {
"@class": "ByteStreamStateHandle",
"handleName": "MANIFEST-000001",
"data": "AQID"
},
"localPath": "MANIFEST-000001"
}],
"metaStateHandle": {
"@class": "ByteStreamStateHandle",
"handleName": "meta",
"data": "AQID"
},
"stateHandleId": {"keyString": "13bbca3d-116b-4d9f-aeff-ad55c9cb29e1"},
"persistedSizeOfThisCheckpoint": 3
}]],
"inputChannelState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []],
"resultSubpartitionState": ["org.apache.flink.runtime.checkpoint.StateObjectCollection", []]
}
},
"isTaskDeployedAsFinished": false,
"isTaskFinished": false
})";
auto snapshot = TaskStateSnapshotDeserializer::Deserialize(json_content);
ASSERT_NE(snapshot, nullptr);
OperatorID opIdWithState =
TaskStateSnapshotDeserializer::HexStringToOperatorId<OperatorID>("4bf7c1955ffe56e2106d666433eaf137");
auto state = snapshot->GetSubtaskStateByOperatorID(opIdWithState);
ASSERT_NE(state, nullptr);
const auto& keyedStateHandles = state->GetManagedKeyedState();
ASSERT_EQ(keyedStateHandles.Size(), 1);
auto remoteHandle = std::dynamic_pointer_cast<IncrementalRemoteKeyedStateHandle>(keyedStateHandles.ToArray().at(0));
ASSERT_NE(remoteHandle, nullptr);
EXPECT_TRUE(remoteHandle->GetSharedState().empty());
ASSERT_EQ(remoteHandle->GetPrivateState().size(), 1);
EXPECT_EQ(remoteHandle->GetPrivateState().at(0).getLocalPath(), "MANIFEST-000001");
}