#include "content/browser/preloading/prefetch/prefetch_data_pipe_tee.h"
#include "base/containers/span.h"
#include "base/metrics/histogram_functions.h"
#include "base/notreached.h"
#include "base/strings/string_view_util.h"
#include "mojo/public/cpp/system/string_data_source.h"
#include "services/network/public/cpp/loading_params.h"
namespace content {
namespace {
MojoResult CreateDataPipeForServingData(
mojo::ScopedDataPipeProducerHandle& producer_handle,
mojo::ScopedDataPipeConsumerHandle& consumer_handle) {
MojoCreateDataPipeOptions options;
options.struct_size = sizeof(MojoCreateDataPipeOptions);
options.flags = MOJO_CREATE_DATA_PIPE_FLAG_NONE;
options.element_num_bytes = 1;
options.capacity_num_bytes = network::GetDataPipeDefaultAllocationSize(
network::DataPipeAllocationSize::kLargerSizeIfPossible);
return mojo::CreateDataPipe(&options, producer_handle, consumer_handle);
}
}
PrefetchDataPipeTee::PrefetchDataPipeTee(
mojo::ScopedDataPipeConsumerHandle source,
size_t buffer_limit)
: source_(std::move(source)),
source_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::SequencedTaskRunner::GetCurrentDefault()),
buffer_limit_(buffer_limit),
target_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC,
base::SequencedTaskRunner::GetCurrentDefault()) {
source_watcher_.Watch(source_.get(), MOJO_HANDLE_SIGNAL_READABLE,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&PrefetchDataPipeTee::OnReadable,
weak_factory_.GetWeakPtr()));
source_watcher_.ArmOrNotify();
}
PrefetchDataPipeTee::~PrefetchDataPipeTee() {
CHECK(!target_.first);
base::UmaHistogramEnumeration(
"Preloading.Prefetch.PrefetchDataPipeTeeDtorState", state_);
}
mojo::ScopedDataPipeConsumerHandle PrefetchDataPipeTee::Clone() {
++count_clone_called_;
switch (state_) {
case State::kLoading:
if (target_.first || pending_writes_) {
base::UmaHistogramCounts100(
"Preloading.Prefetch.PrefetchDataPipeTeeCloneFailed.Loading",
count_clone_called_);
return {};
}
break;
case State::kSizeExceededNoTarget:
CHECK(!target_.first);
CHECK_EQ(pending_writes_, 0u);
state_ = State::kSizeExceeded;
break;
case State::kSizeExceeded:
base::UmaHistogramCounts100(
"Preloading.Prefetch.PrefetchDataPipeTeeCloneFailed.SizeExceeded",
count_clone_called_);
return {};
case State::kLoaded:
break;
}
mojo::ScopedDataPipeConsumerHandle consumer_handle;
mojo::ScopedDataPipeProducerHandle producer_handle;
MojoResult rv =
CreateDataPipeForServingData(producer_handle, consumer_handle);
if (rv != MOJO_RESULT_OK) {
return {};
}
auto producer =
std::make_unique<mojo::DataPipeProducer>(std::move(producer_handle));
WriteData(std::make_pair(std::move(producer), base::WrapRefCounted(this)),
buffer_);
return consumer_handle;
}
void PrefetchDataPipeTee::OnReadable(MojoResult result,
const mojo::HandleSignalsState& state) {
if (pending_writes_) {
return;
}
switch (state_) {
case State::kLoading:
case State::kSizeExceeded:
break;
case State::kSizeExceededNoTarget:
return;
case State::kLoaded:
return;
}
base::span<const uint8_t> read_data;
MojoResult rv = source_->BeginReadData(MOJO_READ_DATA_FLAG_NONE, read_data);
if (rv == MOJO_RESULT_OK) {
switch (state_) {
case State::kLoading:
CHECK_LE(buffer_.size(), buffer_limit_);
if (buffer_.size() + read_data.size() <= buffer_limit_) {
buffer_.append(base::as_string_view(read_data));
if (target_.first) {
WriteData(ResetTarget({}),
std::string_view(buffer_).substr(buffer_.size() -
read_data.size()));
}
break;
}
if (!target_.first) {
read_data = read_data.first(buffer_limit_ - buffer_.size());
buffer_.append(base::as_string_view(read_data));
state_ = State::kSizeExceededNoTarget;
break;
}
buffer_.clear();
state_ = State::kSizeExceeded;
[[fallthrough]];
case State::kSizeExceeded:
CHECK(buffer_.empty());
if (!target_.first) {
break;
}
buffer_.append(base::as_string_view(read_data));
WriteData(ResetTarget({}), buffer_);
break;
case State::kSizeExceededNoTarget:
case State::kLoaded:
NOTREACHED();
}
source_->EndReadData(read_data.size());
source_watcher_.ArmOrNotify();
} else if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
switch (state_) {
case State::kLoading:
state_ = State::kLoaded;
ResetTarget({});
break;
case State::kSizeExceeded:
ResetTarget({});
break;
case State::kSizeExceededNoTarget:
case State::kLoaded:
NOTREACHED();
}
} else if (rv != MOJO_RESULT_SHOULD_WAIT) {
NOTREACHED() << "Unhandled MojoResult: " << rv;
}
}
PrefetchDataPipeTee::ProducerPair PrefetchDataPipeTee::ResetTarget(
ProducerPair target) {
auto old_target = std::move(target_);
target_ = std::move(target);
target_watcher_.Cancel();
if (target_.first) {
CHECK_EQ(target_.second.get(), this);
target_watcher_.Watch(
target_.first->GetProducerHandle(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&PrefetchDataPipeTee::OnWriteDataPipeClosed,
weak_factory_.GetWeakPtr()));
}
return old_target;
}
void PrefetchDataPipeTee::OnWriteDataPipeClosed(
MojoResult result,
const mojo::HandleSignalsState& state) {
CHECK(target_.first);
if (state.peer_closed()) {
ResetTarget({});
}
}
void PrefetchDataPipeTee::WriteData(ProducerPair target,
base::span<const char> data) {
CHECK_EQ(target.second.get(), this);
++pending_writes_;
auto* raw_target = target.first.get();
raw_target->Write(std::make_unique<mojo::StringDataSource>(
data, mojo::StringDataSource::AsyncWritingMode::
STRING_STAYS_VALID_UNTIL_COMPLETION),
base::BindOnce(&PrefetchDataPipeTee::OnDataWritten,
base::Unretained(this), std::move(target)));
}
void PrefetchDataPipeTee::OnDataWritten(ProducerPair target,
MojoResult result) {
CHECK_GT(pending_writes_, 0u);
--pending_writes_;
switch (state_) {
case State::kLoaded:
break;
case State::kSizeExceeded:
buffer_.clear();
[[fallthrough]];
case State::kLoading:
if (result == MOJO_RESULT_OK) {
ResetTarget(std::move(target));
}
if (pending_writes_ == 0) {
source_watcher_.ArmOrNotify();
}
break;
case State::kSizeExceededNoTarget:
NOTREACHED();
}
}
}