* Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Stream data page pool
*/
#ifndef DATASYSTEM_WORKER_STREAM_CACHE_STREAM_DATA_POOL_H
#define DATASYSTEM_WORKER_STREAM_CACHE_STREAM_DATA_POOL_H
#include <memory>
#include <tbb/concurrent_hash_map.h>
#include "datasystem/common/rpc/rpc_server_stream_base.h"
#include "datasystem/common/util/bitmask_enum.h"
#include "datasystem/common/stream_cache/stream_data_page.h"
#include "datasystem/common/stream_cache/stream_fields.h"
#include "datasystem/worker/stream_cache/page_queue/shared_page_queue.h"
namespace datasystem {
namespace worker {
namespace stream_cache {
class ClientWorkerSCServiceImpl;
class StreamManager;
class StreamDataPool {
public:
struct ScanInfo {
uint64_t cursor_;
std::vector<std::string> dest_;
std::unique_ptr<std::future<Status>> future_;
std::unique_ptr<std::shared_timed_mutex> mux_;
std::chrono::high_resolution_clock::time_point start_;
ScanInfo(uint64_t cursor, std::vector<std::string> dest, std::unique_ptr<std::future<Status>> future);
virtual Status GetPageQueue(std::shared_ptr<PageQueueBase> &pageQueue) = 0;
};
struct StreamScanInfo : ScanInfo {
std::shared_ptr<StreamManager> mgr_;
StreamScanInfo(std::shared_ptr<StreamManager> mgr, uint64_t cursor, std::vector<std::string> dest,
std::unique_ptr<std::future<Status>> future);
virtual Status GetPageQueue(std::shared_ptr<PageQueueBase> &pageQueue) override;
};
struct SharedPageScanInfo : ScanInfo {
std::weak_ptr<SharedPageQueue> sharedPageQueue_;
SharedPageScanInfo(std::shared_ptr<SharedPageQueue> sharedPageQueue, uint64_t cursor,
std::vector<std::string> dest, std::unique_ptr<std::future<Status>> future);
virtual Status GetPageQueue(std::shared_ptr<PageQueueBase> &pageQueue) override;
};
StreamDataPool();
~StreamDataPool();
* Initialization
* @return
*/
Status Init();
* @brief Add a stream data object to scan list
* @param[in] streamMgr The stream manager of the actual stream, for exclusive page queue purpose.
* @param[in] streamName The stream name to scan.
* @param[in] dest The remote worker destination.
* @param[in] lastAckCursor The last ack cursor.
* @return Status of the call.
*/
Status AddStreamObject(std::shared_ptr<StreamManager> streamMgr, const std::string &streamName,
const std::vector<std::string> &dest, uint64_t lastAckCursor);
* @brief Add a shared page queue to scan list.
* @param[in] sharedPageQueue The shared page queue to scan.
* @param[in] streamName The actual stream name to scan.
* @param[in] dest The remote worker destination.
* @param[in] lastAckCursor The last ack cursor.
* @return Status of the call.
*/
Status AddSharedPageObject(std::shared_ptr<SharedPageQueue> sharedPageQueue, const std::string &streamName,
const std::vector<std::string> &dest, uint64_t lastAckCursor);
* @brief Remove a stream object from scan list
* @param mgr
* @return
*/
Status RemoveStreamObject(const std::string &streamName, const std::vector<std::string> &dest);
* @brief Reset the scan position
* @param streamName
* @return
*/
Status ResetStreamScanPosition(const std::string &streamName);
private:
std::atomic<bool> interrupt_;
const int numPartitions_;
std::unique_ptr<ThreadPool> threadPool_;
Thread scanner_;
mutable std::shared_timed_mutex queueIdMux_;
std::unordered_map<std::string, std::unordered_set<std::string>> queueIdMap_;
struct ObjectPartition {
uint64_t myId_;
std::atomic<bool> interrupt_;
mutable std::shared_timed_mutex objMux_;
std::unordered_map<std::string, std::shared_ptr<ScanInfo>> objMap_;
explicit ObjectPartition(uint64_t i) : myId_(i), interrupt_(false)
{
}
~ObjectPartition() = default;
template <typename T, typename S>
Status AddScanObject(const std::shared_ptr<T> &streamObj, const std::string &keyName,
const std::vector<std::string> &dest, uint64_t lastAckCursor,
std::unique_ptr<ThreadPool> &pool);
Status RemoveScanObject(const std::string &streamName, const std::vector<std::string> &dest);
Status ScanChangesAndEval(std::unordered_map<std::string, std::shared_ptr<ScanInfo>>::iterator &iter);
Status ResetStreamScanPosition(const std::string &streamName);
Status SendElementsToRemote(const std::string &streamName);
void ScanChanges(std::unique_ptr<ThreadPool> &pool);
};
std::vector<std::unique_ptr<ObjectPartition>> partitionList_;
void ScanChanges();
void Stop();
uint64_t GetPartId(const std::string &streamName) const;
};
}
}
}
#endif