import numpy as np
import dataflow as df
from udf_py.udf_control import UserFunc0
from udf_py.udf_add import UserFunc1
options = {
"ge.exec.deviceId": "0",
"ge.experiment.data_flow_deploy_info_path": "./config/data_flow_deploy_info.json"
}
df.init(options)
'''
FlowData
|
FlowNode(ProcessPoint 控制多func的激活)
|
FlowNode(ProcessPoint 多func)
'''
data0 = df.FlowData()
pp0 = df.FuncProcessPoint(py_func=UserFunc0, workspace_dir="./py_add_ws")
flow_node0 = df.FlowNode(input_num=1, output_num=4)
flow_node0.add_process_point(pp0)
pp1 = df.FuncProcessPoint(py_func=UserFunc1, workspace_dir="./py_add_ws")
flow_node1 = df.FlowNode(input_num=4, output_num=2)
flow_node1.add_process_point(pp1)
flow_node0_out = flow_node0(data0)
flow_node1_out0, flow_node1_out1 = flow_node1(*flow_node0_out)
dag = df.FlowGraph([flow_node1_out0, flow_node1_out1])
feed_data0 = np.array([0], dtype=np.int32)
flow_info = df.FlowInfo()
flow_info.start_time = 0
flow_info.end_time = 5
dag.feed_data({data0: feed_data0}, flow_info)
result = dag.fetch_data([0])
print("TEST-OUTPUT0:", result)
feed_data0 = np.array([1], dtype=np.int32)
dag.feed_data({data0: feed_data0}, flow_info)
result = dag.fetch_data([1])
print("TEST-OUTPUT1:", result)
df.finalize()