#include "base/message_loop/message_pump_fuchsia.h"
#include <lib/async-loop/cpp/loop.h>
#include <lib/async-loop/default.h>
#include <lib/fdio/io.h>
#include <lib/fdio/unsafe.h>
#include <lib/zx/time.h>
#include <zircon/errors.h>
#include "base/auto_reset.h"
#include "base/check.h"
#include "base/fuchsia/fuchsia_logging.h"
#include "base/logging.h"
#include "base/notreached.h"
#include "base/trace_event/trace_event.h"
namespace base {
MessagePumpFuchsia::ZxHandleWatchController::ZxHandleWatchController(
const Location& from_here)
: async_wait_t({}), created_from_location_(from_here) {}
MessagePumpFuchsia::ZxHandleWatchController::~ZxHandleWatchController() {
const bool success = StopWatchingZxHandle();
CHECK(success);
}
bool MessagePumpFuchsia::ZxHandleWatchController::WaitBegin() {
DCHECK(!handler);
async_wait_t::handler = &HandleSignal;
zx_status_t status =
async_begin_wait(weak_pump_->async_loop_->dispatcher(), this);
if (status != ZX_OK) {
ZX_DLOG(ERROR, status) << "async_begin_wait():"
<< created_from_location_.ToString();
async_wait_t::handler = nullptr;
return false;
}
return true;
}
bool MessagePumpFuchsia::ZxHandleWatchController::StopWatchingZxHandle() {
if (was_stopped_) {
DCHECK(!*was_stopped_);
*was_stopped_ = true;
was_stopped_ = nullptr;
}
if (!weak_pump_) {
return true;
}
if (!is_active()) {
return true;
}
async_wait_t::handler = nullptr;
zx_status_t result =
async_cancel_wait(weak_pump_->async_loop_->dispatcher(), this);
ZX_DLOG_IF(ERROR, result != ZX_OK, result)
<< "async_cancel_wait(): " << created_from_location_.ToString();
return result == ZX_OK;
}
void MessagePumpFuchsia::ZxHandleWatchController::HandleSignal(
async_dispatcher_t* async,
async_wait_t* wait,
zx_status_t status,
const zx_packet_signal_t* signal) {
ZxHandleWatchController* controller =
static_cast<ZxHandleWatchController*>(wait);
DCHECK_EQ(controller->handler, &HandleSignal);
Delegate::ScopedDoWorkItem scoped_do_work_item;
if (controller->weak_pump_ && controller->weak_pump_->run_state_) {
scoped_do_work_item =
controller->weak_pump_->run_state_->delegate->BeginWorkItem();
}
TRACE_EVENT0("toplevel", "ZxHandleSignal");
if (status != ZX_OK) {
ZX_DLOG(WARNING, status) << "async wait failed: "
<< controller->created_from_location_.ToString();
return;
}
controller->handler = nullptr;
bool was_stopped = false;
controller->was_stopped_ = &was_stopped;
controller->watcher_->OnZxHandleSignalled(wait->object, signal->observed);
if (was_stopped) {
return;
}
controller->was_stopped_ = nullptr;
if (controller->persistent_) {
controller->WaitBegin();
}
}
void MessagePumpFuchsia::FdWatchController::OnZxHandleSignalled(
zx_handle_t handle,
zx_signals_t signals) {
uint32_t events;
fdio_unsafe_wait_end(io_, signals, &events);
const uint32_t desired_events = desired_events_ | FDIO_EVT_ERROR;
const uint32_t filtered_events = events & desired_events;
DCHECK_NE(filtered_events, 0u) << events << " & " << desired_events;
bool* was_stopped = was_stopped_;
if (filtered_events & FDIO_EVT_WRITABLE) {
watcher_->OnFileCanWriteWithoutBlocking(fd_);
}
if (!*was_stopped && (filtered_events & FDIO_EVT_READABLE)) {
watcher_->OnFileCanReadWithoutBlocking(fd_);
}
}
MessagePumpFuchsia::FdWatchController::FdWatchController(
const Location& from_here)
: FdWatchControllerInterface(from_here),
ZxHandleWatchController(from_here) {}
MessagePumpFuchsia::FdWatchController::~FdWatchController() {
const bool success = StopWatchingFileDescriptor();
CHECK(success);
}
bool MessagePumpFuchsia::FdWatchController::WaitBegin() {
fdio_unsafe_wait_begin(io_, desired_events_, &object, &trigger);
if (async_wait_t::object == ZX_HANDLE_INVALID) {
DLOG(ERROR) << "fdio_wait_begin failed: "
<< ZxHandleWatchController::created_from_location_.ToString();
return false;
}
return MessagePumpFuchsia::ZxHandleWatchController::WaitBegin();
}
bool MessagePumpFuchsia::FdWatchController::StopWatchingFileDescriptor() {
bool success = StopWatchingZxHandle();
if (io_) {
fdio_unsafe_release(io_);
io_ = nullptr;
}
return success;
}
MessagePumpFuchsia::MessagePumpFuchsia()
: async_loop_(new async::Loop(&kAsyncLoopConfigAttachToCurrentThread)),
weak_factory_(this) {}
MessagePumpFuchsia::~MessagePumpFuchsia() = default;
bool MessagePumpFuchsia::WatchFileDescriptor(int fd,
bool persistent,
int mode,
FdWatchController* controller,
FdWatcher* delegate) {
DCHECK_GE(fd, 0);
DCHECK(controller);
DCHECK(delegate);
const bool success = controller->StopWatchingFileDescriptor();
CHECK(success);
controller->fd_ = fd;
controller->watcher_ = delegate;
DCHECK(!controller->io_);
controller->io_ = fdio_unsafe_fd_to_io(fd);
if (!controller->io_) {
DLOG(ERROR) << "Failed to get IO for FD";
return false;
}
switch (mode) {
case WATCH_READ:
controller->desired_events_ = FDIO_EVT_READABLE;
break;
case WATCH_WRITE:
controller->desired_events_ = FDIO_EVT_WRITABLE;
break;
case WATCH_READ_WRITE:
controller->desired_events_ = FDIO_EVT_READABLE | FDIO_EVT_WRITABLE;
break;
default:
NOTREACHED() << "unexpected mode: " << mode;
}
return WatchZxHandle(ZX_HANDLE_INVALID, persistent, 1, controller,
controller);
}
bool MessagePumpFuchsia::WatchZxHandle(zx_handle_t handle,
bool persistent,
zx_signals_t signals,
ZxHandleWatchController* controller,
ZxHandleWatcher* delegate) {
DCHECK_NE(0u, signals);
DCHECK(controller);
DCHECK(delegate);
DCHECK(handle == ZX_HANDLE_INVALID || !controller->is_active() ||
handle == controller->async_wait_t::object);
const bool success = controller->StopWatchingZxHandle();
CHECK(success);
controller->async_wait_t::object = handle;
controller->persistent_ = persistent;
controller->async_wait_t::trigger = signals;
controller->watcher_ = delegate;
controller->weak_pump_ = weak_factory_.GetWeakPtr();
return controller->WaitBegin();
}
bool MessagePumpFuchsia::HandleIoEventsUntil(zx_time_t deadline) {
zx_status_t status = async_loop_->Run(zx::time(deadline), true);
switch (status) {
case ZX_OK:
return true;
case ZX_ERR_CANCELED:
async_loop_->ResetQuit();
return true;
case ZX_ERR_TIMED_OUT:
return false;
default:
ZX_DLOG(FATAL, status) << "unexpected wait status";
return false;
}
}
void MessagePumpFuchsia::Run(Delegate* delegate) {
RunState run_state(delegate);
AutoReset<RunState*> auto_reset_run_state(&run_state_, &run_state);
for (;;) {
const Delegate::NextWorkInfo next_work_info = delegate->DoWork();
if (run_state.should_quit) {
break;
}
const bool did_handle_io_event = HandleIoEventsUntil(0);
if (run_state.should_quit) {
break;
}
bool attempt_more_work =
next_work_info.is_immediate() || did_handle_io_event;
if (attempt_more_work) {
continue;
}
delegate->DoIdleWork();
if (run_state.should_quit) {
break;
}
delegate->BeforeWait();
zx_time_t deadline = next_work_info.delayed_run_time.is_max()
? ZX_TIME_INFINITE
: next_work_info.delayed_run_time.ToZxTime();
HandleIoEventsUntil(deadline);
}
}
void MessagePumpFuchsia::Quit() {
CHECK(run_state_);
run_state_->should_quit = true;
}
void MessagePumpFuchsia::ScheduleWork() {
async_loop_->Quit();
}
void MessagePumpFuchsia::ScheduleDelayedWork(
const Delegate::NextWorkInfo& next_work_info) {
}
}