#ifdef UNSAFE_BUFFERS_BUILD
#pragma allow_unsafe_buffers
#endif
#include "mojo/public/c/system/data_pipe.h"
#include <stddef.h>
#include <stdint.h>
#include <array>
#include <memory>
#include "base/check_op.h"
#include "base/containers/span.h"
#include "base/functional/bind.h"
#include "base/location.h"
#include "base/numerics/safe_conversions.h"
#include "base/run_loop.h"
#include "base/task/sequenced_task_runner.h"
#include "base/test/bind.h"
#include "base/test/task_environment.h"
#include "build/build_config.h"
#include "mojo/core/embedder/embedder.h"
#include "mojo/core/test/mojo_test_base.h"
#include "mojo/public/c/system/functions.h"
#include "mojo/public/c/system/message_pipe.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/message_pipe.h"
#include "mojo/public/cpp/system/simple_watcher.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
namespace core {
namespace {
const uint32_t kSizeOfOptions =
static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
const size_t kMaxPoll = 100;
const size_t kMultiprocessCapacity = 37;
const char kMultiprocessTestData[] = "hello i'm a string that is 36 bytes";
const int kMultiprocessMaxIter = 5;
constexpr size_t kOversizedCapacity = std::numeric_limits<uint32_t>::max();
base::TimeDelta EpsilonDeadline() {
const int64_t tiny_timeout = TestTimeouts::tiny_timeout().InMicroseconds();
#if BUILDFLAG(IS_WIN) || BUILDFLAG(IS_ANDROID)
const int64_t deadline = (tiny_timeout * 3) / 10;
#else
const int64_t deadline = (tiny_timeout * 2) / 10;
#endif
return base::Microseconds(deadline);
}
class DataPipeTest : public test::MojoTestBase {
public:
DataPipeTest()
: producer_(MOJO_HANDLE_INVALID), consumer_(MOJO_HANDLE_INVALID) {}
DataPipeTest(const DataPipeTest&) = delete;
DataPipeTest& operator=(const DataPipeTest&) = delete;
~DataPipeTest() override {
if (producer_ != MOJO_HANDLE_INVALID)
CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_));
if (consumer_ != MOJO_HANDLE_INVALID)
CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_));
}
MojoResult ReadEmptyMessageWithHandles(MojoHandle pipe,
MojoHandle* out_handles,
uint32_t num_handles) {
std::vector<uint8_t> bytes;
std::vector<ScopedHandle> handles;
MojoResult rv = ReadMessageRaw(MessagePipeHandle(pipe), &bytes, &handles,
MOJO_READ_MESSAGE_FLAG_NONE);
if (rv == MOJO_RESULT_OK) {
CHECK_EQ(0u, bytes.size());
CHECK_EQ(num_handles, handles.size());
for (size_t i = 0; i < num_handles; ++i)
out_handles[i] = handles[i].release().value();
}
return rv;
}
MojoResult Create(const MojoCreateDataPipeOptions* options) {
return MojoCreateDataPipe(options, &producer_, &consumer_);
}
MojoResult WriteData(const void* elements,
uint32_t* num_bytes,
bool all_or_none = false) {
MojoWriteDataOptions options;
options.struct_size = sizeof(options);
options.flags = all_or_none ? MOJO_WRITE_DATA_FLAG_ALL_OR_NONE
: MOJO_WRITE_DATA_FLAG_NONE;
return MojoWriteData(producer_, elements, num_bytes, &options);
}
MojoResult ReadData(void* elements,
uint32_t* num_bytes,
bool all_or_none = false,
bool peek = false) {
MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
if (all_or_none)
flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
if (peek)
flags |= MOJO_READ_DATA_FLAG_PEEK;
MojoReadDataOptions options;
options.struct_size = sizeof(options);
options.flags = flags;
return MojoReadData(consumer_, &options, elements, num_bytes);
}
MojoResult QueryData(uint32_t* num_bytes) {
MojoReadDataOptions options;
options.struct_size = sizeof(options);
options.flags = MOJO_READ_DATA_FLAG_QUERY;
return MojoReadData(consumer_, &options, nullptr, num_bytes);
}
MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) {
MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD;
if (all_or_none)
flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
MojoReadDataOptions options;
options.struct_size = sizeof(options);
options.flags = flags;
return MojoReadData(consumer_, &options, nullptr, num_bytes);
}
MojoResult BeginReadData(const void** elements, uint32_t* num_bytes) {
return MojoBeginReadData(consumer_, nullptr, elements, num_bytes);
}
MojoResult EndReadData(uint32_t num_bytes_read) {
return MojoEndReadData(consumer_, num_bytes_read, nullptr);
}
MojoResult BeginWriteData(void** elements, uint32_t* num_bytes) {
return MojoBeginWriteData(producer_, nullptr, elements, num_bytes);
}
MojoResult EndWriteData(uint32_t num_bytes_written) {
return MojoEndWriteData(producer_, num_bytes_written, nullptr);
}
MojoResult CloseProducer() {
MojoResult rv = MojoClose(producer_);
producer_ = MOJO_HANDLE_INVALID;
return rv;
}
MojoResult CloseConsumer() {
MojoResult rv = MojoClose(consumer_);
consumer_ = MOJO_HANDLE_INVALID;
return rv;
}
MojoHandle producer_, consumer_;
};
TEST_F(DataPipeTest, Basic) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
1000 * sizeof(int32_t)
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
int32_t elements[10] = {};
uint32_t num_bytes = 0;
num_bytes = static_cast<uint32_t>(std::size(elements) * sizeof(elements[0]));
elements[0] = 123;
elements[1] = 456;
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes));
MojoHandleSignalsState state;
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &state));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
state.satisfied_signals);
elements[0] = -1;
elements[1] = -1;
ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes));
ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
ASSERT_EQ(elements[0], 123);
ASSERT_EQ(elements[1], 456);
}
TEST_F(DataPipeTest, CreateAndMaybeTransfer) {
auto test_options = std::to_array<MojoCreateDataPipeOptions>({
{},
{kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1,
1000},
{kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
4,
4000},
{kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
100,
0}
,
});
for (size_t i = 0; i < std::size(test_options); i++) {
MojoHandle producer_handle, consumer_handle;
MojoCreateDataPipeOptions* options = i ? &test_options[i] : nullptr;
ASSERT_EQ(MOJO_RESULT_OK,
MojoCreateDataPipe(options, &producer_handle, &consumer_handle));
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle));
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle));
}
}
TEST_F(DataPipeTest, SimpleReadWrite) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
1000 * sizeof(int32_t)
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
int32_t elements[10] = {};
uint32_t num_bytes = 0;
num_bytes = static_cast<uint32_t>(std::size(elements) * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes));
num_bytes = 0;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(0u, num_bytes);
num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes));
num_bytes = sizeof(elements[0]) + 1;
ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes));
elements[0] = 123;
elements[1] = 456;
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
ASSERT_EQ(2u * sizeof(elements[0]), num_bytes);
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
num_bytes = 0;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(2 * sizeof(elements[0]), num_bytes);
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes));
ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
ASSERT_EQ(123, elements[0]);
ASSERT_EQ(-1, elements[1]);
num_bytes = 0;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true));
ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
ASSERT_EQ(456, elements[0]);
ASSERT_EQ(-1, elements[1]);
num_bytes = 0;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
ReadData(elements, &num_bytes, true, false));
ASSERT_EQ(-1, elements[0]);
ASSERT_EQ(-1, elements[1]);
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false));
ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
ASSERT_EQ(456, elements[0]);
ASSERT_EQ(-1, elements[1]);
num_bytes = 0;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(0u, num_bytes);
}
TEST_F(DataPipeTest, BasicProducerWaiting) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
2 * sizeof(int32_t)
};
Create(&options);
MojoHandleSignalsState hss;
hss = GetSignalsState(producer_);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
int32_t elements[2] = {123, 456};
uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
ASSERT_EQ(123, elements[0]);
ASSERT_EQ(-1, elements[1]);
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false));
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
ASSERT_EQ(123, elements[0]);
ASSERT_EQ(-1, elements[1]);
void* buffer = nullptr;
num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
EXPECT_TRUE(buffer);
ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
static_cast<int32_t*>(buffer)[0] = 789;
ASSERT_EQ(MOJO_RESULT_OK,
EndWriteData(static_cast<uint32_t>(1u * sizeof(elements[0]))));
const void* read_buffer = nullptr;
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
EXPECT_TRUE(read_buffer);
ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
ASSERT_EQ(MOJO_RESULT_OK,
EndReadData(static_cast<uint32_t>(1u * sizeof(elements[0]))));
elements[0] = 123;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
CloseConsumer();
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
}
TEST_F(DataPipeTest, PeerClosedProducerWaiting) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
2 * sizeof(int32_t)
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
CloseConsumer();
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
}
TEST_F(DataPipeTest, PeerClosedConsumerWaiting) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
2 * sizeof(int32_t)
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
CloseProducer();
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
}
TEST_F(DataPipeTest, BasicConsumerWaiting) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
1000 * sizeof(int32_t)
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
EXPECT_EQ(0u, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
int32_t elements[2] = {123, 456};
uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
ASSERT_EQ(456, elements[0]);
ASSERT_EQ(-1, elements[1]);
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
ASSERT_EQ(456, elements[0]);
ASSERT_EQ(-1, elements[1]);
elements[0] = 789;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
CloseProducer();
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_TRUE(hss.satisfied_signals & (MOJO_HANDLE_SIGNAL_READABLE |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfiable_signals);
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfiable_signals);
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
ASSERT_EQ(789, elements[0]);
ASSERT_EQ(-1, elements[1]);
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
}
TEST_F(DataPipeTest, ConsumerNewDataReadable) {
const MojoCreateDataPipeOptions create_options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
1000 * sizeof(int32_t)
};
EXPECT_EQ(MOJO_RESULT_OK, Create(&create_options));
int32_t elements[2] = {123, 456};
uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
EXPECT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE));
EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals &
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
int32_t read_elements[6];
uint32_t num_read_bytes = sizeof(read_elements);
MojoReadDataOptions read_options;
read_options.struct_size = sizeof(read_options);
read_options.flags = MOJO_READ_DATA_FLAG_ALL_OR_NONE;
EXPECT_EQ(
MOJO_RESULT_OUT_OF_RANGE,
MojoReadData(consumer_, &read_options, read_elements, &num_read_bytes));
EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals &
MOJO_HANDLE_SIGNAL_READABLE);
EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
EXPECT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE));
EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(consumer_, &read_options,
read_elements, &num_read_bytes));
EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
MOJO_HANDLE_SIGNAL_READABLE);
EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
}
TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
1000 * sizeof(int32_t)
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
int32_t* elements = nullptr;
void* buffer = nullptr;
uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
EXPECT_TRUE(buffer);
EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
elements = static_cast<int32_t*>(buffer);
elements[0] = 123;
elements[1] = 456;
ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0])));
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
const void* read_buffer = nullptr;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
EXPECT_TRUE(read_buffer);
ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
ASSERT_EQ(123, read_elements[0]);
ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
read_buffer = nullptr;
num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
EXPECT_TRUE(read_buffer);
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
read_elements = static_cast<const int32_t*>(read_buffer);
ASSERT_EQ(456, read_elements[0]);
ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
CloseProducer();
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
}
TEST_F(DataPipeTest, BasicTwoPhaseWaiting) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
1000 * sizeof(int32_t)
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
hss = GetSignalsState(producer_);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
void* write_ptr = nullptr;
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
EXPECT_TRUE(write_ptr);
EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
hss = GetSignalsState(consumer_);
ASSERT_EQ(0u, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
static_cast<int32_t*>(write_ptr)[0] = 123;
ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t)));
hss = GetSignalsState(producer_);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
write_ptr = nullptr;
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
EXPECT_TRUE(write_ptr);
EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u));
num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
const void* read_ptr = nullptr;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
EXPECT_TRUE(read_ptr);
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
hss = GetSignalsState(producer_);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u));
hss = GetSignalsState(consumer_);
ASSERT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
}
void Seq(int32_t start, size_t count, int32_t* out) {
for (size_t i = 0; i < count; i++)
out[i] = start + static_cast<int32_t>(i);
}
TEST_F(DataPipeTest, AllOrNone) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
10 * sizeof(int32_t)
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
uint32_t num_bytes = 20u * sizeof(int32_t);
int32_t buffer[100];
Seq(0, std::size(buffer), buffer);
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
num_bytes = ~0u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(0u, num_bytes);
num_bytes = 5u * sizeof(int32_t);
Seq(100, std::size(buffer), buffer);
ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
num_bytes = 6u * sizeof(int32_t);
Seq(200, std::size(buffer), buffer);
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
num_bytes = 11u * sizeof(int32_t);
memset(buffer, 0xab, sizeof(buffer));
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
int32_t expected_buffer[100];
memset(expected_buffer, 0xab, sizeof(expected_buffer));
ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
num_bytes = 11u * sizeof(int32_t);
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
num_bytes = 2u * sizeof(int32_t);
Seq(300, std::size(buffer), buffer);
ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
num_bytes = 3u * sizeof(int32_t);
Seq(400, std::size(buffer), buffer);
ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
for (size_t i = 0; i < kMaxPoll; i++) {
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
if (num_bytes >= 10u * sizeof(int32_t))
break;
base::PlatformThread::Sleep(EpsilonDeadline());
}
ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
num_bytes = 5u * sizeof(int32_t);
memset(buffer, 0xab, sizeof(buffer));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
memset(expected_buffer, 0xab, sizeof(expected_buffer));
Seq(100, 5, expected_buffer);
ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
num_bytes = 6u * sizeof(int32_t);
memset(buffer, 0xab, sizeof(buffer));
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
memset(expected_buffer, 0xab, sizeof(expected_buffer));
ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
num_bytes = 6u * sizeof(int32_t);
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
num_bytes = 2u * sizeof(int32_t);
ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
CloseProducer();
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED);
EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED);
num_bytes = 4u * sizeof(int32_t);
memset(buffer, 0xab, sizeof(buffer));
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
ReadData(buffer, &num_bytes, true));
memset(expected_buffer, 0xab, sizeof(expected_buffer));
ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
num_bytes = 4u * sizeof(int32_t);
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true));
num_bytes = 2u * sizeof(int32_t);
memset(buffer, 0xab, sizeof(buffer));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
memset(expected_buffer, 0xab, sizeof(expected_buffer));
Seq(400, 2, expected_buffer);
ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
num_bytes = 1u * sizeof(int32_t);
ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
num_bytes = ~0u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(0u, num_bytes);
}
TEST_F(DataPipeTest, WrapAround) {
if (IsMojoIpczEnabled()) {
GTEST_SKIP() << "This test covers implementation details that are only "
<< "relevant with MojoIpcz disabled; namely that a data pipe "
<< "is backed by a circular ring buffer.";
}
std::array<unsigned char, 1000> test_data;
for (size_t i = 0; i < std::size(test_data); i++)
test_data[i] = static_cast<unsigned char>(i);
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1u,
100u
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
uint32_t num_bytes = 20u;
ASSERT_EQ(MOJO_RESULT_OK, WriteData(&test_data[0], &num_bytes, true));
ASSERT_EQ(20u, num_bytes);
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
unsigned char read_buffer[1000] = {};
num_bytes = 10u;
ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes, true));
ASSERT_EQ(10u, num_bytes);
ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
void* write_buffer_ptr = nullptr;
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
EXPECT_TRUE(write_buffer_ptr);
ASSERT_EQ(80u, num_bytes);
ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0));
size_t total_num_bytes = 0;
while (total_num_bytes < 90) {
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
ASSERT_EQ(hss.satisfied_signals, MOJO_HANDLE_SIGNAL_WRITABLE);
ASSERT_EQ(hss.satisfiable_signals, MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_PEER_REMOTE);
num_bytes = 100;
ASSERT_EQ(MOJO_RESULT_OK,
WriteData(&test_data[20 + total_num_bytes], &num_bytes, false));
total_num_bytes += num_bytes;
}
ASSERT_EQ(90u, total_num_bytes);
num_bytes = 0;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(100u, num_bytes);
const void* read_buffer_ptr = nullptr;
num_bytes = 0;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
EXPECT_TRUE(read_buffer_ptr);
ASSERT_EQ(90u, num_bytes);
ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0));
num_bytes =
static_cast<uint32_t>(std::size(read_buffer) * sizeof(read_buffer[0]));
memset(read_buffer, 0, num_bytes);
ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes));
ASSERT_EQ(100u, num_bytes);
ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
}
TEST_F(DataPipeTest, WriteCloseProducerRead) {
const char kTestData[] = "hello world";
const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1u,
1000u
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
uint32_t num_bytes = kTestDataSize;
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
ASSERT_EQ(kTestDataSize, num_bytes);
num_bytes = kTestDataSize;
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
ASSERT_EQ(kTestDataSize, num_bytes);
void* write_buffer_ptr = nullptr;
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
EXPECT_TRUE(write_buffer_ptr);
EXPECT_GT(num_bytes, 0u);
for (size_t i = 0; i < kMaxPoll; i++) {
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
if (num_bytes >= 2u * kTestDataSize)
break;
base::PlatformThread::Sleep(EpsilonDeadline());
}
ASSERT_GE(num_bytes, kTestDataSize);
const void* read_buffer_ptr = nullptr;
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
EXPECT_TRUE(read_buffer_ptr);
ASSERT_GE(num_bytes, kTestDataSize);
CloseProducer();
ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize));
read_buffer_ptr = nullptr;
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
EXPECT_TRUE(read_buffer_ptr);
ASSERT_EQ(kTestDataSize, num_bytes);
}
TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) {
const char kTestData[] = "hello world";
const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1u,
1000u
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
uint32_t num_bytes = kTestDataSize;
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
ASSERT_EQ(kTestDataSize, num_bytes);
void* write_buffer_ptr = nullptr;
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
EXPECT_TRUE(write_buffer_ptr);
ASSERT_GT(num_bytes, kTestDataSize);
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
const void* read_buffer_ptr = nullptr;
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
EXPECT_TRUE(read_buffer_ptr);
ASSERT_EQ(kTestDataSize, num_bytes);
CloseConsumer();
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
memcpy(write_buffer_ptr, kTestData, kTestDataSize);
ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize));
num_bytes = kTestDataSize;
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes));
write_buffer_ptr = nullptr;
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
BeginWriteData(&write_buffer_ptr, &num_bytes));
}
TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) {
const uint32_t kTestDataSize = 15u;
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1u,
1000u
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
void* write_buffer_ptr = nullptr;
uint32_t num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
EXPECT_TRUE(write_buffer_ptr);
ASSERT_GT(num_bytes, kTestDataSize);
}
TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
const char kTestData[] = "hello world";
const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1u,
1000u
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
uint32_t num_bytes = kTestDataSize;
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
ASSERT_EQ(kTestDataSize, num_bytes);
CloseProducer();
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfiable_signals);
char buffer[1000];
num_bytes = static_cast<uint32_t>(sizeof(buffer));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true));
ASSERT_EQ(kTestDataSize, num_bytes);
ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
memset(buffer, 0, 1000);
num_bytes = static_cast<uint32_t>(sizeof(buffer));
ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes));
ASSERT_EQ(kTestDataSize, num_bytes);
ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
num_bytes = static_cast<uint32_t>(sizeof(buffer));
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes));
const void* read_buffer_ptr = nullptr;
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
BeginReadData(&read_buffer_ptr, &num_bytes));
num_bytes = 10u;
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
}
TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) {
const char kTestData[] = "hello world";
const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1u,
1000u
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
uint32_t num_bytes = kTestDataSize;
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
ASSERT_EQ(kTestDataSize, num_bytes);
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
const void* read_buffer_ptr = nullptr;
uint32_t read_buffer_size = 0u;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size));
const char kExtraData[] = "bye world";
const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
num_bytes = kExtraDataSize;
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
ASSERT_EQ(kExtraDataSize, num_bytes);
CloseProducer();
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfiable_signals);
ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
EndReadData(read_buffer_size);
}
TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
10 * sizeof(int32_t)
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
uint32_t num_bytes = 1000u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(0u, num_bytes);
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
EndWriteData(1u * sizeof(int32_t)));
base::PlatformThread::Sleep(EpsilonDeadline());
num_bytes = 1000u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(0u, num_bytes);
num_bytes = 0u;
void* write_ptr = nullptr;
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
base::PlatformThread::Sleep(EpsilonDeadline());
num_bytes = 1000u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(0u, num_bytes);
num_bytes = 0u;
write_ptr = nullptr;
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
EXPECT_GE(num_bytes, 1u);
ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u));
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
base::PlatformThread::Sleep(EpsilonDeadline());
num_bytes = 1000u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(0u, num_bytes);
int32_t element = 123;
num_bytes = 1u * sizeof(int32_t);
ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes));
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t)));
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
num_bytes = 0u;
const void* read_ptr = nullptr;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
num_bytes = 0u;
read_ptr = nullptr;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u));
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
}
TEST_F(DataPipeTest, SendProducer) {
const char kTestData[] = "hello world";
const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1u,
1000u
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
MojoHandleSignalsState hss;
uint32_t num_bytes = kTestDataSize;
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
ASSERT_EQ(kTestDataSize, num_bytes);
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
const void* read_buffer = nullptr;
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
ASSERT_EQ(0, memcmp(read_buffer, kTestData, kTestDataSize));
EndReadData(num_bytes);
MojoHandle pipe0, pipe1;
ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
ASSERT_EQ(MOJO_RESULT_OK,
WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &producer_, 1,
MOJO_WRITE_MESSAGE_FLAG_NONE));
producer_ = MOJO_HANDLE_INVALID;
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &hss));
ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &producer_, 1));
const char kExtraData[] = "bye world";
const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
num_bytes = kExtraDataSize;
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
ASSERT_EQ(kExtraDataSize, num_bytes);
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
ASSERT_EQ(0, memcmp(read_buffer, kExtraData, kExtraDataSize));
EndReadData(num_bytes);
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
}
TEST_F(DataPipeTest, ConsumerWithClosedProducerSent) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
static_cast<uint32_t>(sizeof(int32_t)),
1000 * sizeof(int32_t)
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
int32_t data = 123;
uint32_t num_bytes = sizeof(data);
ASSERT_EQ(MOJO_RESULT_OK, WriteData(&data, &num_bytes));
ASSERT_EQ(MOJO_RESULT_OK, CloseProducer());
MojoHandleSignalsState state;
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
state.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
state.satisfiable_signals);
MojoHandle pipe0, pipe1;
ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
ASSERT_EQ(MOJO_RESULT_OK,
WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &consumer_, 1,
MOJO_WRITE_MESSAGE_FLAG_NONE));
consumer_ = MOJO_HANDLE_INVALID;
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &state));
ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &consumer_, 1));
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
state.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
state.satisfiable_signals);
int32_t read_data;
ASSERT_EQ(MOJO_RESULT_OK, ReadData(&read_data, &num_bytes));
ASSERT_EQ(sizeof(read_data), num_bytes);
ASSERT_EQ(data, read_data);
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
}
bool WriteAllData(MojoHandle producer,
const void* elements,
uint32_t num_bytes) {
for (size_t i = 0; i < kMaxPoll; i++) {
uint32_t write_bytes = num_bytes;
MojoResult result =
MojoWriteData(producer, elements, &write_bytes, nullptr);
if (result == MOJO_RESULT_OK) {
num_bytes -= write_bytes;
elements = static_cast<const uint8_t*>(elements) + write_bytes;
if (num_bytes == 0)
return true;
} else {
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
}
MojoHandleSignalsState hss = MojoHandleSignalsState();
EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals(
producer, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_WRITABLE);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
hss.satisfiable_signals);
}
return false;
}
bool ReadAllData(MojoHandle consumer,
void* elements,
uint32_t num_bytes,
bool expect_empty) {
for (size_t i = 0; i < kMaxPoll; i++) {
uint32_t read_bytes = num_bytes;
MojoResult result = MojoReadData(consumer, nullptr, elements, &read_bytes);
if (result == MOJO_RESULT_OK) {
num_bytes -= read_bytes;
elements = static_cast<uint8_t*>(elements) + read_bytes;
if (num_bytes == 0) {
if (expect_empty) {
base::PlatformThread::Sleep(TestTimeouts::tiny_timeout());
MojoReadDataOptions options;
options.struct_size = sizeof(options);
options.flags = MOJO_READ_DATA_FLAG_QUERY;
MojoReadData(consumer, &options, nullptr, &num_bytes);
EXPECT_EQ(0u, num_bytes);
}
return true;
}
} else {
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
}
MojoHandleSignalsState hss = MojoHandleSignalsState();
EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals(
consumer, MOJO_HANDLE_SIGNAL_READABLE, &hss));
EXPECT_TRUE(MOJO_HANDLE_SIGNAL_READABLE & hss.satisfied_signals);
EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED);
}
return num_bytes == 0;
}
TEST_F(DataPipeTest, CreateOversized) {
if (IsMojoIpczEnabled()) {
GTEST_SKIP() << "Data pipes do not allocate dedicated capacity when "
<< "MojoIpcz is enabled, so capacity limits are not enforced "
<< "and therefore cannot be tested.";
}
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1,
kOversizedCapacity,
};
ASSERT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, Create(&options));
}
#if BUILDFLAG(USE_BLINK)
constexpr size_t kNoSpuriousEvents_NumIterations = 1000;
TEST_F(DataPipeTest, NoSpuriousEvents) {
RunTestClient("NoSpuriousEventsHost", [&](MojoHandle host) {
RunTestClient("NoSpuriousEventsClient", [&](MojoHandle client) {
MojoHandle host_to_client;
MojoHandle client_to_host;
MojoCreateMessagePipe(nullptr, &host_to_client, &client_to_host);
WriteMessageWithHandles(host, "x", &host_to_client, 1);
WriteMessageWithHandles(client, "x", &client_to_host, 1);
EXPECT_EQ("done", ReadMessage(client));
WriteMessage(client, "bye");
});
EXPECT_EQ("done", ReadMessage(host));
WriteMessage(host, "bye");
});
}
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(NoSpuriousEventsHost, DataPipeTest, parent) {
const std::vector<uint8_t> kData(512, 'x');
MojoHandle client;
EXPECT_EQ("x", ReadMessageWithHandles(parent, &client, 1));
for (size_t j = 0; j < kNoSpuriousEvents_NumIterations; ++j) {
ScopedDataPipeProducerHandle producer;
ScopedDataPipeConsumerHandle consumer;
CHECK_EQ(MOJO_RESULT_OK, mojo::CreateDataPipe(2048, producer, consumer));
MojoHandle ch = consumer.release().value();
WriteMessageWithHandles(client, "hi", &ch, 1);
for (size_t i = 0; i < 9; ++i) {
WaitForSignals(producer.get().value(), MOJO_HANDLE_SIGNAL_WRITABLE);
size_t bytes_written = 0;
producer->WriteData(base::as_byte_span(kData), MOJO_WRITE_DATA_FLAG_NONE,
bytes_written);
}
}
WriteMessage(parent, "done");
EXPECT_EQ("bye", ReadMessage(parent));
MojoClose(client);
MojoClose(parent);
}
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(NoSpuriousEventsClient,
DataPipeTest,
parent) {
base::test::TaskEnvironment task_environment;
MojoHandle host;
EXPECT_EQ("x", ReadMessageWithHandles(parent, &host, 1));
size_t num_spurious_events = 0;
for (size_t j = 0; j < kNoSpuriousEvents_NumIterations; ++j) {
MojoHandle ch;
ASSERT_EQ("hi", ReadMessageWithHandles(host, &ch, 1));
ScopedDataPipeConsumerHandle consumer(DataPipeConsumerHandle{ch});
SimpleWatcher watcher(FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL);
base::RunLoop loop;
watcher.Watch(consumer.get(), MOJO_HANDLE_SIGNAL_READABLE,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindLambdaForTesting(
[&](MojoResult result, const HandleSignalsState& state) {
if (result == MOJO_RESULT_OK) {
if (!state.readable()) {
++num_spurious_events;
}
base::span<const uint8_t> buffer;
consumer->BeginReadData(0, buffer);
consumer->EndReadData(buffer.size());
watcher.ArmOrNotify();
} else {
CHECK(state.never_readable());
loop.Quit();
}
}));
watcher.ArmOrNotify();
loop.Run();
}
EXPECT_EQ(0u, num_spurious_events);
WriteMessage(parent, "done");
EXPECT_EQ("bye", ReadMessage(parent));
MojoClose(host);
MojoClose(parent);
}
TEST_F(DataPipeTest, Multiprocess) {
const uint32_t kTestDataSize =
static_cast<uint32_t>(sizeof(kMultiprocessTestData));
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1,
kMultiprocessCapacity
};
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
RunTestClient("MultiprocessClient", [&](MojoHandle server_mp) {
uint32_t num_bytes = kTestDataSize;
ASSERT_EQ(MOJO_RESULT_OK,
WriteData(kMultiprocessTestData, &num_bytes, true));
ASSERT_EQ(kTestDataSize, num_bytes);
ASSERT_EQ(MOJO_RESULT_OK,
WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0,
&consumer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
uint8_t buffer[100];
int seq = 0;
for (int i = 0; i < kMultiprocessMaxIter; ++i) {
for (uint32_t size = 1; size <= kMultiprocessCapacity; size++) {
for (unsigned int j = 0; j < size; ++j)
buffer[j] = seq + j;
EXPECT_TRUE(WriteAllData(producer_, buffer, size));
seq += size;
}
}
ASSERT_TRUE(WriteAllData(producer_, kMultiprocessTestData, kTestDataSize));
ASSERT_EQ(MOJO_RESULT_OK,
WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0,
&producer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
producer_ = MOJO_HANDLE_INVALID;
MojoHandleSignalsState hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(server_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
ASSERT_EQ(MOJO_RESULT_OK,
ReadEmptyMessageWithHandles(server_mp, &consumer_, 1));
for (int i = 0; i < 2; ++i) {
EXPECT_TRUE(ReadAllData(consumer_, buffer, kTestDataSize, i == 1));
EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
}
WriteMessage(server_mp, "quit");
});
}
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient, DataPipeTest, client_mp) {
const uint32_t kTestDataSize =
static_cast<uint32_t>(sizeof(kMultiprocessTestData));
MojoHandle consumer = MOJO_HANDLE_INVALID;
MojoHandleSignalsState hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
ASSERT_EQ(MOJO_RESULT_OK,
ReadEmptyMessageWithHandles(client_mp, &consumer, 1));
int32_t buffer[100];
EXPECT_TRUE(ReadAllData(consumer, buffer, kTestDataSize, false));
EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
int seq = 0;
std::array<uint8_t, 100> expected_buffer;
for (int i = 0; i < kMultiprocessMaxIter; ++i) {
for (uint32_t size = 1; size <= kMultiprocessCapacity; ++size) {
for (unsigned int j = 0; j < size; ++j)
expected_buffer[j] = seq + j;
EXPECT_TRUE(ReadAllData(consumer, buffer, size, false));
EXPECT_EQ(0, memcmp(buffer, expected_buffer.data(), size));
seq += size;
}
}
ASSERT_EQ(MOJO_RESULT_OK,
WriteMessageRaw(MessagePipeHandle(client_mp), nullptr, 0, &consumer,
1, MOJO_WRITE_MESSAGE_FLAG_NONE));
MojoHandle producer = MOJO_HANDLE_INVALID;
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
ASSERT_EQ(MOJO_RESULT_OK,
ReadEmptyMessageWithHandles(client_mp, &producer, 1));
EXPECT_TRUE(WriteAllData(producer, kMultiprocessTestData, kTestDataSize));
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(producer));
EXPECT_EQ("quit", ReadMessage(client_mp));
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(client_mp));
}
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer, DataPipeTest, h) {
MojoHandle p;
std::string message = ReadMessageWithHandles(h, &p, 1);
uint32_t num_bytes = static_cast<uint32_t>(message.size());
EXPECT_EQ(MOJO_RESULT_OK,
MojoWriteData(p, message.data(), &num_bytes, nullptr));
EXPECT_EQ(num_bytes, static_cast<uint32_t>(message.size()));
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
EXPECT_EQ("quit", ReadMessage(h));
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(h));
}
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer, DataPipeTest, h) {
MojoHandle c;
std::string expected_message = ReadMessageWithHandles(h, &c, 1);
EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE));
uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
std::vector<char> bytes(expected_message.size());
EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, nullptr, bytes.data(), &num_bytes));
EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
std::string message(bytes.data(), bytes.size());
EXPECT_EQ(expected_message, message);
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
EXPECT_EQ("quit", ReadMessage(h));
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(h));
}
TEST_F(DataPipeTest, SendConsumerAndCloseProducer) {
MojoHandle p, c;
EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &p, &c));
RunTestClient("WriteAndCloseProducer", [&](MojoHandle producer_client) {
RunTestClient("ReadAndCloseConsumer", [&](MojoHandle consumer_client) {
const std::string kMessage = "Hello, world!";
WriteMessageWithHandles(producer_client, kMessage, &p, 1);
WriteMessageWithHandles(consumer_client, kMessage, &c, 1);
WriteMessage(consumer_client, "quit");
});
WriteMessage(producer_client, "quit");
});
}
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite, DataPipeTest, h) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1,
kMultiprocessCapacity
};
MojoHandle p, c;
ASSERT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(&options, &p, &c));
const std::string kMessage = "Hello, world!";
WriteMessageWithHandles(h, kMessage, &c, 1);
uint32_t num_bytes = static_cast<uint32_t>(kMessage.size());
EXPECT_EQ(MOJO_RESULT_OK,
MojoWriteData(p, kMessage.data(), &num_bytes, nullptr));
EXPECT_EQ(num_bytes, static_cast<uint32_t>(kMessage.size()));
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
EXPECT_EQ("quit", ReadMessage(h));
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(h));
}
TEST_F(DataPipeTest, CreateInChild) {
RunTestClient("CreateAndWrite", [&](MojoHandle child) {
MojoHandle c;
std::string expected_message = ReadMessageWithHandles(child, &c, 1);
EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE));
uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
std::vector<char> bytes(expected_message.size());
EXPECT_EQ(MOJO_RESULT_OK,
MojoReadData(c, nullptr, bytes.data(), &num_bytes));
EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
std::string message(bytes.data(), bytes.size());
EXPECT_EQ(expected_message, message);
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
WriteMessage(child, "quit");
});
}
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(DataPipeStatusChangeInTransitClient,
DataPipeTest,
parent) {
MojoHandle handles[6];
EXPECT_EQ("o_O", ReadMessageWithHandles(parent, handles, 6));
base::span<MojoHandle> producers = handles;
base::span<MojoHandle> consumers =
base::span<MojoHandle>(handles).subspan(3u);
EXPECT_EQ(MOJO_RESULT_OK,
WaitForSignals(producers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED));
EXPECT_EQ(MOJO_RESULT_OK,
WaitForSignals(consumers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED));
base::test::SingleThreadTaskEnvironment task_environment;
{
base::RunLoop run_loop;
int count = 0;
auto callback = base::BindRepeating(
[](base::RunLoop* loop, int* count, MojoResult result) {
EXPECT_EQ(MOJO_RESULT_OK, result);
if (++*count == 2)
loop->Quit();
},
&run_loop, &count);
SimpleWatcher producer_watcher(
FROM_HERE, SimpleWatcher::ArmingPolicy::AUTOMATIC,
base::SequencedTaskRunner::GetCurrentDefault());
SimpleWatcher consumer_watcher(
FROM_HERE, SimpleWatcher::ArmingPolicy::AUTOMATIC,
base::SequencedTaskRunner::GetCurrentDefault());
producer_watcher.Watch(Handle(producers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
callback);
consumer_watcher.Watch(Handle(consumers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
callback);
run_loop.Run();
EXPECT_EQ(2, count);
}
MojoResult result;
do {
uint32_t num_bytes = 0;
result = MojoWriteData(producers[2], nullptr, &num_bytes, nullptr);
} while (result == MOJO_RESULT_OK);
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
do {
char byte;
uint32_t num_bytes = 1;
result = MojoReadData(consumers[2], nullptr, &byte, &num_bytes);
} while (result == MOJO_RESULT_SHOULD_WAIT);
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
for (size_t i = 0; i < 6; ++i)
CloseHandle(handles[i]);
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(parent));
}
TEST_F(DataPipeTest, StatusChangeInTransit) {
std::array<MojoHandle, 6> producers;
std::array<MojoHandle, 6> consumers;
for (size_t i = 0; i < 6; ++i)
CreateDataPipe(&producers[i], &consumers[i], 1);
RunTestClient("DataPipeStatusChangeInTransitClient", [&](MojoHandle child) {
MojoHandle handles[] = {producers[0], producers[1], producers[2],
consumers[3], consumers[4], consumers[5]};
WriteMessageWithHandles(child, "o_O", handles, 6);
for (size_t i = 0; i < 3; ++i)
CloseHandle(consumers[i]);
for (size_t i = 3; i < 6; ++i)
CloseHandle(producers[i]);
});
}
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateOversizedChild, DataPipeTest, h) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions,
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1,
kOversizedCapacity
};
MojoHandle p, c;
ASSERT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
MojoCreateDataPipe(&options, &p, &c));
WriteMessage(h, "success");
EXPECT_EQ("quit", ReadMessage(h));
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(h));
}
TEST_F(DataPipeTest, CreateOversizedInChild) {
if (IsMojoIpczEnabled()) {
GTEST_SKIP() << "Data pipes do not allocate dedicated capacity when "
<< "MojoIpcz is enabled, so capacity limits are not enforced "
<< "and therefore cannot be tested.";
}
RunTestClient("CreateOversizedChild", [&](MojoHandle child) {
std::string expected_message = ReadMessage(child);
EXPECT_EQ("success", expected_message);
WriteMessage(child, "quit");
});
}
class TestDataProducer {
public:
explicit TestDataProducer(ScopedDataPipeProducerHandle producer,
base::OnceClosure quit_closure,
uint32_t total_size,
uint32_t chunk_size)
: producer_(std::move(producer)),
quit_closure_(std::move(quit_closure)),
chunk_size_(chunk_size),
bytes_remaining_(total_size) {
watcher_.Watch(producer_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
base::BindRepeating(&TestDataProducer::ProduceMore,
base::Unretained(this)));
ProduceMore(MOJO_RESULT_OK);
}
~TestDataProducer() = default;
private:
void ProduceMore(MojoResult) {
void* data;
uint32_t num_bytes = std::min(chunk_size_, bytes_remaining_);
if (num_bytes == 0) {
producer_.reset();
std::move(quit_closure_).Run();
return;
}
MojoResult rv =
MojoBeginWriteData(producer_->value(), nullptr, &data, &num_bytes);
if (rv == MOJO_RESULT_SHOULD_WAIT) {
watcher_.ArmOrNotify();
return;
}
CHECK_EQ(rv, MOJO_RESULT_OK);
num_bytes = std::min(num_bytes, bytes_remaining_);
memset(data, 42, num_bytes);
CHECK_EQ(MOJO_RESULT_OK,
MojoEndWriteData(producer_->value(), num_bytes, nullptr));
bytes_remaining_ -= num_bytes;
ProduceMore(MOJO_RESULT_OK);
}
ScopedDataPipeProducerHandle producer_;
SimpleWatcher watcher_{FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL};
base::OnceClosure quit_closure_;
const uint32_t chunk_size_;
uint32_t bytes_remaining_;
};
class TestDataDrain {
public:
explicit TestDataDrain(ScopedDataPipeConsumerHandle consumer,
base::OnceClosure quit_closure)
: consumer_(std::move(consumer)), quit_closure_(std::move(quit_closure)) {
watcher_.Watch(
consumer_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
base::BindRepeating(&TestDataDrain::Notify, base::Unretained(this)));
Update();
}
~TestDataDrain() = default;
size_t num_bytes_drained() const { return num_bytes_drained_; }
private:
void Notify(MojoResult) {
auto state = consumer_->QuerySignalsState();
if (state.never_readable()) {
consumer_.reset();
std::move(quit_closure_).Run();
return;
} else if (!state.readable()) {
watcher_.ArmOrNotify();
return;
}
Update();
}
void Update() {
for (;;) {
watcher_.ArmOrNotify();
constexpr size_t kNumReadAttempts = 10;
const void* data;
uint32_t num_bytes;
MojoResult result;
for (size_t i = 0; i < kNumReadAttempts; ++i) {
result =
MojoBeginReadData(consumer_->value(), nullptr, &data, &num_bytes);
if (result == MOJO_RESULT_OK) {
const uint32_t num_bytes_read =
(i == kNumReadAttempts - 1) ? num_bytes : 0;
const uint8_t* bytes = static_cast<const uint8_t*>(data);
EXPECT_EQ(42u, bytes[0]);
EXPECT_EQ(42u, bytes[num_bytes - 1]);
result = MojoEndReadData(consumer_->value(), num_bytes_read, nullptr);
}
}
switch (result) {
case MOJO_RESULT_SHOULD_WAIT:
watcher_.ArmOrNotify();
return;
case MOJO_RESULT_OK:
num_bytes_drained_ += num_bytes;
break;
case MOJO_RESULT_FAILED_PRECONDITION:
Notify(MOJO_RESULT_FAILED_PRECONDITION);
return;
}
}
}
ScopedDataPipeConsumerHandle consumer_;
SimpleWatcher watcher_{FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL};
base::OnceClosure quit_closure_;
size_t num_bytes_drained_ = 0;
base::WeakPtrFactory<TestDataDrain> weak_ptr_factory_{this};
};
constexpr uint32_t kStressTestDataSize = 512 * 1024 * 1024;
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(StressTestRacyTrapsClient, DataPipeTest, h) {
base::test::TaskEnvironment task_environment;
constexpr uint32_t kChunkSize = 4096;
MojoHandle p;
EXPECT_EQ("sup", ReadMessageWithHandles(h, &p, 1));
base::RunLoop loop;
TestDataProducer producer(
ScopedDataPipeProducerHandle{DataPipeProducerHandle{p}},
loop.QuitClosure(), kStressTestDataSize, kChunkSize);
loop.Run();
WriteMessage(h, "bye");
EXPECT_EQ("bye", ReadMessage(h));
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(h));
}
TEST_F(DataPipeTest, DISABLED_StressTestRacyTraps) {
base::test::TaskEnvironment task_environment;
const MojoCreateDataPipeOptions options = {
sizeof(options),
MOJO_CREATE_DATA_PIPE_FLAG_NONE,
1,
128 * 1024,
};
MojoHandle p, c;
ASSERT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(&options, &p, &c));
RunTestClient("StressTestRacyTrapsClient", [&](MojoHandle child) {
WriteMessageWithHandles(child, "sup", &p, 1);
base::RunLoop loop;
TestDataDrain drain(ScopedDataPipeConsumerHandle{DataPipeConsumerHandle{c}},
loop.QuitClosure());
loop.Run();
EXPECT_EQ(kStressTestDataSize, drain.num_bytes_drained());
EXPECT_EQ("bye", ReadMessage(child));
WriteMessage(child, "bye");
});
}
#endif
}
}
}