* Copyright (c) 2016-2023 Google, Inc. All rights reserved.
* **********************************************************/
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of Google, Inc. nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL GOOGLE, INC. OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*/
#define NOMINMAX
#undef NDEBUG
#include <assert.h>
#include <cstring>
#include <iostream>
#include <thread>
#include <vector>
#include "dr_api.h"
#include "scheduler.h"
#ifdef HAS_ZIP
# include "zipfile_istream.h"
# include "zipfile_ostream.h"
#endif
namespace dynamorio {
namespace drmemtrace {
using ::dynamorio::drmemtrace::memtrace_stream_t;
class mock_reader_t : public reader_t {
public:
mock_reader_t() = default;
explicit mock_reader_t(const std::vector<trace_entry_t> &trace)
: trace_(trace)
{
verbosity_ = 3;
}
bool
init() override
{
at_eof_ = false;
++*this;
return true;
}
trace_entry_t *
read_next_entry() override
{
trace_entry_t *entry = read_queued_entry();
if (entry != nullptr)
return entry;
++index_;
if (index_ >= static_cast<int>(trace_.size())) {
at_eof_ = true;
return nullptr;
}
return &trace_[index_];
}
std::string
get_stream_name() const override
{
return "";
}
private:
std::vector<trace_entry_t> trace_;
int index_ = -1;
};
#if (defined(X86_64) || defined(ARM_64)) && defined(HAS_ZIP)
struct trace_position_t {
trace_position_t(uint64_t record, uint64_t instr, uint64_t timestamp)
: record_ordinal(record)
, instruction_ordinal(instr)
, last_timestamp(timestamp)
{
}
bool
operator==(const trace_position_t &rhs)
{
return record_ordinal == rhs.record_ordinal &&
instruction_ordinal == rhs.instruction_ordinal &&
last_timestamp == rhs.last_timestamp;
}
uint64_t record_ordinal;
uint64_t instruction_ordinal;
uint64_t last_timestamp;
};
struct context_switch_t {
context_switch_t(memref_tid_t prev_tid, memref_tid_t new_tid, trace_position_t output,
trace_position_t prev, trace_position_t next)
: prev_tid(prev_tid)
, new_tid(new_tid)
, output_position(output)
, prev_input_position(prev)
, new_input_position(next)
{
}
bool
operator==(const context_switch_t &rhs)
{
return prev_tid == rhs.prev_tid && new_tid == rhs.new_tid &&
output_position == rhs.output_position &&
prev_input_position == rhs.prev_input_position &&
new_input_position == rhs.new_input_position;
}
memref_tid_t prev_tid;
memref_tid_t new_tid;
trace_position_t output_position;
trace_position_t prev_input_position;
trace_position_t new_input_position;
};
static std::ostream &
operator<<(std::ostream &o, const trace_position_t &tp)
{
return o << "<" << tp.record_ordinal << "," << tp.instruction_ordinal << ","
<< tp.last_timestamp << ">";
}
static std::ostream &
operator<<(std::ostream &o, const context_switch_t &cs)
{
if (cs.prev_tid == INVALID_THREAD_ID) {
return o << cs.new_tid;
}
return o << cs.prev_tid << " => " << cs.new_tid << " @ " << cs.output_position << " ("
<< cs.prev_input_position << " => " << cs.new_input_position << ")";
}
#endif
static trace_entry_t
make_instr(addr_t pc, trace_type_t type = TRACE_TYPE_INSTR)
{
trace_entry_t entry;
entry.type = static_cast<unsigned short>(type);
entry.size = 1;
entry.addr = pc;
return entry;
}
static trace_entry_t
make_exit(memref_tid_t tid)
{
trace_entry_t entry;
entry.type = TRACE_TYPE_THREAD_EXIT;
entry.addr = static_cast<addr_t>(tid);
return entry;
}
static trace_entry_t
make_footer()
{
trace_entry_t entry;
entry.type = TRACE_TYPE_FOOTER;
return entry;
}
static trace_entry_t
make_version(int version)
{
trace_entry_t entry;
entry.type = TRACE_TYPE_MARKER;
entry.size = TRACE_MARKER_TYPE_VERSION;
entry.addr = version;
return entry;
}
static trace_entry_t
make_thread(memref_tid_t tid)
{
trace_entry_t entry;
entry.type = TRACE_TYPE_THREAD;
entry.addr = static_cast<addr_t>(tid);
return entry;
}
static trace_entry_t
make_pid(memref_pid_t pid)
{
trace_entry_t entry;
entry.type = TRACE_TYPE_PID;
entry.addr = static_cast<addr_t>(pid);
return entry;
}
static trace_entry_t
make_timestamp(uint64_t timestamp)
{
trace_entry_t entry;
entry.type = TRACE_TYPE_MARKER;
entry.size = TRACE_MARKER_TYPE_TIMESTAMP;
entry.addr = static_cast<addr_t>(timestamp);
return entry;
}
static trace_entry_t
make_marker(trace_marker_type_t type, uintptr_t value)
{
trace_entry_t entry;
entry.type = TRACE_TYPE_MARKER;
entry.size = static_cast<unsigned short>(type);
entry.addr = value;
return entry;
}
static bool
memref_is_nop_instr(memref_t &record)
{
if (!type_is_instr(record.instr.type))
return false;
instr_noalloc_t noalloc;
instr_noalloc_init(GLOBAL_DCONTEXT, &noalloc);
instr_t *instr = instr_from_noalloc(&noalloc);
app_pc pc =
decode(GLOBAL_DCONTEXT, reinterpret_cast<app_pc>(record.instr.encoding), instr);
return pc != nullptr && instr_is_nop(instr);
}
static void
test_serial()
{
std::cerr << "\n----------------\nTesting serial\n";
static constexpr memref_tid_t TID_A = 42;
static constexpr memref_tid_t TID_B = 99;
std::vector<trace_entry_t> refs_A = {
make_thread(TID_A),
make_pid(1),
make_version(4),
make_timestamp(10),
make_instr(10),
make_timestamp(30),
make_instr(30),
make_timestamp(50),
make_instr(50),
make_exit(TID_A),
};
std::vector<trace_entry_t> refs_B = {
make_thread(TID_B),
make_pid(1),
make_version(4),
make_timestamp(20),
make_instr(20),
make_timestamp(40),
make_instr(40),
make_timestamp(60),
make_instr(60),
make_exit(TID_B),
};
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
if (scheduler.init(sched_inputs, 1,
scheduler_t::make_scheduler_serial_options(4)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
auto *stream = scheduler.get_stream(0);
memref_t memref;
uint64_t last_timestamp = 0;
memref_tid_t last_timestamp_tid = INVALID_THREAD_ID;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
assert(stream->get_input_workload_ordinal() == 0);
if (memref.marker.type == TRACE_TYPE_MARKER &&
memref.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP) {
assert(memref.marker.marker_value > last_timestamp);
last_timestamp = memref.marker.marker_value;
assert(last_timestamp_tid != memref.marker.tid);
last_timestamp_tid = memref.marker.tid;
}
}
}
static void
test_parallel()
{
std::cerr << "\n----------------\nTesting parallel\n";
std::vector<trace_entry_t> input_sequence = {
make_thread(1),
make_pid(1),
make_instr(42),
make_exit(1),
};
static constexpr int NUM_INPUTS = 3;
static constexpr int NUM_OUTPUTS = 2;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = 100 + i;
inputs[i] = input_sequence;
for (auto &record : inputs[i]) {
if (record.type == TRACE_TYPE_THREAD || record.type == TRACE_TYPE_THREAD_EXIT)
record.addr = static_cast<addr_t>(tid);
}
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS,
scheduler_t::make_scheduler_parallel_options(4)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::unordered_map<memref_tid_t, int> tid2stream;
int count = 0;
for (int i = 0; i < NUM_OUTPUTS; i++) {
auto *stream = scheduler.get_stream(i);
memref_t memref;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
++count;
if (tid2stream.find(memref.instr.tid) == tid2stream.end())
tid2stream[memref.instr.tid] = i;
else
assert(tid2stream[memref.instr.tid] == i);
assert(
stream->get_record_ordinal() ==
scheduler.get_input_stream_interface(stream->get_input_stream_ordinal())
->get_record_ordinal());
assert(
stream->get_instruction_ordinal() ==
scheduler.get_input_stream_interface(stream->get_input_stream_ordinal())
->get_instruction_ordinal());
}
}
assert(count == 2 * NUM_INPUTS);
}
static void
test_param_checks()
{
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t()),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), 1);
std::vector<scheduler_t::range_t> regions;
regions.emplace_back(0, 2);
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
sched_inputs[0].thread_modifiers.push_back(scheduler_t::input_thread_info_t(regions));
assert(
scheduler.init(sched_inputs, 1, scheduler_t::make_scheduler_serial_options()) ==
scheduler_t::STATUS_ERROR_INVALID_PARAMETER);
sched_inputs[0].thread_modifiers[0].regions_of_interest[0].start_instruction = 2;
sched_inputs[0].thread_modifiers[0].regions_of_interest[0].stop_instruction = 1;
assert(
scheduler.init(sched_inputs, 1, scheduler_t::make_scheduler_serial_options()) ==
scheduler_t::STATUS_ERROR_INVALID_PARAMETER);
sched_inputs[0].thread_modifiers[0].regions_of_interest[0].start_instruction = 2;
sched_inputs[0].thread_modifiers[0].regions_of_interest[0].stop_instruction = 10;
sched_inputs[0].thread_modifiers[0].regions_of_interest.emplace_back(10, 20);
assert(
scheduler.init(sched_inputs, 1, scheduler_t::make_scheduler_serial_options()) ==
scheduler_t::STATUS_ERROR_INVALID_PARAMETER);
sched_inputs[0].thread_modifiers[0].regions_of_interest[0].start_instruction = 2;
sched_inputs[0].thread_modifiers[0].regions_of_interest[0].stop_instruction = 10;
sched_inputs[0].thread_modifiers[0].regions_of_interest[1].start_instruction = 4;
sched_inputs[0].thread_modifiers[0].regions_of_interest[1].stop_instruction = 12;
assert(
scheduler.init(sched_inputs, 1, scheduler_t::make_scheduler_serial_options()) ==
scheduler_t::STATUS_ERROR_INVALID_PARAMETER);
}
static void
test_regions_bare()
{
std::cerr << "\n----------------\nTesting bare regions\n";
std::vector<trace_entry_t> memrefs = {
make_thread(1),
make_pid(1),
make_marker(TRACE_MARKER_TYPE_CACHE_LINE_SIZE, 64),
make_instr(1),
make_instr(2),
make_instr(3),
make_instr(4),
make_instr(5),
make_instr(6),
make_instr(7),
make_instr(8),
make_instr(9),
make_instr(10),
make_exit(1),
};
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(memrefs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), 1);
std::vector<scheduler_t::range_t> regions;
regions.emplace_back(2, 2);
regions.emplace_back(4, 4);
regions.emplace_back(5, 5);
regions.emplace_back(8, 9);
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
sched_inputs[0].thread_modifiers.push_back(scheduler_t::input_thread_info_t(regions));
if (scheduler.init(sched_inputs, 1,
scheduler_t::scheduler_options_t(
scheduler_t::MAP_TO_ANY_OUTPUT, scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
4)) != scheduler_t::STATUS_SUCCESS)
assert(false);
int ordinal = 0;
auto *stream = scheduler.get_stream(0);
memref_t memref;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
switch (ordinal) {
case 0:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 2);
break;
case 1:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_WINDOW_ID);
assert(memref.marker.marker_value == 1);
break;
case 2:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 4);
break;
case 3:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_WINDOW_ID);
assert(memref.marker.marker_value == 2);
break;
case 4:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 5);
break;
case 5:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_WINDOW_ID);
assert(memref.marker.marker_value == 3);
break;
case 6:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 8);
break;
case 7:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 9);
break;
default: assert(ordinal == 8); assert(memref.exit.type == TRACE_TYPE_THREAD_EXIT);
}
++ordinal;
}
assert(ordinal == 9);
}
static void
test_regions_bare_no_marker()
{
std::cerr << "\n----------------\nTesting bare regions with no marker\n";
std::vector<trace_entry_t> memrefs = {
make_thread(1),
make_pid(1),
make_instr(1),
make_instr(2),
make_instr(3),
make_instr(4),
make_exit(1),
};
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(memrefs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), 1);
std::vector<scheduler_t::range_t> regions;
regions.emplace_back(2, 0);
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
sched_inputs[0].thread_modifiers.push_back(scheduler_t::input_thread_info_t(regions));
if (scheduler.init(sched_inputs, 1,
scheduler_t::scheduler_options_t(
scheduler_t::MAP_TO_ANY_OUTPUT, scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
4)) != scheduler_t::STATUS_SUCCESS)
assert(false);
int ordinal = 0;
auto *stream = scheduler.get_stream(0);
memref_t memref;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
switch (ordinal) {
case 0:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 2);
break;
case 1:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 3);
break;
case 2:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 4);
break;
default: assert(ordinal == 3); assert(memref.exit.type == TRACE_TYPE_THREAD_EXIT);
}
++ordinal;
}
assert(ordinal == 4);
}
static void
test_regions_timestamps()
{
std::cerr << "\n----------------\nTesting regions\n";
std::vector<trace_entry_t> memrefs = {
make_thread(1),
make_pid(1),
make_marker(TRACE_MARKER_TYPE_PAGE_SIZE, 4096),
make_timestamp(10),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 1),
make_instr(1),
make_instr(2),
make_instr(3),
make_timestamp(20),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 2),
make_timestamp(30),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 3),
make_instr(4),
make_timestamp(40),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 4),
make_instr(5),
make_instr(6),
make_timestamp(50),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 5),
make_instr(7),
make_instr(8),
make_exit(1),
};
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(memrefs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), 1);
std::vector<scheduler_t::range_t> regions;
regions.emplace_back(2, 2);
regions.emplace_back(6, 7);
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
sched_inputs[0].thread_modifiers.push_back(scheduler_t::input_thread_info_t(regions));
if (scheduler.init(sched_inputs, 1,
scheduler_t::make_scheduler_serial_options(4)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
int ordinal = 0;
auto *stream = scheduler.get_stream(0);
memref_t memref;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
switch (ordinal) {
case 0:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP);
assert(memref.marker.marker_value == 10);
break;
case 1:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID);
assert(memref.marker.marker_value == 1);
break;
case 2:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 2);
break;
case 3:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_WINDOW_ID);
assert(memref.marker.marker_value == 1);
break;
case 4:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP);
assert(memref.marker.marker_value == 40);
break;
case 5:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID);
assert(memref.marker.marker_value == 4);
break;
case 6:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 6);
break;
case 7:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP);
assert(memref.marker.marker_value == 50);
break;
case 8:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID);
assert(memref.marker.marker_value == 5);
break;
case 9:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 7);
break;
default:
assert(ordinal == 10);
assert(memref.exit.type == TRACE_TYPE_THREAD_EXIT);
}
++ordinal;
}
assert(ordinal == 11);
}
static void
test_regions_start()
{
std::cerr << "\n----------------\nTesting region at start\n";
std::vector<trace_entry_t> memrefs = {
make_thread(1),
make_pid(1),
make_marker(TRACE_MARKER_TYPE_PAGE_SIZE, 4096),
make_timestamp(10),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 1),
make_instr(1),
make_instr(2),
make_exit(1),
};
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(memrefs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), 1);
std::vector<scheduler_t::range_t> regions;
regions.emplace_back(1, 0);
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
sched_inputs[0].thread_modifiers.push_back(scheduler_t::input_thread_info_t(regions));
if (scheduler.init(sched_inputs, 1,
scheduler_t::make_scheduler_serial_options(5)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
int ordinal = 0;
auto *stream = scheduler.get_stream(0);
memref_t memref;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
switch (ordinal) {
case 0:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP);
break;
case 1:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID);
break;
case 2:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 1);
break;
case 3:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 2);
break;
default: assert(ordinal == 4); assert(memref.exit.type == TRACE_TYPE_THREAD_EXIT);
}
++ordinal;
}
assert(ordinal == 5);
}
static void
test_regions_too_far()
{
std::cerr << "\n----------------\nTesting region going too far\n";
std::vector<trace_entry_t> memrefs = {
make_thread(1),
make_pid(1),
make_marker(TRACE_MARKER_TYPE_PAGE_SIZE, 4096),
make_timestamp(10),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 1),
make_instr(1),
make_instr(2),
make_exit(1),
make_footer(),
};
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(memrefs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), 1);
std::vector<scheduler_t::range_t> regions;
regions.emplace_back(3, 0);
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
sched_inputs[0].thread_modifiers.push_back(scheduler_t::input_thread_info_t(regions));
if (scheduler.init(sched_inputs, 1,
scheduler_t::make_scheduler_serial_options(4)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
auto *stream = scheduler.get_stream(0);
memref_t memref;
scheduler_t::stream_status_t status = stream->next_record(memref);
assert(status == scheduler_t::STATUS_REGION_INVALID);
}
static void
test_regions()
{
test_regions_timestamps();
test_regions_bare();
test_regions_bare_no_marker();
test_regions_start();
test_regions_too_far();
}
static void
test_only_threads()
{
std::cerr << "\n----------------\nTesting thread filters\n";
static constexpr memref_tid_t TID_A = 42;
static constexpr memref_tid_t TID_B = 99;
static constexpr memref_tid_t TID_C = 7;
std::vector<trace_entry_t> refs_A = {
make_thread(TID_A),
make_pid(1),
make_instr(50),
make_exit(TID_A),
};
std::vector<trace_entry_t> refs_B = {
make_thread(TID_B),
make_pid(1),
make_instr(60),
make_exit(TID_B),
};
std::vector<trace_entry_t> refs_C = {
make_thread(TID_C),
make_pid(1),
make_instr(60),
make_exit(TID_C),
};
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_C)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_C);
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
sched_inputs[0].only_threads.insert(TID_B);
if (scheduler.init(sched_inputs, 1,
scheduler_t::make_scheduler_serial_options(4)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
auto *stream = scheduler.get_stream(0);
memref_t memref;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
assert(memref.instr.tid == TID_B);
}
}
static void
test_real_file_queries_and_filters(const char *testdir)
{
std::cerr << "\n----------------\nTesting real files\n";
#if (defined(X86_64) || defined(ARM_64)) && defined(HAS_ZIP) && defined(HAS_SNAPPY)
std::string trace1 = std::string(testdir) + "/drmemtrace.chase-snappy.x64.tracedir";
static constexpr memref_tid_t TID_1_A = 23699;
std::string trace2 = std::string(testdir) + "/drmemtrace.threadsig.x64.tracedir";
static constexpr memref_tid_t TID_2_A = 1257604;
static constexpr memref_tid_t TID_2_B = 1257602;
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(trace1);
sched_inputs[0].only_threads.insert(TID_1_A);
sched_inputs.emplace_back(trace2);
sched_inputs[1].only_threads.insert(TID_2_A);
sched_inputs[1].only_threads.insert(TID_2_B);
if (scheduler.init(sched_inputs, 1,
scheduler_t::make_scheduler_serial_options(1)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
auto *stream = scheduler.get_stream(0);
memref_t memref;
int max_workload_index = 0;
int max_input_index = 0;
std::set<memref_tid_t> tids_seen;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
assert(memref.instr.tid == TID_1_A || memref.instr.tid == TID_2_A ||
memref.instr.tid == TID_2_B);
tids_seen.insert(memref.instr.tid);
if (stream->get_input_workload_ordinal() > max_workload_index)
max_workload_index = stream->get_input_workload_ordinal();
if (stream->get_input_stream_ordinal() > max_input_index)
max_input_index = stream->get_input_stream_ordinal();
if (stream->get_input_stream_ordinal() == 0)
assert(stream->get_input_workload_ordinal() == 0);
else
assert(stream->get_input_workload_ordinal() == 1);
}
assert(max_workload_index == 1);
assert(max_input_index == 2);
assert(scheduler.get_input_stream_count() == 3);
assert(scheduler.get_input_stream_name(0) ==
"chase.20190225.185346.23699.memtrace.sz");
assert(scheduler.get_input_stream_name(1) ==
"drmemtrace.threadsig.1257604.1983.trace.zip" ||
scheduler.get_input_stream_name(1) ==
"drmemtrace.threadsig.1257602.1021.trace.zip");
assert(scheduler.get_input_stream_name(2) ==
"drmemtrace.threadsig.1257604.1983.trace.zip" ||
scheduler.get_input_stream_name(2) ==
"drmemtrace.threadsig.1257602.1021.trace.zip");
assert(tids_seen.size() == 3);
assert(tids_seen.find(TID_1_A) != tids_seen.end() &&
tids_seen.find(TID_2_A) != tids_seen.end() &&
tids_seen.find(TID_2_B) != tids_seen.end());
#endif
}
static std::vector<std::string>
run_lockstep_simulation(scheduler_t &scheduler, int num_outputs, memref_tid_t tid_base,
bool send_time = false)
{
std::vector<scheduler_t::stream_t *> outputs(num_outputs, nullptr);
std::vector<bool> eof(num_outputs, false);
for (int i = 0; i < num_outputs; i++)
outputs[i] = scheduler.get_stream(i);
int num_eof = 0;
std::vector<std::string> sched_as_string(num_outputs);
while (num_eof < num_outputs) {
for (int i = 0; i < num_outputs; i++) {
if (eof[i])
continue;
memref_t memref;
scheduler_t::stream_status_t status;
if (send_time) {
status = outputs[i]->next_record(
memref, outputs[i]->get_instruction_ordinal() + 1);
} else {
status = outputs[i]->next_record(memref);
}
if (status == scheduler_t::STATUS_EOF) {
++num_eof;
eof[i] = true;
continue;
}
if (status == scheduler_t::STATUS_WAIT) {
sched_as_string[i] += '-';
continue;
}
assert(status == scheduler_t::STATUS_OK);
if (type_is_instr(memref.instr.type)) {
sched_as_string[i] +=
'A' + static_cast<char>(memref.instr.tid - tid_base);
} else {
sched_as_string[i] += '.';
}
}
}
size_t max_size = 0;
for (int i = 0; i < num_outputs; ++i)
max_size = std::max(max_size, sched_as_string[i].size());
for (int step = 0; step < static_cast<int>(max_size); ++step) {
std::set<char> inputs;
for (int out = 0; out < num_outputs; ++out) {
if (static_cast<int>(sched_as_string[out].size()) <= step)
continue;
if (sched_as_string[out][step] < 'A' || sched_as_string[out][step] > 'Z')
continue;
assert(inputs.find(sched_as_string[out][step]) == inputs.end());
inputs.insert(sched_as_string[out][step]);
}
}
return sched_as_string;
}
static void
test_synthetic()
{
std::cerr << "\n----------------\nTesting synthetic\n";
static constexpr int NUM_INPUTS = 7;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr int QUANTUM_DURATION = 3;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
inputs[i].push_back(make_thread(tid));
inputs[i].push_back(make_pid(1));
for (int j = 0; j < NUM_INSTRS; j++)
inputs[i].push_back(make_instr(42 + j * 4));
inputs[i].push_back(make_exit(tid));
}
static const char *const CORE0_SCHED_STRING = "AAACCCEEEGGGBBBDDDFFFAAA.CCC.EEE.GGG.";
static const char *const CORE1_SCHED_STRING = "BBBDDDFFFAAACCCEEEGGGBBB.DDD.FFF.";
{
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_BASE + i);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
3);
sched_ops.quantum_duration = QUANTUM_DURATION;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
assert(sched_as_string[1] == CORE1_SCHED_STRING);
}
{
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_BASE + i);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
3);
sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME;
sched_ops.quantum_duration = QUANTUM_DURATION;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE, true);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
assert(sched_as_string[1] == CORE1_SCHED_STRING);
}
}
static void
test_synthetic_time_quanta()
{
std::cerr << "\n----------------\nTesting time quanta\n";
#ifdef HAS_ZIP
static constexpr memref_tid_t TID_BASE = 42;
static constexpr memref_tid_t TID_A = TID_BASE;
static constexpr memref_tid_t TID_B = TID_A + 1;
static constexpr memref_tid_t TID_C = TID_A + 2;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INPUTS = 3;
std::vector<trace_entry_t> refs[NUM_INPUTS];
for (int i = 0; i < NUM_INPUTS; ++i) {
refs[i].push_back(make_thread(TID_BASE + i));
refs[i].push_back(make_pid(1));
refs[i].push_back(make_version(TRACE_ENTRY_VERSION));
refs[i].push_back(make_timestamp(10));
refs[i].push_back(make_instr(10));
refs[i].push_back(make_instr(30));
refs[i].push_back(make_instr(50));
refs[i].push_back(make_exit(TID_BASE + i));
}
std::string record_fname = "tmp_test_replay_time.zip";
{
std::vector<scheduler_t::input_reader_t> readers;
for (int i = 0; i < NUM_INPUTS; ++i) {
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(refs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_BASE + i);
}
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
4);
sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME;
sched_ops.quantum_duration = 3;
zipfile_ostream_t outfile(record_fname);
sched_ops.schedule_record_ostream = &outfile;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
auto check_next = [](scheduler_t::stream_t *stream, uint64_t time,
scheduler_t::stream_status_t expect_status,
memref_tid_t expect_tid = INVALID_THREAD_ID,
trace_type_t expect_type = TRACE_TYPE_READ) {
memref_t memref;
scheduler_t::stream_status_t status = stream->next_record(memref, time);
if (status != expect_status) {
std::cerr << "Expected status " << expect_status << " != " << status
<< "\n";
assert(false);
}
if (status == scheduler_t::STATUS_OK) {
if (memref.marker.tid != expect_tid) {
std::cerr << "Expected tid " << expect_tid
<< " != " << memref.marker.tid << "\n";
assert(false);
}
if (memref.marker.type != expect_type) {
std::cerr << "Expected type " << expect_type
<< " != " << memref.marker.type << "\n";
assert(false);
}
}
};
uint64_t time = 1;
auto *cpu0 = scheduler.get_stream(0);
auto *cpu1 = scheduler.get_stream(1);
check_next(cpu0, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER);
check_next(cpu0, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER);
check_next(cpu0, ++time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR);
check_next(cpu1, time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_MARKER);
check_next(cpu1, time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_MARKER);
check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR);
check_next(cpu0, ++time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_MARKER);
check_next(cpu0, time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_MARKER);
check_next(cpu0, ++time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_INSTR);
check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR);
check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR);
check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_THREAD_EXIT);
check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR);
check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR);
check_next(cpu0, ++time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_INSTR);
check_next(cpu0, ++time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_INSTR);
check_next(cpu0, time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_THREAD_EXIT);
check_next(cpu0, time, scheduler_t::STATUS_EOF);
check_next(cpu1, time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_THREAD_EXIT);
check_next(cpu1, time, scheduler_t::STATUS_EOF);
if (scheduler.write_recorded_schedule() != scheduler_t::STATUS_SUCCESS)
assert(false);
}
{
std::vector<scheduler_t::input_reader_t> readers;
for (int i = 0; i < NUM_INPUTS; ++i) {
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(refs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_BASE + i);
}
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_AS_PREVIOUSLY,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
4);
zipfile_istream_t infile(record_fname);
sched_ops.schedule_replay_istream = &infile;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_A);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == "..A..CCC.");
assert(sched_as_string[1] == "..BAA.BB.");
}
#endif
}
static void
test_synthetic_with_timestamps()
{
std::cerr << "\n----------------\nTesting synthetic with timestamps\n";
static constexpr int NUM_WORKLOADS = 3;
static constexpr int NUM_INPUTS_PER_WORKLOAD = 3;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int workload_idx = 0; workload_idx < NUM_WORKLOADS; workload_idx++) {
std::vector<scheduler_t::input_reader_t> readers;
for (int input_idx = 0; input_idx < NUM_INPUTS_PER_WORKLOAD; input_idx++) {
memref_tid_t tid =
TID_BASE + workload_idx * NUM_INPUTS_PER_WORKLOAD + input_idx;
std::vector<trace_entry_t> inputs;
inputs.push_back(make_thread(tid));
inputs.push_back(make_pid(1));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) {
if (instr_idx % 2 == 0) {
inputs.push_back(make_timestamp(
1000 * workload_idx +
100 * (NUM_INPUTS_PER_WORKLOAD - input_idx) + 10 * instr_idx));
}
inputs.push_back(make_instr(42 + instr_idx * 4));
}
inputs.push_back(make_exit(tid));
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
}
sched_inputs.emplace_back(std::move(readers));
}
memref_tid_t tid = TID_BASE + NUM_WORKLOADS * NUM_INPUTS_PER_WORKLOAD;
std::vector<trace_entry_t> inputs;
inputs.push_back(make_thread(tid));
inputs.push_back(make_pid(1));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) {
if (instr_idx % 2 == 0)
inputs.push_back(make_timestamp(1 + instr_idx));
inputs.push_back(make_instr(42 + instr_idx * 4));
}
inputs.push_back(make_exit(tid));
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
3);
sched_ops.quantum_duration = 3;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] ==
".CC.C.II.IC.CC.F.FF.I.II.FF.F..BB.B.HH.HE.EE.BB.B.HH.H..DD.DA.AA.G.GG.DD.D.");
assert(sched_as_string[1] ==
".FF.F.JJ.JJ.JJ.JJ.J.CC.C.II.I..EE.EB.BB.H.HH.EE.E..AA.A.GG.GD.DD.AA.A.GG.G.");
}
static void
test_synthetic_with_priorities()
{
std::cerr << "\n----------------\nTesting synthetic with priorities\n";
static constexpr int NUM_WORKLOADS = 3;
static constexpr int NUM_INPUTS_PER_WORKLOAD = 3;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<scheduler_t::input_workload_t> sched_inputs;
auto get_tid = [&](int workload_idx, int input_idx) {
return TID_BASE + workload_idx * NUM_INPUTS_PER_WORKLOAD + input_idx;
};
for (int workload_idx = 0; workload_idx < NUM_WORKLOADS; workload_idx++) {
std::vector<scheduler_t::input_reader_t> readers;
for (int input_idx = 0; input_idx < NUM_INPUTS_PER_WORKLOAD; input_idx++) {
memref_tid_t tid = get_tid(workload_idx, input_idx);
std::vector<trace_entry_t> inputs;
inputs.push_back(make_thread(tid));
inputs.push_back(make_pid(1));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) {
if (instr_idx % 2 == 0) {
inputs.push_back(make_timestamp(
1000 * workload_idx +
100 * (NUM_INPUTS_PER_WORKLOAD - input_idx) + 10 * instr_idx));
}
inputs.push_back(make_instr(42 + instr_idx * 4));
}
inputs.push_back(make_exit(tid));
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
}
sched_inputs.emplace_back(std::move(readers));
sched_inputs.back().thread_modifiers.emplace_back(
get_tid(workload_idx, 1), 1);
}
memref_tid_t tid = TID_BASE + NUM_WORKLOADS * NUM_INPUTS_PER_WORKLOAD;
std::vector<trace_entry_t> inputs;
inputs.push_back(make_thread(tid));
inputs.push_back(make_pid(1));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) {
if (instr_idx % 2 == 0)
inputs.push_back(make_timestamp(1 + instr_idx));
inputs.push_back(make_instr(42 + instr_idx * 4));
}
inputs.push_back(make_exit(tid));
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
3);
sched_ops.quantum_duration = 3;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] ==
".BB.B.HH.HE.EE.BB.B.HH.H..FF.F.JJ.JJ.JJ.JJ.J.CC.C.II.I..DD.DA.AA.G.GG.DD.D.");
assert(sched_as_string[1] ==
".EE.EB.BB.H.HH.EE.E..CC.C.II.IC.CC.F.FF.I.II.FF.F..AA.A.GG.GD.DD.AA.A.GG.G.");
}
static void
test_synthetic_with_bindings()
{
std::cerr << "\n----------------\nTesting synthetic with bindings\n";
static constexpr int NUM_WORKLOADS = 3;
static constexpr int NUM_INPUTS_PER_WORKLOAD = 3;
static constexpr int NUM_OUTPUTS = 5;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<scheduler_t::input_workload_t> sched_inputs;
auto get_tid = [&](int workload_idx, int input_idx) {
return TID_BASE + workload_idx * NUM_INPUTS_PER_WORKLOAD + input_idx;
};
for (int workload_idx = 0; workload_idx < NUM_WORKLOADS; workload_idx++) {
std::vector<scheduler_t::input_reader_t> readers;
for (int input_idx = 0; input_idx < NUM_INPUTS_PER_WORKLOAD; input_idx++) {
memref_tid_t tid = get_tid(workload_idx, input_idx);
std::vector<trace_entry_t> inputs;
inputs.push_back(make_thread(tid));
inputs.push_back(make_pid(1));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) {
if (instr_idx % 2 == 0) {
inputs.push_back(make_timestamp(10 * (instr_idx + 1)));
}
inputs.push_back(make_instr(42 + instr_idx * 4));
}
inputs.push_back(make_exit(tid));
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
}
sched_inputs.emplace_back(std::move(readers));
std::set<scheduler_t::output_ordinal_t> cores;
switch (workload_idx) {
case 0: cores.insert({ 2, 4 }); break;
case 1: cores.insert({ 0, 1 }); break;
case 2: cores.insert({ 1, 2, 3 }); break;
default: assert(false);
}
sched_inputs.back().thread_modifiers.emplace_back(cores);
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
3);
sched_ops.quantum_duration = 3;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == ".DD.D.FF.FD.DD.F.FF.DD.D.FF.F.");
assert(sched_as_string[1] == ".EE.E.HH.HE.EE.I.II.EE.E.");
assert(sched_as_string[2] == ".AA.A.CC.CG.GG.C.CC.HH.H.CC.C.");
assert(sched_as_string[3] == ".GG.G.II.IH.HH.GG.G.II.I.");
assert(sched_as_string[4] == ".BB.BA.AA.B.BB.AA.A.BB.B.");
}
static void
test_synthetic_with_bindings_weighted()
{
std::cerr << "\n----------------\nTesting synthetic with bindings and diff stamps\n";
static constexpr int NUM_WORKLOADS = 3;
static constexpr int NUM_INPUTS_PER_WORKLOAD = 3;
static constexpr int NUM_OUTPUTS = 5;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<scheduler_t::input_workload_t> sched_inputs;
auto get_tid = [&](int workload_idx, int input_idx) {
return TID_BASE + workload_idx * NUM_INPUTS_PER_WORKLOAD + input_idx;
};
for (int workload_idx = 0; workload_idx < NUM_WORKLOADS; workload_idx++) {
std::vector<scheduler_t::input_reader_t> readers;
for (int input_idx = 0; input_idx < NUM_INPUTS_PER_WORKLOAD; input_idx++) {
memref_tid_t tid = get_tid(workload_idx, input_idx);
std::vector<trace_entry_t> inputs;
inputs.push_back(make_thread(tid));
inputs.push_back(make_pid(1));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) {
if (instr_idx % 2 == 0) {
inputs.push_back(make_timestamp(
1000 * workload_idx +
100 * (NUM_INPUTS_PER_WORKLOAD - input_idx) + 10 * instr_idx));
}
inputs.push_back(make_instr(42 + instr_idx * 4));
}
inputs.push_back(make_exit(tid));
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
}
sched_inputs.emplace_back(std::move(readers));
std::set<scheduler_t::output_ordinal_t> cores;
switch (workload_idx) {
case 0: cores.insert({ 2, 4 }); break;
case 1: cores.insert({ 0, 1 }); break;
case 2: cores.insert({ 1, 2, 3 }); break;
default: assert(false);
}
sched_inputs.back().thread_modifiers.emplace_back(cores);
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
3);
sched_ops.quantum_duration = 3;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == ".FF.FF.FF.FF.F..EE.EE.EE.EE.E.");
assert(sched_as_string[1] == ".II.II.II.II.I..DD.DD.DD.DD.D.");
assert(sched_as_string[2] == ".CC.CC.CC.CC.C..AA.AA.AA.AA.A.");
assert(sched_as_string[3] == ".HH.HH.HH.HH.H..GG.GG.GG.GG.G.");
assert(sched_as_string[4] == ".BB.BB.BB.BB.B.");
}
static void
test_synthetic_with_syscalls_multiple()
{
std::cerr << "\n----------------\nTesting synthetic with blocking syscalls\n";
static constexpr int NUM_WORKLOADS = 3;
static constexpr int NUM_INPUTS_PER_WORKLOAD = 3;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<scheduler_t::input_workload_t> sched_inputs;
auto get_tid = [&](int workload_idx, int input_idx) {
return TID_BASE + workload_idx * NUM_INPUTS_PER_WORKLOAD + input_idx;
};
for (int workload_idx = 0; workload_idx < NUM_WORKLOADS; workload_idx++) {
std::vector<scheduler_t::input_reader_t> readers;
for (int input_idx = 0; input_idx < NUM_INPUTS_PER_WORKLOAD; input_idx++) {
memref_tid_t tid = get_tid(workload_idx, input_idx);
std::vector<trace_entry_t> inputs;
inputs.push_back(make_thread(tid));
inputs.push_back(make_pid(1));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) {
if (instr_idx % 2 == 0) {
inputs.push_back(make_timestamp(
1000 * workload_idx +
100 * (NUM_INPUTS_PER_WORKLOAD - input_idx) + 10 * instr_idx));
}
inputs.push_back(make_instr(42 + instr_idx * 4));
if (input_idx == 1 && instr_idx % (workload_idx + 1) == workload_idx) {
inputs.push_back(
make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0));
}
}
inputs.push_back(make_exit(tid));
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
}
sched_inputs.emplace_back(std::move(readers));
sched_inputs.back().thread_modifiers.emplace_back(
get_tid(workload_idx, 1), 1);
}
memref_tid_t tid = TID_BASE + NUM_WORKLOADS * NUM_INPUTS_PER_WORKLOAD;
std::vector<trace_entry_t> inputs;
inputs.push_back(make_thread(tid));
inputs.push_back(make_pid(1));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) {
if (instr_idx % 2 == 0)
inputs.push_back(make_timestamp(1 + instr_idx));
inputs.push_back(make_instr(42 + instr_idx * 4));
}
inputs.push_back(make_exit(tid));
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
3);
sched_ops.quantum_duration = 3;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] ==
".B..HH.H.B.H.HH..B.HH.H..B.E.B...II.I.JJ.JJ.JJ.JJ.J.CC.C.II.I..DD.DA.AA.G.GG."
"DD.D.");
assert(sched_as_string[1] ==
".EE..B..EE..B..EE..B..EE...CC.C.FF.FB..C.CC.F.FF.I.II.FF.F..AA.A.GG.GD.DD.AA."
"A.GG.G.");
}
static void
test_synthetic_with_syscalls_single()
{
std::cerr
<< "\n----------------\nTesting synthetic single-input with blocking syscalls\n";
static constexpr int NUM_WORKLOADS = 1;
static constexpr int NUM_INPUTS_PER_WORKLOAD = 1;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<scheduler_t::input_workload_t> sched_inputs;
auto get_tid = [&](int workload_idx, int input_idx) {
return TID_BASE + workload_idx * NUM_INPUTS_PER_WORKLOAD + input_idx;
};
for (int workload_idx = 0; workload_idx < NUM_WORKLOADS; workload_idx++) {
std::vector<scheduler_t::input_reader_t> readers;
for (int input_idx = 0; input_idx < NUM_INPUTS_PER_WORKLOAD; input_idx++) {
memref_tid_t tid = get_tid(workload_idx, input_idx);
std::vector<trace_entry_t> inputs;
inputs.push_back(make_thread(tid));
inputs.push_back(make_pid(1));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) {
if (instr_idx % 2 == 0) {
inputs.push_back(make_timestamp(
1000 * workload_idx +
100 * (NUM_INPUTS_PER_WORKLOAD - input_idx) + 10 * instr_idx));
}
inputs.push_back(make_instr(42 + instr_idx * 4));
if (instr_idx % 3 == 1) {
inputs.push_back(
make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0));
}
}
inputs.push_back(make_exit(tid));
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
}
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
4);
sched_ops.quantum_duration = 3;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == ".AA..AA.A.A.AA..A.");
assert(sched_as_string[1] == "");
}
static bool
check_ref(std::vector<memref_t> &refs, int &idx, memref_tid_t expected_tid,
trace_type_t expected_type,
trace_marker_type_t expected_marker = TRACE_MARKER_TYPE_RESERVED_END)
{
if (expected_tid != refs[idx].instr.tid || expected_type != refs[idx].instr.type) {
std::cerr << "Record " << idx << " has tid " << refs[idx].instr.tid
<< " and type " << refs[idx].instr.type << " != expected tid "
<< expected_tid << " and expected type " << expected_type << "\n";
return false;
}
if (expected_type == TRACE_TYPE_MARKER &&
expected_marker != refs[idx].marker.marker_type) {
std::cerr << "Record " << idx << " has marker type "
<< refs[idx].marker.marker_type << " but expected " << expected_marker
<< "\n";
return false;
}
++idx;
return true;
}
static void
test_synthetic_with_syscalls_precise()
{
std::cerr << "\n----------------\nTesting blocking syscall precise switch points\n";
static constexpr memref_tid_t TID_A = 42;
static constexpr memref_tid_t TID_B = 99;
static constexpr int SYSNUM = 202;
std::vector<trace_entry_t> refs_A = {
make_thread(TID_A),
make_pid(1),
make_version(TRACE_ENTRY_VERSION),
make_timestamp(20),
make_instr(10),
make_marker(TRACE_MARKER_TYPE_SYSCALL, SYSNUM),
make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0),
make_marker(TRACE_MARKER_TYPE_FUNC_ID, 100),
make_marker(TRACE_MARKER_TYPE_FUNC_ARG, 42),
make_timestamp(50),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 1),
make_marker(TRACE_MARKER_TYPE_FUNC_ID, 100),
make_marker(TRACE_MARKER_TYPE_FUNC_RETVAL, 0),
make_instr(12),
make_exit(TID_A),
};
std::vector<trace_entry_t> refs_B = {
make_thread(TID_B),
make_pid(1),
make_version(TRACE_ENTRY_VERSION),
make_timestamp(120),
make_instr(20),
make_instr(21),
make_exit(TID_B),
};
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
4);
scheduler_t scheduler;
if (scheduler.init(sched_inputs, 1, sched_ops) != scheduler_t::STATUS_SUCCESS)
assert(false);
auto *stream = scheduler.get_stream(0);
memref_t memref;
std::vector<memref_t> refs;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
if (status == scheduler_t::STATUS_WAIT)
continue;
assert(status == scheduler_t::STATUS_OK);
refs.push_back(memref);
}
std::vector<trace_entry_t> entries;
int idx = 0;
bool res = true;
res = res &&
check_ref(refs, idx, TID_A, TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_VERSION) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_TIMESTAMP) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_INSTR) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_SYSCALL) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_MARKER,
TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_FUNC_ID) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_FUNC_ARG) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_TIMESTAMP) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_CPU_ID) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_FUNC_ID) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_FUNC_RETVAL) &&
check_ref(refs, idx, TID_B, TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_VERSION) &&
check_ref(refs, idx, TID_B, TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_TIMESTAMP) &&
check_ref(refs, idx, TID_B, TRACE_TYPE_INSTR) &&
check_ref(refs, idx, TID_B, TRACE_TYPE_INSTR) &&
check_ref(refs, idx, TID_B, TRACE_TYPE_THREAD_EXIT) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_INSTR) &&
check_ref(refs, idx, TID_A, TRACE_TYPE_THREAD_EXIT);
assert(res);
}
static void
test_synthetic_with_syscalls()
{
test_synthetic_with_syscalls_multiple();
test_synthetic_with_syscalls_single();
test_synthetic_with_syscalls_precise();
}
#if (defined(X86_64) || defined(ARM_64)) && defined(HAS_ZIP)
static void
simulate_core(scheduler_t::stream_t *stream)
{
memref_t record;
for (scheduler_t::stream_status_t status = stream->next_record(record);
status != scheduler_t::STATUS_EOF; status = stream->next_record(record)) {
if (status == scheduler_t::STATUS_WAIT) {
std::this_thread::yield();
continue;
}
assert(status == scheduler_t::STATUS_OK);
}
}
#endif
static void
test_synthetic_multi_threaded(const char *testdir)
{
std::cerr << "\n----------------\nTesting synthetic multi-threaded\n";
#if (defined(X86_64) || defined(ARM_64)) && defined(HAS_ZIP)
std::string path = std::string(testdir) + "/drmemtrace.threadsig.x64.tracedir";
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(path);
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
2);
static constexpr int NUM_OUTPUTS = 4;
static constexpr int QUANTUM_DURATION = 2000;
sched_ops.quantum_duration = QUANTUM_DURATION;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::thread> threads;
threads.reserve(NUM_OUTPUTS);
for (int i = 0; i < NUM_OUTPUTS; ++i) {
threads.emplace_back(std::thread(&simulate_core, scheduler.get_stream(i)));
}
for (std::thread &thread : threads)
thread.join();
#endif
}
static void
test_speculation()
{
std::cerr << "\n----------------\nTesting speculation\n";
std::vector<trace_entry_t> memrefs = {
make_thread(1),
make_pid(1),
make_marker(TRACE_MARKER_TYPE_PAGE_SIZE, 4096),
make_timestamp(10),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 1),
make_instr(1, TRACE_TYPE_INSTR_CONDITIONAL_JUMP),
make_instr(2),
make_instr(3, TRACE_TYPE_INSTR_CONDITIONAL_JUMP),
make_instr(4),
make_instr(5),
make_exit(1),
};
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(memrefs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), 1);
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops =
scheduler_t::make_scheduler_serial_options(4);
sched_ops.flags = static_cast<scheduler_t::scheduler_flags_t>(
static_cast<int>(sched_ops.flags) |
static_cast<int>(scheduler_t::SCHEDULER_SPECULATE_NOPS));
if (scheduler.init(sched_inputs, 1, sched_ops) != scheduler_t::STATUS_SUCCESS)
assert(false);
int ordinal = 0;
auto *stream = scheduler.get_stream(0);
memref_t memref;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
switch (ordinal) {
case 0:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_PAGE_SIZE);
break;
case 1:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP);
break;
case 2:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID);
break;
case 3:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 1);
break;
case 4:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 2);
stream->start_speculation(100, true);
assert(stream->unread_last_record() == scheduler_t::STATUS_INVALID);
break;
case 5:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 100);
assert(memref_is_nop_instr(memref));
break;
case 6:
assert(type_is_instr(memref.instr.type));
assert(memref_is_nop_instr(memref));
#ifdef AARCH64
assert(memref.instr.addr == 104);
#elif defined(X86_64) || defined(X86_32)
assert(memref.instr.addr == 101);
#elif defined(ARM)
assert(memref.instr.addr == 102 || memref.instr.addr == 104);
#endif
stream->stop_speculation();
break;
case 7:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 2);
break;
case 8:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 3);
break;
case 9:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 4);
stream->start_speculation(200, false);
break;
case 10:
assert(type_is_instr(memref.instr.type));
assert(memref_is_nop_instr(memref));
assert(memref.instr.addr == 200);
stream->start_speculation(300, false);
assert(stream->unread_last_record() == scheduler_t::STATUS_INVALID);
break;
case 11:
assert(type_is_instr(memref.instr.type));
assert(memref_is_nop_instr(memref));
assert(memref.instr.addr == 300);
stream->stop_speculation();
break;
case 12:
assert(type_is_instr(memref.instr.type));
assert(memref_is_nop_instr(memref));
#ifdef AARCH64
assert(memref.instr.addr == 204);
#elif defined(X86_64) || defined(X86_32)
assert(memref.instr.addr == 201);
#elif defined(ARM)
assert(memref.instr.addr == 202 || memref.instr.addr == 204);
#endif
stream->start_speculation(400, true);
break;
case 13:
assert(type_is_instr(memref.instr.type));
assert(memref_is_nop_instr(memref));
assert(memref.instr.addr == 400);
stream->stop_speculation();
break;
case 14:
assert(type_is_instr(memref.instr.type));
assert(memref_is_nop_instr(memref));
#ifdef AARCH64
assert(memref.instr.addr == 204);
#elif defined(X86_64) || defined(X86_32)
assert(memref.instr.addr == 201);
#elif defined(ARM)
assert(memref.instr.addr == 202 || memref.instr.addr == 204);
#endif
stream->stop_speculation();
break;
case 15:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 5);
break;
default:
assert(ordinal == 16);
assert(memref.exit.type == TRACE_TYPE_THREAD_EXIT);
}
++ordinal;
}
assert(ordinal == 17);
}
static void
test_replay()
{
#ifdef HAS_ZIP
std::cerr << "\n----------------\nTesting replay\n";
static constexpr int NUM_INPUTS = 7;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr int QUANTUM_INSTRS = 3;
static const char *const CORE0_SCHED_STRING = "AAACCCEEEGGGBBBDDDFFFAAA.CCC.EEE.GGG.";
static const char *const CORE1_SCHED_STRING = "BBBDDDFFFAAACCCEEEGGGBBB.DDD.FFF.";
static constexpr memref_tid_t TID_BASE = 100;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
inputs[i].push_back(make_thread(tid));
inputs[i].push_back(make_pid(1));
for (int j = 0; j < NUM_INSTRS; j++)
inputs[i].push_back(make_instr(42 + j * 4));
inputs[i].push_back(make_exit(tid));
}
std::string record_fname = "tmp_test_replay_record.zip";
{
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
2);
sched_ops.quantum_duration = QUANTUM_INSTRS;
zipfile_ostream_t outfile(record_fname);
sched_ops.schedule_record_ostream = &outfile;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
assert(sched_as_string[1] == CORE1_SCHED_STRING);
if (scheduler.write_recorded_schedule() != scheduler_t::STATUS_SUCCESS)
assert(false);
}
for (int outer = 0; outer < 5; ++outer) {
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_AS_PREVIOUSLY,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
2);
zipfile_istream_t infile(record_fname);
sched_ops.schedule_replay_istream = &infile;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
assert(sched_as_string[1] == CORE1_SCHED_STRING);
}
#endif
}
#if (defined(X86_64) || defined(ARM_64)) && defined(HAS_ZIP)
static void
simulate_core_and_record_schedule(scheduler_t::stream_t *stream,
const scheduler_t &scheduler,
std::vector<context_switch_t> &thread_sequence)
{
memref_t record;
memref_tid_t prev_tid = INVALID_THREAD_ID;
memtrace_stream_t *prev_stream = nullptr;
for (scheduler_t::stream_status_t status = stream->next_record(record);
status != scheduler_t::STATUS_EOF; status = stream->next_record(record)) {
if (status == scheduler_t::STATUS_WAIT) {
std::this_thread::yield();
continue;
}
assert(status == scheduler_t::STATUS_OK);
if (record.instr.tid != prev_tid && prev_tid != INVALID_THREAD_ID) {
auto *new_stream =
scheduler.get_input_stream_interface(stream->get_input_stream_ordinal());
assert(new_stream != nullptr);
assert(prev_stream != nullptr);
thread_sequence.emplace_back(
prev_tid, record.instr.tid,
trace_position_t(stream->get_record_ordinal(),
stream->get_instruction_ordinal(),
stream->get_last_timestamp()),
trace_position_t(prev_stream->get_record_ordinal(),
prev_stream->get_instruction_ordinal(),
prev_stream->get_last_timestamp()),
trace_position_t(new_stream->get_record_ordinal(),
new_stream->get_instruction_ordinal(),
new_stream->get_last_timestamp()));
}
prev_tid = record.instr.tid;
prev_stream =
scheduler.get_input_stream_interface(stream->get_input_stream_ordinal());
}
if (thread_sequence.empty()) {
thread_sequence.emplace_back(INVALID_THREAD_ID, prev_tid,
trace_position_t(0, 0, 0), trace_position_t(0, 0, 0),
trace_position_t(0, 0, 0));
}
}
#endif
static void
test_replay_multi_threaded(const char *testdir)
{
std::cerr << "\n----------------\nTesting synthetic multi-threaded replay\n";
#if (defined(X86_64) || defined(ARM_64)) && defined(HAS_ZIP)
std::string path = std::string(testdir) + "/drmemtrace.threadsig.x64.tracedir";
std::string record_fname = "tmp_test_replay_multi_record.zip";
static constexpr int NUM_OUTPUTS = 4;
std::vector<std::vector<context_switch_t>> thread_sequence(NUM_OUTPUTS);
{
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(path);
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
1);
zipfile_ostream_t outfile(record_fname);
sched_ops.schedule_record_ostream = &outfile;
static constexpr int QUANTUM_DURATION = 2000;
sched_ops.quantum_duration = QUANTUM_DURATION;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::thread> threads;
threads.reserve(NUM_OUTPUTS);
for (int i = 0; i < NUM_OUTPUTS; ++i) {
threads.emplace_back(std::thread(&simulate_core_and_record_schedule,
scheduler.get_stream(i), std::ref(scheduler),
std::ref(thread_sequence[i])));
}
for (std::thread &thread : threads)
thread.join();
if (scheduler.write_recorded_schedule() != scheduler_t::STATUS_SUCCESS)
assert(false);
}
{
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(path);
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_AS_PREVIOUSLY,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
1);
zipfile_istream_t infile(record_fname);
sched_ops.schedule_replay_istream = &infile;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::vector<context_switch_t>> replay_sequence(NUM_OUTPUTS);
std::vector<std::thread> threads;
threads.reserve(NUM_OUTPUTS);
for (int i = 0; i < NUM_OUTPUTS; ++i) {
threads.emplace_back(std::thread(&simulate_core_and_record_schedule,
scheduler.get_stream(i), std::ref(scheduler),
std::ref(replay_sequence[i])));
}
for (std::thread &thread : threads)
thread.join();
std::cerr << "Recorded:\n";
for (int i = 0; i < NUM_OUTPUTS; ++i) {
std::cerr << "Core #" << i << ": ";
for (const auto &cs : thread_sequence[i])
std::cerr << " " << cs << "\n";
std::cerr << "\n";
}
std::cerr << "Replayed:\n";
for (int i = 0; i < NUM_OUTPUTS; ++i) {
std::cerr << "Core #" << i << ": ";
for (const auto &cs : replay_sequence[i])
std::cerr << " " << cs << "\n";
std::cerr << "\n";
}
for (int i = 0; i < NUM_OUTPUTS; ++i) {
assert(thread_sequence[i].size() == replay_sequence[i].size());
for (size_t j = 0; j < thread_sequence[i].size(); ++j) {
assert(thread_sequence[i][j] == replay_sequence[i][j]);
}
}
}
#endif
}
#ifdef HAS_ZIP
class test_scheduler_t : public scheduler_t {
public:
void
write_test_schedule(std::string record_fname)
{
scheduler_t scheduler;
std::vector<schedule_record_t> sched0;
sched0.emplace_back(scheduler_t::schedule_record_t::VERSION, 0, 0, 0, 0);
sched0.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 0, 4, 11);
sched0.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 7,
0xffffffffffffffffUL, 91);
sched0.emplace_back(scheduler_t::schedule_record_t::FOOTER, 0, 0, 0, 0);
std::vector<schedule_record_t> sched1;
sched1.emplace_back(scheduler_t::schedule_record_t::VERSION, 0, 0, 0, 0);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 0, 4, 10);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 0, 4, 20);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 2, 4, 7, 60);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 0, 4, 30);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 4, 7, 40);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 4, 7, 50);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 4, 7, 70);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 0, 7,
0xffffffffffffffffUL, 80);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 1, 7,
0xffffffffffffffffUL, 90);
sched1.emplace_back(scheduler_t::schedule_record_t::DEFAULT, 3, 7,
0xffffffffffffffffUL, 110);
sched1.emplace_back(scheduler_t::schedule_record_t::FOOTER, 0, 0, 0, 0);
zipfile_ostream_t outfile(record_fname);
std::string err = outfile.open_new_component(recorded_schedule_component_name(0));
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched0.data()),
sched0.size() * sizeof(sched0[0])))
assert(false);
err = outfile.open_new_component(recorded_schedule_component_name(1));
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched1.data()),
sched1.size() * sizeof(sched1[0])))
assert(false);
}
};
#endif
static void
test_replay_timestamps()
{
#ifdef HAS_ZIP
std::cerr << "\n----------------\nTesting replay timestamp ordering\n";
static constexpr int NUM_INPUTS = 4;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
inputs[i].push_back(make_thread(tid));
inputs[i].push_back(make_pid(1));
inputs[i].push_back(make_timestamp(10 + i));
for (int j = 0; j < NUM_INSTRS; j++)
inputs[i].push_back(make_instr(42 + j * 4));
inputs[i].push_back(make_exit(tid));
}
static const char *const CORE0_SCHED_STRING = ".AAA-------------------------CCC.";
static const char *const CORE1_SCHED_STRING = ".BBB.CCCCCC.DDDAAABBBDDDAAA.BBB.DDD.";
std::string record_fname = "tmp_test_replay_timestamp.zip";
test_scheduler_t test_scheduler;
test_scheduler.write_test_schedule(record_fname);
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_AS_PREVIOUSLY,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
4);
zipfile_istream_t infile(record_fname);
sched_ops.schedule_replay_istream = &infile;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
assert(sched_as_string[1] == CORE1_SCHED_STRING);
#endif
}
static void
test_replay_skip()
{
#ifdef HAS_ZIP
std::cerr << "\n----------------\nTesting replay of skips\n";
std::vector<trace_entry_t> memrefs = {
make_thread(1),
make_pid(1),
make_marker(TRACE_MARKER_TYPE_PAGE_SIZE, 4096),
make_timestamp(10),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 1),
make_instr(1),
make_instr(2),
make_instr(3),
make_timestamp(20),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 2),
make_timestamp(30),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 3),
make_instr(4),
make_timestamp(40),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 4),
make_instr(5),
make_instr(6),
make_timestamp(50),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 5),
make_instr(7),
make_instr(8),
make_exit(1),
};
std::vector<scheduler_t::range_t> regions;
regions.emplace_back(2, 2);
regions.emplace_back(6, 7);
std::string record_fname = "tmp_test_replay_skip.zip";
{
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(memrefs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), 1);
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
sched_inputs[0].thread_modifiers.push_back(
scheduler_t::input_thread_info_t(regions));
scheduler_t scheduler;
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
4);
zipfile_ostream_t outfile(record_fname);
sched_ops.schedule_record_ostream = &outfile;
if (scheduler.init(sched_inputs, 1, sched_ops) != scheduler_t::STATUS_SUCCESS)
assert(false);
auto *stream = scheduler.get_stream(0);
memref_t memref;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
}
if (scheduler.write_recorded_schedule() != scheduler_t::STATUS_SUCCESS)
assert(false);
}
{
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(memrefs)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), 1);
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
sched_inputs[0].thread_modifiers.push_back(
scheduler_t::input_thread_info_t(regions));
scheduler_t scheduler;
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_AS_PREVIOUSLY,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
4);
zipfile_istream_t infile(record_fname);
sched_ops.schedule_replay_istream = &infile;
if (scheduler.init(sched_inputs, 1, sched_ops) != scheduler_t::STATUS_SUCCESS)
assert(false);
int ordinal = 0;
auto *stream = scheduler.get_stream(0);
memref_t memref;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
switch (ordinal) {
case 0:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP);
assert(memref.marker.marker_value == 10);
break;
case 1:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID);
assert(memref.marker.marker_value == 1);
break;
case 2:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 2);
break;
case 3:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_WINDOW_ID);
assert(memref.marker.marker_value == 1);
break;
case 4:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP);
assert(memref.marker.marker_value == 40);
break;
case 5:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID);
assert(memref.marker.marker_value == 4);
break;
case 6:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 6);
break;
case 7:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP);
assert(memref.marker.marker_value == 50);
break;
case 8:
assert(memref.marker.type == TRACE_TYPE_MARKER);
assert(memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID);
assert(memref.marker.marker_value == 5);
break;
case 9:
assert(type_is_instr(memref.instr.type));
assert(memref.instr.addr == 7);
break;
default:
assert(ordinal == 10);
assert(memref.exit.type == TRACE_TYPE_THREAD_EXIT);
}
++ordinal;
}
assert(ordinal == 11);
}
#endif
}
static void
test_replay_as_traced()
{
#ifdef HAS_ZIP
std::cerr << "\n----------------\nTesting replay as-traced\n";
static constexpr int NUM_INPUTS = 5;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
static constexpr int CPU0 = 6;
static constexpr int CPU1 = 9;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
inputs[i].push_back(make_thread(tid));
inputs[i].push_back(make_pid(1));
for (int j = 0; j < (i == NUM_INPUTS - 1 ? 6 : NUM_INSTRS); j++)
inputs[i].push_back(make_instr(42 + j * 4));
inputs[i].push_back(make_exit(tid));
}
std::string cpu_fname = "tmp_test_cpu_as_traced.zip";
static const char *const CORE0_SCHED_STRING = "EEE-AAA-CCCAAACCCBBB.DDD.";
static const char *const CORE1_SCHED_STRING = "---EEE.BBBDDDBBBDDDAAA.CCC.";
{
std::vector<schedule_entry_t> sched0;
sched0.emplace_back(TID_BASE + 4, 10, CPU0, 0);
sched0.emplace_back(TID_BASE, 101, CPU0, 0);
sched0.emplace_back(TID_BASE + 2, 103, CPU0, 0);
sched0.emplace_back(TID_BASE, 105, CPU0, 4);
sched0.emplace_back(TID_BASE + 2, 107, CPU0, 4);
sched0.emplace_back(TID_BASE + 1, 109, CPU0, 7);
sched0.emplace_back(TID_BASE + 3, 111, CPU0, 7);
std::vector<schedule_entry_t> sched1;
sched1.emplace_back(TID_BASE + 4, 20, CPU1, 4);
sched1.emplace_back(TID_BASE + 1, 102, CPU1, 0);
sched1.emplace_back(TID_BASE + 3, 104, CPU1, 0);
sched1.emplace_back(TID_BASE + 1, 106, CPU1, 4);
sched1.emplace_back(TID_BASE + 3, 108, CPU1, 4);
sched1.emplace_back(TID_BASE, 110, CPU1, 7);
sched1.emplace_back(TID_BASE + 2, 112, CPU1, 7);
std::ostringstream cpu0_string;
cpu0_string << CPU0;
std::ostringstream cpu1_string;
cpu1_string << CPU1;
zipfile_ostream_t outfile(cpu_fname);
std::string err = outfile.open_new_component(cpu0_string.str());
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched0.data()),
sched0.size() * sizeof(sched0[0])))
assert(false);
err = outfile.open_new_component(cpu1_string.str());
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched1.data()),
sched1.size() * sizeof(sched1[0])))
assert(false);
}
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_RECORDED_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
3);
zipfile_istream_t infile(cpu_fname);
sched_ops.replay_as_traced_istream = &infile;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
for (int i = 0; i < NUM_OUTPUTS; i++) {
int64_t cpu = scheduler.get_stream(i)->get_output_cpuid();
assert(cpu >= 0);
if (i == 0)
assert(cpu == CPU0);
else
assert(cpu == CPU1);
}
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
assert(sched_as_string[1] == CORE1_SCHED_STRING);
#endif
}
static void
test_replay_as_traced_i6107_workaround()
{
#ifdef HAS_ZIP
std::cerr << "\n----------------\nTesting replay as-traced i#6107 workaround\n";
static constexpr int NUM_INPUTS = 2;
static constexpr int NUM_OUTPUTS = 1;
static constexpr int CHUNK_NUM_INSTRS = 10 * 1000 * 1000;
static constexpr int SCHED_STEP_INSTRS = 1000 * 1000;
static constexpr memref_tid_t TID_BASE = 100;
static constexpr uint64_t TIMESTAMP_BASE = 100;
static constexpr int CPU = 6;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int input_idx = 0; input_idx < NUM_INPUTS; input_idx++) {
memref_tid_t tid = TID_BASE + input_idx;
inputs[input_idx].push_back(make_thread(tid));
inputs[input_idx].push_back(make_pid(1));
for (int step_idx = 0; step_idx <= CHUNK_NUM_INSTRS / SCHED_STEP_INSTRS;
++step_idx) {
inputs[input_idx].push_back(make_timestamp(101 + step_idx));
for (int instr_idx = 0; instr_idx < SCHED_STEP_INSTRS; ++instr_idx) {
inputs[input_idx].push_back(make_instr(42 + instr_idx));
}
}
inputs[input_idx].push_back(make_exit(tid));
}
std::string cpu_fname = "tmp_test_cpu_i6107.zip";
{
std::vector<schedule_entry_t> sched;
for (int step_idx = 0; step_idx <= CHUNK_NUM_INSTRS / SCHED_STEP_INSTRS;
++step_idx) {
for (int input_idx = 0; input_idx < NUM_INPUTS; input_idx++) {
sched.emplace_back(TID_BASE + input_idx, TIMESTAMP_BASE + step_idx, CPU,
step_idx * SCHED_STEP_INSTRS % CHUNK_NUM_INSTRS);
}
}
std::ostringstream cpu_string;
cpu_string << CPU;
zipfile_ostream_t outfile(cpu_fname);
std::string err = outfile.open_new_component(cpu_string.str());
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched.data()),
sched.size() * sizeof(sched[0])))
assert(false);
}
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int input_idx = 0; input_idx < NUM_INPUTS; input_idx++) {
memref_tid_t tid = TID_BASE + input_idx;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(
std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[input_idx])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_RECORDED_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
2);
zipfile_istream_t infile(cpu_fname);
sched_ops.replay_as_traced_istream = &infile;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
auto *stream = scheduler.get_stream(0);
memref_t memref;
for (scheduler_t::stream_status_t status = stream->next_record(memref);
status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) {
assert(status == scheduler_t::STATUS_OK);
}
#endif
}
static void
test_replay_as_traced_from_file(const char *testdir)
{
#if (defined(X86_64) || defined(ARM_64)) && defined(HAS_ZIP)
std::cerr << "\n----------------\nTesting replay as-traced from a file\n";
std::string path = std::string(testdir) + "/drmemtrace.threadsig.x64.tracedir";
std::string cpu_file =
std::string(testdir) + "/drmemtrace.threadsig.x64.tracedir/cpu_schedule.bin.zip";
static const char *const SCHED_STRING =
"Core #0: 1257598 \nCore #1: 1257603 \nCore #2: 1257601 \n"
"Core #3: 1257599 => 1257604 @ <366987,87875,13331862029895453> "
"(<366986,87875,13331862029895453> => <1,0,0>) \n"
"Core #4: 1257600 \nCore #5: 1257596 \nCore #6: 1257602 \n";
static constexpr int NUM_OUTPUTS = 7;
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(path);
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_RECORDED_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
2);
std::cerr << "Reading cpu file " << cpu_file << "\n";
zipfile_istream_t infile(cpu_file);
sched_ops.replay_as_traced_istream = &infile;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::vector<context_switch_t>> replay_sequence(NUM_OUTPUTS);
std::vector<std::thread> threads;
threads.reserve(NUM_OUTPUTS);
for (int i = 0; i < NUM_OUTPUTS; ++i) {
threads.emplace_back(std::thread(&simulate_core_and_record_schedule,
scheduler.get_stream(i), std::ref(scheduler),
std::ref(replay_sequence[i])));
}
for (std::thread &thread : threads)
thread.join();
std::ostringstream replay_string;
for (int i = 0; i < NUM_OUTPUTS; ++i) {
replay_string << "Core #" << i << ": ";
for (const auto &cs : replay_sequence[i])
replay_string << cs << " ";
replay_string << "\n";
}
std::cerr << "As-traced from file:\n"
<< SCHED_STRING << "Versus replay:\n"
<< replay_string.str();
assert(replay_string.str() == SCHED_STRING);
#endif
}
static void
test_inactive()
{
#ifdef HAS_ZIP
std::cerr << "\n----------------\nTesting inactive cores\n";
static constexpr memref_tid_t TID_A = 42;
static constexpr memref_tid_t TID_B = TID_A + 1;
static constexpr int NUM_OUTPUTS = 2;
std::vector<trace_entry_t> refs_A = {
make_thread(TID_A),
make_pid(1),
make_version(TRACE_ENTRY_VERSION),
make_timestamp(10),
make_instr(10),
make_instr(30),
make_instr(50),
make_exit(TID_A),
};
std::vector<trace_entry_t> refs_B = {
make_thread(TID_B),
make_pid(1),
make_version(TRACE_ENTRY_VERSION),
make_timestamp(20),
make_instr(20),
make_instr(40),
make_instr(60),
make_instr(80),
make_exit(TID_B),
};
std::string record_fname = "tmp_test_replay_inactive.zip";
{
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
4);
sched_ops.quantum_duration = 2;
zipfile_ostream_t outfile(record_fname);
sched_ops.schedule_record_ostream = &outfile;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
auto *stream0 = scheduler.get_stream(0);
auto *stream1 = scheduler.get_stream(1);
auto check_next = [](scheduler_t::stream_t *stream,
scheduler_t::stream_status_t expect_status,
memref_tid_t expect_tid = INVALID_THREAD_ID,
trace_type_t expect_type = TRACE_TYPE_READ) {
memref_t memref;
scheduler_t::stream_status_t status = stream->next_record(memref);
assert(status == expect_status);
if (status == scheduler_t::STATUS_OK) {
if (memref.marker.tid != expect_tid) {
std::cerr << "Expected tid " << expect_tid
<< " != " << memref.marker.tid << "\n";
assert(false);
}
if (memref.marker.type != expect_type) {
std::cerr << "Expected type " << expect_type
<< " != " << memref.marker.type << "\n";
assert(false);
}
}
};
scheduler_t::stream_status_t status;
status = stream0->unread_last_record();
assert(status == scheduler_t::STATUS_INVALID);
check_next(stream0, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER);
check_next(stream0, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER);
check_next(stream0, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR);
uint64_t ref_ord = stream0->get_record_ordinal();
uint64_t instr_ord = stream0->get_instruction_ordinal();
status = stream0->unread_last_record();
assert(status == scheduler_t::STATUS_OK);
assert(stream0->get_record_ordinal() == ref_ord - 1);
assert(stream0->get_instruction_ordinal() == instr_ord - 1);
assert(stream0->start_speculation(300, true) == scheduler_t::STATUS_INVALID);
check_next(stream0, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR);
assert(stream0->get_record_ordinal() == ref_ord);
assert(stream0->get_instruction_ordinal() == instr_ord);
check_next(stream1, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_MARKER);
check_next(stream1, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_MARKER);
check_next(stream1, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR);
check_next(stream1, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR);
status = stream1->unread_last_record();
assert(status == scheduler_t::STATUS_OK);
status = stream1->unread_last_record();
assert(status == scheduler_t::STATUS_INVALID);
status = stream1->set_active(false);
assert(status == scheduler_t::STATUS_OK);
check_next(stream1, scheduler_t::STATUS_WAIT);
status = stream1->set_active(false);
assert(status == scheduler_t::STATUS_OK);
check_next(stream1, scheduler_t::STATUS_WAIT);
check_next(stream0, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR);
check_next(stream0, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR);
check_next(stream0, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR);
check_next(stream0, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR);
status = stream0->set_active(false);
assert(status == scheduler_t::STATUS_OK);
check_next(stream0, scheduler_t::STATUS_WAIT);
status = stream1->set_active(true);
assert(status == scheduler_t::STATUS_OK);
check_next(stream1, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR);
check_next(stream1, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_THREAD_EXIT);
check_next(stream1, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_THREAD_EXIT);
check_next(stream1, scheduler_t::STATUS_EOF);
if (scheduler.write_recorded_schedule() != scheduler_t::STATUS_SUCCESS)
assert(false);
}
{
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_AS_PREVIOUSLY,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
4);
zipfile_istream_t infile(record_fname);
sched_ops.schedule_replay_istream = &infile;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_A);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == "..AABBA.");
assert(sched_as_string[1] == "..B---B.");
}
#endif
}
int
test_main(int argc, const char *argv[])
{
assert(argc == 2);
dr_standalone_init();
test_serial();
test_parallel();
test_param_checks();
test_regions();
test_only_threads();
test_real_file_queries_and_filters(argv[1]);
test_synthetic();
test_synthetic_time_quanta();
test_synthetic_with_timestamps();
test_synthetic_with_priorities();
test_synthetic_with_bindings();
test_synthetic_with_bindings_weighted();
test_synthetic_with_syscalls();
test_synthetic_multi_threaded(argv[1]);
test_speculation();
test_replay();
test_replay_multi_threaded(argv[1]);
test_replay_timestamps();
test_replay_skip();
test_replay_as_traced_from_file(argv[1]);
test_replay_as_traced();
test_replay_as_traced_i6107_workaround();
test_inactive();
dr_standalone_exit();
return 0;
}
}
}