"""PTS test case python implementation"""
import shlex
import os
import subprocess
import re
import sys
import time
import logging
from threading import Thread
import Queue
from utils import exec_iut_cmd
import ptstypes
log = logging.debug
class MmiParser(object):
""""Interface to parsing arguments from description of MMI
It is assumed that all arguments in description are enclosed in single
quotes.
"""
min_arg = 1
max_arg = 10
arg_name_prefix = "arg_"
arg_value_prefix = "MMI_arg_value_"
def __init__(self):
"""Constructor of the parser"""
self.pattern = re.compile(r"(?:'|=\s+)([0-9-xA-Fa-f]+)")
self.args = []
for i in range(self.min_arg, self.max_arg):
index = str(i)
mmi_arg_name = self.arg_name_prefix + index
mmi_arg_value = self.arg_value_prefix + index
setattr(self, mmi_arg_name, mmi_arg_value)
def parse_description(self, description):
"""Parse PTS MMI description text for argument values.
It is necessary to do it for now, but in future PTS will provide API to
get the values
An example of MMI that requires parsing is listed below. For that MMI
00D3 should be converted to hexadecimal 0xD3 and size to int 45
project_name: GATT
wid: 69
test_case_name: TC_GAC_CL_BV_01_C
Please send prepare write request with handle = '00D3'O and size = '45'
to the PTS.
Description: Verify that the Implementation Under Test (IUT) can send
data according to negotiate MTU size."
"""
log("%s %r", self.parse_description.__name__, description)
self.args = self.pattern.findall(description)
log("Parse result: %r", self.args)
def reset(self):
"""Resets the args
To be used when parsed values are not needed anymore
"""
self.args = []
def process_args(self, args):
"""Replaces the MMI keywords arguments (e.g. MMI.arg_1) with the
respective argument values from MMI description
"""
log("%s: %s", self.process_args.__name__, args)
log("MMI.args now %r", self.args)
args_list = list(args)
for arg_index, arg in enumerate(args):
if not isinstance(arg, basestring):
continue
if arg.startswith(MMI.arg_value_prefix):
mmi_index = int(arg[arg.rfind("_") + 1:])
args_list[arg_index] = self.args[mmi_index - 1]
out_args = tuple(args_list)
log("returning %r", out_args)
return out_args
MMI = MmiParser()
class TestCmd:
"""A command ran in IUT during test case execution"""
def __init__(self, command, start_wid=None, stop_wid=None):
"""Constructor
stop_wid -- some test cases require the child process (this test
command) to be termintated (Ctrl-C on terminal) in response
to dialog with this wid
"""
self.command = command
self.start_wid = start_wid
self.stop_wid = stop_wid
self.process = None
self.__started = False
def start(self):
"""Starts the command"""
if self.__started:
return
self.__started = True
log("starting child process %s" % self)
self.process = exec_iut_cmd(self.command)
def stop(self):
"""Stops the command"""
if not self.__started:
return
log("stopping child process %s" % self)
self.process.kill()
def __str__(self):
"""Returns string representation"""
return "%s %s %s" % (self.command, self.start_wid, self.stop_wid)
class TestFunc:
"""A wrapper around test functions"""
def __init__(self, func, *args, **kwds):
"""Constructor of TestFunc
MMI.arg_X -- Passing these keywords in args would enable parsing the
description of PTS MMI for values. Each of the keywords
has the value of the description. For example: for
description "Please send prepare write request with handle
= '00D3'O and size = '45' to the PTS" MMI.arg_1 will have
the value 0xD3 and MMI.arg_2 will have the value 45
start_wid -- wid to start TestFunc
stop_wid -- not used by TestFunc, because function stopping is not easy
to implement. Provided only for compatibility with TestCmd.
post_wid -- start TestFunc on the next MMI after MMI with this wid
skip_call -- a tuple of integers of func call numbers to skip.
Starting from one so first call is 1.
start_wid and stop_wid must be passed in as keyword arguments. This is
because all other arguments will be passed to the func. For example:
TestFunc(my_function, arg1, arg2, kwd1=5, start_wid=117)
"""
self.func = func
self.__set_attrs(kwds)
self.args = args
self.kwds = kwds
self.call_count = 0
self.desc_parsing_needed = False
for arg in args:
if isinstance(arg, str) and arg.startswith(MMI.arg_value_prefix):
self.desc_parsing_needed = True
break
def __set_attrs(self, kwds):
"""Read attributes from arbitrary keyword argument dictionary.
Attributes are not specified in the constructor as normal arguments
cause they are not always used and when not used they would be
consuming function (func) arguments (args).
These attributes are used by this class and not passed to the func,
hence they are removed from kwds.
Note: with test functions stop_wid is only there to be compatible with
TestCmd interface. But since functions cannot be stopped, stop_wid is
useless.
kwds -- arbitrary keyword argument dictionary
"""
attr_names = ["start_wid", "stop_wid", "post_wid", "skip_call"]
for attr_name in attr_names:
if attr_name in kwds:
attr_value = kwds.pop(attr_name)
else:
attr_value = None
setattr(self, attr_name, attr_value)
def start(self):
"""Starts the function"""
self.call_count += 1
log("Starting test function: %s" % str(self))
if isinstance(self.skip_call, tuple):
if self.call_count in self.skip_call:
log("Skipping starting test function")
return
if self.desc_parsing_needed:
args = MMI.process_args(self.args)
else:
args = self.args
log("Test function parameters: args=%r, kwds=%r", args, self.kwds)
self.func(*args, **self.kwds)
def stop(self):
"""Does nothing, since not easy job to stop a function"""
pass
def __str__(self):
"""Returns string representation"""
return ("class=%s, func=%s start_wid=%s stop_wid=%s post_wid=%s "
"skip_call=%s call_count=%s args=%s kwds=%s" %
(self.__class__, self.func, self.start_wid, self.stop_wid,
self.post_wid, self.skip_call, self.call_count, self.args,
self.kwds))
class TestFuncCleanUp(TestFunc):
"""Clean-up function that is invoked after running test case in PTS."""
pass
def is_cleanup_func(func):
"""'Retruns True if func is an in an instance of TestFuncCleanUp"""
return isinstance(func, TestFuncCleanUp)
class AbstractMethodException(Exception):
"""Exception raised if an abstract method is called."""
pass
class PTSCallback(object):
"""Base class for PTS callback implementors"""
def __init__(self):
pass
def log(self, log_type, logtype_string, log_time, log_message):
"""Implements:
interface IPTSControlClientLogger : IUnknown {
HRESULT _stdcall Log(
[in] _PTS_LOGTYPE logType,
[in] LPWSTR szLogType,
[in] LPWSTR szTime,
[in] LPWSTR pszMessage);
};
"""
raise AbstractMethodException()
def on_implicit_send(self, project_name, wid, test_case_name, description,
style):
"""Implements:
interface IPTSImplicitSendCallbackEx : IUnknown {
HRESULT _stdcall OnImplicitSend(
[in] LPWSTR pszProjectName,
[in] unsigned short wID,
[in] LPWSTR pszTestCase,
[in] LPWSTR pszDescription,
[in] unsigned long style,
[in, out] LPWSTR pszResponse,
[in] unsigned long responseSize,
[in, out] long* pbResponseIsPresent);
};
return -- response as a python string
"""
raise AbstractMethodException()
class TestCase(PTSCallback):
"""A PTS test case"""
def copy(self):
"""Copy constructor"""
return TestCase(self.project_name, self.name, self.cmds,
self.ptsproject_name, self.no_wid, self.edit1_wids,
self.verify_wids, self.ok_cancel_wids,
self.generic_wid_hdl)
def __init__(self, project_name, test_case_name, cmds=[],
ptsproject_name=None, no_wid=None, edit1_wids=None,
verify_wids=None, ok_cancel_wids=None,
generic_wid_hdl=None):
"""TestCase constructor
cmds -- a list of TestCmd and TestFunc or single instance of them
no_wid -- a wid (tag) to respond No to
edit1_wids -- A dictionary of wids as keys and string or callable as
values. The string value or the string returned from the
callable value is send to PTS in response to
MMI_Style_Edit1 style prompts with matching wid.
verify_wids -- A dictionary of wids as keys and a tuple of strings as
values or a callable as value. The strings are used with
MMI_Style_Yes_No1 to confirm/verify that the MMI
description contains all of the strings in the tuple.
All the case-based characters are uppercased before
verification to avoid fake verification errors.
If the value is callable it will be passed PTS MMI
description as a parameter. It is expected to return
boolean True for the Yes and False for the No response
of MMI_Style_Yes_No1.
ok_cancel_wids -- A dictionary of wids as keys and bool or callable as
values. The bool value or the bool returned from the
callable value is converted to OK, Cancel and send to
PTS in response to MMI_Style_Ok_Cancel1 and
MMI_Style_Ok_Cancel2 style prompts with matching wid.
generic_wid_hdl -- A instance of general wid handler used for every wid
that came to test case.
"""
self.project_name = project_name
self.name = test_case_name
self.status = "init"
self.state = None
if isinstance(cmds, list):
self.cmds = list(cmds)
else:
self.cmds = [cmds]
if no_wid:
assert isinstance(no_wid, int), \
"no_wid should be int, and not %r" % no_wid
if edit1_wids:
assert isinstance(edit1_wids, dict), \
"edit1_wids should be dict, and not %r" % edit1_wids
if verify_wids:
assert isinstance(verify_wids, dict), \
"verify_wids should be dict, and not %r" % verify_wids
if ok_cancel_wids:
assert isinstance(ok_cancel_wids, dict), \
"ok_cancel_wids should be dict, and not %r" % ok_cancel_wids
self.no_wid = no_wid
self.edit1_wids = edit1_wids
self.verify_wids = verify_wids
self.ok_cancel_wids = ok_cancel_wids
self.generic_wid_hdl = generic_wid_hdl
self.post_wid_queue = []
self.post_wid_thread = None
self.thread_exception = Queue.Queue()
self.ptsproject_name = ptsproject_name
self.tc_subproc = None
self.lf_subproc = None
def __str__(self):
"""Returns string representation"""
return "%s %s" % (self.project_name, self.name)
def log(self, log_type, logtype_string, log_time, log_message):
"""Overrides PTSCallback method. Handles
PTSControl.IPTSControlClientLogger.Log"""
new_status = None
if log_type == ptstypes.PTS_LOGTYPE_START_TEST:
new_status = "Started"
elif log_type == ptstypes.PTS_LOGTYPE_FINAL_VERDICT and \
logtype_string.lower() == "final verdict":
if "PASS" in log_message:
new_status = "PASS"
elif "INCONC" in log_message:
new_status = "INCONC"
elif "FAIL" in log_message:
new_status = "FAIL"
else:
new_status = "UNKNOWN VERDICT: %s" % log_message.strip()
if new_status:
self.status = new_status
log("New status %s - %s", str(self), new_status)
def handle_mmi_style_yes_no1(self, wid, description):
"""Implements implicit send handling for MMI_Style_Yes_No1"""
log("%s, %r %r", self.handle_mmi_style_yes_no1.__name__,
wid, description)
log("no_wid=%r verify_wids=%r", self.no_wid, self.verify_wids)
yes_response = "Yes"
no_response = "No"
my_response = ""
bool2rsp = {True: yes_response, False: no_response}
if self.no_wid and wid == self.no_wid:
my_response = no_response
elif self.verify_wids and wid in self.verify_wids:
log("Starting verification of: %r", self.verify_wids)
data = self.verify_wids[wid]
if callable(data):
bool_rsp = data(description)
my_response = bool2rsp[bool_rsp]
elif isinstance(data, tuple) and callable(data[0]):
bool_rsp = data[0](description, *data[1:])
my_response = bool2rsp[bool_rsp]
else:
for verify in self.verify_wids[wid]:
log("Verifying: %r", verify)
if isinstance(verify, list):
for x in verify:
if x.upper() not in description.upper():
my_response = no_response
log("%r not found, skipping...", x)
break
else:
my_response = yes_response
if my_response is yes_response:
break
else:
if verify.upper() not in description.upper():
my_response = no_response
break
else:
my_response = yes_response
if my_response is yes_response:
log("All verifications passed")
else:
log("Verification failed: not in description")
else:
my_response = yes_response
if not self.verify_wids or wid not in self.verify_wids:
search_strings = ["confirm", "verify"]
if any(str in description.lower() for str in search_strings):
logging.warning("Verification missing for: %r", description)
return my_response
def handle_mmi_style_edit1(self, wid, description):
"""Implements implicit send handling for MMI_Style_Edit1"""
log("%s, %r edit1_wids=%r", self.handle_mmi_style_edit1.__name__, wid,
self.edit1_wids)
my_response = ""
if self.edit1_wids and wid in self.edit1_wids.keys():
response = self.edit1_wids[wid]
if callable(response):
my_response = response(description)
elif isinstance(response, tuple) and callable(response[0]):
my_response = response[0](description, *response[1:])
else:
my_response = response
return my_response
def handle_mmi_style_ok_cancel(self, wid, description):
"""Implements implicit send handling for MMI_Style_Ok_Cancel1 and
MMI_Style_Ok_Cancel2"""
log("%s, %r ok_cancel_wids=%r",
self.handle_mmi_style_ok_cancel.__name__, wid, self.ok_cancel_wids)
my_response = ""
if self.ok_cancel_wids and wid in self.ok_cancel_wids.keys():
response = self.ok_cancel_wids[wid]
if callable(response):
my_response = response(description)
elif isinstance(response, tuple) and callable(response[0]):
my_response = response[0](description, *response[1:])
else:
my_response = response
else:
my_response = True
response = {True: "OK", False: "Cancel"}[my_response]
return response
def start_stop_cmds_by_wid(self, wid, description):
"""Starts/stops commands
The commands started/stopped are the ones that have the same start_wid
or stop_wid as the argument
"""
for cmd in self.cmds:
if cmd.post_wid == wid:
self.post_wid_queue.append(cmd)
if cmd.start_wid == wid:
if cmd.desc_parsing_needed:
MMI.parse_description(description)
cmd.start()
if cmd.desc_parsing_needed:
MMI.reset()
if cmd.stop_wid == wid:
cmd.stop()
def run_post_wid_cmds(self):
"""Run post wid commands in a thread"""
log("%s %s", self, self.run_post_wid_cmds.__name__)
for index, cmd in enumerate(self.post_wid_queue):
log("%d) %s", index, cmd)
for cmd in self.post_wid_queue:
try:
cmd.start()
except Exception as e:
self.thread_exception.put(sys.exc_info()[1])
log("Caught exception in post_wid_thread %r", e)
break
del self.post_wid_queue[:]
def join_post_wid_thread(self):
"""Join post_wid_thread. Re-raise exceptions it discovered."""
log("%s %s", self, self.join_post_wid_thread.__name__)
log("self.post_wid_thread %r", self.post_wid_thread)
if self.post_wid_thread:
log("self.post_wid_thread.is_alive() %r",
self.post_wid_thread.is_alive())
try:
exc = self.thread_exception.get_nowait()
except Queue.Empty:
pass
else:
log("Re-raising exception sent from thread %r", exc)
self.thread_exception.task_done()
raise exc
if self.post_wid_thread and self.post_wid_thread.is_alive():
log("Waiting post wid thread to finish...")
self.post_wid_thread.join()
def handle_mmi_generic(self, wid, description, style, test_case_name):
response = self.generic_wid_hdl(wid, description, test_case_name)
if response == "WAIT":
return response
if style == ptstypes.MMI_Style_Edit1 \
or style == ptstypes.MMI_Style_Edit2:
return str(response)
if style == ptstypes.MMI_Style_Ok_Cancel1 \
or style == ptstypes.MMI_Style_Ok_Cancel2:
return "OK" if response else "Cancel"
if style == ptstypes.MMI_Style_Yes_No1:
return "Yes" if response else "No"
if style == ptstypes.MMI_Style_Yes_No_Cancel1:
if response is None:
return "Cancel"
elif response:
return "Yes"
else:
return "No"
if style == ptstypes.MMI_Style_Ok:
return "Ok"
if style == ptstypes.MMI_Style_Abort_Retry1:
return "Retry" if response else "Abort"
def on_implicit_send(self, project_name, wid, test_case_name, description,
style):
"""Overrides PTSCallback method. Handles
PTSControl.IPTSImplicitSendCallbackEx.OnImplicitSend"""
log("%s %s", self, self.on_implicit_send.__name__)
self.join_post_wid_thread()
my_response = ""
self.start_stop_cmds_by_wid(wid, description)
if self.generic_wid_hdl is not None:
my_response = self.handle_mmi_generic(wid, description, style,
test_case_name)
else:
if style == ptstypes.MMI_Style_Yes_No1:
my_response = self.handle_mmi_style_yes_no1(wid, description)
elif style == ptstypes.MMI_Style_Edit1:
my_response = self.handle_mmi_style_edit1(wid, description)
else:
my_response = self.handle_mmi_style_ok_cancel(wid, description)
if len(self.post_wid_queue):
log("Running post_wid test functions")
self.post_wid_thread = Thread(None, self.run_post_wid_cmds)
self.post_wid_thread.start()
log("Sending response %r", my_response)
return my_response
def pre_run(self):
"""Method called before test case is run in PTS"""
log("%s %s %s" % (self.pre_run.__name__, self.project_name, self.name))
log("About to run test case %s %s with commands:" %
(self.project_name, self.name))
for index, cmd in enumerate(self.cmds):
log("%d) %s", index, cmd)
subproc_dir = (os.path.dirname(os.path.realpath(__file__)) + "/" +
self.ptsproject_name + "/")
subproc_path = subproc_dir + "pre_tc.py"
if os.path.exists(subproc_path):
log("%s, run pre test case script" % self.post_run.__name__)
self.lf_subproc = open(subproc_dir + "sp_pre_stdout.log", "w")
subproc_cmd = " ".join(
[subproc_path, self.project_name, self.name])
self.tc_subproc = subprocess.Popen(shlex.split(subproc_cmd),
shell=False,
stdin=subprocess.PIPE,
stdout=self.lf_subproc,
stderr=self.lf_subproc)
for cmd in self.cmds:
if cmd.start_wid is None and cmd.post_wid is None and \
not is_cleanup_func(cmd):
cmd.start()
def post_run(self, error_code):
"""Method called after test case is run in PTS
error_code -- String code of an error that occured during test run
"""
log("%s %s %s %s" % (self.post_run.__name__, self.project_name,
self.name, error_code))
if error_code in ptstypes.PTSCONTROL_E_STRING.values():
self.status = error_code
elif error_code:
raise Exception("Unknown error code %r!" % error_code)
for cmd in self.cmds:
if is_cleanup_func(cmd):
cmd.start()
time.sleep(3)
for cmd in self.cmds:
cmd.stop()
if self.tc_subproc is not None:
log("%s, cleanup running pre test case script" %
self.post_run.__name__)
self.tc_subproc.communicate(input='#close\n')
self.lf_subproc.close()
subproc_dir = (os.path.dirname(os.path.realpath(__file__)) + "/" +
self.ptsproject_name + "/")
subproc_path = subproc_dir + "post_tc.py"
if os.path.exists(subproc_path):
log("%s, run post test case script" % self.post_run.__name__)
self.lf_subproc = open(subproc_dir + "sp_post_stdout.log", "w")
subproc_cmd = " ".join(
[subproc_path, self.project_name, self.name])
self.tc_subproc = subprocess.Popen(shlex.split(subproc_cmd),
shell=False,
stdin=subprocess.PIPE,
stdout=self.lf_subproc,
stderr=self.lf_subproc)
self.tc_subproc.communicate(input='#close\n')
self.lf_subproc.close()
class TestCaseLT1(TestCase):
def copy(self):
"""Copy constructor"""
test_case = super(TestCaseLT1, self).copy()
test_case.name_lt2 = self.name_lt2
return test_case
def __init__(self, *args, **kwargs):
name_lt2 = kwargs.pop('lt2', None)
super(TestCaseLT1, self).__init__(*args, **kwargs)
self.name_lt2 = name_lt2
class TestCaseLT2(TestCase):
pass
def get_max_test_case_desc(test_cases):
"""Takes a list of test cases and return a tuple of longest project name
and test case name."""
max_project_name = 0
max_test_case_name = 0
for test_case in test_cases:
project_name_len = len(test_case.project_name)
test_case_name_len = len(test_case.name)
if project_name_len > max_project_name:
max_project_name = project_name_len
if test_case_name_len > max_test_case_name:
max_test_case_name = test_case_name_len
return (max_project_name, max_test_case_name)