#include "services/network/chunked_data_pipe_upload_data_stream.h"
#include "base/check_op.h"
#include "base/containers/span.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/location.h"
#include "base/numerics/safe_conversions.h"
#include "base/task/sequenced_task_runner.h"
#include "mojo/public/c/system/types.h"
#include "net/base/io_buffer.h"
namespace network {
ChunkedDataPipeUploadDataStream::ChunkedDataPipeUploadDataStream(
scoped_refptr<ResourceRequestBody> resource_request_body,
mojo::PendingRemote<mojom::ChunkedDataPipeGetter> chunked_data_pipe_getter,
#if BUILDFLAG(ARKWEB_SCHEME_HANDLER)
bool has_null_source,
bool get_size_when_initialize)
#else
bool has_null_source)
#endif
: net::UploadDataStream(true,
has_null_source,
resource_request_body->identifier()),
resource_request_body_(std::move(resource_request_body)),
chunked_data_pipe_getter_(std::move(chunked_data_pipe_getter)),
#if BUILDFLAG(ARKWEB_SCHEME_HANDLER)
handle_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::SequencedTaskRunner::GetCurrentDefault()),
has_null_source_(has_null_source),
get_size_when_initialize_(get_size_when_initialize) {
#else
handle_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::SequencedTaskRunner::GetCurrentDefault()) {
#endif
CHECK(chunked_data_pipe_getter_.is_bound());
#if BUILDFLAG(ARKWEB_SCHEME_HANDLER)
if (!get_size_when_initialize) {
ArkWebInitInternal();
}
#else
chunked_data_pipe_getter_.set_disconnect_handler(
base::BindOnce(&ChunkedDataPipeUploadDataStream::OnDataPipeGetterClosed,
base::Unretained(this)));
chunked_data_pipe_getter_->GetSize(
base::BindOnce(&ChunkedDataPipeUploadDataStream::OnSizeReceived,
base::Unretained(this)));
#endif
}
ChunkedDataPipeUploadDataStream::~ChunkedDataPipeUploadDataStream() {}
bool ChunkedDataPipeUploadDataStream::AllowHTTP1() const {
return resource_request_body_->AllowHTTP1ForStreamingUpload();
}
int ChunkedDataPipeUploadDataStream::InitInternal(
const net::NetLogWithSource& net_log) {
#if BUILDFLAG(ARKWEB_SCHEME_HANDLER)
if (get_size_when_initialize_) {
ArkWebInitInternal();
}
#endif
if (status_ != net::OK)
return status_;
if (!chunked_data_pipe_getter_.is_connected())
return net::ERR_FAILED;
switch (cache_state_) {
case CacheState::kActive:
if (data_pipe_.is_valid())
return net::OK;
else
break;
case CacheState::kExhausted:
return net::ERR_FAILED;
case CacheState::kDisabled:
break;
}
mojo::ScopedDataPipeProducerHandle data_pipe_producer;
mojo::ScopedDataPipeConsumerHandle data_pipe_consumer;
MojoResult result =
mojo::CreateDataPipe(nullptr, data_pipe_producer, data_pipe_consumer);
if (result != MOJO_RESULT_OK)
return net::ERR_INSUFFICIENT_RESOURCES;
chunked_data_pipe_getter_->StartReading(std::move(data_pipe_producer));
data_pipe_ = std::move(data_pipe_consumer);
return net::OK;
}
int ChunkedDataPipeUploadDataStream::ReadInternal(net::IOBuffer* buf,
int buf_len) {
CHECK(!buf_);
CHECK(buf);
DCHECK_GT(buf_len, 0);
if (status_ != net::OK)
return status_;
if (size_ && bytes_read_ == *size_) {
DCHECK(!IsEOF());
SetIsFinalChunk();
return net::OK;
}
int cache_read_len = ReadFromCacheIfNeeded(buf, buf_len);
if (cache_read_len > 0)
return cache_read_len;
if (!handle_watcher_.IsWatching()) {
handle_watcher_.Watch(
data_pipe_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
base::BindRepeating(&ChunkedDataPipeUploadDataStream::OnHandleReadable,
base::Unretained(this)));
}
size_t num_bytes = base::checked_cast<size_t>(buf_len);
if (size_ && num_bytes > *size_ - bytes_read_)
num_bytes = *size_ - bytes_read_;
MojoResult rv = data_pipe_->ReadData(MOJO_READ_DATA_FLAG_NONE,
buf->first(num_bytes), num_bytes);
if (rv == MOJO_RESULT_OK) {
bytes_read_ += num_bytes;
if (size_ && *size_ == bytes_read_)
SetIsFinalChunk();
WriteToCacheIfNeeded(buf, num_bytes);
return num_bytes;
}
if (rv == MOJO_RESULT_SHOULD_WAIT) {
handle_watcher_.ArmOrNotify();
buf_ = buf;
buf_len_ = buf_len;
return net::ERR_IO_PENDING;
}
if (!size_) {
buf_ = buf;
buf_len_ = buf_len;
handle_watcher_.Cancel();
data_pipe_.reset();
return net::ERR_IO_PENDING;
}
DCHECK_LT(bytes_read_, *size_);
return net::ERR_FAILED;
}
void ChunkedDataPipeUploadDataStream::ResetInternal() {
buf_ = nullptr;
buf_len_ = 0;
handle_watcher_.Cancel();
bytes_read_ = 0;
if (cache_state_ != CacheState::kDisabled)
return;
data_pipe_.reset();
}
void ChunkedDataPipeUploadDataStream::OnSizeReceived(int32_t status,
uint64_t size) {
DCHECK(!size_);
DCHECK_EQ(net::OK, status_);
status_ = status;
if (status == net::OK) {
size_ = size;
if (size == bytes_read_) {
if (buf_)
SetIsFinalChunk();
} else if (size < bytes_read_ || (buf_ && !data_pipe_.is_valid())) {
status_ = net::ERR_FAILED;
}
}
if (buf_ && (IsEOF() || status_ != net::OK)) {
handle_watcher_.Cancel();
data_pipe_.reset();
buf_ = nullptr;
buf_len_ = 0;
chunked_data_pipe_getter_.reset();
if (status_ < net::ERR_IO_PENDING) {
LOG(ERROR) << "OnSizeReceived failed with Error: " << status_;
}
OnReadCompleted(status_);
}
}
void ChunkedDataPipeUploadDataStream::OnHandleReadable(MojoResult result) {
CHECK(buf_);
scoped_refptr<net::IOBuffer> buf(std::move(buf_));
int buf_len = buf_len_;
buf_len_ = 0;
int rv = ReadInternal(buf.get(), buf_len);
if (rv != net::ERR_IO_PENDING) {
if (rv < net::ERR_IO_PENDING) {
LOG(ERROR) << "OnHandleReadable failed with Error: " << rv;
}
OnReadCompleted(rv);
}
}
void ChunkedDataPipeUploadDataStream::OnDataPipeGetterClosed() {
if (status_ == net::OK && !size_)
OnSizeReceived(net::ERR_FAILED, 0);
}
void ChunkedDataPipeUploadDataStream::EnableCache(size_t dst_window_size) {
DCHECK_EQ(bytes_read_, 0u);
DCHECK_EQ(cache_state_, CacheState::kDisabled);
DCHECK(cache_.empty());
cache_state_ = CacheState::kActive;
dst_window_size_ = dst_window_size;
}
void ChunkedDataPipeUploadDataStream::WriteToCacheIfNeeded(net::IOBuffer* buf,
size_t num_bytes) {
if (cache_state_ != CacheState::kActive)
return;
if (cache_.size() >= bytes_read_)
return;
if (cache_.size() >= dst_window_size_) {
cache_state_ = CacheState::kExhausted;
return;
}
auto to_write = buf->first(num_bytes);
cache_.insert(cache_.end(), to_write.begin(), to_write.end());
}
int ChunkedDataPipeUploadDataStream::ReadFromCacheIfNeeded(net::IOBuffer* buf,
int buf_len) {
if (cache_state_ != CacheState::kActive)
return 0;
if (cache_.size() <= bytes_read_)
return 0;
int read_size =
std::min(static_cast<int>(cache_.size() - bytes_read_), buf_len);
buf->span().copy_prefix_from(base::as_byte_span(cache_).subspan(
bytes_read_, base::checked_cast<size_t>(read_size)));
bytes_read_ += read_size;
return read_size;
}
}