* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_CHANNEL_STATE_WRITE_REQUEST_EXECUTOR_IMPL_H
#define OMNISTREAM_CHANNEL_STATE_WRITE_REQUEST_EXECUTOR_IMPL_H
#include <deque>
#include <map>
#include <queue>
#include <set>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>
#include <functional>
#include <memory>
#include "ChannelStateWriteRequestExecutor.h"
#include "ChannelStateWriteRequestDispatcher.h"
#include "ChannelStateWriteRequest.h"
#include "runtime/jobgraph/JobVertexID.h"
namespace omnistream {
class ChannelStateWriteRequestExecutorImpl : public ChannelStateWriteRequestExecutor {
public:
ChannelStateWriteRequestExecutorImpl(
std::shared_ptr<ChannelStateWriteRequestDispatcher> dispatcher);
~ChannelStateWriteRequestExecutorImpl() override;
void start() override;
void submit(std::shared_ptr<ChannelStateWriteRequest> req) override;
void submitPriority(std::shared_ptr<ChannelStateWriteRequest> req) override;
void registerSubtask(const JobVertexID &jvid, int idx) override;
void releaseSubtask(const JobVertexID &jvid, int idx) override;
void shutdown() override;
private:
std::shared_ptr<ChannelStateWriteRequestDispatcher> dispatcher;
const int maxSubtasks;
std::function<void(ChannelStateWriteRequestExecutor *)> onRegistered;
std::mutex ®isterLock;
std::mutex mutex;
std::condition_variable cv;
std::deque<std::shared_ptr<ChannelStateWriteRequest>> readyQueue;
std::map<SubtaskID, std::queue<std::shared_ptr<ChannelStateWriteRequest>>> unreadyQueues;
std::set<SubtaskID> subtasks;
std::atomic<bool> isRegistering;
std::atomic<bool> stopped{false};
std::atomic<bool> started{false};
std::thread worker;
std::exception_ptr exceptionPtr;
void run();
void loop();
std::shared_ptr<ChannelStateWriteRequest> take();
void enqueue(std::shared_ptr<ChannelStateWriteRequest> req, bool priority);
void registerCallback(std::shared_ptr<ChannelStateWriteRequest> first, SubtaskID sid);
void cleanup();
void completeRegistration();
};
}
#endif