"""
Test SBDebugger.InterruptRequested and SBCommandInterpreter.WasInterrupted.
"""
import lldb
import lldbsuite.test.lldbutil as lldbutil
from lldbsuite.test.lldbtest import *
import threading
import os
class TestDebuggerInterruption(TestBase):
"""This test runs a command that starts up, rendevous with the test thread
using threading barriers, then checks whether it has been interrupted.
The command's first argument is either 'interp' or 'debugger', to test
InterruptRequested and WasInterrupted respectively.
The command has two modes, interrupt and check, the former is the one that
waits for an interrupt. Then latter just returns whether an interrupt was
requested. We use the latter to make sure we took down the flag correctly."""
NO_DEBUG_INFO_TESTCASE = True
class CommandRunner(threading.Thread):
"""This class is for running a command, and for making a thread to run the command on.
It gets passed the test it is working on behalf of, and most of the important
objects come from the test."""
def __init__(self, test):
super().__init__()
self.test = test
def rendevous(self):
import interruptible
interruptible.local_data = interruptible.BarrierContainer(
self.test.before_interrupt_barrier,
self.test.after_interrupt_barrier,
self.test.event,
)
class DirectCommandRunner(CommandRunner):
""" "This version runs a single command using HandleCommand."""
def __init__(self, test, command):
super().__init__(test)
self.command = command
def run(self):
self.rendevous()
result = self.test.dbg.GetCommandInterpreter().HandleCommand(
self.command, self.test.result
)
if self.test.result_barrier:
self.test.result_barrier.wait()
class CommandInterpreterRunner(CommandRunner):
"""This version runs the CommandInterpreter and feeds the command to it."""
def __init__(self, test):
super().__init__(test)
def run(self):
self.rendevous()
test = self.test
with open(test.getBuildArtifact(test.in_filename), "w") as f:
f.write(f"{test.command}\n")
with open(test.out_filename, "w") as outf, open(
test.in_filename, "r"
) as inf:
outsbf = lldb.SBFile(outf.fileno(), "w", False)
orig_outf = test.dbg.GetOutputFile()
error = test.dbg.SetOutputFile(outsbf)
test.assertSuccess(error, "Could not set outfile")
insbf = lldb.SBFile(inf.fileno(), "r", False)
orig_inf = test.dbg.GetOutputFile()
error = test.dbg.SetInputFile(insbf)
test.assertSuccess(error, "Could not set infile")
options = lldb.SBCommandInterpreterRunOptions()
options.SetPrintResults(True)
options.SetEchoCommands(False)
test.dbg.RunCommandInterpreter(True, False, options, 0, False, False)
test.dbg.GetOutputFile().Flush()
error = test.dbg.SetOutputFile(orig_outf)
test.assertSuccess(error, "Restored outfile")
test.dbg.SetInputFile(orig_inf)
test.assertSuccess(error, "Restored infile")
def command_setup(self, args):
"""Insert our command, if needed. Then set up event and barriers if needed.
Then return the command to run."""
self.interp = self.dbg.GetCommandInterpreter()
self.command_name = "interruptible_command"
self.cmd_result = lldb.SBCommandReturnObject()
if not "check" in args:
self.event = threading.Event()
self.result_barrier = threading.Barrier(2, timeout=10)
self.before_interrupt_barrier = threading.Barrier(2, timeout=10)
self.after_interrupt_barrier = threading.Barrier(2, timeout=10)
else:
self.event = None
self.result_barrier = None
self.before_interrupt_barrier = None
self.after_interrupt_barrier = None
if not self.interp.UserCommandExists(self.command_name):
cmd_filename = "interruptible"
cmd_filename = os.path.join(self.getSourceDir(), "interruptible.py")
self.runCmd(f"command script import {cmd_filename}")
cmd_string = f"command script add {self.command_name} --class interruptible.WelcomeCommand"
self.runCmd(cmd_string)
if len(args) == 0:
command = self.command_name
else:
command = self.command_name + " " + args
return command
def run_single_command(self, command):
self.result.Clear()
self.runner = TestDebuggerInterruption.DirectCommandRunner(self, command)
self.runner.start()
def start_command_interp(self):
self.runner = TestDebuggerInterruption.CommandInterpreterRunner(self)
self.runner.start()
def check_text(self, result_text, interrupted):
if interrupted:
self.assertIn(
"Command was interrupted", result_text, "Got the interrupted message"
)
else:
self.assertIn(
"Command was not interrupted",
result_text,
"Got the not interrupted message",
)
def gather_output(self):
self.runner.join(10.0)
finished = not self.runner.is_alive()
if not finished:
self.event.set()
self.runner.join(10.0)
self.assertTrue(finished, "We did finish the command")
def check_result(self, interrupted=True):
self.gather_output()
self.check_text(self.result.GetOutput(), interrupted)
def check_result_output(self, interrupted=True):
self.gather_output()
buffer = ""
with open(self.out_filename, "r") as f:
buffer = f.read()
self.assertNotEqual(len(buffer), 0, "No command data")
self.check_text(buffer, interrupted)
def debugger_interrupt_test(self, use_interrupt_requested):
"""Test that debugger interruption interrupts a command
running directly through HandleCommand.
If use_interrupt_requested is true, we'll check that API,
otherwise we'll check WasInterrupted. They should both do
the same thing."""
if use_interrupt_requested:
command = self.command_setup("debugger")
else:
command = self.command_setup("interp")
self.result = lldb.SBCommandReturnObject()
self.run_single_command(command)
self.before_interrupt_barrier.wait()
self.dbg.RequestInterrupt()
self.dbg.RequestInterrupt()
def cleanup():
self.dbg.CancelInterruptRequest()
self.addTearDownHook(cleanup)
self.after_interrupt_barrier.wait()
self.result_barrier.wait()
self.assertTrue(self.result.Succeeded(), "Our command succeeded")
result_output = self.result.GetOutput()
self.check_result(True)
self.dbg.CancelInterruptRequest()
command = self.command_setup("debugger")
self.run_single_command(command)
self.result_barrier.wait()
self.assertFalse(self.result.Succeeded(), "Our command was not allowed to run")
error_output = self.result.GetError()
self.assertIn(
"... Interrupted", error_output, "Command was cut short by interrupt"
)
self.dbg.CancelInterruptRequest()
command = self.command_setup("debugger check")
self.run_single_command(command)
result_output = self.result.GetOutput()
self.check_result(False)
def test_debugger_interrupt_use_dbg(self):
self.debugger_interrupt_test(True)
def test_debugger_interrupt_use_interp(self):
self.debugger_interrupt_test(False)
def test_interp_doesnt_interrupt_debugger(self):
"""Test that interpreter interruption does not interrupt a command
running directly through HandleCommand.
If use_interrupt_requested is true, we'll check that API,
otherwise we'll check WasInterrupted. They should both do
the same thing."""
command = self.command_setup("debugger poll")
self.result = lldb.SBCommandReturnObject()
self.run_single_command(command)
self.before_interrupt_barrier.wait()
self.dbg.GetCommandInterpreter().InterruptCommand()
self.after_interrupt_barrier.wait()
self.result_barrier.wait()
self.assertTrue(self.result.Succeeded(), "Our command succeeded")
result_output = self.result.GetOutput()
self.check_result(False)
def interruptible_command_test(self, use_interrupt_requested):
"""Test that interpreter interruption interrupts a command
running in the RunCommandInterpreter loop.
If use_interrupt_requested is true, we'll check that API,
otherwise we'll check WasInterrupted. They should both do
the same thing."""
self.out_filename = self.getBuildArtifact("output")
self.in_filename = self.getBuildArtifact("input")
if os.path.exists(self.out_filename):
os.unlink(self.out_filename)
if use_interrupt_requested:
self.command = self.command_setup("debugger") + "\n"
else:
self.command = self.command_setup("interp") + "\n"
self.start_command_interp()
self.before_interrupt_barrier.wait()
sent_interrupt = self.dbg.GetCommandInterpreter().InterruptCommand()
self.assertTrue(sent_interrupt, "Did send command interrupt.")
self.after_interrupt_barrier.wait()
self.check_result_output(True)
os.unlink(self.out_filename)
self.command = self.command_setup("interp check") + "\n"
self.start_command_interp()
self.check_result_output(False)
def test_interruptible_command_check_dbg(self):
self.interruptible_command_test(True)
def test_interruptible_command_check_interp(self):
self.interruptible_command_test(False)
def test_debugger_doesnt_interrupt_command(self):
"""Test that debugger interruption doesn't interrupt a command
running in the RunCommandInterpreter loop."""
self.out_filename = self.getBuildArtifact("output")
self.in_filename = self.getBuildArtifact("input")
if os.path.exists(self.out_filename):
os.unlink(self.out_filename)
self.command = self.command_setup("interp poll") + "\n"
self.start_command_interp()
self.before_interrupt_barrier.wait()
self.dbg.RequestInterrupt()
def cleanup():
self.dbg.CancelInterruptRequest()
self.addTearDownHook(cleanup)
self.after_interrupt_barrier.wait()
self.check_result_output(False)
os.unlink(self.out_filename)