910e62b5创建于 1月15日历史提交
// Copyright 2018 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "components/cronet/native/upload_data_sink.h"

#include <inttypes.h>
#include <utility>

#include "base/check_op.h"
#include "base/functional/bind.h"
#include "base/memory/raw_ptr.h"
#include "base/strings/strcat.h"
#include "base/strings/stringprintf.h"
#include "base/task/single_thread_task_runner.h"
#include "components/cronet/cronet_upload_data_stream.h"
#include "components/cronet/native/engine.h"
#include "components/cronet/native/generated/cronet.idl_impl_struct.h"
#include "components/cronet/native/include/cronet_c.h"
#include "components/cronet/native/io_buffer_with_cronet_buffer.h"
#include "components/cronet/native/runnables.h"
#include "components/cronet/native/url_request.h"
#include "net/base/io_buffer.h"

namespace cronet {

// This class is called by Cronet's network stack as an implementation of
// CronetUploadDataStream::Delegate, and forwards the calls along to
// Cronet_UploadDataSinkImpl on the embedder's executor.
// This class is always called on the network thread and is destroyed in
// OnUploadDataStreamDestroyed() callback.
class Cronet_UploadDataSinkImpl::NetworkTasks
    : public CronetUploadDataStream::Delegate {
 public:
  NetworkTasks(Cronet_UploadDataSinkImpl* upload_data_sink,
               Cronet_Executor* upload_data_provider_executor);

  NetworkTasks(const NetworkTasks&) = delete;
  NetworkTasks& operator=(const NetworkTasks&) = delete;

  ~NetworkTasks() override;

 private:
  // CronetUploadDataStream::Delegate implementation:
  void InitializeOnNetworkThread(
      base::WeakPtr<CronetUploadDataStream> upload_data_stream) override;
  void Read(scoped_refptr<net::IOBuffer> buffer, int buf_len) override;
  void Rewind() override;
  void OnUploadDataStreamDestroyed() override;

  // Post |task| to client executor.
  void PostTaskToExecutor(base::OnceClosure task);

  // The upload data sink that is owned by url request and always accessed on
  // the client thread. It always outlives |this| callback.
  const raw_ptr<Cronet_UploadDataSinkImpl, AcrossTasksDanglingUntriaged>
      upload_data_sink_ = nullptr;

  // Executor for provider callback, used, but not owned, by |this|. Always
  // outlives |this| callback.
  Cronet_ExecutorPtr const upload_data_provider_executor_ = nullptr;

  THREAD_CHECKER(network_thread_checker_);
};

Cronet_UploadDataSinkImpl::NetworkTasks::NetworkTasks(
    Cronet_UploadDataSinkImpl* upload_data_sink,
    Cronet_Executor* upload_data_provider_executor)
    : upload_data_sink_(upload_data_sink),
      upload_data_provider_executor_(upload_data_provider_executor) {
  DETACH_FROM_THREAD(network_thread_checker_);
}

Cronet_UploadDataSinkImpl::NetworkTasks::~NetworkTasks() = default;

Cronet_UploadDataSinkImpl::Cronet_UploadDataSinkImpl(
    Cronet_UrlRequestImpl* url_request,
    Cronet_UploadDataProvider* upload_data_provider,
    Cronet_Executor* upload_data_provider_executor)
    : url_request_(url_request),
      upload_data_provider_executor_(upload_data_provider_executor),
      upload_data_provider_(upload_data_provider) {}

Cronet_UploadDataSinkImpl::~Cronet_UploadDataSinkImpl() = default;

void Cronet_UploadDataSinkImpl::InitRequest(CronetURLRequest* request) {
  int64_t length = upload_data_provider_->GetLength();
  if (length == -1) {
    is_chunked_ = true;
  } else {
    CHECK_GE(length, 0);
    length_ = static_cast<uint64_t>(length);
    remaining_length_ = length_;
  }

  request->SetUpload(std::make_unique<CronetUploadDataStream>(
      new NetworkTasks(this, upload_data_provider_executor_), length));
}

void Cronet_UploadDataSinkImpl::OnReadSucceeded(uint64_t bytes_read,
                                                bool final_chunk) {
  {
    base::AutoLock lock(lock_);
    CheckState(READ);
    in_which_user_callback_ = NOT_IN_CALLBACK;
    if (!upload_data_provider_)
      return;
  }
  if (url_request_->IsDone())
    return;
  if (close_when_not_in_callback_) {
    PostCloseToExecutor();
    return;
  }
  CHECK(bytes_read > 0 || (final_chunk && bytes_read == 0));
  // Bytes read exceeds buffer length.
  CHECK_LE(static_cast<size_t>(bytes_read), buffer_->io_buffer_len());
  if (!is_chunked_) {
    // Only chunked upload can have the final chunk.
    CHECK(!final_chunk);
    // Read upload data length exceeds specified length.
    if (bytes_read > remaining_length_) {
      PostCloseToExecutor();
      std::string error_message =
          base::StringPrintf("Read upload data length %" PRIu64
                             " exceeds expected length %" PRIu64,
                             length_ - remaining_length_ + bytes_read, length_);
      url_request_->OnUploadDataProviderError(error_message.c_str());
      return;
    }
    remaining_length_ -= bytes_read;
  }
  network_task_runner_->PostTask(
      FROM_HERE, base::BindOnce(&CronetUploadDataStream::OnReadSuccess,
                                upload_data_stream_, bytes_read, final_chunk));
}

void Cronet_UploadDataSinkImpl::OnReadError(Cronet_String error_message) {
  {
    base::AutoLock lock(lock_);
    CheckState(READ);
    in_which_user_callback_ = NOT_IN_CALLBACK;
    if (!upload_data_provider_)
      return;
  }
  if (url_request_->IsDone())
    return;
  PostCloseToExecutor();
  url_request_->OnUploadDataProviderError(error_message);
}

void Cronet_UploadDataSinkImpl::OnRewindSucceeded() {
  {
    base::AutoLock lock(lock_);
    CheckState(REWIND);
    in_which_user_callback_ = NOT_IN_CALLBACK;
    if (!upload_data_provider_)
      return;
  }
  remaining_length_ = length_;
  if (url_request_->IsDone())
    return;
  if (close_when_not_in_callback_) {
    PostCloseToExecutor();
    return;
  }
  network_task_runner_->PostTask(
      FROM_HERE, base::BindOnce(&CronetUploadDataStream::OnRewindSuccess,
                                upload_data_stream_));
}

void Cronet_UploadDataSinkImpl::OnRewindError(Cronet_String error_message) {
  {
    base::AutoLock lock(lock_);
    CheckState(REWIND);
    in_which_user_callback_ = NOT_IN_CALLBACK;
    if (!upload_data_provider_)
      return;
  }
  if (url_request_->IsDone())
    return;
  PostCloseToExecutor();
  url_request_->OnUploadDataProviderError(error_message);
}

void Cronet_UploadDataSinkImpl::InitializeUploadDataStream(
    base::WeakPtr<CronetUploadDataStream> upload_data_stream,
    scoped_refptr<base::SingleThreadTaskRunner> network_task_runner) {
  DCHECK(!upload_data_stream_);
  DCHECK(!network_task_runner_.get());
  upload_data_stream_ = upload_data_stream;
  network_task_runner_ = network_task_runner;
}

void Cronet_UploadDataSinkImpl::PostCloseToExecutor() {
  Cronet_RunnablePtr runnable = new cronet::OnceClosureRunnable(base::BindOnce(
      &Cronet_UploadDataSinkImpl::Close, base::Unretained(this)));
  // |runnable| is passed to executor, which destroys it after execution.
  Cronet_Executor_Execute(upload_data_provider_executor_, runnable);
}

void Cronet_UploadDataSinkImpl::Read(scoped_refptr<net::IOBuffer> buffer,
                                     int buf_len) {
  if (url_request_->IsDone())
    return;
  Cronet_UploadDataProviderPtr upload_data_provider = nullptr;
  {
    base::AutoLock lock(lock_);
    if (!upload_data_provider_)
      return;
    CheckState(NOT_IN_CALLBACK);
    in_which_user_callback_ = READ;
    upload_data_provider = upload_data_provider_;
  }
  buffer_ =
      std::make_unique<Cronet_BufferWithIOBuffer>(std::move(buffer), buf_len);
  Cronet_UploadDataProvider_Read(upload_data_provider, this,
                                 buffer_->cronet_buffer());
}

void Cronet_UploadDataSinkImpl::Rewind() {
  if (url_request_->IsDone())
    return;
  Cronet_UploadDataProviderPtr upload_data_provider = nullptr;
  {
    base::AutoLock lock(lock_);
    if (!upload_data_provider_)
      return;
    CheckState(NOT_IN_CALLBACK);
    in_which_user_callback_ = REWIND;
    upload_data_provider = upload_data_provider_;
  }
  Cronet_UploadDataProvider_Rewind(upload_data_provider, this);
}

void Cronet_UploadDataSinkImpl::Close() {
  Cronet_UploadDataProviderPtr upload_data_provider = nullptr;
  {
    base::AutoLock lock(lock_);
    // If |upload_data_provider_| is already closed from OnResponseStarted(),
    // don't close it again from OnError() or OnCanceled().
    if (!upload_data_provider_)
      return;
    if (in_which_user_callback_ != NOT_IN_CALLBACK) {
      // If currently in the callback, then wait until return from callback
      // before closing.
      close_when_not_in_callback_ = true;
      return;
    }
    upload_data_provider = upload_data_provider_;
    upload_data_provider_ = nullptr;
  }
  Cronet_UploadDataProvider_Close(upload_data_provider);
}

void Cronet_UploadDataSinkImpl::CheckState(UserCallback expected_state) {
  lock_.AssertAcquired();
  CHECK(in_which_user_callback_ == expected_state);
}

void Cronet_UploadDataSinkImpl::NetworkTasks::InitializeOnNetworkThread(
    base::WeakPtr<CronetUploadDataStream> upload_data_stream) {
  DCHECK_CALLED_ON_VALID_THREAD(network_thread_checker_);
  PostTaskToExecutor(
      base::BindOnce(&Cronet_UploadDataSinkImpl::InitializeUploadDataStream,
                     base::Unretained(upload_data_sink_), upload_data_stream,
                     base::SingleThreadTaskRunner::GetCurrentDefault()));
}

void Cronet_UploadDataSinkImpl::NetworkTasks::Read(
    scoped_refptr<net::IOBuffer> buffer,
    int buf_len) {
  DCHECK_CALLED_ON_VALID_THREAD(network_thread_checker_);
  PostTaskToExecutor(base::BindOnce(&Cronet_UploadDataSinkImpl::Read,
                                    base::Unretained(upload_data_sink_),
                                    std::move(buffer), buf_len));
}

void Cronet_UploadDataSinkImpl::NetworkTasks::Rewind() {
  DCHECK_CALLED_ON_VALID_THREAD(network_thread_checker_);
  PostTaskToExecutor(base::BindOnce(&Cronet_UploadDataSinkImpl::Rewind,
                                    base::Unretained(upload_data_sink_)));
}

void Cronet_UploadDataSinkImpl::NetworkTasks::OnUploadDataStreamDestroyed() {
  DCHECK_CALLED_ON_VALID_THREAD(network_thread_checker_);
  delete this;
}

void Cronet_UploadDataSinkImpl::NetworkTasks::PostTaskToExecutor(
    base::OnceClosure task) {
  Cronet_RunnablePtr runnable =
      new cronet::OnceClosureRunnable(std::move(task));
  // |runnable| is passed to executor, which destroys it after execution.
  Cronet_Executor_Execute(upload_data_provider_executor_, runnable);
}

}  // namespace cronet