910e62b5创建于 1月15日历史提交
// Copyright 2023 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "content/browser/preloading/prefetch/prefetch_data_pipe_tee.h"

#include "base/containers/span.h"
#include "base/metrics/histogram_functions.h"
#include "base/notreached.h"
#include "base/strings/string_view_util.h"
#include "mojo/public/cpp/system/string_data_source.h"
#include "services/network/public/cpp/loading_params.h"

namespace content {

namespace {

MojoResult CreateDataPipeForServingData(
    mojo::ScopedDataPipeProducerHandle& producer_handle,
    mojo::ScopedDataPipeConsumerHandle& consumer_handle) {
  MojoCreateDataPipeOptions options;

  options.struct_size = sizeof(MojoCreateDataPipeOptions);
  options.flags = MOJO_CREATE_DATA_PIPE_FLAG_NONE;
  options.element_num_bytes = 1;
  options.capacity_num_bytes = network::GetDataPipeDefaultAllocationSize(
      network::DataPipeAllocationSize::kLargerSizeIfPossible);

  return mojo::CreateDataPipe(&options, producer_handle, consumer_handle);
}

}  // namespace

PrefetchDataPipeTee::PrefetchDataPipeTee(
    mojo::ScopedDataPipeConsumerHandle source,
    size_t buffer_limit)
    : source_(std::move(source)),
      source_watcher_(FROM_HERE,
                      mojo::SimpleWatcher::ArmingPolicy::MANUAL,
                      base::SequencedTaskRunner::GetCurrentDefault()),
      buffer_limit_(buffer_limit),
      target_watcher_(FROM_HERE,
                      mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC,
                      base::SequencedTaskRunner::GetCurrentDefault()) {
  source_watcher_.Watch(source_.get(), MOJO_HANDLE_SIGNAL_READABLE,
                        MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
                        base::BindRepeating(&PrefetchDataPipeTee::OnReadable,
                                            weak_factory_.GetWeakPtr()));
  source_watcher_.ArmOrNotify();
}

PrefetchDataPipeTee::~PrefetchDataPipeTee() {
  CHECK(!target_.first);

  base::UmaHistogramEnumeration(
      "Preloading.Prefetch.PrefetchDataPipeTeeDtorState", state_);
}

mojo::ScopedDataPipeConsumerHandle PrefetchDataPipeTee::Clone() {
  ++count_clone_called_;

  switch (state_) {
    case State::kLoading:
      if (target_.first || pending_writes_) {
        base::UmaHistogramCounts100(
            "Preloading.Prefetch.PrefetchDataPipeTeeCloneFailed.Loading",
            count_clone_called_);
        return {};
      }
      break;
    case State::kSizeExceededNoTarget:
      CHECK(!target_.first);
      CHECK_EQ(pending_writes_, 0u);
      state_ = State::kSizeExceeded;
      break;
    case State::kSizeExceeded:
      base::UmaHistogramCounts100(
          "Preloading.Prefetch.PrefetchDataPipeTeeCloneFailed.SizeExceeded",
          count_clone_called_);
      return {};
    case State::kLoaded:
      break;
  }

  mojo::ScopedDataPipeConsumerHandle consumer_handle;
  mojo::ScopedDataPipeProducerHandle producer_handle;
  MojoResult rv =
      CreateDataPipeForServingData(producer_handle, consumer_handle);
  if (rv != MOJO_RESULT_OK) {
    return {};
  }

  auto producer =
      std::make_unique<mojo::DataPipeProducer>(std::move(producer_handle));

  // Send `buffer_` (== the whole data read so far) to the new target.
  WriteData(std::make_pair(std::move(producer), base::WrapRefCounted(this)),
            buffer_);

  return consumer_handle;
}

void PrefetchDataPipeTee::OnReadable(MojoResult result,
                                     const mojo::HandleSignalsState& state) {
  if (pending_writes_) {
    // Reading is blocked while writing to a target is ongoing.
    return;
  }

  switch (state_) {
    case State::kLoading:
    case State::kSizeExceeded:
      break;
    case State::kSizeExceededNoTarget:
      // Reading is blocked until the first target is added, because there are
      // no buffer space to store the read data until the target is added.
      return;
    case State::kLoaded:
      // No further read is needed.
      return;
  }

  base::span<const uint8_t> read_data;
  MojoResult rv = source_->BeginReadData(MOJO_READ_DATA_FLAG_NONE, read_data);
  if (rv == MOJO_RESULT_OK) {
    switch (state_) {
      case State::kLoading:
        CHECK_LE(buffer_.size(), buffer_limit_);
        if (buffer_.size() + read_data.size() <= buffer_limit_) {
          buffer_.append(base::as_string_view(read_data));
          if (target_.first) {
            WriteData(ResetTarget({}),  //
                      std::string_view(buffer_).substr(buffer_.size() -
                                                       read_data.size()));
          }
          break;
        }

        // If there are no targets yet, stop reading and keep `buffer_` (== the
        // whole data read so far) until the first target is added.
        if (!target_.first) {
          read_data = read_data.first(buffer_limit_ - buffer_.size());
          buffer_.append(base::as_string_view(read_data));
          state_ = State::kSizeExceededNoTarget;
          break;
        }

        // If there is a target, clear the current `buffer_` because it was
        // already written to the target. The current read data is written to
        // the target below.
        buffer_.clear();
        state_ = State::kSizeExceeded;
        [[fallthrough]];

      case State::kSizeExceeded:
        CHECK(buffer_.empty());
        if (!target_.first) {
          // The target was already removed. Discard the current read data,
          // because anyway no targets can be added to `this`.
          break;
        }

        buffer_.append(base::as_string_view(read_data));
        WriteData(ResetTarget({}), buffer_);
        break;
      case State::kSizeExceededNoTarget:
      case State::kLoaded:
        NOTREACHED();
    }
    source_->EndReadData(read_data.size());
    source_watcher_.ArmOrNotify();
  } else if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
    switch (state_) {
      case State::kLoading:
        state_ = State::kLoaded;
        // Closes the producer handle, if any.
        ResetTarget({});
        break;
      case State::kSizeExceeded:
        // Closes the producer handle, if any.
        ResetTarget({});
        break;
      case State::kSizeExceededNoTarget:
      case State::kLoaded:
        NOTREACHED();
    }
  } else if (rv != MOJO_RESULT_SHOULD_WAIT) {
    NOTREACHED() << "Unhandled MojoResult: " << rv;
  }
}

