task_dispatch — Task-Level Execution Framework
Overview
task_dispatch is the task-level counterpart to action_dispatch.
| Dimension | action_dispatch | task_dispatch |
|---|---|---|
| Granularity | Joint-level streaming (100Hz) | Task-level sequencing (event-driven) |
| Use case | ACT/VLA imitation learning | MoveIt planners (VoxPoser, visual_grasp, etc.) |
| Execution | Topic streaming | Sequential action execution |
┌──────────────────────┐ ┌──────────────────────┐
│ action_dispatch │ │ task_dispatch │
│ │ │ │
│ ACT/VLA → 100Hz │ │ Planner → TaskPlan │
│ stream → Position │ │ steps → MoveIt │
│ Controller │ │ Gateway → Trajectory │
│ │ │ Controller │
└──────────────────────┘ └──────────────────────┘
Architecture
visual_grasp_planner ─┐
├─→ ExecuteTaskPlan action ─→ TaskExecutorNode
VoxPoser planner ─────┘ │
┌───────────┼────────────┐
│ │ │
MOVE_TO_POSE GRIPPER WAIT
│ │ │
MoveToPose FollowJoint sleep()
service Trajectory
(sync call) action
│ │
moveit_gateway gripper_trajectory
│ _controller
IK + plan + exec │
│ │
└─────┬─────┘
│
ros2_control
hardware
TaskExecutorNode provides the following execution capabilities:
| Step Type | Constant | Description | Underlying Call |
|---|---|---|---|
MOVE_TO_POSE |
0 |
Move arm to target pose | /moveit_gateway/move_to_pose service |
GRIPPER |
1 |
Control gripper open/close | /{gripper_controller}/follow_joint_trajectory action |
WAIT |
2 |
Wait for specified duration | Cancellable time.sleep() |
ROS Interfaces
Action Server
| Interface | Type | Description |
|---|---|---|
~/execute_task_plan |
ibrobot_msgs/action/ExecuteTaskPlan |
Main entry point for planners |
Upstream Dependencies
| Interface | Type | Description |
|---|---|---|
/moveit_gateway/move_to_pose |
ibrobot_msgs/srv/MoveToPose (Service) |
Arm motion, provided by moveit_gateway |
/{gripper_controller}/follow_joint_trajectory |
control_msgs/action/FollowJointTrajectory (Action) |
Gripper control, provided by ros2_control |
/joint_states |
sensor_msgs/msg/JointState (Topic) |
Joint state feedback |
Message Definitions
TaskStep.msg — Task step:
uint8 MOVE_TO_POSE = 0 # Move to target pose
uint8 GRIPPER = 1 # Gripper control
uint8 WAIT = 2 # Wait
uint8 type # Step type
string label # Human-readable label (for logging and feedback)
geometry_msgs/Pose target_pose # MOVE_TO_POSE: target pose
float64 velocity_scaling # MOVE_TO_POSE: velocity ratio (0.0~1.0, 0.0=default)
float64 gripper_position # GRIPPER: position (0.0=closed, 1.0=fully open)
float64 wait_duration_s # WAIT: duration in seconds
ExecuteTaskPlan.action:
# Goal
ibrobot_msgs/TaskStep[] steps # Task step sequence
string task_id # Unique task identifier
string task_description # Task description
---
# Result
bool success
string message
int32 steps_completed # Number of completed steps
float64 total_duration_s # Total duration
---
# Feedback
int32 current_step # Current step index (0-indexed)
int32 total_steps
string current_label
string status # "executing" | "completed" | "failed"
float32 progress # 0.0 ~ 1.0
Quick Start
1. Build
colcon build --packages-select ibrobot_msgs robot_config robot_moveit task_dispatch
source install/setup.bash
2. Launch
Task executor is automatically launched when control_mode contains "moveit":
export ROS_DOMAIN_ID=<your_id>
# Simulation mode
ros2 launch robot_config robot.launch.py \
robot_config:=so101_single_arm \
use_sim:=true \
control_mode:=moveit_planning
# Real hardware
ros2 launch robot_config robot.launch.py \
robot_config:=so101_single_arm \
use_sim:=false \
control_mode:=moveit_planning
Verify the node is running:
ros2 node list | grep task_executor
# Output: /task_executor
ros2 action list | grep execute_task_plan
# Output: /task_executor/execute_task_plan
3. CLI Verification
Test gripper open/close (no MoveIt motion planning required, gripper + wait only):
ros2 action send_goal /task_executor/execute_task_plan ibrobot_msgs/action/ExecuteTaskPlan "{steps: [{type: 2, label: 'wait_1s', wait_duration_s: 1.0}, {type: 1, label: 'open_gripper', gripper_position: 0.5}, {type: 2, label: 'wait_0.5s', wait_duration_s: 0.5}, {type: 1, label: 'close_gripper', gripper_position: 0.0}], task_id: 'test_001', task_description: 'gripper cycle test'}" --feedback
Expected output:
Result:
success: true
message: All steps completed successfully
steps_completed: 4
total_duration_s: 3.6s
Goal finished with status: SUCCEEDED
Test full pick sequence (includes MOVE_TO_POSE, requires moveit_gateway to be ready):
ros2 action send_goal /task_executor/execute_task_plan ibrobot_msgs/action/ExecuteTaskPlan "{steps: [{type: 1, label: 'open_gripper', gripper_position: 1.0}, {type: 0, label: 'move_above', target_pose: {position: {x: 0.2, y: 0.0, z: 0.3}, orientation: {w: 1.0}}, velocity_scaling: 0.3}, {type: 0, label: 'move_to_grasp', target_pose: {position: {x: 0.2, y: 0.0, z: 0.1}, orientation: {w: 1.0}}, velocity_scaling: 0.2}, {type: 1, label: 'close_gripper', gripper_position: 0.0}, {type: 2, label: 'hold', wait_duration_s: 1.0}, {type: 0, label: 'lift', target_pose: {position: {x: 0.2, y: 0.0, z: 0.4}, orientation: {w: 1.0}}, velocity_scaling: 0.3}], task_id: 'pick_001', task_description: 'pick and place'}" --feedback
Python API Usage
Basic: Sending a Task Plan
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from geometry_msgs.msg import Pose
from ibrobot_msgs.action import ExecuteTaskPlan
from ibrobot_msgs.msg import TaskStep
class MyPlanner(Node):
def __init__(self):
super().__init__('my_planner')
self._client = ActionClient(
self, ExecuteTaskPlan, '/task_executor/execute_task_plan'
)
def send_pick_task(self):
self._client.wait_for_server()
goal = ExecuteTaskPlan.Goal()
goal.task_id = 'pick_001'
goal.task_description = 'pick up object'
# Step 1: Open gripper
step1 = TaskStep()
step1.type = TaskStep.GRIPPER
step1.label = 'open'
step1.gripper_position = 1.0
# Step 2: Move above target
step2 = TaskStep()
step2.type = TaskStep.MOVE_TO_POSE
step2.label = 'approach'
step2.target_pose = Pose()
step2.target_pose.position.x = 0.2
step2.target_pose.position.y = 0.0
step2.target_pose.position.z = 0.3
step2.target_pose.orientation.w = 1.0
step2.velocity_scaling = 0.3
# Step 3: Close gripper
step3 = TaskStep()
step3.type = TaskStep.GRIPPER
step3.label = 'grasp'
step3.gripper_position = 0.0
# Step 4: Wait for settling
step4 = TaskStep()
step4.type = TaskStep.WAIT
step4.label = 'settle'
step4.wait_duration_s = 0.5
goal.steps = [step1, step2, step3, step4]
future = self._client.send_goal_async(
goal, feedback_callback=self._feedback_cb
)
future.add_done_callback(self._goal_response_cb)
def _goal_response_cb(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().warn('Goal rejected')
return
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self._result_cb)
def _feedback_cb(self, feedback_msg):
fb = feedback_msg.feedback
self.get_logger().info(
f'[{fb.current_step + 1}/{fb.total_steps}] '
f'{fb.current_label} — {fb.status} ({fb.progress:.0%})'
)
def _result_cb(self, future):
result = future.result().result
self.get_logger().info(
f'Done: success={result.success}, '
f'steps={result.steps_completed}, '
f'time={result.total_duration_s:.1f}s'
)
Configuration
All parameters are loaded from robot_config YAML (SSOT) automatically.
Gripper joint name resolved from joints.gripper:
joints:
gripper:
- "6" # Gripper joint name (SO-101 uses numeric IDs)
Gripper controller matched by gripper + trajectory keyword in control_modes:
control_modes:
moveit_planning:
controllers:
- joint_state_broadcaster
- arm_trajectory_controller
- gripper_trajectory_controller # → /gripper_trajectory_controller/follow_joint_trajectory
Auto-launch conditions (launch_builders/task_execution.py):
Task executor is automatically added to launch when any of:
control_modecontainsmoveitcontrol_modecontainsvisual_graspcontrol_modecontainsvoxposerexecutor.task_dispatch: true
Integrating Existing Planners
visual_grasp_planner
Remove the original executor.py and instead:
- Compute grasp poses (YOLO + SAM + PCA + calibration)
- Build a
TaskStep[]sequence - Send as
ExecuteTaskPlanaction goal
VoxPoser
- LLM generates waypoints
- Convert waypoints to
TaskStep[]sequence - Send as
ExecuteTaskPlanaction goal
Custom Planners
Any new planner follows the same pattern:
- Perception/Reasoning → produces target poses
- Build
TaskStep[]sequence - Send
ExecuteTaskPlanaction goal - Monitor feedback for execution progress