Stream 模块架构
1. 模块概述
- 功能介绍:Stream 模块负责管理任务队列,实现异步执行和同步机制。通过 SQ(Submission Queue)和 CQ(Completion Queue)机制与硬件交互,支持多种流类型(普通流、控制流、协处理器流、David流、XPU流)。
- 设计目标:
- 提供高效的异步任务执行机制
- 支持多种流类型适配不同场景
- 实现任务队列管理和同步机制
- 支持 SQ/CQ 资源管理和复用
- 支持流捕获功能(ACL Graph)
- 支持自动切分 SQ 模式
2. 使用场景与对外接口
2.1 使用场景
-
场景一:创建流并提交任务
aclrtStream stream; aclrtCreateStream(&stream); // 创建流 aclrtLaunchKernel(stream, ...); // 提交内核任务 aclrtSynchronizeStream(stream); // 等待完成 -
场景二:多流并行执行
aclrtStream stream1, stream2; aclrtCreateStream(&stream1); aclrtCreateStream(&stream2); // 在不同流上并行执行任务 aclrtLaunchKernel(stream1, kernel1, ...); aclrtLaunchKernel(stream2, kernel2, ...); -
场景三:事件同步
aclrtRecordEvent(event, stream1); // 在 stream1 记录事件 aclrtStreamWaitEvent(stream2, event); // stream2 等待事件 -
场景四:带配置创建流
aclrtStream stream; aclrtCreateStreamWithConfig(&stream, 0, ACL_STREAM_FAST_SYNC); // 创建快速同步流
2.2 对外接口
核心接口
| 接口 | 头文件位置 | 说明 |
|---|---|---|
aclrtCreateStream() |
include/external/acl/acl_rt.h |
创建流 |
aclrtCreateStreamWithConfig() |
include/external/acl/acl_rt.h |
带优先级和标志创建流 |
aclrtDestroyStream() |
include/external/acl/acl_rt.h |
销毁流 |
aclrtDestroyStreamForce() |
include/external/acl/acl_rt.h |
强制销毁流 |
aclrtSynchronizeStream() |
include/external/acl/acl_rt.h |
同步流 |
aclrtStreamQuery() |
include/external/acl/acl_rt.h |
查询流状态 |
aclrtStreamWaitEvent() |
include/external/acl/acl_rt.h |
流等待事件 |
扩展接口
| 接口 | 头文件位置 | 说明 |
|---|---|---|
aclrtSynchronizeStreamWithTimeout() |
include/external/acl/acl_rt.h |
带超时同步流 |
aclrtStreamGetPriority() |
include/external/acl/acl_rt.h |
获取流优先级 |
aclrtStreamGetFlags() |
include/external/acl/acl_rt.h |
获取流标志 |
aclrtStreamAbort() |
include/external/acl/acl_rt.h |
终止流执行 |
aclrtStreamGetId() |
include/external/acl/acl_rt.h |
获取流 ID |
aclrtGetStreamAvailableNum() |
include/external/acl/acl_rt.h |
获取可用流数量 |
aclrtStreamWaitEventWithTimeout() |
include/external/acl/acl_rt.h |
流等待事件带超时 |
流属性接口
| 接口 | 头文件位置 | 说明 |
|---|---|---|
aclrtSetStreamAttribute() |
include/external/acl/acl_rt.h |
设置流属性 |
aclrtGetStreamAttribute() |
include/external/acl/acl_rt.h |
获取流属性 |
aclrtSetStreamFailureMode() |
include/external/acl/acl_rt.h |
设置流失败模式 |
aclrtSetStreamOverflowSwitch() |
include/external/acl/acl_rt.h |
设置流溢出开关 |
aclrtGetStreamOverflowSwitch() |
include/external/acl/acl_rt.h |
获取流溢出开关 |
流控制接口
| 接口 | 头文件位置 | 说明 |
|---|---|---|
aclrtActiveStream() |
include/external/acl/acl_rt.h |
激活流 |
aclrtSwitchStream() |
include/external/acl/acl_rt.h |
切换流 |
aclrtStreamStop() |
include/external/acl/acl_rt.h |
停止流 |
aclrtPersistentTaskClean() |
include/external/acl/acl_rt.h |
清理持久任务 |
流捕获接口(aclmdlRI)
| 接口 | 头文件位置 | 说明 |
|---|---|---|
aclmdlRICaptureBegin() |
include/external/acl/acl_rt.h |
开始流捕获 |
aclmdlRICaptureEnd() |
include/external/acl/acl_rt.h |
结束流捕获 |
aclmdlRICaptureGetInfo() |
include/external/acl/acl_rt.h |
获取捕获信息 |
aclmdlRICaptureTaskGrpBegin() |
include/external/acl/acl_rt.h |
开始任务组捕获 |
aclmdlRICaptureTaskGrpEnd() |
include/external/acl/acl_rt.h |
结束任务组捕获 |
aclmdlRICaptureTaskUpdateBegin() |
include/external/acl/acl_rt.h |
开始任务更新捕获 |
aclmdlRICaptureTaskUpdateEnd() |
include/external/acl/acl_rt.h |
结束任务更新捕获 |
流配置接口
| 接口 | 头文件位置 | 说明 |
|---|---|---|
aclrtCreateStreamConfigHandle() |
include/external/acl/acl_rt.h |
创建流配置句柄 |
aclrtDestroyStreamConfigHandle() |
include/external/acl/acl_rt.h |
销毁流配置句柄 |
aclrtSetStreamConfigOpt() |
include/external/acl/acl_rt.h |
设置流配置选项 |
aclrtCreateStreamV2() |
include/external/acl/acl_rt.h |
带配置创建流 |
3. 架构总览
整体设计思路
Stream 采用 SQ/CQ 异步机制实现任务执行:任务通过 SQ 提交到硬件,硬件执行完成后通过 CQ 返回结果。Stream 维护任务队列(taskPosHead/taskPosTail),支持任务分配、回收和同步。继承自 NoCopy 基类防止拷贝。
架构分层图
graph TB
subgraph ACL_API["ACL 接口层"]
aclrtCreateStream["aclrtCreateStream()"]
aclrtSynchronizeStream["aclrtSynchronizeStream()"]
end
subgraph ACL_Impl["ACL 实现层"]
ACL_Impl_Layer["aclrt_impl/stream.cpp"]
end
subgraph RT_API["RT 接口层"]
rtStreamCreate["rtStreamCreate()"]
rtStreamSynchronize["rtStreamSynchronize()"]
end
subgraph StreamCore["Stream 核心层"]
StreamNode["Stream<br/>流管理"]
StreamFactory["StreamFactory<br/>流工厂"]
end
subgraph Derivatives["Stream 派生类"]
CtrlStream["CtrlStream<br/>控制流"]
CoprocessorStream["CoprocessorStream<br/>协处理器流"]
DavidStream["DavidStream<br/>David架构流"]
XpuStream["XpuStream<br/>XPU架构流"]
end
subgraph Resource["资源管理层"]
SqCqManage["StreamSqCqManage<br/>SQ/CQ 管理"]
TaskAllocator["TaskAllocator<br/>任务分配器"]
TaskResManage["TaskResManage<br/>任务资源管理"]
DvppGrp["DvppGrp<br/>DVPP组管理"]
end
subgraph DriverLayer["驱动层"]
DriverNode["Driver"]
HAL["HAL 驱动"]
end
aclrtCreateStream --> ACL_Impl_Layer
aclrtSynchronizeStream --> ACL_Impl_Layer
ACL_Impl_Layer --> rtStreamCreate
ACL_Impl_Layer --> rtStreamSynchronize
rtStreamCreate --> StreamFactory
rtStreamSynchronize --> StreamNode
StreamFactory --> StreamNode
StreamFactory --> CtrlStream
StreamFactory --> CoprocessorStream
StreamFactory --> DavidStream
StreamFactory --> XpuStream
StreamNode --> SqCqManage
StreamNode --> TaskAllocator
StreamNode --> TaskResManage
StreamNode --> DvppGrp
SqCqManage --> DriverNode --> HAL
核心模块交互图
sequenceDiagram
participant App as 应用程序
participant ACL as aclrt API
participant ACLImpl as aclrt_impl
participant RT as rtStream API
participant Factory as StreamFactory
participant Stream as Stream
participant SqCqManage as StreamSqCqManage
participant Driver as Driver/HAL
participant HW as 硬件
App->>ACL: aclrtCreateStream()
ACL->>ACLImpl: aclrtCreateStreamImpl()
ACLImpl->>RT: rtStreamCreate()
RT->>Factory: CreateStream()
Factory->>Stream: new Stream(dev, prio)
Stream->>SqCqManage: AllocStreamSqCq()
SqCqManage->>Driver: NormalSqCqAllocate()
Driver-->>SqCqManage: sqId, cqId
Stream->>Stream: Setup()
Stream->>Driver: StreamIdAlloc()
App->>ACL: aclrtLaunchKernel()
ACL->>Stream: AllocTask()
Stream->>Stream: 填充 TaskInfo
Stream->>Driver: 发送 SQE
Driver->>HW: 提交任务
App->>ACL: aclrtSynchronizeStream()
ACL->>ACLImpl: aclrtSynchronizeStreamImpl()
ACLImpl->>RT: rtStreamSynchronize()
RT->>Stream: Synchronize()
Stream->>Stream: 等待 CQ 完成
Stream-->>App: 返回结果
4. 详细设计
4.1 核心流程
流创建流程
flowchart TD
A[aclrtCreateStream] --> B[aclrtCreateStreamImpl]
B --> C[rtStreamCreate]
C --> D[StreamFactory::CreateStream]
D --> E[CreateStreamAndGet<br/>版本分发]
E --> F{判断标志}
F -->|ACL_STREAM_DEVICE_USE_ONLY| G[创建 CoprocessorStream]
F -->|其他| H[创建 DavidStream/Stream]
G --> I[Setup]
H --> I
I --> J[AllocStreamSqCq]
J --> K[分配 sqId/cqId]
K --> L[AllocLogicCq]
L --> M[CreateStreamTaskRes]
M --> N[流就绪]
关键代码:
// 文件位置:src/acl/aclrt_impl/stream.cpp:33-50
aclError aclrtCreateStreamImpl(aclrtStream *stream) {
rtStream_t rtStream = nullptr;
const rtError_t rtErr = rtStreamCreate(&rtStream,
static_cast<int32_t>(RT_STREAM_PRIORITY_DEFAULT));
if (rtErr != RT_ERROR_NONE) {
return ACL_GET_ERRCODE_RTS(rtErr);
}
*stream = static_cast<aclrtStream>(rtStream);
return ACL_SUCCESS;
}
// 文件位置:src/runtime/core/src/stream/stream.cc:606-788
rtError_t Stream::Setup() {
// 设置 SQ 深度
const uint32_t rtsqDepth =
(((flags_ & RT_STREAM_HUGE) != 0U) &&
(device_->GetDevProperties().maxTaskNumPerHugeStream != 0)) ?
device_->GetDevProperties().maxTaskNumPerHugeStream :
device_->GetDevProperties().rtsqDepth;
SetSqDepth(rtsqDepth);
// 分配 streamId
error = device_->Driver_()->StreamIdAlloc(&streamId_, device_->Id_(),
device_->DevGetTsId(), priority_);
// 分配 SQ/CQ
error = stmSqCqManage->AllocStreamSqCq(this, priority_, 0U, tmpSqId, tmpCqId);
sqId_ = tmpSqId;
cqId_ = tmpCqId;
// 分配逻辑 CQ
error = AllocLogicCq(isDisableThread, starsFlag, stmSqCqManage);
// 创建任务资源
CreateStreamTaskRes();
error = CreateStreamArgRes();
return RT_ERROR_NONE;
}
任务提交流程
flowchart TD
A[AllocTask] --> B{任务组更新?}
B -->|是| C[UpdateTask]
B -->|否| D{捕获模式?}
D -->|是| E[AllocCaptureTask]
D -->|否| F[TaskFactory::Alloc]
F --> G[设置任务归属]
G --> H[填充 TaskInfo]
H --> I[填充 SQE]
I --> J[发送到 SQ]
J --> K[更新 sqTailPos]
关键代码:
// 文件位置:src/runtime/core/src/stream/stream.cc:4544-4579
TaskInfo* Stream::AllocTask(TaskInfo* pTask, tsTaskType_t taskType,
rtError_t& errorReason, uint32_t sqeNum,
UpdateTaskFlag flag) {
// 任务组更新场景
if (IsTaskGroupUpdate()) {
TaskInfo* updateTask = nullptr;
if (flag == UpdateTaskFlag::SUPPORT) {
errorReason = UpdateTask(&updateTask);
return updateTask;
}
}
// 捕获模式场景
if (GetCaptureStatus() != RT_STREAM_CAPTURE_STATUS_NONE) {
TaskInfo* captureTask = pTask;
const rtError_t error = AllocCaptureTask(taskType, sqeNum, &captureTask);
if (error != RT_ERROR_STREAM_CAPTURE_EXIT) {
errorReason = error;
return error == RT_ERROR_NONE ? captureTask : nullptr;
}
}
// 正常分配
if (taskResMang_ == nullptr) {
return device_->GetTaskFactory()->Alloc(this, taskType, errorReason);
} else {
pTask->stream = this;
return pTask;
}
}
流同步流程
flowchart TD
A[aclrtSynchronizeStream] --> B[aclrtSynchronizeStreamImpl]
B --> C[rtStreamSynchronize]
C --> D{STARS 平台?}
D -->|是| E{分离发送回收?}
E -->|否| F[StarsWaitForTask]
E -->|是| G[SynchronizeImpl]
D -->|否| H{禁用线程?}
H -->|是| I[WaitForTask]
H -->|否| J[创建 Event]
J --> K[Record Event]
K --> L[Event::Synchronize]
L --> M[返回结果]
关键代码:
// 文件位置:src/acl/aclrt_impl/stream.cpp:132-149
aclError aclrtSynchronizeStreamImpl(aclrtStream stream) {
const rtError_t rtErr = rtStreamSynchronize(static_cast<rtStream_t>(stream));
if (rtErr != RT_ERROR_NONE) {
return ACL_GET_ERRCODE_RTS(rtErr);
}
return ACL_SUCCESS;
}
// 文件位置:src/runtime/core/src/stream/stream.cc:1927-1982
rtError_t Stream::Synchronize(const bool isNeedWaitSyncCq, int32_t timeout) {
// STARS 平台路径
if (device_->IsStarsPlatform()) {
if (!IsSeparateSendAndRecycle() || GetBindFlag()) {
error = StarsWaitForTask(lastTaskId_, isNeedWaitSyncCq, timeout);
} else {
if (!IsSyncFinished()) {
error = SynchronizeImpl(lastTaskId_, latestConcernedTaskId.Value(), timeout);
}
}
return GetSynchronizeError(error);
}
// 禁用线程路径
if (Runtime::Instance()->GetDisableThread()) {
error = WaitForTask(lastTaskId_, isNeedWaitSyncCq, timeout);
return GetSynchronizeError(error);
}
// 正常路径:通过 Event 同步
Event *event = new Event(device_, RT_EVENT_DEFAULT, Context_(), true);
error = event->Record(this);
error = event->Synchronize(timeout);
return error;
}
4.2 核心机制详解
SQ/CQ 异步机制
设计思想:通过 SQ(提交队列)和 CQ(完成队列)实现异步任务执行,任务提交到 SQ 后立即返回,硬件执行完成后通过 CQ 通知。
// 文件位置:src/runtime/core/src/stream/stream.hpp
class Stream : public NoCopy {
protected:
uint32_t sqId_; // SQ ID
uint32_t cqId_; // CQ ID
uint32_t sqTailPos_; // SQ 尾位置
uint32_t sqHeadPos_; // SQ 头位置
uint64_t sqRegVirtualAddr_; // SQ 寄存器虚拟地址
private:
uint64_t sqAddr_; // SQ 基地址 (最大 2M)
uint32_t sqDepth_; // SQ 深度
uint8_t* sqeBuffer_; // SQE 缓冲区指针
uint32_t sqeBufferSize_; // SQE 缓冲区大小
Atomic<uint32_t> taskPosHead_; // 任务位置头 (Stars)
Atomic<uint32_t> taskPosTail_; // 任务位置尾 (Stars)
};
多类型流支持
设计思想:支持多种流类型适配不同硬件架构和使用场景。
classDiagram
class NoCopy {
<<interface>>
}
class Stream {
+Setup() rtError_t
+Synchronize() rtError_t
+AllocTask() TaskInfo*
+GetSqId() uint32_t
+GetCqId() uint32_t
}
class CtrlStream {
+Setup() rtError_t
+GetHeadPosFromCtrlSq() rtError_t
-sqDepth = 1022
}
class CoprocessorStream {
+Setup() rtError_t
}
class DavidStream {
+Setup() rtError_t
+StarsAddTaskToStream() rtError_t
+DavidUpdatePublicQueue() rtError_t
}
class XpuStream {
+Setup() rtError_t
+StarsAddTaskToStream() rtError_t
}
class DvppGrp {
+Setup() rtError_t
+getLogicCqId() uint32_t
}
NoCopy <|-- Stream
NoCopy <|-- DvppGrp
Stream <|-- CtrlStream
Stream <|-- CoprocessorStream
Stream <|-- DavidStream
Stream <|-- XpuStream
Stream --> DvppGrp : 关联
派生类特点:
| 类名 | 继承关系 | 特点 | 关键标志 |
|---|---|---|---|
| CtrlStream | Stream | 控制流,固定SQ深度1022 | isCtrlStream_=true |
| CoprocessorStream | Stream | 协处理器流,远程处理 | TSDRV_FLAG_REMOTE_ID |
| DavidStream | Stream | David/Stars架构流 | 重写大量方法 |
| XpuStream | Stream | XPU架构流 | 重写核心方法 |
| DvppGrp | NoCopy | DVPP组管理(非Stream派生) | 提供logicCq |
自动切分 SQ 模式
设计思想:当单个流的 SQ 深度达到上限时,自动创建 slave stream 扩展 SQ 容量,实现主从模式。
// 文件位置:src/runtime/core/src/stream/stream.hpp:146-151
struct AutoSplitSqContext {
Stream *masterStream{nullptr}; // slave stream 指向 master
std::vector<Stream *> slaveStreams; // master 的 slave 列表
int32_t exposedStreamId{-1}; // 对外暴露的 streamId
uint32_t curStreamSqeCount{0U}; // 当前已分配的 SQE 数量
};
关键代码:
// 文件位置:src/runtime/core/src/stream/stream.cc:1003-1048
rtError_t Stream::SetupForAutoSplit() {
error = InitAutoSplitBasicParams();
error = AllocPosToTaskIdMap();
error = CreateStreamArgRes();
error = AllocStreamIdForAutoSplit();
error = AllocAutoSplitContext(); // 创建 AutoSplitSqContext
error = AllocSqeBufferForAutoSplit();
error = AllocSqCqForAutoSplitWithRetry();
SetMaxTaskId(isDisableThread);
return RT_ERROR_NONE;
}
4.3 模块职责划分
| 模块 | 职责 | 位置 |
|---|---|---|
| aclrt API | 对外 ACL 接口 | include/external/acl/acl_rt.h |
| aclrt_impl | ACL 接口实现 | src/acl/aclrt_impl/stream.cpp |
| rt API | 内部 RT 接口 | src/runtime/api/api_c_stream.cc |
| Stream | 流管理核心类,继承自 NoCopy | stream/stream.hpp |
| StreamFactory | 静态流创建工厂,版本分发 | stream/stream_factory.hpp |
| StreamSqCqManage | SQ/CQ ID 管理与复用 | stream/stream_sqcq_manage.hpp |
| TaskAllocator | 任务对象分配器 | task/task_allocator.hpp |
| TaskResManage | 任务资源管理 | task/task_res_manage/ |
| EngineStreamObserver | 流状态观察者 | stream/engine_stream_observer.hpp |
| DvppGrp | DVPP 组管理(非流类型) | stream/dvpp_grp.hpp |
4.4 核心数据结构
classDiagram
class Stream {
-int32_t streamId_
-uint32_t sqId_
-uint32_t cqId_
-uint32_t poolId_
-uint32_t logicalCqId_
-uint32_t sqTailPos_
-uint32_t sqHeadPos_
-uint64_t sqAddr_
-uint32_t sqDepth_
-uint8_t* sqeBuffer_
-Device* device_
-Context* context_
-uint32_t priority_
-uint32_t flags_
-Atomic~uint32_t~ taskPosHead_
-Atomic~uint32_t~ taskPosTail_
-Atomic~uint32_t~ pendingNum_
-DvppGrp* dvppGrp_
-AutoSplitSqContext* autoSplitCtx_
-bool isAutoSplitSq_
-bool isSlaveStream_
+Setup() rtError_t
+Synchronize() rtError_t
+AllocTask() TaskInfo*
+Id_() int32_t
+GetSqId() uint32_t
+GetCqId() uint32_t
+IsAutoSplitSq() bool
}
class StreamSqCqManage {
-mutex streamMapLock_
-map~uint32_t,uint32_t~ streamIdToSqIdMap_
-map~uint32_t,uint32_t~ streamIdToCqIdMap_
-map~uint32_t,uint32_t~ sqIdRefMap_
-map~uint32_t,Stream*~ streams_
-uint32_t normalCq_
+AllocStreamSqCq() rtError_t
+DeAllocStreamSqCq() rtError_t
+AllocLogicCq() rtError_t
+FreeLogicCq() rtError_t
}
class AutoSplitSqContext {
+Stream* masterStream
+vector~Stream*~ slaveStreams
+int32_t exposedStreamId
+uint32_t curStreamSqeCount
}
class rtLogicCqInfo_t {
+uint32_t logicCqId
+uint32_t remoteFlag
+bool isFastCq
+bool isDefaultCq
}
Stream *-- StreamSqCqManage
Stream *-- AutoSplitSqContext
StreamSqCqManage --> rtLogicCqInfo_t
5. 关键文件索引
| 模块 | 文件路径 | 核心内容 |
|---|---|---|
| ACL 头文件 | include/external/acl/acl_rt.h |
aclrt* 外部接口声明 |
| ACL 实现 | src/acl/aclrt_impl/stream.cpp |
aclrt 接口实现(466行) |
| RT 接口 | src/runtime/api/api_c_stream.cc |
rtStream* 内部接口 |
| Stream 核心类 | src/runtime/core/src/stream/stream.hpp |
Stream 类定义(1700+行) |
| Stream 实现 | src/runtime/core/src/stream/stream.cc |
Setup/Synchronize/AllocTask 实现(6000+行) |
| Stream 工厂 | src/runtime/core/src/stream/stream_factory.hpp |
静态 CreateStream 工厂方法 |
| SQ/CQ 管理 | src/runtime/core/src/stream/stream_sqcq_manage.hpp |
SQ/CQ ID 分配与复用 |
| 控制流 | src/runtime/core/src/stream/ctrl_stream.hpp |
CtrlStream 定义 |
| 协处理器流 | src/runtime/core/src/stream/coprocessor_stream.hpp |
CoprocessorStream |
| David流 | src/runtime/core/src/stream/stream_david.hpp |
DavidStream,Stars架构 |
| XPU流 | src/runtime/feature/xpu/stream_xpu.hpp |
XpuStream |
| DVPP组 | src/runtime/core/inc/stream/dvpp_grp.hpp |
DvppGrp(非Stream派生) |
| 流创建分发 | src/runtime/core/src/stream/v200/stream_creator_c.cc |
CreateStreamAndGet 实现 |
6. 性能优化策略
- 任务预分配:TaskAllocator 预分配任务对象,减少分配开销
- SQ/CQ 池化复用:多 Stream 共享 SQ/CQ 资源,CQ 复用机制减少资源开销
- 原子状态管理:taskPosHead/taskPosTail/pendingNum 使用原子操作,减少锁开销
- 异步回收:支持异步任务回收,不阻塞任务提交
- 自动切分 SQ:大任务量场景自动扩展 SQ 容量(主从模式)
- 快速同步模式:ACL_STREAM_FAST_SYNC 标志支持快速同步
- 引用计数:sqIdRefMap_ 管理 SQ ID 复用,避免频繁分配释放
- 版本分发工厂:根据硬件版本(v100/v200/v201)自动选择最优 Stream 类型
本模块文档基于源码分析,已验证所有 ACL 接口来自 include/external/acl/acl_rt.h,实现来自 src/acl/aclrt_impl/stream.cpp。