PrefetchDataPipeTee::ProducerPair PrefetchDataPipeTee::ResetTarget(
    ProducerPair target) {
  auto old_target = std::move(target_);
  target_ = std::move(target);

  target_watcher_.Cancel();
  if (target_.first) {
    CHECK_EQ(target_.second.get(), this);
    // Detect disconnection during `target_` is set and no ongoing writes for
    // `target_` is performed. Disconnection during ongoing writes are detected
    // and handled by `OnDataWritten()`.
    target_watcher_.Watch(
        target_.first->GetProducerHandle(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
        MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
        base::BindRepeating(&PrefetchDataPipeTee::OnWriteDataPipeClosed,
                            weak_factory_.GetWeakPtr()));
  }

  return old_target;
}

void PrefetchDataPipeTee::OnWriteDataPipeClosed(
    MojoResult result,
    const mojo::HandleSignalsState& state) {
  CHECK(target_.first);
  if (state.peer_closed()) {
    ResetTarget({});
  }
}

void PrefetchDataPipeTee::WriteData(ProducerPair target,
                                    base::span<const char> data) {
  CHECK_EQ(target.second.get(), this);
  ++pending_writes_;
  auto* raw_target = target.first.get();
  raw_target->Write(std::make_unique<mojo::StringDataSource>(
                        data, mojo::StringDataSource::AsyncWritingMode::
                                  STRING_STAYS_VALID_UNTIL_COMPLETION),
                    base::BindOnce(&PrefetchDataPipeTee::OnDataWritten,
                                   base::Unretained(this), std::move(target)));
}

void PrefetchDataPipeTee::OnDataWritten(ProducerPair target,
                                        MojoResult result) {
  CHECK_GT(pending_writes_, 0u);
  --pending_writes_;

  switch (state_) {
    case State::kLoaded:
      // Destruct `target`, because all data (== `buffer_`) is written to
      // `target`.
      break;
    case State::kSizeExceeded:
      // Data are streamed and thus cleared after written.
      // `buffer_` is kept until here because
      // `STRING_STAYS_VALID_UNTIL_COMPLETION` is used.
      buffer_.clear();
      [[fallthrough]];
    case State::kLoading:
      // Continue writing on `target` by setting it to `target_` again on
      // successful writes, while destruct `target` on errors.
      if (result == MOJO_RESULT_OK) {
        ResetTarget(std::move(target));
      }
      // In either case continue loading because `buffer_` and upcoming data can
      // be still used for future targets.
      if (pending_writes_ == 0) {
        source_watcher_.ArmOrNotify();
      }
      break;
    case State::kSizeExceededNoTarget:
      NOTREACHED();
  }
}

}  // namespace content