#include "services/audio/input_sync_writer.h"
#include <algorithm>
#include <cinttypes>
#include <memory>
#include <utility>
#include "base/check.h"
#include "base/compiler_specific.h"
#include "base/containers/heap_array.h"
#include "base/containers/span.h"
#include "base/containers/span_reader.h"
#include "base/format_macros.h"
#include "base/logging.h"
#include "base/metrics/histogram_functions.h"
#include "base/numerics/safe_conversions.h"
#include "base/strings/stringprintf.h"
#include "base/trace_event/trace_event.h"
#include "build/build_config.h"
#include "media/base/audio_bus.h"
#include "media/base/audio_glitch_info.h"
#include "media/base/media_switches.h"
#include "services/audio/input_glitch_counter.h"
namespace audio {
namespace {
enum class AudioGlitchResult {
kNoGlitches = 0,
kGlitches = 1,
kMaxValue = kGlitches
};
}
InputSyncWriter::OverflowData::OverflowData(
double volume,
base::TimeTicks capture_time,
const media::AudioGlitchInfo& glitch_info,
std::unique_ptr<media::AudioBus> audio_bus)
: volume_(volume),
capture_time_(capture_time),
glitch_info_(glitch_info),
audio_bus_(std::move(audio_bus)) {}
InputSyncWriter::OverflowData::~OverflowData() {}
InputSyncWriter::OverflowData::OverflowData(InputSyncWriter::OverflowData&&) =
default;
InputSyncWriter::OverflowData& InputSyncWriter::OverflowData::operator=(
InputSyncWriter::OverflowData&& other) = default;
InputSyncWriter::InputSyncWriter(
base::RepeatingCallback<void(const std::string&)> log_callback,
base::UnsafeSharedMemoryRegion shared_memory,
std::unique_ptr<base::CancelableSyncSocket> socket,
uint32_t shared_memory_segment_count,
const media::AudioParameters& params,
std::unique_ptr<InputGlitchCounter> glitch_counter)
: log_callback_(std::move(log_callback)),
socket_(std::move(socket)),
shared_memory_region_(std::move(shared_memory)),
shared_memory_mapping_(shared_memory_region_.Map()),
shared_memory_segment_size_([&]() {
CHECK(shared_memory_segment_count > 0);
return shared_memory_mapping_.size() / shared_memory_segment_count;
}()),
creation_time_(base::TimeTicks::Now()),
audio_bus_memory_size_(base::checked_cast<uint32_t>(
media::AudioBus::CalculateMemorySize(params))),
glitch_counter_(std::move(glitch_counter)),
confirm_reads_via_shmem_(
base::FeatureList::IsEnabled(media::kAudioInputConfirmReadsViaShmem)),
dropped_buffer_glitch_{.duration = params.GetBufferDuration(),
.count = 1} {
DCHECK(log_callback_);
CHECK(socket_);
CHECK(shared_memory_region_.IsValid());
CHECK(shared_memory_mapping_.IsValid());
CHECK_EQ(shared_memory_segment_size_ * shared_memory_segment_count,
shared_memory_mapping_.size());
CHECK_EQ(shared_memory_segment_size_,
audio_bus_memory_size_ + sizeof(media::AudioInputBufferParameters));
SendLogMessage("%s({shared_memory_segment_count=%u}, {params=%s})", __func__,
shared_memory_segment_count,
params.AsHumanReadableString().c_str());
SendLogMessage(
"%s => (shared_memory_segment_size=[%u], audio_bus_memory_size=[%u])",
__func__, shared_memory_segment_size_, audio_bus_memory_size_);
DCHECK(glitch_counter_);
audio_buses_.resize(shared_memory_segment_count);
base::span<uint8_t> data = shared_memory_mapping_.GetMemoryAsSpan<uint8_t>();
CHECK(!data.empty());
auto reader = base::SpanReader<uint8_t>(data);
for (auto& bus : audio_buses_) {
auto input_buffer = *reader.Read(shared_memory_segment_size_);
auto audio_data =
input_buffer.subspan<sizeof(media::AudioInputBufferParameters)>();
CHECK_EQ(audio_data.size(), audio_bus_memory_size_);
CHECK(
base::IsAligned(audio_data.data(), media::AudioBus::kChannelAlignment));
bus = media::AudioBus::WrapMemory(params, audio_data);
}
CHECK(reader.remaining_span().empty());
}
InputSyncWriter::~InputSyncWriter() = default;
std::unique_ptr<InputSyncWriter> InputSyncWriter::Create(
base::RepeatingCallback<void(const std::string&)> log_callback,
uint32_t shared_memory_segment_count,
const media::AudioParameters& params,
base::CancelableSyncSocket* foreign_socket) {
if (shared_memory_segment_count == 0)
return nullptr;
base::CheckedNumeric<uint32_t> requested_memory_size =
ComputeAudioInputBufferSizeChecked(params, shared_memory_segment_count);
if (!requested_memory_size.IsValid())
return nullptr;
auto shared_memory = base::UnsafeSharedMemoryRegion::Create(
requested_memory_size.ValueOrDie());
if (!shared_memory.IsValid())
return nullptr;
auto socket = std::make_unique<base::CancelableSyncSocket>();
if (!base::CancelableSyncSocket::CreatePair(socket.get(), foreign_socket)) {
return nullptr;
}
auto glitch_counter = std::make_unique<InputGlitchCounter>(log_callback);
return std::make_unique<InputSyncWriter>(
std::move(log_callback), std::move(shared_memory), std::move(socket),
shared_memory_segment_count, params, std::move(glitch_counter));
}
base::UnsafeSharedMemoryRegion InputSyncWriter::TakeSharedMemoryRegion() {
DCHECK(shared_memory_region_.IsValid());
return std::move(shared_memory_region_);
}
void InputSyncWriter::Write(const media::AudioBus* data,
double volume,
base::TimeTicks capture_time,
const media::AudioGlitchInfo& glitch_info) {
TRACE_EVENT("audio", "InputSyncWriter::Write", "capture_time (ms)",
(capture_time - base::TimeTicks()).InMillisecondsF(),
"capture_delay (ms)",
(base::TimeTicks::Now() - capture_time).InMillisecondsF());
glitch_info.MaybeAddTraceEvent();
CheckTimeSinceLastWrite();
pending_glitch_info_ += glitch_info;
ReceiveReadConfirmationsFromConsumer();
const size_t segment_count = audio_buses_.size();
glitch_counter_->ReportMissedReadDeadline(number_of_filled_segments_ ==
segment_count);
if (!overflow_data_.empty()) {
auto data_it = overflow_data_.begin();
while (data_it != overflow_data_.end() &&
number_of_filled_segments_ < segment_count) {
const bool successful_write = WriteDataToCurrentSegment(
*data_it->audio_bus_, data_it->volume_, data_it->capture_time_,
data_it->glitch_info_);
glitch_counter_->ReportDroppedData(!successful_write);
if (!successful_write) {
pending_glitch_info_ += data_it->glitch_info_;
pending_glitch_info_ += dropped_buffer_glitch_;
}
++data_it;
}
overflow_data_.erase(overflow_data_.begin(), data_it);
if (overflow_data_.empty()) {
SendLogMessage("%s => (FIFO emptied)", __func__);
}
}
if (number_of_filled_segments_ < audio_buses_.size()) {
DCHECK(overflow_data_.empty());
const bool successful_write = WriteDataToCurrentSegment(
*data, volume, capture_time, pending_glitch_info_);
glitch_counter_->ReportDroppedData(!successful_write);
if (successful_write) {
pending_glitch_info_ = {};
} else {
pending_glitch_info_ += dropped_buffer_glitch_;
}
} else {
if (PushDataToFifo(*data, volume, capture_time, pending_glitch_info_)) {
pending_glitch_info_ = {};
} else {
glitch_counter_->ReportDroppedData(true);
pending_glitch_info_ += dropped_buffer_glitch_;
}
}
}
void InputSyncWriter::Close() {
socket_->Close();
}
void InputSyncWriter::CheckTimeSinceLastWrite() {
#if !BUILDFLAG(IS_ANDROID)
static const base::TimeDelta kLogDelayThreadhold = base::Milliseconds(500);
base::TimeTicks new_write_time = base::TimeTicks::Now();
if (last_write_time_.is_null()) {
base::TimeDelta interval = new_write_time - creation_time_;
SendLogMessage(
"%s => (audio input data received for the first time: delay=%" PRId64
" ms)",
__func__, interval.InMilliseconds());
} else {
base::TimeDelta interval = new_write_time - last_write_time_;
if (interval > kLogDelayThreadhold) {
SendLogMessage(
"%s => (WARNING: audio input data delay unexpectedly long: "
"delay=%" PRId64 " ms)",
__func__, interval.InMilliseconds());
}
}
last_write_time_ = new_write_time;
#endif
}
void InputSyncWriter::ReceiveReadConfirmationsFromConsumer() {
if (confirm_reads_via_shmem_) {
while (next_read_buffer_index_ < next_buffer_id_) {
media::AudioInputBuffer* buffer =
GetSharedInputBuffer(next_read_buffer_index_ % audio_buses_.size());
std::atomic_ref<uint32_t> has_unread_data(buffer->params.has_unread_data);
if (has_unread_data.load(std::memory_order_relaxed)) {
break;
}
++next_read_buffer_index_;
CHECK_GT(number_of_filled_segments_, 0u);
--number_of_filled_segments_;
}
return;
}
size_t number_of_indices_available = socket_->Peek() / sizeof(uint32_t);
if (number_of_indices_available > 0) {
auto indices =
base::HeapArray<uint32_t>::WithSize(number_of_indices_available);
size_t bytes_received =
socket_->Receive(base::as_writable_bytes(indices.as_span()));
CHECK_EQ(number_of_indices_available * sizeof(indices[0]), bytes_received);
for (size_t i = 0; i < number_of_indices_available; ++i) {
++next_read_buffer_index_;
CHECK_EQ(indices[i], next_read_buffer_index_);
CHECK_GT(number_of_filled_segments_, 0u);
--number_of_filled_segments_;
}
}
}
bool InputSyncWriter::PushDataToFifo(
const media::AudioBus& data,
double volume,
base::TimeTicks capture_time,
const media::AudioGlitchInfo& glitch_info) {
TRACE_EVENT("audio", "InputSyncWriter::PushDataToFifo", "capture time (ms)",
(capture_time - base::TimeTicks()).InMillisecondsF(),
"capture_delay (ms)",
(base::TimeTicks::Now() - capture_time).InMillisecondsF(),
"fifo delay (ms)",
(number_of_filled_segments_ + overflow_data_.size()) *
dropped_buffer_glitch_.duration);
if (overflow_data_.size() == kMaxOverflowBusesSize) {
TRACE_EVENT_INSTANT0(
"audio", "InputSyncWriter::PushDataToFifo - overflow - dropped data",
TRACE_EVENT_SCOPE_THREAD);
if (fifo_full_count_ <= 50 && fifo_full_count_ % 10 == 0) {
SendLogMessage("%s => (WARNING: no room in FIFO)", __func__);
if (fifo_full_count_ == 50) {
SendLogMessage(
"%s => (WARNING: log cap reached, suppressing further FIFO "
"overflow logs)",
__func__);
}
}
++fifo_full_count_;
return false;
}
if (overflow_data_.empty()) {
SendLogMessage("%s => (starting to use the FIFO)", __func__);
}
std::unique_ptr<media::AudioBus> audio_bus =
media::AudioBus::Create(data.channels(), data.frames());
data.CopyTo(audio_bus.get());
overflow_data_.emplace_back(volume, capture_time, glitch_info,
std::move(audio_bus));
DCHECK_LE(overflow_data_.size(), static_cast<size_t>(kMaxOverflowBusesSize));
return true;
}
bool InputSyncWriter::WriteDataToCurrentSegment(
const media::AudioBus& data,
double volume,
base::TimeTicks capture_time,
const media::AudioGlitchInfo& glitch_info) {
CHECK(number_of_filled_segments_ < audio_buses_.size());
TRACE_EVENT("audio", "WriteDataToCurrentSegment", "glitches",
glitch_info.count, "glitch_duration (ms)",
glitch_info.duration.InMillisecondsF(), "capture_time (ms)",
(capture_time - base::TimeTicks()).InMillisecondsF(),
"capture_delay (ms)",
(base::TimeTicks::Now() - capture_time).InMillisecondsF(),
"fifo delay (ms)",
number_of_filled_segments_ * dropped_buffer_glitch_.duration);
media::AudioInputBuffer* buffer = GetSharedInputBuffer(current_segment_id_);
buffer->params.volume = volume;
buffer->params.size = audio_bus_memory_size_;
buffer->params.capture_time_us =
(capture_time - base::TimeTicks()).InMicroseconds();
buffer->params.id = next_buffer_id_;
buffer->params.glitch_duration_us = glitch_info.duration.InMicroseconds();
buffer->params.glitch_count = glitch_info.count;
if (confirm_reads_via_shmem_) {
std::atomic_ref<uint32_t> has_unread_data(buffer->params.has_unread_data);
has_unread_data.store(1, std::memory_order_relaxed);
}
data.CopyTo(audio_buses_[current_segment_id_].get());
return SignalDataWrittenAndUpdateCounters();
}
bool InputSyncWriter::SignalDataWrittenAndUpdateCounters() {
if (socket_->Send(base::byte_span_from_ref(current_segment_id_)) !=
sizeof(current_segment_id_)) {
if (!had_socket_error_) {
had_socket_error_ = true;
SendLogMessage("%s => (WARNING: no room in socket buffer, dropped data)",
__func__);
TRACE_EVENT_INSTANT0(
"audio", "InputSyncWriter: No room in socket buffer - dropped data",
TRACE_EVENT_SCOPE_THREAD);
}
return false;
}
had_socket_error_ = false;
if (++current_segment_id_ >= audio_buses_.size())
current_segment_id_ = 0;
++number_of_filled_segments_;
CHECK_LE(number_of_filled_segments_, audio_buses_.size());
++next_buffer_id_;
return true;
}
media::AudioInputBuffer* InputSyncWriter::GetSharedInputBuffer(
uint32_t segment_id) {
uint8_t* ptr = static_cast<uint8_t*>(shared_memory_mapping_.memory());
CHECK_LT(segment_id, audio_buses_.size());
UNSAFE_TODO(ptr += segment_id * shared_memory_segment_size_);
return reinterpret_cast<media::AudioInputBuffer*>(ptr);
}
void InputSyncWriter::SendLogMessage(const char* format, ...) {
if (log_callback_.is_null()) {
return;
}
va_list args;
va_start(args, format);
log_callback_.Run(
base::StrCat({"AISW::", base::StringPrintV(format, args),
base::StringPrintf(" [this=0x%" PRIXPTR "]",
reinterpret_cast<uintptr_t>(this))}));
va_end(args);
}
}