#include "chromeos/process_proxy/process_output_watcher.h"
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <cstdio>
#include <cstring>
#include "base/compiler_specific.h"
#include "base/functional/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/posix/eintr_wrapper.h"
#include "base/task/single_thread_task_runner.h"
#include "base/third_party/icu/icu_utf.h"
namespace {
constexpr int kAckWindow = 30;
size_t UTF8SizeFromLeadingByte(uint8_t leading_byte) {
size_t byte_count = 0;
uint8_t mask = 1 << 7;
uint8_t error_mask = 1 << (7 - CBU8_MAX_LENGTH);
while (leading_byte & mask) {
if (mask & error_mask)
return 1;
mask >>= 1;
++byte_count;
}
return byte_count ? byte_count : 1;
}
}
namespace chromeos {
ProcessOutputWatcher::ProcessOutputWatcher(
int out_fd,
const ProcessOutputCallback& callback)
: read_buffer_size_(0),
process_output_file_(out_fd),
on_read_callback_(callback) {
CHECK_GE(out_fd, 0);
read_buffer_capacity_ = std::size(read_buffer_) - 1;
}
ProcessOutputWatcher::~ProcessOutputWatcher() = default;
void ProcessOutputWatcher::Start() {
WatchProcessOutput();
}
void ProcessOutputWatcher::OnProcessOutputCanReadWithoutBlocking() {
output_file_watcher_.reset();
ReadFromFd(process_output_file_.GetPlatformFile());
}
void ProcessOutputWatcher::WatchProcessOutput() {
output_file_watcher_ = base::FileDescriptorWatcher::WatchReadable(
process_output_file_.GetPlatformFile(),
base::BindRepeating(
&ProcessOutputWatcher::OnProcessOutputCanReadWithoutBlocking,
base::Unretained(this)));
}
void ProcessOutputWatcher::ReadFromFd(int fd) {
DCHECK_GT(read_buffer_capacity_, read_buffer_size_);
ssize_t bytes_read = UNSAFE_TODO(
HANDLE_EINTR(read(fd, &read_buffer_[read_buffer_size_],
read_buffer_capacity_ - read_buffer_size_)));
if (bytes_read > 0) {
ReportOutput(PROCESS_OUTPUT_TYPE_OUT, bytes_read);
return;
}
if (bytes_read < 0)
DPLOG(WARNING) << "read from buffer failed";
on_read_callback_.Run(PROCESS_OUTPUT_TYPE_EXIT, "");
}
size_t ProcessOutputWatcher::OutputSizeWithoutIncompleteUTF8() {
int last_lead_byte = read_buffer_size_ - 1;
while (true) {
if (read_buffer_size_ - last_lead_byte > CBU8_MAX_LENGTH)
return read_buffer_size_;
if (last_lead_byte < 0)
return read_buffer_size_;
if (!UNSAFE_TODO(CBU8_IS_TRAIL(read_buffer_[last_lead_byte]))) {
break;
}
--last_lead_byte;
}
size_t last_length =
UTF8SizeFromLeadingByte(UNSAFE_TODO(read_buffer_[last_lead_byte]));
if (!last_length || last_length + last_lead_byte <= read_buffer_size_)
return read_buffer_size_;
return last_lead_byte;
}
void ProcessOutputWatcher::ReportOutput(ProcessOutputType type,
size_t new_bytes_count) {
read_buffer_size_ += new_bytes_count;
size_t output_to_report = OutputSizeWithoutIncompleteUTF8();
on_read_callback_.Run(type, std::string(read_buffer_, output_to_report));
if (output_to_report < read_buffer_size_) {
for (size_t i = output_to_report; i < read_buffer_size_; ++i) {
UNSAFE_TODO(read_buffer_[i - output_to_report]) =
UNSAFE_TODO(read_buffer_[i]);
}
}
read_buffer_size_ -= output_to_report;
if (++unacked_outputs_ <= kAckWindow) {
WatchProcessOutput();
}
}
void ProcessOutputWatcher::AckOutput() {
if (--unacked_outputs_ == kAckWindow) {
WatchProcessOutput();
}
}
}