"""TTP c to python api."""
import weakref
from functools import wraps
from typing import Callable
import ttp_logger
if __package__ or "." in __name__:
from . import _ttp_c2python_api
else:
import _ttp_c2python_api
try:
import builtins as __builtin__
except ImportError:
import __builtin__
RET_ERROR = 1
ReportState_RS_NORMAL = _ttp_c2python_api.ReportState_RS_NORMAL
ReportState_RS_RETRY = _ttp_c2python_api.ReportState_RS_RETRY
ReportState_RS_UCE = _ttp_c2python_api.ReportState_RS_UCE
ReportState_RS_UCE_CORRUPTED = _ttp_c2python_api.ReportState_RS_UCE_CORRUPTED
ReportState_RS_HCCL_FAILED = _ttp_c2python_api.ReportState_RS_HCCL_FAILED
ReportState_RS_INIT_FINISH = _ttp_c2python_api.ReportState_RS_INIT_FINISH
ReportState_RS_PREREPAIR_FINISH = _ttp_c2python_api.ReportState_RS_PREREPAIR_FINISH
ReportState_RS_STEP_FINISH = _ttp_c2python_api.ReportState_RS_STEP_FINISH
ReportState_RS_UNKNOWN = _ttp_c2python_api.ReportState_RS_UNKNOWN
RepairType_RT_SEND = _ttp_c2python_api.RepairType_RT_SEND
RepairType_RT_UCE_HIGHLEVEL = _ttp_c2python_api.RepairType_RT_UCE_HIGHLEVEL
RepairType_RT_UCE_LOWLEVEL = _ttp_c2python_api.RepairType_RT_UCE_LOWLEVEL
RepairType_RT_RECV_REPAIR = _ttp_c2python_api.RepairType_RT_RECV_REPAIR
RepairType_RT_ROLLBACK = _ttp_c2python_api.RepairType_RT_ROLLBACK
RepairType_RT_LOAD_CKPT = _ttp_c2python_api.RepairType_RT_LOAD_CKPT
RepairType_RT_LOAD_REBUILD = _ttp_c2python_api.RepairType_RT_LOAD_REBUILD
def _swig_repr(self):
try:
strthis = "proxy of " + self.this.__repr__()
except __builtin__.Exception:
strthis = ""
return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
class SwigPyIterator(object):
thisown = property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc="The membership flag")
__repr__ = _swig_repr
__swig_destroy__ = _ttp_c2python_api.delete_SwigPyIterator
def __init__(self, *args, **kwargs):
raise AttributeError("No constructor defined - class is abstract")
def __next__(self):
return _ttp_c2python_api.SwigPyIterator___next__(self)
def __eq__(self, x):
return _ttp_c2python_api.SwigPyIterator___eq__(self, x)
def __ne__(self, x):
return _ttp_c2python_api.SwigPyIterator___ne__(self, x)
def __iadd__(self, n):
return _ttp_c2python_api.SwigPyIterator___iadd__(self, n)
def __isub__(self, n):
return _ttp_c2python_api.SwigPyIterator___isub__(self, n)
def __add__(self, n):
return _ttp_c2python_api.SwigPyIterator___add__(self, n)
def __sub__(self, *args):
return _ttp_c2python_api.SwigPyIterator___sub__(self, *args)
def __iter__(self):
return self
def value(self):
return _ttp_c2python_api.SwigPyIterator_value(self)
def incr(self, n=1):
return _ttp_c2python_api.SwigPyIterator_incr(self, n)
def decr(self, n=1):
return _ttp_c2python_api.SwigPyIterator_decr(self, n)
def distance(self, x):
return _ttp_c2python_api.SwigPyIterator_distance(self, x)
def equal(self, x):
return _ttp_c2python_api.SwigPyIterator_equal(self, x)
def copy(self):
return _ttp_c2python_api.SwigPyIterator_copy(self)
def next(self):
return _ttp_c2python_api.SwigPyIterator_next(self)
def previous(self):
return _ttp_c2python_api.SwigPyIterator_previous(self)
def advance(self, n):
return _ttp_c2python_api.SwigPyIterator_advance(self, n)
_ttp_c2python_api.SwigPyIterator_swigregister(SwigPyIterator)
def convert(func: Callable):
@wraps(func)
def wrapper(self):
return list(func(self))
return wrapper
class RepairContext(object):
thisown = property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc="The membership flag")
__repr__ = _swig_repr
__swig_destroy__ = _ttp_c2python_api.delete_RepairContext
def __init__(self, *args, **kwargs):
raise AttributeError("No constructor defined")
type = property(
_ttp_c2python_api.RepairContext_type_get,
_ttp_c2python_api.RepairContext_type_set
)
src_rank = property(
convert(_ttp_c2python_api.RepairContext_srcRank_get),
_ttp_c2python_api.RepairContext_srcRank_set
)
dst_rank = property(
convert(_ttp_c2python_api.RepairContext_dstRank_get),
_ttp_c2python_api.RepairContext_dstRank_set
)
replica_idx = property(
convert(_ttp_c2python_api.RepairContext_replicaIdx_get),
_ttp_c2python_api.RepairContext_replicaIdx_set
)
group_idx = property(
convert(_ttp_c2python_api.RepairContext_groupIdx_get),
_ttp_c2python_api.RepairContext_groupIdx_set
)
repair_id = property(
_ttp_c2python_api.RepairContext_repairId_get,
_ttp_c2python_api.RepairContext_repairId_set
)
step = property(
_ttp_c2python_api.RepairContext_step_get,
_ttp_c2python_api.RepairContext_step_set
)
rank_list = property(
convert(_ttp_c2python_api.RepairContext_ranks_get),
_ttp_c2python_api.RepairContext_ranks_set
)
zit_param = property(
_ttp_c2python_api.RepairContext_zitParam_get,
_ttp_c2python_api.RepairContext_zitParam_set
)
_ttp_c2python_api.RepairContext_swigregister(RepairContext)
class CallbackReceiver(object):
thisown = property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc="The membership flag")
__repr__ = _swig_repr
__swig_destroy__ = _ttp_c2python_api.delete_callback_receiver
def __init__(self):
if self.__class__ == CallbackReceiver:
_self = None
else:
_self = self
_ttp_c2python_api.callback_receiver_swiginit(self, _ttp_c2python_api.new_callback_receiver(_self, ))
def __disown__(self):
self.this.disown()
_ttp_c2python_api.disown_callback_receiver(self)
return weakref.proxy(self)
def ckpt_callback(self, step, repair_id, group_idx, ranks):
return _ttp_c2python_api.callback_receiver_ckpt_callback(self, step, repair_id, group_idx, ranks)
def downgrade_rebuild_callback(self, comm_group_idx, comm_groups, repair_id, zit_param):
return _ttp_c2python_api.callback_receiver_downgrade_rebuild_callback(self, comm_group_idx, comm_groups,
repair_id,
zit_param)
def upgrade_rebuild_callback(self, rank_list, repair_id, zit_param):
return _ttp_c2python_api.callback_receiver_upgrade_rebuild_callback(self, rank_list, repair_id, zit_param)
def upgrade_repair_callback(self, context):
return _ttp_c2python_api.callback_receiver_upgrade_repair_callback(self, context)
def upgrade_rollback_callback(self, context):
return _ttp_c2python_api.callback_receiver_upgrade_rollback_callback(self, context)
def repair_callback(self, context):
return _ttp_c2python_api.callback_receiver_repair_callback(self, context)
def rollback_callback(self, context):
return _ttp_c2python_api.callback_receiver_rollback_callback(self, context)
def pause_callback(self, pause_step, hot_switch):
return _ttp_c2python_api.callback_receiver_pause_callback(self, pause_step, hot_switch)
def continue_callback(self):
return _ttp_c2python_api.callback_receiver_continue_callback(self)
def rebuild_callback(self, dp_group, dp_group_size, world_group, world_group_size):
return _ttp_c2python_api.callback_receiver_rebuild_callback(
self, dp_group, dp_group_size, world_group, world_group_size)
def report_stop_complete_callback(self, code, msg, error_info_map):
return _ttp_c2python_api.callback_receiver_report_stop_complete_callback(self, code, msg, error_info_map)
def report_strategies_callback(self, error_info_map, strategies):
return _ttp_c2python_api.callback_receiver_report_strategies_callback(self, error_info_map, strategies)
def report_result_callback(self, code, msg, error_info_map, strategy):
return _ttp_c2python_api.callback_receiver_report_result_callback(
self, code, msg, error_info_map, strategy)
def report_fault_ranks_callback(self, error_info_map, error_code_map):
return _ttp_c2python_api.callback_receiver_report_fault_ranks_callback(self, error_info_map, error_code_map)
def rename_callback(self):
return _ttp_c2python_api.callback_receiver_rename_callback(self)
def exit_callback(self):
return _ttp_c2python_api.callback_receiver_exit_callback(self)
def device_stop_callback(self):
return _ttp_c2python_api.callback_receiver_device_stop_callback(self)
def device_clean_callback(self):
return _ttp_c2python_api.callback_receiver_device_clean_callback(self)
def register_callback(self):
return _ttp_c2python_api.callback_receiver_register_callback(self)
def communication_operate_callback(self, rank_list, repair_id):
return _ttp_c2python_api.callback_receiver_communication_operate_callback(self, rank_list, repair_id)
def launch_tcp_store_client_callback(self, url, rank, world_size):
return _ttp_c2python_api.callback_receiver_launch_tcp_store_client_callback(self, url, rank, world_size)
def launch_tcp_store_server_callback(self, url, world_size):
return _ttp_c2python_api.callback_receiver_launch_tcp_store_server_callback(self, url, world_size)
def decrypt_callback(self, cipher_text):
return _ttp_c2python_api.decrypt_callback(self, cipher_text)
_ttp_c2python_api.callback_receiver_swigregister(CallbackReceiver)
class TTPCallbackReceiver(CallbackReceiver):
def __init__(self):
super().__init__()
handler_ = None
def set_callback_receiver(handler):
global handler_
handler_ = handler
_ttp_c2python_api.set_callback_receiver(handler)
set_callback_receiver(TTPCallbackReceiver())
def convert_tuple_to_list(func: Callable):
def convert_list(args):
if not args:
return args
args_ = tuple()
for i, arg in enumerate(args):
if isinstance(arg, tuple):
arg = convert_list(arg)
args_ += (list(arg),)
else:
args_ += args[i:i + 1]
return args_
@wraps(func)
def wrapper(*args, **kwargs):
args_ = convert_list(args)
return func(*args_, **kwargs)
return wrapper
def init_controller(rank, world_size, enable_local_copy, enable_arf, enable_zit):
"""init controller"""
return _ttp_c2python_api.init_controller(rank, world_size, enable_local_copy, enable_arf, enable_zit)
def start_controller(master_ip, port, enable_ssl, tls_info):
"""start controller"""
return _ttp_c2python_api.start_controller(master_ip, port, enable_ssl, tls_info)
def destroy_controller():
"""destroy controller"""
return _ttp_c2python_api.destroy_controller()
def init_processor(rank, world_size, enable_local_copy, enable_ssl, tls_info, enable_uce, enable_arf, enable_zit):
"""init processor"""
return _ttp_c2python_api.init_processor(rank, world_size, enable_local_copy, enable_ssl,
tls_info, enable_uce, enable_arf, enable_zit)
def start_processor(master_ip, port, local_ip):
"""start processor"""
return _ttp_c2python_api.start_processor(master_ip, port, local_ip)
def get_repair_id():
"""get repair id"""
return _ttp_c2python_api.get_repair_id()
def get_hot_switch():
"""get hot switch"""
return _ttp_c2python_api.get_hot_switch()
def set_optimizer_replica(rank_list, replica_cnt, replica_shift):
"""set optimizer replica info"""
return _ttp_c2python_api.set_optimizer_replica(rank_list, replica_cnt, replica_shift)
def set_dp_group_info(rank_list):
"""set origin dp list info"""
return _ttp_c2python_api.set_dp_group_info(rank_list)
def set_ckpt_callback(callback):
"""set callback function use to exec checkpoint"""
if callback is None:
ttp_logger.LOGGER.error("set ckpt callback failed, callback is none")
return RET_ERROR
handler_.ckpt_callback = convert_tuple_to_list(callback)
return _ttp_c2python_api.set_ckpt_callback()
def set_zit_downgrade_rebuild_callback(callback):
"""set callback function use to zit degrade rebuild"""
if callback is None:
ttp_logger.LOGGER.error("set degrade rebuild callback failed, callback is none")
return RET_ERROR
handler_.downgrade_rebuild_callback = convert_tuple_to_list(callback)
return _ttp_c2python_api.set_zit_downgrade_rebuild_callback()
def set_zit_upgrade_rebuild_callback(callback):
"""set callback function use to zit upgrade rebuild"""
if callback is None:
ttp_logger.LOGGER.error("set upgrade rebuild callback failed, callback is none")
return RET_ERROR
handler_.upgrade_rebuild_callback = convert_tuple_to_list(callback)
return _ttp_c2python_api.set_zit_upgrade_rebuild_callback()
def set_zit_upgrade_repair_callback(callback):
"""set callback func use to zit upgrade repair callback"""
if callback is None:
ttp_logger.LOGGER.error("set upgrade repair callback failed, callback is none")
return RET_ERROR
handler_.upgrade_repair_callback = callback
return _ttp_c2python_api.set_zit_upgrade_repair_callback()
def set_zit_upgrade_rollback_callback(callback):
"""set callback func use to zit upgrade rollback rollback"""
if callback is None:
ttp_logger.LOGGER.error("set upgrade rollback callback failed, callback is none")
return RET_ERROR
handler_.upgrade_rollback_callback = callback
return _ttp_c2python_api.set_zit_upgrade_rollback_callback()
def start_copying():
"""set begin to local copy optimizer state"""
return _ttp_c2python_api.start_copying()
def start_updating(backup_step):
"""set begin to update optimizer state"""
return _ttp_c2python_api.start_updating(backup_step)
def reset_limit_step():
"""reset limit step before stop return"""
return _ttp_c2python_api.reset_limit_step()
def end_updating(step):
"""finished to update optimizer state"""
return _ttp_c2python_api.end_updating(step)
def set_rename_callback(callback):
"""set callback function used to exec rename"""
if callback is None:
ttp_logger.LOGGER.error("set rename callback failed, callback is none")
return RET_ERROR
handler_.rename_callback = callback
return _ttp_c2python_api.set_rename_callback()
def set_exit_callback(callback):
"""set callback function used to notify process exit"""
if callback is None:
ttp_logger.LOGGER.error("set exit callback failed, callback is none")
return RET_ERROR
handler_.exit_callback = callback
return _ttp_c2python_api.set_exit_callback()
def set_stop_device_callback(callback):
"""set callback func use to exec device stop"""
if callback is None:
ttp_logger.LOGGER.error("set stop device callback failed, callback is none")
return RET_ERROR
handler_.device_stop_callback = callback
return _ttp_c2python_api.set_stop_device_callback()
def set_clean_device_callback(callback):
"""set callback func use to exec device clean"""
if callback is None:
ttp_logger.LOGGER.error("set clean device callback failed, callback is none")
return RET_ERROR
handler_.device_clean_callback = callback
return _ttp_c2python_api.set_clean_device_callback()
def set_repair_callback(callback):
"""set callback func use to exec repair"""
if callback is None:
ttp_logger.LOGGER.error("set repair callback failed, callback is none")
return RET_ERROR
handler_.repair_callback = callback
return _ttp_c2python_api.set_repair_callback()
def set_rollback_callback(callback):
"""set callback func use to exec rollback"""
if callback is None:
ttp_logger.LOGGER.error("set rollback callback failed, callback is none")
return RET_ERROR
handler_.rollback_callback = callback
return _ttp_c2python_api.set_rollback_callback()
def set_pause_callback(callback):
"""set callback func use to exec rollback"""
if callback is None:
ttp_logger.LOGGER.error("set pause callback failed, callback is none")
return RET_ERROR
handler_.pause_callback = callback
return _ttp_c2python_api.set_pause_callback()
def set_continue_callback(callback):
"""set callback func use to exec rollback"""
if callback is None:
ttp_logger.LOGGER.error("set rollback callback failed, callback is none")
return RET_ERROR
handler_.continue_callback = callback
return _ttp_c2python_api.set_continue_callback()
def set_register_check_callback(callback):
"""set callback func use to exec register to mindx"""
if callback is None:
ttp_logger.LOGGER.error("set register check callback failed, callback is none")
return RET_ERROR
handler_.register_callback = callback
return _ttp_c2python_api.set_register_check_callback()
def set_report_stop_complete_callback(callback):
"""set callback func use to exec report stop complete"""
if callback is None:
ttp_logger.LOGGER.error("set report stop complete callback failed, callback is none")
return RET_ERROR
handler_.report_stop_complete_callback = callback
return _ttp_c2python_api.set_report_stop_complete_callback()
def set_report_strategies_callback(callback):
"""set callback func use to exec report recover strategy"""
if callback is None:
ttp_logger.LOGGER.error("set report recover strategy callback failed, callback is none")
return RET_ERROR
handler_.report_strategies_callback = convert_tuple_to_list(callback)
return _ttp_c2python_api.set_report_strategies_callback()
def set_report_result_callback(callback):
"""set callback func use to exec report recover status"""
if callback is None:
ttp_logger.LOGGER.error("set report recover status callback failed, callback is none")
return RET_ERROR
handler_.report_result_callback = callback
return _ttp_c2python_api.set_report_result_callback()
def set_report_fault_ranks_callback(callback):
"""set callback func use to exec report process fault"""
if callback is None:
ttp_logger.LOGGER.error("set report process fault callback failed, callback is none")
return RET_ERROR
handler_.report_fault_ranks_callback = callback
return _ttp_c2python_api.set_report_fault_ranks_callback()
def mindx_stop_train_callback(rank_list):
"""set callback func use to notify stop train"""
return _ttp_c2python_api.mindx_stop_train_callback(rank_list)
def mindx_pause_train_callback(timeout):
"""set callback func use to notify pause train"""
return _ttp_c2python_api.mindx_pause_train_callback(timeout)
def mindx_prepare_action_callback(action, fault_ranks):
"""set callback func use to prepare action"""
return _ttp_c2python_api.mindx_prepare_action_callback(action, fault_ranks)
def mindx_change_strategy_callback(strategy, params):
"""set callback func use to change strategy"""
return _ttp_c2python_api.mindx_change_strategy_callback(strategy, params)
def mindx_query_high_availability_switch():
"""query high availability switch status"""
return _ttp_c2python_api.mindx_query_high_availability_switch()
def mindx_notify_fault_callback(rank_list, time):
"""set callback func use to notify fault ranks"""
return _ttp_c2python_api.mindx_notify_fault_callback(rank_list, time)
def mindx_notify_dump_callback():
"""set callback func use to notify do dump"""
return _ttp_c2python_api.mindx_notify_dump_callback()
def set_communication_operate_callback(callback):
"""set callback func use to communication operate"""
if callback is None:
ttp_logger.LOGGER.error("set communication operate callback failed, callback is none")
return RET_ERROR
handler_.communication_operate_callback = convert_tuple_to_list(callback)
return _ttp_c2python_api.set_communication_operate_callback()
def set_launch_tcp_store_client_callback(callback):
"""set callback func use to exec launch tcp store client"""
if callback is None:
ttp_logger.LOGGER.error("set launch tcp store client callback failed, callback is none")
return RET_ERROR
handler_.launch_tcp_store_client_callback = callback
return _ttp_c2python_api.set_launch_tcp_store_client_callback()
def set_launch_tcp_store_server_callback(callback):
"""set backup controller callback func to launch tcp store server"""
if callback is None:
ttp_logger.LOGGER.error("set launch tcp store server callback failed, callback is none")
return RET_ERROR
handler_.launch_tcp_store_server_callback = callback
return _ttp_c2python_api.set_launch_tcp_store_server_callback()
def set_rebuild_callback(callback):
"""set callback func use to exec downgrade rebuild"""
if callback is None:
ttp_logger.LOGGER.error("set rebuild callback failed, callback is none")
return RET_ERROR
handler_.rebuild_callback = convert_tuple_to_list(callback)
return _ttp_c2python_api.set_rebuild_callback()
def set_dump_status(result):
"""set dump result code"""
return _ttp_c2python_api.set_dump_status(result)
def destroy_processor():
"""destroy processor"""
return _ttp_c2python_api.destroy_processor()
def report_status(state, error_code=''):
"""report status"""
return _ttp_c2python_api.report_status(state, error_code)
def wait_next_action():
"""wait next action"""
return _ttp_c2python_api.wait_next_action()
def wait_repair_action():
"""wait repair action"""
return _ttp_c2python_api.wait_repair_action()
def get_repair_type():
"""get repair type"""
return _ttp_c2python_api.get_repair_type()
def report_load_ckpt_step(load_ckpt_step):
"""report step while load from ckpt"""
return _ttp_c2python_api.report_load_ckpt_step(load_ckpt_step)
def log(level, msg):
"""ttp c2python log api"""
return _ttp_c2python_api.log(level, msg)
def set_decrypt_callback(callback):
"""set callback function use to decrypt cipher text"""
if callback is None:
ttp_logger.LOGGER.error("set decrypt callback failed, callback is none")
return RET_ERROR
handler_.decrypt_callback = callback
return _ttp_c2python_api.set_decrypt_callback()
cvar = _ttp_c2python_api.cvar