* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "utils/qpl/QplJobPool.h"
#include "utils/Macros.h"
#include <arrow/util/logging.h>
#include <iostream>
namespace gluten {
namespace qpl {
std::array<qpl_job*, QplJobHWPool::MAX_JOB_NUMBER> QplJobHWPool::jobPool;
std::array<std::atomic_bool, QplJobHWPool::MAX_JOB_NUMBER> QplJobHWPool::jobLocks;
bool QplJobHWPool::jobPoolReady = false;
std::unique_ptr<uint8_t[]> QplJobHWPool::hwJobsBuffer;
QplJobHWPool& QplJobHWPool::GetInstance() {
static QplJobHWPool pool;
return pool;
}
QplJobHWPool::QplJobHWPool() : randomEngine(std::random_device()()), distribution(0, MAX_JOB_NUMBER - 1) {
uint64_t initTime = 0;
TIME_NANO(initTime, InitJobPool());
DLOG(INFO) << "Init job pool took " << 1.0 * initTime / 1e6 << "ms";
}
QplJobHWPool::~QplJobHWPool() {
for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) {
if (jobPool[i]) {
while (!tryLockJob(i))
;
qpl_fini_job(jobPool[i]);
unLockJob(i);
jobPool[i] = nullptr;
}
}
jobPoolReady = false;
}
void QplJobHWPool::InitJobPool() {
uint32_t jobSize = 0;
const char* qpl_version = qpl_get_library_version();
qpl_get_job_size(qpl_path_hardware, &jobSize);
hwJobsBuffer = std::make_unique<uint8_t[]>(jobSize * MAX_JOB_NUMBER);
for (uint32_t index = 0; index < MAX_JOB_NUMBER; ++index) {
qpl_job* qplJobPtr = reinterpret_cast<qpl_job*>(hwJobsBuffer.get() + index * jobSize);
if (auto status = qpl_init_job(qpl_path_hardware, qplJobPtr); status != QPL_STS_OK) {
jobPoolReady = false;
ARROW_LOG(WARNING)
<< "Initialization of hardware-assisted DeflateQpl codec failed at index: " << index
<< ", falling back to SW codec. (Details: QplJobHWPool->qpl_init_job with error code: " << status
<< " - please refer to qpl_status in ./contrib/qpl/include/qpl/c_api/status.h). "
<< "Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up. QPL Version: "
<< qpl_version;
return;
}
jobPool[index] = qplJobPtr;
jobLocks[index].store(false);
}
ARROW_LOG(WARNING) << "Initialization of hardware-assisted DeflateQpl codec succeeded.";
jobPoolReady = true;
}
qpl_job* QplJobHWPool::AcquireJob(uint32_t& jobId) {
if (!IsJobPoolReady()) {
return nullptr;
}
uint32_t retry = 0;
auto index = distribution(randomEngine);
while (!tryLockJob(index)) {
index = distribution(randomEngine);
retry++;
if (retry > MAX_JOB_NUMBER) {
return nullptr;
}
}
jobId = MAX_JOB_NUMBER - index;
DLOG(INFO) << "Acquired job index " << index << " after " << retry << " retries.";
return jobPool[index];
}
void QplJobHWPool::ReleaseJob(uint32_t jobId) {
if (IsJobPoolReady()) {
auto index = MAX_JOB_NUMBER - jobId;
unLockJob(index);
}
}
bool QplJobHWPool::tryLockJob(uint32_t index) {
CheckJobIndex(index);
bool expected = false;
return jobLocks[index].compare_exchange_strong(expected, true);
}
void QplJobHWPool::unLockJob(uint32_t index) {
CheckJobIndex(index);
jobLocks[index].store(false);
}
}
}