* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: The interface of worker liveiness check.
*/
#ifndef DATASYSTEM_WORKER_WORKER_LIVENESS_CHECK_H
#define DATASYSTEM_WORKER_WORKER_LIVENESS_CHECK_H
#include <cstdint>
#include <functional>
#include <memory>
#include <thread>
#include <atomic>
#include "datasystem/common/ak_sk/ak_sk_manager.h"
#include "datasystem/common/util/thread.h"
#include "datasystem/common/util/timer.h"
#include "datasystem/protos/master_object.stub.rpc.pb.h"
#include "datasystem/protos/object_posix.stub.rpc.pb.h"
#include "datasystem/protos/share_memory.stub.rpc.pb.h"
#include "datasystem/protos/worker_object.stub.rpc.pb.h"
#include "datasystem/utils/status.h"
namespace datasystem {
namespace worker {
class WorkerOCServer;
class WorkerLivenessCheck {
public:
WorkerLivenessCheck(WorkerOCServer *workerOcServer, std::string probeFileName, uint32_t probeTimeoutMs,
HostPort hostPort, std::string workerUuid, std::shared_ptr<AkSkManager> akSkManager);
~WorkerLivenessCheck();
WorkerLivenessCheck(const WorkerLivenessCheck &) = delete;
WorkerLivenessCheck &operator=(const WorkerLivenessCheck &) = delete;
* @brief Init WorkerLivenessCheck instance.
* @return Status of this call.
*/
Status Init();
* @brief Stop liveness check.
*/
void Stop();
protected:
* @brief Set liveness probe.
* @param[in] success Mark as success liveness probe or not.
* @return Status of this call
*/
Status SetLivenessProbe(bool success);
* @brief Reset liveness probe, if liveness probe exist, delete it
* @return Status of this call
*/
Status ResetLivenessProbe();
* @brief Check liveness for all services.
* @return Status of this call.
*/
Status CheckWorkerServices();
* @brief Check liveness for rocksdb service.
* @return Status of this call.
*/
Status CheckRocksDbService();
* @brief Check current node is master or not.
* @return true Current node is master.
*/
bool IsMasterNode();
* @brief Get liveness status.
* @param[in/out] timer check timer.
* @param[in] lastStatus last check status.
* return Status of the call.
*/
Status CheckLivenessProbeFile(Timer &timer, const Status &lastStatus);
private:
void Run();
Status DoLivenessCheck();
std::unique_ptr<Thread> checkThread_;
std::atomic<bool> exitFlag_{ false };
WorkerOCServer *workerOcServer_;
std::string probeFileName_;
uint32_t probeTimeoutMs_;
struct Policy {
std::string name;
std::function<Status()> func;
};
std::vector<Policy> policies_;
HostPort hostPort_;
std::time_t lastSuccessTimeUs_ = 0;
const uint32_t TIME_UNIT_CONVERSION = 1000;
std::string livenessKey_;
std::string workerUuid_;
std::shared_ptr<AkSkManager> akSkManager_;
std::vector<std::string> servicesNames_;
std::shared_mutex lock_;
};
}
}
#endif