"""
Test case for testing the gdbremote protocol.
Tests run against debugserver and lldb-server (llgs).
lldb-server tests run where the lldb-server exe is
available.
This class will be broken into smaller test case classes by
gdb remote packet functional areas. For now it contains
the initial set of tests implemented.
"""
import binascii
import itertools
import struct
import gdbremote_testcase
import lldbgdbserverutils
from lldbsuite.support import seven
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test.lldbdwarf import *
from lldbsuite.test import lldbutil, lldbplatformutil
class LldbGdbServerTestCase(
gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser
):
def test_thread_suffix_supported(self):
server = self.connect_to_debug_monitor()
self.assertIsNotNone(server)
self.do_handshake()
self.test_sequence.add_log_lines(
[
"lldb-server < 26> read packet: $QThreadSuffixSupported#e4",
"lldb-server < 6> send packet: $OK#9a",
],
True,
)
self.expect_gdbremote_sequence()
def test_list_threads_in_stop_reply_supported(self):
server = self.connect_to_debug_monitor()
self.assertIsNotNone(server)
self.do_handshake()
self.test_sequence.add_log_lines(
[
"lldb-server < 27> read packet: $QListThreadsInStopReply#21",
"lldb-server < 6> send packet: $OK#9a",
],
True,
)
self.expect_gdbremote_sequence()
def test_c_packet_works(self):
self.build()
procs = self.prep_debug_monitor_and_inferior()
self.test_sequence.add_log_lines(
["read packet: $c#63", "send packet: $W00#00"], True
)
self.expect_gdbremote_sequence()
@skipIfWindows
def test_inferior_print_exit(self):
self.build()
procs = self.prep_debug_monitor_and_inferior(inferior_args=["hello, world"])
self.test_sequence.add_log_lines(
[
"read packet: $vCont;c#a8",
{
"type": "output_match",
"regex": self.maybe_strict_output_regex(r"hello, world\r\n"),
},
"send packet: $W00#00",
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
def test_first_launch_stop_reply_thread_matches_first_qC(self):
self.build()
procs = self.prep_debug_monitor_and_inferior()
self.test_sequence.add_log_lines(
[
"read packet: $qC#00",
{
"direction": "send",
"regex": r"^\$QC([0-9a-fA-F]+)#",
"capture": {1: "thread_id_QC"},
},
"read packet: $?#00",
{
"direction": "send",
"regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)",
"capture": {1: "thread_id_?"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertEqual(context.get("thread_id_QC"), context.get("thread_id_?"))
def test_attach_commandline_continue_app_exits(self):
self.build()
self.set_inferior_startup_attach()
procs = self.prep_debug_monitor_and_inferior()
self.test_sequence.add_log_lines(
["read packet: $vCont;c#a8", "send packet: $W00#00"], True
)
self.expect_gdbremote_sequence()
time.sleep(1)
if not lldb.remote_platform:
poll_result = procs["inferior"].poll()
self.assertIsNotNone(poll_result)
self.assertFalse(
lldbgdbserverutils.process_is_running(procs["inferior"].pid, False)
)
def test_qRegisterInfo_returns_one_valid_result(self):
self.build()
self.prep_debug_monitor_and_inferior()
self.test_sequence.add_log_lines(
[
"read packet: $qRegisterInfo0#00",
{
"direction": "send",
"regex": r"^\$(.+);#[0-9A-Fa-f]{2}",
"capture": {1: "reginfo_0"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
reg_info_packet = context.get("reginfo_0")
self.assertIsNotNone(reg_info_packet)
self.assert_valid_reg_info(
lldbgdbserverutils.parse_reg_info_response(reg_info_packet)
)
def test_qRegisterInfo_returns_all_valid_results(self):
self.build()
self.prep_debug_monitor_and_inferior()
self.add_register_info_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
for reg_info in self.parse_register_info_packets(context):
self.assert_valid_reg_info(reg_info)
def test_qRegisterInfo_contains_required_generics_debugserver(self):
self.build()
self.prep_debug_monitor_and_inferior()
self.add_register_info_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
reg_infos = self.parse_register_info_packets(context)
generic_regs = {
reg_info["generic"]: 1 for reg_info in reg_infos if "generic" in reg_info
}
self.assertIn("pc", generic_regs)
if self.getArchitecture() != "powerpc64le":
self.assertIn("fp", generic_regs)
self.assertIn("sp", generic_regs)
self.assertIn("flags", generic_regs)
def test_qRegisterInfo_contains_at_least_one_register_set(self):
self.build()
self.prep_debug_monitor_and_inferior()
self.add_register_info_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
reg_infos = self.parse_register_info_packets(context)
register_sets = {
reg_info["set"]: 1 for reg_info in reg_infos if "set" in reg_info
}
self.assertGreaterEqual(len(register_sets), 1)
def targetHasAVX(self):
triple = self.dbg.GetSelectedPlatform().GetTriple()
if not re.match(".*-.*-linux", triple):
return True
if lldb.remote_platform:
self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"')
cpuinfo_path = "cpuinfo"
self.addTearDownHook(lambda: os.unlink("cpuinfo"))
else:
cpuinfo_path = "/proc/cpuinfo"
f = open(cpuinfo_path, "r")
cpuinfo = f.read()
f.close()
return " avx " in cpuinfo
@expectedFailureAll(oslist=["windows"])
@skipIf(archs=no_match(["amd64", "i386", "x86_64"]))
@add_test_categories(["llgs"])
def test_qRegisterInfo_contains_avx_registers(self):
self.build()
self.prep_debug_monitor_and_inferior()
self.add_register_info_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
reg_infos = self.parse_register_info_packets(context)
register_sets = {
reg_info["set"]: 1 for reg_info in reg_infos if "set" in reg_info
}
self.assertEqual(
self.targetHasAVX(), "Advanced Vector Extensions" in register_sets
)
def qThreadInfo_contains_thread(self):
procs = self.prep_debug_monitor_and_inferior()
self.add_threadinfo_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
threads = self.parse_threadinfo_packets(context)
self.assertIsNotNone(threads)
self.assertEqual(len(threads), 1)
def test_qThreadInfo_contains_thread_launch(self):
self.build()
self.set_inferior_startup_launch()
self.qThreadInfo_contains_thread()
@expectedFailureAll(oslist=["windows"])
def test_qThreadInfo_contains_thread_attach(self):
self.build()
self.set_inferior_startup_attach()
self.qThreadInfo_contains_thread()
def qThreadInfo_matches_qC(self):
procs = self.prep_debug_monitor_and_inferior()
self.add_threadinfo_collection_packets()
self.test_sequence.add_log_lines(
[
"read packet: $qC#00",
{
"direction": "send",
"regex": r"^\$QC([0-9a-fA-F]+)#",
"capture": {1: "thread_id"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
threads = self.parse_threadinfo_packets(context)
self.assertIsNotNone(threads)
self.assertEqual(len(threads), 1)
QC_thread_id_hex = context.get("thread_id")
self.assertIsNotNone(QC_thread_id_hex)
QC_thread_id = int(QC_thread_id_hex, 16)
self.assertEqual(threads[0], QC_thread_id)
def test_qThreadInfo_matches_qC_launch(self):
self.build()
self.set_inferior_startup_launch()
self.qThreadInfo_matches_qC()
@expectedFailureAll(oslist=["windows"])
def test_qThreadInfo_matches_qC_attach(self):
self.build()
self.set_inferior_startup_attach()
self.qThreadInfo_matches_qC()
def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior()
self.add_register_info_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
reg_infos = self.parse_register_info_packets(context)
self.assertIsNotNone(reg_infos)
self.assertGreater(len(reg_infos), 0)
byte_order = self.get_target_byte_order()
reg_index = 0
for reg_info in reg_infos:
if not "set" in reg_info:
continue
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $p{0:x}#00".format(reg_index),
{
"direction": "send",
"regex": r"^\$([0-9a-fA-F]+)#",
"capture": {1: "p_response"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
p_response = context.get("p_response")
self.assertIsNotNone(p_response)
if p_response.startswith("E") and len(p_response) == 3:
continue
if "dynamic_size_dwarf_expr_bytes" in reg_info:
self.updateRegInfoBitsize(reg_info, byte_order)
self.assertEqual(
len(p_response), 2 * int(reg_info["bitsize"]) / 8, reg_info
)
reg_index += 1
def Hg_switches_to_3_threads(self, pass_pid=False):
_, threads = self.launch_with_threads(3)
pid_str = ""
if pass_pid:
pid_str = "p{0:x}.".format(procs["inferior"].pid)
for thread in threads:
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $Hg{0}{1:x}#00".format(
pid_str, thread
),
"send packet: $OK#00",
"read packet: $qC#00",
{
"direction": "send",
"regex": r"^\$QC([0-9a-fA-F]+)#",
"capture": {1: "thread_id"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
self.assertIsNotNone(context.get("thread_id"))
self.assertEqual(int(context.get("thread_id"), 16), thread)
@skipIf(compiler="clang", compiler_version=["<", "11.0"])
def test_Hg_switches_to_3_threads_launch(self):
self.build()
self.set_inferior_startup_launch()
self.Hg_switches_to_3_threads()
def Hg_fails_on_pid(self, pass_pid):
_, threads = self.launch_with_threads(2)
if pass_pid == -1:
pid_str = "p-1."
else:
pid_str = "p{0:x}.".format(pass_pid)
thread = threads[1]
self.test_sequence.add_log_lines(
[
"read packet: $Hg{0}{1:x}#00".format(
pid_str, thread
),
"send packet: $Eff#00",
],
True,
)
self.expect_gdbremote_sequence()
@add_test_categories(["llgs"])
def test_Hg_fails_on_another_pid(self):
self.build()
self.set_inferior_startup_launch()
self.Hg_fails_on_pid(1)
@add_test_categories(["llgs"])
def test_Hg_fails_on_zero_pid(self):
self.build()
self.set_inferior_startup_launch()
self.Hg_fails_on_pid(0)
@add_test_categories(["llgs"])
def test_Hg_fails_on_minus_one_pid(self):
self.build()
self.set_inferior_startup_launch()
self.Hg_fails_on_pid(-1)
def Hc_then_Csignal_signals_correct_thread(self, segfault_signo):
NUM_THREADS = 3
inferior_args = ["thread:segfault"]
for i in range(NUM_THREADS - 1):
inferior_args.append("thread:new")
inferior_args.append("sleep:10")
procs = self.prep_debug_monitor_and_inferior(inferior_args=inferior_args)
self.test_sequence.add_log_lines(["read packet: $c#63"], True)
context = self.expect_gdbremote_sequence()
signaled_tids = {}
print_thread_ids = {}
for i in range(NUM_THREADS - 1):
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
"capture": {1: "signo", 2: "thread_id"},
}
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
signo = context.get("signo")
self.assertEqual(int(signo, 16), segfault_signo)
thread_id = int(context.get("thread_id"), 16)
self.assertNotIn(thread_id, signaled_tids)
signaled_tids[thread_id] = 1
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $Hc{0:x}#00".format(thread_id),
"send packet: $OK#00",
"read packet: $C{0:x}#00".format(
lldbutil.get_signal_number("SIGUSR1")
),
{
"type": "output_match",
"regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n",
"capture": {1: "print_thread_id", 2: "post_handle_thread_id"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
print_thread_id = context.get("print_thread_id")
self.assertIsNotNone(print_thread_id)
print_thread_id = int(print_thread_id, 16)
self.assertNotIn(print_thread_id, print_thread_ids)
print_thread_ids[print_thread_id] = 1
post_handle_thread_id = context.get("post_handle_thread_id")
self.assertIsNotNone(post_handle_thread_id)
post_handle_thread_id = int(post_handle_thread_id, 16)
self.assertEqual(post_handle_thread_id, print_thread_id)
@expectedFailureDarwin
@skipIfWindows
@expectedFailureNetBSD
def test_Hc_then_Csignal_signals_correct_thread_launch(self):
self.build()
self.set_inferior_startup_launch()
if self.platformIsDarwin():
self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS)
else:
self.Hc_then_Csignal_signals_correct_thread(
lldbutil.get_signal_number("SIGSEGV")
)
@skipIfWindows
def test_m_packet_reads_memory(self):
self.build()
self.set_inferior_startup_launch()
MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz"
procs = self.prep_debug_monitor_and_inferior(
inferior_args=[
"set-message:%s" % MEMORY_CONTENTS,
"get-data-address-hex:g_message",
"sleep:5",
]
)
self.test_sequence.add_log_lines(
[
"read packet: $c#63",
{
"type": "output_match",
"regex": self.maybe_strict_output_regex(
r"data address: 0x([0-9a-fA-F]+)\r\n"
),
"capture": {1: "message_address"},
},
"read packet: {}".format(chr(3)),
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
"capture": {1: "stop_signo", 2: "stop_thread_id"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
self.assertIsNotNone(context.get("message_address"))
message_address = int(context.get("message_address"), 16)
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $m{0:x},{1:x}#00".format(
message_address, len(MEMORY_CONTENTS)
),
{
"direction": "send",
"regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
"capture": {1: "read_contents"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
self.assertIsNotNone(context.get("read_contents"))
read_contents = seven.unhexlify(context.get("read_contents"))
self.assertEqual(read_contents, MEMORY_CONTENTS)
def test_qMemoryRegionInfo_is_supported(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior()
self.test_sequence.add_log_lines(
["read packet: $qMemoryRegionInfo#00", "send packet: $OK#00"], True
)
self.expect_gdbremote_sequence()
@skipIfWindows
def test_qMemoryRegionInfo_reports_code_address_as_executable(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(
inferior_args=["get-code-address-hex:hello", "sleep:5"]
)
self.test_sequence.add_log_lines(
[
"read packet: $c#63",
{
"type": "output_match",
"regex": self.maybe_strict_output_regex(
r"code address: 0x([0-9a-fA-F]+)\r\n"
),
"capture": {1: "code_address"},
},
"read packet: {}".format(chr(3)),
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
"capture": {1: "stop_signo", 2: "stop_thread_id"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
self.assertIsNotNone(context.get("code_address"))
code_address = int(context.get("code_address"), 16)
self.reset_test_sequence()
self.add_query_memory_region_packets(code_address)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
mem_region_dict = self.parse_memory_region_packet(context)
self.assertNotIn("error", mem_region_dict)
self.assertIn("permissions", mem_region_dict)
self.assertIn("r", mem_region_dict["permissions"])
self.assertIn("x", mem_region_dict["permissions"])
self.assert_address_within_memory_region(code_address, mem_region_dict)
@skipIfWindows
def test_qMemoryRegionInfo_reports_stack_address_as_rw(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(
inferior_args=["get-stack-address-hex:", "sleep:5"]
)
self.test_sequence.add_log_lines(
[
"read packet: $c#63",
{
"type": "output_match",
"regex": self.maybe_strict_output_regex(
r"stack address: 0x([0-9a-fA-F]+)\r\n"
),
"capture": {1: "stack_address"},
},
"read packet: {}".format(chr(3)),
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
"capture": {1: "stop_signo", 2: "stop_thread_id"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
self.assertIsNotNone(context.get("stack_address"))
stack_address = int(context.get("stack_address"), 16)
self.reset_test_sequence()
self.add_query_memory_region_packets(stack_address)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
mem_region_dict = self.parse_memory_region_packet(context)
self.assertNotIn("error", mem_region_dict)
self.assertIn("permissions", mem_region_dict)
self.assertIn("r", mem_region_dict["permissions"])
self.assertIn("w", mem_region_dict["permissions"])
self.assert_address_within_memory_region(stack_address, mem_region_dict)
@skipIfWindows
def test_qMemoryRegionInfo_reports_heap_address_as_rw(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(
inferior_args=["get-heap-address-hex:", "sleep:5"]
)
self.test_sequence.add_log_lines(
[
"read packet: $c#63",
{
"type": "output_match",
"regex": self.maybe_strict_output_regex(
r"heap address: 0x([0-9a-fA-F]+)\r\n"
),
"capture": {1: "heap_address"},
},
"read packet: {}".format(chr(3)),
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
"capture": {1: "stop_signo", 2: "stop_thread_id"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
self.assertIsNotNone(context.get("heap_address"))
heap_address = int(context.get("heap_address"), 16)
self.reset_test_sequence()
self.add_query_memory_region_packets(heap_address)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
mem_region_dict = self.parse_memory_region_packet(context)
self.assertNotIn("error", mem_region_dict)
self.assertIn("permissions", mem_region_dict)
self.assertIn("r", mem_region_dict["permissions"])
self.assertIn("w", mem_region_dict["permissions"])
self.assert_address_within_memory_region(heap_address, mem_region_dict)
def breakpoint_set_and_remove_work(self, want_hardware):
procs = self.prep_debug_monitor_and_inferior(
inferior_args=[
"get-code-address-hex:hello",
"sleep:1",
"call-function:hello",
]
)
self.add_register_info_collection_packets()
self.add_process_info_collection_packets()
self.test_sequence.add_log_lines(
[
"read packet: $c#63",
{
"type": "output_match",
"regex": self.maybe_strict_output_regex(
r"code address: 0x([0-9a-fA-F]+)\r\n"
),
"capture": {1: "function_address"},
},
"read packet: {}".format(chr(3)),
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
"capture": {1: "stop_signo", 2: "stop_thread_id"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
process_info = self.parse_process_info_response(context)
endian = process_info.get("endian")
self.assertIsNotNone(endian)
reg_infos = self.parse_register_info_packets(context)
(pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos)
self.assertIsNotNone(pc_lldb_reg_index)
self.assertIsNotNone(pc_reg_info)
self.assertIsNotNone(context.get("function_address"))
function_address = int(context.get("function_address"), 16)
target_arch = self.getArchitecture()
if target_arch in ["arm", "arm64", "aarch64"]:
BREAKPOINT_KIND = 4
else:
BREAKPOINT_KIND = 1
z_packet_type = 0
if want_hardware:
z_packet_type = 1
self.reset_test_sequence()
self.add_set_breakpoint_packets(
function_address,
z_packet_type,
do_continue=True,
breakpoint_kind=BREAKPOINT_KIND,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
stop_signo = context.get("stop_signo")
self.assertIsNotNone(stop_signo)
self.assertEqual(int(stop_signo, 16), lldbutil.get_signal_number("SIGTRAP"))
self.assertEqual(len(context["O_content"]), 0)
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $p{0:x}#00".format(pc_lldb_reg_index),
{
"direction": "send",
"regex": r"^\$([0-9a-fA-F]+)#",
"capture": {1: "p_response"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
p_response = context.get("p_response")
self.assertIsNotNone(p_response)
returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned(
endian, p_response
)
self.assertEqual(returned_pc, function_address)
self.reset_test_sequence()
self.add_remove_breakpoint_packets(
function_address, z_packet_type, breakpoint_kind=BREAKPOINT_KIND
)
self.test_sequence.add_log_lines(
[
"read packet: $c#63",
{"type": "output_match", "regex": r"^hello, world\r\n$"},
{"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@skipIfWindows
def test_software_breakpoint_set_and_remove_work(self):
if self.getArchitecture() == "arm":
self.build(dictionary={"CFLAGS_EXTRAS": "-marm"})
else:
self.build()
self.set_inferior_startup_launch()
self.breakpoint_set_and_remove_work(want_hardware=False)
@skipUnlessPlatform(oslist=["linux"])
@skipIf(archs=no_match(["arm", "aarch64"]))
def test_hardware_breakpoint_set_and_remove_work(self):
if self.getArchitecture() == "arm":
self.build(dictionary={"CFLAGS_EXTRAS": "-marm"})
else:
self.build()
self.set_inferior_startup_launch()
self.breakpoint_set_and_remove_work(want_hardware=True)
def get_qSupported_dict(self, features=[]):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior()
self.add_qSupported_packets(features)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
return self.parse_qSupported_response(context)
def test_qSupported_returns_known_stub_features(self):
supported_dict = self.get_qSupported_dict()
self.assertIsNotNone(supported_dict)
self.assertGreater(len(supported_dict), 0)
def test_qSupported_auvx(self):
expected = (
"+"
if lldbplatformutil.getPlatform() in ["freebsd", "linux", "netbsd"]
else "-"
)
supported_dict = self.get_qSupported_dict()
self.assertEqual(supported_dict.get("qXfer:auxv:read", "-"), expected)
def test_qSupported_libraries_svr4(self):
expected = (
"+"
if lldbplatformutil.getPlatform() in ["freebsd", "linux", "netbsd"]
else "-"
)
supported_dict = self.get_qSupported_dict()
self.assertEqual(supported_dict.get("qXfer:libraries-svr4:read", "-"), expected)
def test_qSupported_siginfo_read(self):
expected = (
"+" if lldbplatformutil.getPlatform() in ["freebsd", "linux"] else "-"
)
supported_dict = self.get_qSupported_dict()
self.assertEqual(supported_dict.get("qXfer:siginfo:read", "-"), expected)
def test_qSupported_QPassSignals(self):
expected = (
"+"
if lldbplatformutil.getPlatform() in ["freebsd", "linux", "netbsd"]
else "-"
)
supported_dict = self.get_qSupported_dict()
self.assertEqual(supported_dict.get("QPassSignals", "-"), expected)
@add_test_categories(["fork"])
def test_qSupported_fork_events(self):
supported_dict = self.get_qSupported_dict(["multiprocess+", "fork-events+"])
self.assertEqual(supported_dict.get("multiprocess", "-"), "+")
self.assertEqual(supported_dict.get("fork-events", "-"), "+")
self.assertEqual(supported_dict.get("vfork-events", "-"), "-")
@add_test_categories(["fork"])
def test_qSupported_fork_events_without_multiprocess(self):
supported_dict = self.get_qSupported_dict(["fork-events+"])
self.assertEqual(supported_dict.get("multiprocess", "-"), "-")
self.assertEqual(supported_dict.get("fork-events", "-"), "-")
self.assertEqual(supported_dict.get("vfork-events", "-"), "-")
@add_test_categories(["fork"])
def test_qSupported_vfork_events(self):
supported_dict = self.get_qSupported_dict(["multiprocess+", "vfork-events+"])
self.assertEqual(supported_dict.get("multiprocess", "-"), "+")
self.assertEqual(supported_dict.get("fork-events", "-"), "-")
self.assertEqual(supported_dict.get("vfork-events", "-"), "+")
@add_test_categories(["fork"])
def test_qSupported_vfork_events_without_multiprocess(self):
supported_dict = self.get_qSupported_dict(["vfork-events+"])
self.assertEqual(supported_dict.get("multiprocess", "-"), "-")
self.assertEqual(supported_dict.get("fork-events", "-"), "-")
self.assertEqual(supported_dict.get("vfork-events", "-"), "-")
@skipIfRemote
def test_qSupported_memory_tagging(self):
supported_dict = self.get_qSupported_dict()
self.assertEqual(
supported_dict.get("memory-tagging", "-"),
"+" if self.isAArch64MTE() else "-",
)
@skipIfWindows
def test_written_M_content_reads_back_correctly(self):
self.build()
self.set_inferior_startup_launch()
TEST_MESSAGE = "Hello, memory"
procs = self.prep_debug_monitor_and_inferior(
inferior_args=[
"set-message:xxxxxxxxxxxxxX",
"get-data-address-hex:g_message",
"sleep:1",
"print-message:",
]
)
self.test_sequence.add_log_lines(
[
"read packet: $c#63",
{
"type": "output_match",
"regex": self.maybe_strict_output_regex(
r"data address: 0x([0-9a-fA-F]+)\r\n"
),
"capture": {1: "message_address"},
},
"read packet: {}".format(chr(3)),
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
"capture": {1: "stop_signo", 2: "stop_thread_id"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
self.assertIsNotNone(context.get("message_address"))
message_address = int(context.get("message_address"), 16)
hex_encoded_message = seven.hexlify(TEST_MESSAGE)
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $M{0:x},{1:x}:{2}#00".format(
message_address, len(TEST_MESSAGE), hex_encoded_message
),
"send packet: $OK#00",
"read packet: $m{0:x},{1:x}#00".format(
message_address, len(TEST_MESSAGE)
),
"send packet: ${0}#00".format(hex_encoded_message),
"read packet: $x{0:x},{1:x}#00".format(
message_address, len(TEST_MESSAGE)
),
"send packet: ${0}#00".format(TEST_MESSAGE),
"read packet: $m{0:x},4#00".format(message_address),
"send packet: ${0}#00".format(hex_encoded_message[0:8]),
"read packet: $x{0:x},4#00".format(message_address),
"send packet: ${0}#00".format(TEST_MESSAGE[0:4]),
"read packet: $c#63",
{
"type": "output_match",
"regex": r"^message: (.+)\r\n$",
"capture": {1: "printed_message"},
},
"send packet: $W00#00",
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
printed_message = context.get("printed_message")
self.assertIsNotNone(printed_message)
self.assertEqual(printed_message, TEST_MESSAGE + "X")
def test_P_writes_all_gpr_registers(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"])
self.add_register_info_collection_packets()
self.add_process_info_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
reg_infos = self.parse_register_info_packets(context)
self.assertIsNotNone(reg_infos)
self.add_lldb_register_index(reg_infos)
process_info = self.parse_process_info_response(context)
endian = process_info.get("endian")
self.assertIsNotNone(endian)
gpr_reg_infos = [
reg_info
for reg_info in reg_infos
if self.is_bit_flippable_register(reg_info)
]
self.assertGreater(len(gpr_reg_infos), 0)
(successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value(
gpr_reg_infos, endian
)
self.trace(
"successful writes: {}, failed writes: {}".format(
successful_writes, failed_writes
)
)
self.assertGreater(successful_writes, 0)
@skipIfWindows
def test_P_and_p_thread_suffix_work(self):
self.build()
self.set_inferior_startup_launch()
_, threads = self.launch_with_threads(3)
self.reset_test_sequence()
self.add_thread_suffix_request_packets()
self.add_register_info_collection_packets()
self.add_process_info_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
endian = process_info.get("endian")
self.assertIsNotNone(endian)
reg_infos = self.parse_register_info_packets(context)
self.assertIsNotNone(reg_infos)
self.add_lldb_register_index(reg_infos)
reg_index = self.select_modifiable_register(reg_infos)
self.assertIsNotNone(reg_index)
reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8
self.assertGreater(reg_byte_size, 0)
expected_reg_values = []
register_increment = 1
next_value = None
for thread in threads:
if not next_value:
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $p{0:x};thread:{1:x}#00".format(
reg_index, thread
),
{
"direction": "send",
"regex": r"^\$([0-9a-fA-F]+)#",
"capture": {1: "p_response"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
p_response = context.get("p_response")
self.assertIsNotNone(p_response)
next_value = lldbgdbserverutils.unpack_register_hex_unsigned(
endian, p_response
)
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $P{0:x}={1};thread:{2:x}#00".format(
reg_index,
lldbgdbserverutils.pack_register_hex(
endian, next_value, byte_size=reg_byte_size
),
thread,
),
"send packet: $OK#00",
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
expected_reg_values.append(next_value)
next_value += register_increment
thread_index = 0
for thread in threads:
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
{
"direction": "send",
"regex": r"^\$([0-9a-fA-F]+)#",
"capture": {1: "p_response"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
p_response = context.get("p_response")
self.assertIsNotNone(p_response)
read_value = lldbgdbserverutils.unpack_register_hex_unsigned(
endian, p_response
)
self.assertEqual(read_value, expected_reg_values[thread_index])
thread_index += 1
@skipUnlessPlatform(oslist=["freebsd", "linux"])
@add_test_categories(["llgs"])
def test_qXfer_siginfo_read(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(
inferior_args=["thread:segfault", "thread:new", "sleep:10"]
)
self.test_sequence.add_log_lines(["read packet: $c#63"], True)
self.expect_gdbremote_sequence()
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
"capture": {1: "signo", 2: "thread_id"},
}
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
self.assertEqual(
int(context["signo"], 16), lldbutil.get_signal_number("SIGSEGV")
)
crashing_thread = int(context["thread_id"], 16)
self.reset_test_sequence()
self.add_process_info_collection_packets()
self.test_sequence.add_log_lines(
[
"read packet: $Hg{:x}#00".format(crashing_thread),
"send packet: $OK#00",
"read packet: $qXfer:siginfo:read::0,80:#00",
{
"direction": "send",
"regex": re.compile(
r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re.MULTILINE | re.DOTALL
),
"capture": {1: "response_type", 2: "content_raw"},
},
],
True,
)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
self.assertEqual(context.get("response_type"), "l")
content_raw = context.get("content_raw")
self.assertIsNotNone(content_raw)
content = self.decode_gdbremote_binary(content_raw).encode("latin1")
process_info = self.parse_process_info_response(context)
pad = ""
if process_info["ptrsize"] == "8":
pad = "i"
signo_idx = 0
errno_idx = 1
code_idx = 2
addr_idx = -1
SEGV_MAPERR = 1
if process_info["ostype"] == "linux":
format_str = "iii{}P".format(pad)
elif process_info["ostype"].startswith("freebsd"):
format_str = "iiiiiiP"
elif process_info["ostype"].startswith("netbsd"):
format_str = "iii{}P".format(pad)
errno_idx = 2
code_idx = 1
else:
assert False, "unknown ostype"
decoder = struct.Struct(format_str)
decoded = decoder.unpack(content[: decoder.size])
self.assertEqual(decoded[signo_idx], lldbutil.get_signal_number("SIGSEGV"))
self.assertEqual(decoded[errno_idx], 0)
self.assertEqual(decoded[code_idx], SEGV_MAPERR)
self.assertEqual(decoded[addr_idx], 0)