#include <gtest/gtest.h>
#include <string>
#include "runtime/taskexecutor/TaskManagerServices.h"
#include "nlohmann/json.hpp"
#include "runtime/taskexecutor/OmniTaskExecutor.h"
#include "runtime/taskmanager/OmniTask.h"
#include "streaming/runtime/tasks/omni/OmniStreamTask.h"
#include "test/functionaltest/e2e/FrameworkConfig.h"
class TaskManagerTest : public FrameworkTestConfig {
public:
void SetUp() override {
FrameworkTestConfig::SetUp();
std::ifstream taskMCSConfFile("./input/taskMCSConf.json");
taskMCSConfJson_ = nlohmann::json::parse(taskMCSConfFile);
std::ifstream jobFile("./input/job.json");
nlohmann::json jobJson = nlohmann::json::parse(jobFile);
std::ifstream sourceTaskFile("./input/sourceTask.json");
nlohmann::json sourceTaskJson = nlohmann::json::parse(sourceTaskFile);
std::ifstream sourceTddFile("./input/sourceTdd.json");
nlohmann::json sourceTddJson = nlohmann::json::parse(sourceTddFile);
std::ifstream sinkTaskFile("./input/sinkTask.json");
nlohmann::json sinkTaskJson = nlohmann::json::parse(sinkTaskFile);
std::ifstream sinkTddFile("./input/sinkTdd.json");
nlohmann::json sinkTddJson = nlohmann::json::parse(sinkTddFile);
jobInfo_ = jobJson;
srcTaskInfo_ = sourceTaskJson;
srcTddInfo_ = sourceTddJson;
sinkTaskInfo_ = sinkTaskJson;
sinkTddInfo_ = sinkTddJson;
clearOutputFile();
}
};
TEST_F(TaskManagerTest, DISABLED_TaskManagerAndStreamTaskTest) {
taskManagerServices_ = std::shared_ptr<omnistream::TaskManagerServices>(omnistream::TaskManagerServices::fromConfiguration(taskMCSConfJson_));
taskExecutor_ = std::make_shared<omnistream::OmniTaskExecutor>(taskManagerServices_);
sourceTask_ = std::shared_ptr<omnistream::OmniTask>(taskExecutor_->submitTask(jobInfo_, srcTaskInfo_, srcTddInfo_,nullptr,nullptr,nullptr,nullptr));
sinkTask_ = std::shared_ptr<omnistream::OmniTask>(taskExecutor_->submitTask(jobInfo_, sinkTaskInfo_, sinkTddInfo_,nullptr,nullptr,nullptr,nullptr));
std::thread sourceThread([&]() {
sourceStreamTask_ = std::shared_ptr<omnistream::OmniStreamTask>(reinterpret_cast<omnistream::OmniStreamTask*>(sourceTask_->setupStreamTask(sourceTask_->SOURCE_STREAM_TASK)));
sourceTask_->doRun(reinterpret_cast<long>(sourceStreamTask_.get()));
});
std::thread sinkThread([&]() {
sinkStreamTask_ = std::shared_ptr<omnistream::OmniStreamTask>(reinterpret_cast<omnistream::OmniStreamTask*>(sinkTask_->setupStreamTask(sinkTask_->ONEINTPUT_STREAM_TASK)));
sinkTask_->doRun(reinterpret_cast<long>(sinkStreamTask_.get()));
});
sourceThread.join();
sinkThread.join();
std::string output = getOutputFile();
std::string expectedOutput = "+I,1\n+I,4\n+I,7\n+I,10\n";
EXPECT_EQ(output, expectedOutput);
}