* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <boost/algorithm/string.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <filesystem>
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/config/Config.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/common/memory/MmapAllocator.h"
namespace gluten {
inline static const std::string kVeloxBackendKind{"velox"};
class VeloxBackend {
public:
~VeloxBackend() {
if (dynamic_cast<facebook::velox::cache::AsyncDataCache*>(asyncDataCache_.get())) {
LOG(INFO) << asyncDataCache_->toString();
for (const auto& entry : std::filesystem::directory_iterator(cachePathPrefix_)) {
if (entry.path().filename().string().find(cacheFilePrefix_) != std::string::npos) {
LOG(INFO) << "Removing cache file " << entry.path().filename().string();
std::filesystem::remove(cachePathPrefix_ + "/" + entry.path().filename().string());
}
}
asyncDataCache_->shutdown();
}
}
static void create(const std::unordered_map<std::string, std::string>& conf);
static VeloxBackend* get();
facebook::velox::cache::AsyncDataCache* getAsyncDataCache() const;
std::shared_ptr<facebook::velox::config::ConfigBase> getBackendConf() const {
return backendConf_;
}
void tearDown() {
ioExecutor_.reset();
}
private:
explicit VeloxBackend(const std::unordered_map<std::string, std::string>& conf) {
init(conf);
}
void init(const std::unordered_map<std::string, std::string>& conf);
void initCache();
void initConnector();
void initUdf();
void initJolFilesystem();
std::string getCacheFilePrefix() {
return "cache." + boost::lexical_cast<std::string>(boost::uuids::random_generator()()) + ".";
}
static std::unique_ptr<VeloxBackend> instance_;
std::shared_ptr<facebook::velox::cache::AsyncDataCache> asyncDataCache_;
std::unique_ptr<folly::IOThreadPoolExecutor> ssdCacheExecutor_;
std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
std::shared_ptr<facebook::velox::memory::MmapAllocator> cacheAllocator_;
std::string cachePathPrefix_;
std::string cacheFilePrefix_;
std::shared_ptr<facebook::velox::config::ConfigBase> backendConf_;
};
}