############################################################################
# tools/pynuttx/tests/test_runtime_thread.py
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
# ASF licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
############################################################################
import unittest
import gdb
from nxgdb import utils
# The following test cases require running the program as
# we need to access the memory of the program
class TestThread(unittest.TestCase):
@classmethod
def setUpClass(cls):
pass
def test_info_threads(self):
gdb.execute("info nxthreads")
def test_thread(self):
gdb.execute("nxthread")
def test_thread_bad_args(self):
out = gdb.execute("nxthread apply", to_string=True)
self.assertEqual(out, "Please specify a thread ID list\n")
out = gdb.execute("nxthread apply bad", to_string=True)
self.assertEqual(out, "Please specify a command following the thread ID list\n")
out = gdb.execute("nxthread apply badid0 cmd", to_string=True)
self.assertEqual(out, "Please specify a thread ID list and command\n")
out = gdb.execute("nxthread badid", to_string=True)
self.assertEqual(out, "Invalid thread id badid\n")
out = gdb.execute("nxthread 256", to_string=True)
self.assertEqual(out, "Invalid thread id 256\n")
def test_thread_apply_all(self):
out = gdb.execute("nxthread apply all bt", to_string=True)
self.assertTrue("#0" in out and "Thread" in out, msg=f"Got: {out}")
def test_thread_apply_with_ids(self):
out = gdb.execute("nxthread apply 0 bt", to_string=True)
self.assertTrue("#0" in out and "Thread 0" in out, msg=f"Got: {out}")
out = gdb.execute("nxthread apply 0 1 info reg", to_string=True)
# info reg has format like
# reg xxx xxx
self.assertGreaterEqual(len(out.split("\n")[0].split()), 2, msg=f"Got: {out}")
def test_thread_with_id(self):
# This command suppose to switch the stack frames
# make sure we have at least two threads
if gdb.parse_and_eval("g_npidhash") < 2:
out = gdb.execute("nxthread 0", to_string=True)
self.assertEqual(out, "")
return
# Get the current running thread first, after testing we should switch back to it
# and then continue, otherwise, we might fail on an assertion. For example, we will
# fail to continue running a task wich is waiting for a mutex if the current running
# task was actually the idle task.
cur_thread_id = gdb.parse_and_eval(
f"g_running_tasks[{gdb.selected_thread().num - 1}]"
)["pid"]
gdb.execute("nxthread 0")
gdb.execute("frame 0")
cur_sp = utils.get_sp()
sps = []
for i in range(gdb.parse_and_eval("g_npidhash")):
gdb.execute(f"nxthread {i}")
# switch frame here
gdb.execute("frame 0")
new_sp = utils.get_sp()
sps.append(new_sp)
# We should have some different stack pointers
self.assertFalse(all([int(sp) == int(cur_sp) for sp in sps]))
gdb.execute(f"nxthread {cur_thread_id}")