import array
import binascii
import logging
import re
import socket
import struct
import sys
import time
from ptsprojects.stack import get_stack
from pybtp import btp
from pybtp.types import BTPError
log = logging.debug
def l2cap_wid_hdl(wid, description, test_case_name):
log("%s, %r, %r, %s", l2cap_wid_hdl.__name__, wid, description,
test_case_name)
module = sys.modules[__name__]
try:
handler = getattr(module, "hdl_wid_%d" % wid)
return handler(description)
except AttributeError as e:
logging.exception(e.message)
def hdl_wid_14(desc):
"""
Implements: TSC_MMI_iut_disable_connection
:param desc: Initiate an L2CAP disconnection from the IUT to the PTS.
:return:
"""
btp.l2cap_disconn(0)
return True
def hdl_wid_15(desc):
"""
Implements: TSC_MMI_tester_enable_connection
:param desc: Action: Place the IUT in connectable mode.
:return:
"""
btp.gap_set_conn()
btp.gap_set_gendiscov()
btp.gap_adv_ind_on()
return True
def hdl_wid_22(desc):
"""
Implements: TSC_MMI_iut_disable_acl_connection
:param desc: Initiate an ACL disconnection from the IUT to the PTS.
:return:
"""
btp.gap_wait_for_connection()
btp.gap_disconn()
return True
def hdl_wid_36(desc):
"""
Implements: TSC_MMI_iut_send_le_flow_control_credit
:param desc: Command IUT to send LE Flow Control Credit to increase the LE data packet that PTS can send.
:return:
"""
return True
def hdl_wid_37(desc):
"""
Implements: TSC_MMI_upper_tester_confirm_LE_data
:param desc: Did the Upper Tester send first data frame of xxxx to the PTS.
Click Yes if it matched, otherwise click No.
:return:
"""
pattern = re.compile(r"frame\sof\s([0-9a-fA-F]+)")
data = pattern.findall(desc)
if not data:
logging.error("%s parsing error", hdl_wid_37.__name__)
return False
stack = get_stack()
tx_data = stack.l2cap.tx_data_get(0)
if tx_data is None or len(tx_data) < 1:
return False
return data[0] == tx_data[0]
def hdl_wid_39(desc):
"""
Implements: TSC_MMI_upper_tester_confirm_receive_command_not_understaood
:param desc: Did Implementation Under Test (IUT) receive L2CAP Reject with 'command not understood' error.
Click Yes if it is, otherwise click No.
:return:
"""
stack = get_stack()
return not stack.l2cap.is_connected(0)
def hdl_wid_40(desc):
"""
Implements: TSC_MMI_upper_tester_confirm_data_receive
:param desc: Please confirm the Upper Tester receive data
:return:
"""
stack = get_stack()
rx_data = stack.l2cap.rx_data_get_all(10)
return rx_data is not None
def hdl_wid_41(desc):
"""
Implements: TSC_MMI_iut_send_le_credit_based_connection_request
:param desc: Using the Implementation Under Test (IUT), send a LE Credit based connection request to PTS.
:return:
"""
stack = get_stack()
btp.l2cap_conn(None, None, stack.l2cap.psm)
return True
def hdl_wid_42(desc):
"""
Implements: TSC_MMI_upper_tester_confirm_receive_reject_psm
:param desc: Did Implementation Under Test (IUT) receive Request Reject with 'LE_PSM not supported' 0x0002 error.
Click Yes if it is, otherwise click No.
:return:
"""
stack = get_stack()
return not stack.l2cap.is_connected(0)
def hdl_wid_43(desc):
"""
Implements: TSC_MMI_upper_tester_send_le_data_packet_large
:param desc: Upper Tester command IUT to send multiple LE frame data packet to the PTS.
:return:
"""
stack = get_stack()
btp.l2cap_send_data(0, "FF", 40)
return True
def hdl_wid_48(desc):
"""
Implements: TSC_MMI_upper_tester_confirm_receive_reject_resources
:param desc: Did Implementation Under Test (IUT) receive Connection refused 'Insufficient Resources' 0x0004 error.
Click Yes if it is, otherwise click No.
:return:
"""
stack = get_stack()
return not stack.l2cap.is_connected(0)
def hdl_wid_51(desc):
"""
Implements: TSC_MMI_iut_enable_le_connection
:param desc: Initiate or create LE ACL connection to the PTS.
:return:
"""
btp.gap_conn()
return True
def hdl_wid_52(desc):
"""
Implements: TSC_MMI_upper_tester_confirm_receive_reject_invalid_source_cid
:param desc: Did Implementation Under Test (IUT) receive Connection refused 'Invalid Source CID' 0x0009 error.
And does not send anything over refuse LE data channel. Click Yes if it is, otherwise click No.
:return:
"""
stack = get_stack()
return not stack.l2cap.is_connected(0)
def hdl_wid_53(desc):
"""
Implements: TSC_MMI_upper_tester_confirm_receive_reject_source_cid_already_allocated
:param desc: Did Implementation Under Test (IUT) receive Connection refused 'Source CID Already Allocated' 0x000A
error. And does not send anything over refuse LE data channel. Click Yes if it is, otherwise click No.
:return:
"""
stack = get_stack()
return not stack.l2cap.is_connected(0)
def hdl_wid_54(desc):
"""
Implements: TSC_MMI_upper_tester_confirm_receive_reject_unacceptable_parameters
:param desc: Did Implementation Under Test (IUT) receive Connection refused 'Unacceptable Parameters' 0x000B error.
Click Yes if it is, otherwise click No.
:return:
"""
stack = get_stack()
return not stack.l2cap.is_connected(0)
def hdl_wid_55(desc):
"""
:param desc: Upper Tester command IUT to send LE data packet to the PTS with
larger or equal to the TSPX_tester_mps and smaller or equal to the TSPX_tester_mtu values.
:return:
"""
stack = get_stack()
l2cap = stack.l2cap
channel = l2cap._chan_lookup_id(0)
if not channel:
return False
btp.l2cap_send_data(0, '00' * channel.peer_mps)
return True
def hdl_wid_56(desc):
"""
Implements: TSC_MMI_tester_enable_connection
:param desc: Action: Place the IUT in connectable mode.
:return:
"""
btp.gap_set_conn()
btp.gap_set_gendiscov()
btp.gap_adv_ind_on()
return True
def hdl_wid_57(desc):
stack = get_stack()
l2cap = stack.l2cap
channel = l2cap._chan_lookup_id(0)
if not channel:
return False
btp.l2cap_send_data(0, '00' * (channel.peer_mps * 4))
return True
def hdl_wid_58(desc):
return True
def hdl_wid_59(desc):
btp.gap_conn_param_update(btp.pts_addr_get(), btp.pts_addr_type_get(),
720, 864, 0, 400)
return True
def hdl_wid_100(desc):
l2cap = get_stack().l2cap
for channel in l2cap.channels:
try:
while True:
btp.l2cap_send_data(channel.id, '00')
except BTPError:
pass
except socket.timeout:
pass
return True
def hdl_wid_101(desc):
btp.l2cap_disconn(0)
return True
def hdl_wid_102(desc):
return True
def hdl_wid_103(desc):
stack = get_stack()
btp.l2cap_reconfigure(None, None, 0,
[chan.id for chan in stack.l2cap.channels])
return True
def hdl_wid_104(desc):
stack = get_stack()
stack.l2cap.clear_data()
return True
def hdl_wid_105(desc):
logging.error("Updating MPS size is not supported.")
return False
def hdl_wid_106(desc):
stack = get_stack()
l2cap = stack.l2cap
btp.l2cap_le_listen(l2cap.psm, l2cap.initial_mtu,
l2cap.insufficient_authen)
return True
def hdl_wid_107(desc):
stack = get_stack()
l2cap = stack.l2cap
btp.l2cap_le_listen(l2cap.psm, l2cap.initial_mtu,
l2cap.insufficient_author)
return True
def hdl_wid_108(desc):
stack = get_stack()
l2cap = stack.l2cap
btp.l2cap_le_listen(l2cap.psm, l2cap.initial_mtu,
l2cap.insufficient_key_sz)
return True
def hdl_wid_111(desc):
pattern = re.compile(r"\s([0-9]+)\s")
data = pattern.findall(desc)
if not data:
logging.error("%s parsing error", hdl_wid_111.__name__)
return False
stack = get_stack()
l2cap = stack.l2cap
channels = l2cap.rx_data_get_all(10)
if not (len(channels) == 1):
return False
rx_data = channels[0]
if not (len(rx_data) == 1):
return False
data_packet = rx_data[0]
if not (len(data_packet) == int(data[0])):
return False
comp_data = bytearray(range(len(data_packet)))
if not (data_packet == comp_data):
return False
return True
def hdl_wid_112(desc):
stack = get_stack()
l2cap = stack.l2cap
channels = l2cap.rx_data_get_all(10)
if not (len(channels) == 2):
return False
data_0 = channels[0]
if not(len(data_0) == 1):
return False
data_0 = data_0[0]
data_1 = channels[1]
if not (len(data_1) == 1):
return False
data_1 = data_1[0]
expected = binascii.unhexlify(bytearray('aa' * len(data_0)))
if not (data_0 == expected):
return False
expected = binascii.unhexlify(bytearray('55' * len(data_1)))
if not (data_1 == expected):
return False
return True
def hdl_wid_255(desc):
stack = get_stack()
l2cap = stack.l2cap
btp.l2cap_conn(None, None, l2cap.psm, num=2)
return True
def hdl_wid_256(desc):
btp.gap_wait_for_connection()
return True
def hdl_wid_257(desc):
stack = get_stack()
l2cap = stack.l2cap
channel = l2cap._chan_lookup_id(0)
if not channel:
return False
for _ in range(2):
btp.l2cap_send_data(0, '00' * channel.peer_mtu)
return True
def hdl_wid_258(desc):
stack = get_stack()
l2cap = stack.l2cap
channel = l2cap._chan_lookup_id(0)
if not channel:
return False
for _ in range(2):
btp.l2cap_send_data(0, '00' * channel.peer_mtu)
return True
def hdl_wid_259(desc):
return True
def hdl_wid_260(desc):
stack = get_stack()
return not stack.l2cap.is_connected(0)
def hdl_wid_261(desc):
time.sleep(2)
stack = get_stack()
channels = stack.l2cap.rx_data_get_all(10)
chan = stack.l2cap._chan_lookup_id(0)
if not (len(channels) == 1):
return False
rx_data = channels[0]
size = [len(d) for d in rx_data]
return size == 2 * [chan.our_mtu]
def hdl_wid_262(desc):
stack = get_stack()
l2cap = stack.l2cap
channel = l2cap._chan_lookup_id(0)
if not channel:
return False
for _ in range(5):
btp.l2cap_send_data(0, '00' * channel.peer_mtu)
time.sleep(2)
return True
def hdl_wid_20001(desc):
btp.gap_set_conn()
btp.gap_set_gendiscov()
btp.gap_adv_ind_on()
return True
def hdl_wid_20100(desc):
btp.gap_conn()
return True