* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "audio_codec_worker.h"
#include "avcodec_trace.h"
#include "avcodec_errors.h"
#include "avcodec_log.h"
#include "utils.h"
namespace {
constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_AUDIO, "AvCodec-AudioCodecWorker"};
constexpr uint8_t LOGD_FREQUENCY = 5;
}
namespace OHOS {
namespace MediaAVCodec {
constexpr short DEFAULT_TRY_DECODE_TIME = 10;
constexpr short DEFAULT_BUFFER_COUNT = 8;
constexpr int TIMEOUT_MS = 1000;
const std::string_view INPUT_BUFFER = "inputBuffer";
const std::string_view OUTPUT_BUFFER = "outputBuffer";
const std::string_view ASYNC_HANDLE_INPUT = "OS_AuCodecIn";
const std::string_view ASYNC_DECODE_FRAME = "OS_AuCodecOut";
AudioCodecWorker::AudioCodecWorker(const std::shared_ptr<AudioBaseCodec> &codec,
const std::shared_ptr<AVCodecCallback> &callback)
: isFirFrame_(true),
isRunning(true),
codec_(codec),
inputBufferSize(codec_->GetInputBufferSize()),
outputBufferSize(codec_->GetOutputBufferSize()),
bufferCount(DEFAULT_BUFFER_COUNT),
name_(codec->GetCodecType()),
inputTask_(std::make_unique<TaskThread>(ASYNC_HANDLE_INPUT)),
outputTask_(std::make_unique<TaskThread>(ASYNC_DECODE_FRAME)),
callback_(callback),
inputBuffer_(std::make_shared<AudioBuffersManager>(inputBufferSize, INPUT_BUFFER, DEFAULT_BUFFER_COUNT)),
outputBuffer_(std::make_shared<AudioBuffersManager>(outputBufferSize, OUTPUT_BUFFER, DEFAULT_BUFFER_COUNT))
{
inputTask_->RegisterHandler([this] { ProduceInputBuffer(); });
outputTask_->RegisterHandler([this] { ConsumerOutputBuffer(); });
}
AudioCodecWorker::~AudioCodecWorker()
{
AVCODEC_LOGD("release all data of %{public}s codec worker in destructor.", name_.data());
Dispose();
ResetTask();
ReleaseAllInBufferQueue();
ReleaseAllInBufferAvaQueue();
inputBuffer_->ReleaseAll();
outputBuffer_->ReleaseAll();
if (codec_) {
codec_ = nullptr;
}
if (callback_) {
callback_.reset();
callback_ = nullptr;
}
}
bool AudioCodecWorker::PushInputData(const uint32_t &index)
{
AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "%{public}s Worker PushInputData enter,index:%{public}u", name_.data(), index);
if (!isRunning) {
return true;
}
if (!callback_) {
AVCODEC_LOGE("push input buffer failed in worker, callback is nullptr, please check the callback.");
Dispose();
return false;
}
if (!codec_) {
AVCODEC_LOGE("push input buffer failed in worker, codec is nullptr, please check the codec.");
Dispose();
return false;
}
std::lock_guard<std::mutex> lock(stateMutex_);
inBufIndexQue_.push(index);
outputCondition_.notify_all();
return true;
}
bool AudioCodecWorker::Start()
{
AVCODEC_SYNC_TRACE;
AVCODEC_LOGD("Worker Start enter");
if (!callback_) {
AVCODEC_LOGE("Start failed in worker, callback is nullptr, please check the callback.");
return false;
}
if (!codec_) {
AVCODEC_LOGE("Start failed in worker, codec_ is nullptr, please check the codec_.");
return false;
}
bool result = Begin();
return result;
}
bool AudioCodecWorker::Stop()
{
AVCODEC_SYNC_TRACE;
AVCODEC_LOGD("Worker Stop enter");
Dispose();
if (inputTask_) {
inputTask_->StopAsync();
} else {
AVCODEC_LOGE("Stop failed in worker, inputTask_ is nullptr, please check the inputTask_.");
return false;
}
if (outputTask_) {
outputTask_->StopAsync();
} else {
AVCODEC_LOGE("Stop failed in worker, outputTask_ is nullptr, please check the outputTask_.");
return false;
}
ReleaseAllInBufferQueue();
ReleaseAllInBufferAvaQueue();
inputBuffer_->ReleaseAll();
outputBuffer_->ReleaseAll();
return true;
}
bool AudioCodecWorker::Pause()
{
AVCODEC_SYNC_TRACE;
AVCODEC_LOGD("Worker Pause enter");
Dispose();
if (inputTask_) {
inputTask_->PauseAsync();
} else {
AVCODEC_LOGE("Pause failed in worker, inputTask_ is nullptr, please check the inputTask_.");
return false;
}
if (outputTask_) {
outputTask_->PauseAsync();
} else {
AVCODEC_LOGE("Pause failed in worker, outputTask_ is nullptr, please check the outputTask_.");
return false;
}
ReleaseAllInBufferQueue();
ReleaseAllInBufferAvaQueue();
inputBuffer_->ReleaseAll();
outputBuffer_->ReleaseAll();
return true;
}
bool AudioCodecWorker::Resume()
{
AVCODEC_SYNC_TRACE;
AVCODEC_LOGD("Worker Resume enter");
if (!callback_) {
AVCODEC_LOGE("Resume failed in worker, callback_ is nullptr, please check the callback_.");
return false;
}
if (!codec_) {
AVCODEC_LOGE("Resume failed in worker, codec_ is nullptr, please check the codec_.");
return false;
}
bool result = Begin();
return result;
}
bool AudioCodecWorker::Release()
{
AVCODEC_SYNC_TRACE;
AVCODEC_LOGD("Worker Release enter");
Dispose();
ResetTask();
ReleaseAllInBufferQueue();
ReleaseAllInBufferAvaQueue();
inputBuffer_->ReleaseAll();
outputBuffer_->ReleaseAll();
if (codec_) {
codec_ = nullptr;
}
if (callback_) {
callback_.reset();
callback_ = nullptr;
}
AVCODEC_LOGD("Worker Release end");
return true;
}
std::shared_ptr<AudioBuffersManager> AudioCodecWorker::GetInputBuffer() const noexcept
{
AVCODEC_LOGD("Worker GetInputBuffer enter");
return inputBuffer_;
}
std::shared_ptr<AudioBuffersManager> AudioCodecWorker::GetOutputBuffer() const noexcept
{
AVCODEC_LOGD("Worker GetOutputBuffer enter");
return outputBuffer_;
}
std::shared_ptr<AudioBufferInfo> AudioCodecWorker::GetOutputBufferInfo(const uint32_t &index) const noexcept
{
return outputBuffer_->getMemory(index);
}
std::shared_ptr<AudioBufferInfo> AudioCodecWorker::GetInputBufferInfo(const uint32_t &index) const noexcept
{
return inputBuffer_->getMemory(index);
}
void AudioCodecWorker::ProduceInputBuffer()
{
AVCODEC_SYNC_TRACE;
AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker produceInputBuffer enter");
if (!isRunning) {
usleep(DEFAULT_TRY_DECODE_TIME);
return;
}
std::unique_lock lock(inputMutex_);
while (!inBufAvaIndexQue_.empty() && isRunning) {
uint32_t index;
{
std::lock_guard<std::mutex> avaLock(inAvaMutex_);
index = inBufAvaIndexQue_.front();
inBufAvaIndexQue_.pop();
}
auto inputBuffer = GetInputBufferInfo(index);
if (!inputBuffer) {
AVCODEC_LOGE("Failed to get input buffer at index %{public}u", index);
continue;
}
inputBuffer->SetBufferOwned();
callback_->OnInputBufferAvailable(index, inputBuffer->GetBuffer());
}
inputCondition_.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS),
[this] { return (!inBufAvaIndexQue_.empty() || !isRunning); });
AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker produceInputBuffer exit");
}
bool AudioCodecWorker::HandInputBuffer(int32_t &ret)
{
uint32_t inputIndex;
{
std::lock_guard<std::mutex> lock(stateMutex_);
inputIndex = inBufIndexQue_.front();
inBufIndexQue_.pop();
}
AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "handle input buffer. index:%{public}u", inputIndex);
auto inputBuffer = GetInputBufferInfo(inputIndex);
if (inputBuffer == nullptr) {
AVCODEC_LOGE("inputBuffer is nullptr");
return false;
}
bool isEos = inputBuffer->CheckIsEos();
ret = codec_->ProcessSendData(inputBuffer);
inputBuffer_->ReleaseBuffer(inputIndex);
{
std::lock_guard<std::mutex> lock(inAvaMutex_);
inBufAvaIndexQue_.push(inputIndex);
inputCondition_.notify_all();
}
if (ret == AVCodecServiceErrCode::AVCS_ERR_INVALID_DATA) {
callback_->OnError(AVCodecErrorType::AVCODEC_ERROR_INTERNAL, ret);
}
return isEos;
}
void AudioCodecWorker::ReleaseOutputBuffer(const uint32_t &index, const int32_t &ret)
{
outputBuffer_->ReleaseBuffer(index);
callback_->OnError(AVCodecErrorType::AVCODEC_ERROR_INTERNAL, ret);
}
void AudioCodecWorker::SetFirstAndEosStatus(std::shared_ptr<AudioBufferInfo> &outBuffer, bool isEos, uint32_t index)
{
if (isEos && outBuffer != nullptr) {
AVCODEC_LOGD("set buffer EOS. index:%{public}u", index);
outBuffer->SetEos(isEos);
}
if (isFirFrame_ && outBuffer != nullptr) {
outBuffer->SetFirstFrame();
isFirFrame_ = false;
}
}
void AudioCodecWorker::ConsumerOutputBuffer()
{
AVCODEC_SYNC_TRACE;
AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker consumerOutputBuffer enter");
if (!isRunning) {
usleep(DEFAULT_TRY_DECODE_TIME);
return;
}
std::unique_lock lock(outputMutex_);
while (!inBufIndexQue_.empty() && isRunning) {
int32_t ret = AVCodecServiceErrCode::AVCS_ERR_INVALID_DATA;
bool isEos = HandInputBuffer(ret);
if (ret == AVCodecServiceErrCode::AVCS_ERR_NOT_ENOUGH_DATA) {
AVCODEC_LOGW("current input buffer is not enough,skip this frame");
continue;
}
if (ret != AVCodecServiceErrCode::AVCS_ERR_OK && ret != AVCodecServiceErrCode::AVCS_ERR_END_OF_STREAM) {
AVCODEC_LOGE("input error!");
return;
}
uint32_t index;
if (outputBuffer_->RequestAvailableIndex(index)) {
auto outBuffer = GetOutputBufferInfo(index);
if (!outBuffer) {
AVCODEC_LOGE("outBuffer is null!");
continue;
}
SetFirstAndEosStatus(outBuffer, isEos, index);
ret = codec_->ProcessRecieveData(outBuffer);
if (ret == AVCodecServiceErrCode::AVCS_ERR_NOT_ENOUGH_DATA) {
AVCODEC_LOGD("current ouput buffer is not enough,skip this frame. index:%{public}u", index);
outputBuffer_->ReleaseBuffer(index);
continue;
}
if (ret != AVCodecServiceErrCode::AVCS_ERR_OK && ret != AVCodecServiceErrCode::AVCS_ERR_END_OF_STREAM) {
AVCODEC_LOGE("process output buffer error! index:%{public}u", index);
ReleaseOutputBuffer(index, ret);
return;
}
AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Work %{public}s consumerOutputBuffer callback_ index:%{public}u",
name_.data(), index);
lock.unlock();
callback_->OnOutputBufferAvailable(index, outBuffer->GetBufferAttr(), outBuffer->GetFlag(),
outBuffer->GetBuffer());
lock.lock();
}
}
outputCondition_.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS),
[this] { return (inBufIndexQue_.size() > 0 || !isRunning); });
AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Work consumerOutputBuffer exit");
}
void AudioCodecWorker::Dispose()
{
AVCODEC_LOGD("Worker dispose enter");
isRunning = false;
outputBuffer_->DisableRunning();
{
std::unique_lock lock(inputMutex_);
inputCondition_.notify_all();
}
{
std::unique_lock lock(outputMutex_);
outputCondition_.notify_all();
}
}
bool AudioCodecWorker::Begin()
{
AVCODEC_LOGD("Worker begin enter");
for (uint32_t i = 0; i < static_cast<uint32_t>(bufferCount); i++) {
inBufAvaIndexQue_.push(i);
}
isRunning = true;
inputBuffer_->SetRunning();
outputBuffer_->SetRunning();
if (inputTask_) {
inputTask_->Start();
} else {
return false;
}
if (outputTask_) {
outputTask_->Start();
} else {
return false;
}
inputCondition_.notify_all();
outputCondition_.notify_all();
return true;
}
void AudioCodecWorker::ReleaseAllInBufferQueue()
{
std::lock_guard<std::mutex> lock(stateMutex_);
while (!inBufIndexQue_.empty()) {
inBufIndexQue_.pop();
}
}
void AudioCodecWorker::ReleaseAllInBufferAvaQueue()
{
std::lock_guard<std::mutex> lock(inAvaMutex_);
while (!inBufAvaIndexQue_.empty()) {
inBufAvaIndexQue_.pop();
}
}
void AudioCodecWorker::ResetTask()
{
if (inputTask_) {
inputTask_->Stop();
inputTask_.reset();
inputTask_ = nullptr;
}
if (outputTask_) {
outputTask_->Stop();
outputTask_.reset();
outputTask_ = nullptr;
}
}
}
}