transfer_engine Python API Guide
1. Build Python Wheel
Option A: one-click script
./build.sh
Option B: manual
python3 -m pip install wheel
python3 setup.py bdist_wheel
Wheel output:
dist/*.whl
2. Python Package
from yr.datasystem import TransferEngine, Result, ErrorCode
3. API Reference
TransferEngine:
engine = TransferEngine()
Methods:
initialize(local_hostname: str, protocol: str, device_name: str) -> Resultprotocolis currently not validated.device_namemust matchnpu:${device_id}.register_memory(buffer_addr_regisrterch: int, length: int) -> Resultbatch_register_memory(buffer_addrs: list[int], lengths: list[int]) -> Resultunregister_memory(buffer_addr_regisrterch: int) -> Resultbatch_unregister_memory(buffer_addrs: list[int]) -> Resulttransfer_sync_read(target_hostname: str, buffer: int, peer_buffer_address: int, length: int) -> Resultbatch_transfer_sync_read(target_hostname: str, buffers: list[int], peer_buffer_addresses: list[int], lengths: list[int]) -> Resultfinalize() -> Result
Result:
is_ok() -> boolis_error() -> boolget_code() -> ErrorCodeget_msg() -> strto_string() -> str
ErrorCode:
kOkkInvalidkNotFoundkRuntimeErrorkNotReadykNotAuthorizedkNotSupported
4. Quick Example (single process)
import torch
import torch_npu
from yr.datasystem import TransferEngine
owner = TransferEngine()
requester = TransferEngine()
owner_device_id = 0
requester_device_id = 1
owner.initialize("127.0.0.1:60551", "ascend", f"npu:{owner_device_id}")
requester.initialize("127.0.0.1:60552", "ascend", f"npu:{requester_device_id}")
size = 64
src = torch.arange(size, dtype=torch.uint8, device=f"npu:{owner_device_id}")
dst = torch.zeros(size, dtype=torch.uint8, device=f"npu:{requester_device_id}")
src_addr = src.data_ptr()
dst_addr = dst.data_ptr()
owner.register_memory(src_addr, size)
rc = requester.transfer_sync_read("127.0.0.1:60551", dst_addr, src_addr, size)
print(rc.to_string())
print("equal:", torch.equal(src.cpu(), dst.cpu()))
requester.finalize()
owner.finalize()
5. Cross-node Smoke Example (owner/requester)
Smoke script:
tests/python/smoke/test_python_api_smoke.py
5.1 Start owner (Node A)
PYTHONPATH=.:python python3 tests/python/smoke/test_python_api_smoke.py \
--role owner \
--local-hostname 10.10.10.1:18481 \
--device-id 0 \
--size 4096 \
--register-count 4 \
--hold-seconds 600
Owner will print:
[OWNER_READY] ... remote_addrs=...[OWNER_READY_FOR_REQUESTER] --peer-hostname ... --peer-device-id ... --remote-addrs ...
5.2 Run requester (Node B)
Use the printed values from owner:
PYTHONPATH=.:python python3 tests/python/smoke/test_python_api_smoke.py \
--role requester \
--local-hostname 10.10.10.2:18482 \
--device-id 1 \
--size 4096 \
--peer-hostname 10.10.10.1:18481 \
--peer-device-id 0 \
--remote-addrs 0x1234,0x5678,0x9abc,0xdef0 \
--auto-verify-data
6. ST Test
ST case file:
tests/python/st/test_python_api_st.py
Run:
PYTHONPATH=.:python python3 -m unittest tests.python.st.test_python_api_st -v
Notes:
- ST currently requires
torch+torch_npu. - ST expects at least 2 NPUs on one node (same-node, different
device_id).