0. 写在开头#
作为 SO-101 基础实践的理论部分,本文将深入浅出地介绍 ACT(Action Chunking with Transformers)模型。
实践系统介绍#
主臂(Leader Arm):
- 由人控制;
- 记录关节姿态,称为“动作(Action)”,训练时模型需要预测该动作。
从臂(Follower Arm):
- 装有摄像头;
- 由程序控制:
- 数据采集时:使用 PID 跟随已记录的 Action;
- 测试时:使用 PID 跟随模型预测的 Action;
- 记录关节姿态与相机图像,统称为“观测(Observation)”,作为模型输入。
数据收集流程#
操作员通过主臂执行演示,系统以固定频率同步记录三类信息:
- 场景视觉——机载/外置摄像头的当前图像;
- 从臂状态——七自由度从臂的当前关节位置向量(含末端夹爪开合自由度);
- 主臂控制——同为七维的主臂关节目标(作为动作标签/ground truth)。
采集中,主从之间存在通信时延与 PID 跟踪误差,因此从臂在时刻 $t$ 的实际位置更接近于主臂在 $t-1$ 的目标。为提升预测稳定性,观测(Observation)选取“当前从臂关节 + 当前图像”,而非“上一帧主臂动作”,因为前者能直接反映已执行结果、首帧不缺失,且允许模型自适应学习主从系统性细微差异。
训练阶段以(图像、从臂关节)为输入、以主臂关节目标为动作标签;测试/推理阶段在相同观测输入上预测虚拟主臂动作,并下发该动作,由从臂通过 PID 跟随完成实际执行。
ACT 模型#
ACT 模型核心创新点#
Action Chunking Policy(动作分块策略)#
- ACT 的核心思想:缩短有效决策时域(reduce the effective horizon of a long trajectory)。
- Action Chunking Policy:在时刻 $t$ 观测一次后,一次性输出接下来 $K$ 个时间步的动作序列;系统每经过 $K$ 步才进行一次新的决策。
- 痛点:累计误差(Compounding Error)。在长时程、多步控制任务中,早期或连续的小偏差会相互叠加,最终放大为不可挽回的偏差,导致任务失败。
- 传统做法:
- 单步策略(Single-Step Policy)在每个时间步都重新观测并决策。一旦连续多次出现轻微预测偏差,误差会以高频方式累积。
- 为什么 Action Chunking Policy 能缓解累计误差?
- 降低决策频率:由“每步决策”降为“每 $K$ 步决策”,有效决策次数显著减少,误差触发与积累机会随之降低。
- 抑制连续小错:单次偏差通常可在下一次(块级)决策时被纠正;真正致命的是高频、连续的小错。减少决策频率可直接降低“连续错误链”出现概率。
- 提高轨迹稳定性:成块输出的动作在时间上更一致,抑制逐步预测抖动,降低轨迹漂移风险。
- 传统做法:
- 痛点:非马尔可夫环境(Non-Markovian)。在马尔可夫环境中,最优策略可仅依赖当前状态;但现实任务常含隐含时间依赖。以炒菜为例:倒油 → 等待加热一段时间(在达到目标油温前后外观几乎无明显变化)→ 投入食材。
- 传统做法:
- 单步策略(Single-Step Policy):由于关键信息无法从单帧观测直接辨识,难以判断“何时执行下一步”。
- 历史条件化策略(History-Conditioned Policy):通过引入过往观测与动作来弥补非马尔可夫信息,但其条件域从“当前观测”扩展到“较长历史”,容易产生因果误识(causal misidentification)。
- 因果误识(Causal Misidentification):指将与行动高度共现、却非因果的线索当作真正原因,从而学习到错误的决策规则。比如:刹车时刹车灯会亮,但“灯亮”并非“刹车原因”,若据此做决策即为因果误识。
- 为什么 Action Chunking Policy 能提高非马尔可夫环境中的表现?
- 将“倒油 → 等待加热 → 投入食材”封装为一个 action chunk,使模型能够显式/隐式地建模其中的等待时长与触发条件。
- 用面向未来的动作序列直接对时间依赖进行结构化约束,更容易学到真正的时序因果(如“等待时长”)而非表面共现。
- 传统做法:
Temporal Ensemble(时间集成)#

- 为避免 Action Chunking “每 $K$ 步一次决策”带来的卡顿执行,使用 Temporal Ensemble 在每个时间步都预测一个长度为 $K$ 的动作块。
- 每次执行到第 $t$ 步时,将所有历史时刻对“第 $t$ 步”的预测进行加权融合,得到最终动作,相当于一种滑动加权平均的平滑器。
- 权重通常对更早产生的预测赋予更大权重(更稳定、噪声更小),从而显著提升轨迹的连续性与顺滑度。
- 代价是需要额外的前向推理次数(计算量上升),但无需改动训练目标或系统结构。
ACT 伪代码与算法流程#
请先学习 VAE 和 CVAE 模型,再继续阅读。
记号约定:令 $a_{t:t+K}\!\triangleq\!(a_t,\dots,a_{t+K-1})$ 表示长度为 $K$ 的动作序列;$o_t$ 为时刻 $t$ 的观测,$\bar{o}_t$ 为去除图像模态后的观测;$z$ 为潜变量。
一、ACT 训练#
给定:演示数据集 $\mathcal{D}$、块大小(预测跨度)$K$、正则权重 $\beta$。
- 初始化编码器(后验)$q_\phi\!\left(z\,\middle|\,a_{t:t+K},\bar{o}_t\right)$。
- 初始化解码器(策略)$\pi_\theta\!\left(\hat{a}_{t:t+K}\,\middle|\,o_t,z\right)$。
- 对迭代轮次 $n=1,2,\ldots$ 重复:
- 从 $\mathcal{D}$ 采样样本对 $(o_t, a_{t:t+K})$;
- 从 $q_\phi\!\left(z\,\middle|\,a_{t:t+K},\bar{o}_t\right)$ 采样 $z$;
- 用 $\pi_\theta\!\left(\hat{a}_{t:t+K}\,\middle|\,o_t,z\right)$ 预测 $\hat{a}_{t:t+K}$;
- 计算重构损失 $$ \mathcal{L}_{\text{reconst}}=\mathrm{MSE}\!\left(\hat{a}_{t:t+K},\,a_{t:t+K}\right) $$
- 计算正则(KL)项 $$ \mathcal{L}_{\text{reg}}=D_{\mathrm{KL}}\!\bigl(q_\phi(z\,|\,a_{t:t+K},\bar{o}_t)\,\|\,\mathcal{N}(0,I)\bigr) $$
- 用 Adam 优化器更新 $\theta,\phi$,总损失 $$ \mathcal{L}=\mathcal{L}_{\text{reconst}}+\beta\,\mathcal{L}_{\text{reg}}\,. $$
说明:重构项对应最大似然思想——在给定观测 $o_t$ 与潜变量 $z$ 下,使真实动作序列 $a_{t:t+K}$ 的生成概率最大。工程上常用距离度量近似该原则,常见为 $MSE$ 或 $L_1$;经验上 $L_1$ 往往更稳健,许多工作报告 $L_1$ 优于 $MSE$。
二、ACT 推理#
给定:训练好的策略 $\pi_\theta$,轨迹长度(episode 长度)$T$,指数衰减系数 $m$。
- 初始化先入先出(FIFO)缓冲区 $\mathcal{B}[0{:}T]$,其中 $\mathcal{B}[t]$ 存储对时刻 $t$ 的多次候选动作预测(来自不同起点的重叠块)。
- 对 $t=1,2,\ldots,T$:
- 令 $z=\mathbf{0}$(使用先验均值进行推理),用 $\pi_\theta\!\left(\hat{a}_{t:t+K}\,\middle|\,o_t,z\right)$ 预测长度为 $K$ 的动作序列;
- 将 $\hat{a}_{t:t+K}$ 的各元素分别追加进对应槽位的缓冲区:$\mathcal{B}[t{:}t+K]$;
- 设 $\mathcal{B}[t]=\{A_t[i]\}_{i=0}^{N_t-1}$ 为当前步 $t$ 聚合到的候选动作集合($N_t$ 为该槽累计的候选数);
- 用指数加权平均得到最终执行动作
$$
a^{\text{exec}}_t=\frac{\sum_{i=0}^{N_t-1}w_i\,A_t[i]}{\sum_{i=0}^{N_t-1}w_i}\,,\quad
w_i=\exp(-m\cdot i)\,.
$$
注:$i$ 按加入缓冲区的先后次序编号,$i=0$ 表示最早预测,$m>0$ 控制对旧候选的衰减强度。
三、损失函数#
- 重构损失:$\displaystyle \mathcal{L}_{\text{reconst}}=\mathrm{MSE}\!\left(\hat{a}_{t:t+K},\,a_{t:t+K}\right)$
- 正则损失:$\displaystyle \mathcal{L}_{\text{reg}}=D_{\mathrm{KL}}\!\bigl(q_\phi(z\,|\,a_{t:t+K},\bar{o}_t)\,\|\,\mathcal{N}(0,I)\bigr)$
- 总损失:$\displaystyle \mathcal{L}=\mathcal{L}_{\text{reconst}}+\beta\,\mathcal{L}_{\text{reg}}$。
ACT 模型架构图#
请先学习 VAE 和 CVAE 模型,再继续阅读。
ACT 训练#

总体思想与 CVAE 相似,但在 ACT 中,输入与输出均为动作序列(action sequence):将 CVAE 中的“图像”角色替换为“动作序列”。同时,我们在 encoder 与 decoder 中都使用 condition(条件),即 observation(观测),包含摄像头图像与从臂的关节信息。为加快训练,encoder 的条件中不使用图像,仅使用从臂关节信息。
为什么条件要包含从臂的关节信息?
- 让模型专注于增量(在已知关节状态基础上的变化),而非直接从图像回归绝对位姿。
- 让模型感知主臂与从臂之间细微而系统性的差异。
Step 1:数据与监督信号#
采集数据集。每个 batch(批次)的 input(输入)为 observation(观测,包含摄像头图像与从臂关节信息),ground-truth label(真实值标签)为 action sequence(动作序列)。该动作序列的维度等于“每步 7 维主臂动作”与 action chunk 大小 $K$ 的组合($7\times K$ 维)。
Step 2:潜变量后验(encoder)与采样#
- CLS token(分类标记):一个可学习的 512 维向量(可视作模型参数)。
- embedded joints(关节信息嵌入):从臂 7 个关节值,经线性层映射为 512 维向量。
- embedded action sequence(动作序列嵌入):将长度为 $K$ 的动作序列逐步经线性层映射为 512 维向量,并加上 Sinusoidal Positional Encoding(正弦位置编码),得到 $K$ 个 512 维 token。
因此,送入 Transformer Encoder 的共为 $K+2$ 个 512 维 token(含 1 个 CLS、1 个 joints、以及 $K$ 个 action 序列 token)。Encoder 通过 self-attention(自注意力)融合全局信息,我们取 CLS 对应的 512 维输出,经线性层得到 32 维高斯的均值与方差,随后用 reparameterization trick(重参数化技巧)采样潜变量 $z\in\mathbb{R}^{32}$。
Step 3:条件解码与动作预测(decoder)#
- 图像分支:4 个摄像头、每帧分辨率 480×640、RGB 三通道,经 ResNet-18 得到特征张量(例如 15×20×$C$,通常 $C\approx512$),flatten 为 $300\times C$,再经线性层映射到 512 维,并加上 Sinusoidal Positional Encoding。四路拼接共得到 $1200$ 个 512 维向量(cam1 ~ cam4 合计)。
- joints 分支:与 encoder 侧相同处理,经线性层得到 512 维向量。
- latent 分支:将潜变量 $z$ 经线性层映射为 512 维向量。
最终,Transformer Encoder 的输入 token 数为 $1200+2=1202$ 个 512 维向量。其输出作为 key/value 供 Transformer Decoder 做 cross-attention(交叉注意力)。Transformer Decoder 接收 $K$ 个 query 并输出长度为 $K$ 的 predicted action sequence(预测动作序列)。
ACT 推理#

推理流程与训练阶段的 Step 3 类似,但此处直接将潜变量设为零向量 $\mathbf{0}$:在先验假设 $z\sim\mathcal{N}(0,I)$ 下,$\mathbf{0}$ 是对称中心和众数(实作中常见的推理近似),据此可直接生成 predicted action sequence(预测动作序列)作为输出。
ACT 消融实验#

- Action Chunk(动作分块)的有效性 将“单步预测”扩展为“一次预测 $K$ 步”能显著提升成功率。随着 chunk size 从 1 增至接近整段(≈400 步),性能总体上升;当 chunk 过长时略有回落,可能由于一次性输出过长、与环境交互与反馈不足。
- Temporal Ensemble(时间集成) 对相邻输出片段在重叠区加权平均,可带来稳定的小幅增益,使动作更连贯。
- CVAE 目标($KL$ 对齐)的作用
对潜变量 $z$ 施加 $KL$ 正则,使其与标准正态对齐:
- 脚本演示数据:去掉 $KL$ 影响较小,因数据更规整、噪声少。
- 人类演示数据:去掉 $KL$ 后性能显著下降,说明 $KL$ 正则对建模人类噪声/多样性至关重要。
- 远程操作的控制频率与数据采集效率 提高示教/遥操作频率可显著缩短单次演示时长:例如从 5 Hz 提升至 50 Hz,采集同一任务由约 30–40 s 降至 ≈20 s。作者建议高控制频率对高效采集至关重要。
实践#
标准 ACT 通常按单任务训练,跨任务泛化有限,且不内置语言接口;若需多任务或语言条件,可在更高层做扩展。
ACT 相对其他模仿学习方法,具有:
- 减少复合误差:通过预测动作块降低误差累积;
- 提高成功率:在精细操作任务上表现优异;
- 端到端训练:无需手工设计特征;
- 多模态融合:有效融合视觉与状态信息。
LeRobot ACT 代码阅读#
LeRobot 仓库经常性变动,请务必参考官方教程和仓库代码;本文撰写时版本为d57d1aa1(2025-10-31)。
lerobot/src/lerobot/policies/act/
├── configuration_act.py
├── modeling_act.py
├── processor_act.py
└── README.md -> ../../../../docs/source/policy_act_README.md
LeRobot 的 ACT 由 configuration_act.py、modeling_act.py、processor_act.py 三个文件实现。
configuration_act.py:ACT 配置#
#!/usr/bin/env python
# Copyright 2024 Tony Z. Zhao and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# 以上是版权与开源许可声明,表明本代码遵循 Apache 2.0 许可证
from dataclasses import (
dataclass,
field,
) # dataclass 用于简化配置类的定义,field 可定义默认值/工厂
from lerobot.configs.policies import (
PreTrainedConfig,
) # 项目内基类:预训练策略的通用配置抽象
from lerobot.configs.types import (
NormalizationMode,
) # 枚举:输入/输出的归一化模式(如 MEAN_STD、MIN_MAX)
from lerobot.optim.optimizers import AdamWConfig # 优化器配置对象(AdamW 的超参集合)
# 使用注册器将该配置类注册为 "act" 类型,便于通过字符串查找/构造对应配置。
@PreTrainedConfig.register_subclass("act")
@dataclass # dataclass 会自动生成 __init__/__repr__/__eq__ 等,从字段定义中推导构造参数
class ACTConfig(PreTrainedConfig):
"""Configuration class for the Action Chunking Transformers policy.
Defaults are configured for training on bimanual Aloha tasks like "insertion" or "transfer".
The parameters you will most likely need to change are the ones which depend on the environment / sensors.
Those are: `input_shapes` and 'output_shapes`.
Notes on the inputs and outputs:
- Either:
- At least one key starting with "observation.image is required as an input.
AND/OR
- The key "observation.environment_state" is required as input.
- If there are multiple keys beginning with "observation.images." they are treated as multiple camera
views. Right now we only support all images having the same shape.
- May optionally work without an "observation.state" key for the proprioceptive robot state.
- "action" is required as an output key.
Args:
n_obs_steps: Number of environment steps worth of observations to pass to the policy (takes the
current step and additional steps going back).
chunk_size: The size of the action prediction "chunks" in units of environment steps.
n_action_steps: The number of action steps to run in the environment for one invocation of the policy.
This should be no greater than the chunk size. For example, if the chunk size size 100, you may
set this to 50. This would mean that the model predicts 100 steps worth of actions, runs 50 in the
environment, and throws the other 50 out.
input_shapes: A dictionary defining the shapes of the input data for the policy. The key represents
the input data name, and the value is a list indicating the dimensions of the corresponding data.
For example, "observation.image" refers to an input from a camera with dimensions [3, 96, 96],
indicating it has three color channels and 96x96 resolution. Importantly, `input_shapes` doesn't
include batch dimension or temporal dimension.
output_shapes: A dictionary defining the shapes of the output data for the policy. The key represents
the output data name, and the value is a list indicating the dimensions of the corresponding data.
For example, "action" refers to an output shape of [14], indicating 14-dimensional actions.
Importantly, `output_shapes` doesn't include batch dimension or temporal dimension.
input_normalization_modes: A dictionary with key representing the modality (e.g. "observation.state"),
and the value specifies the normalization mode to apply. The two available modes are "mean_std"
which subtracts the mean and divides by the standard deviation and "min_max" which rescale in a
[-1, 1] range.
output_normalization_modes: Similar dictionary as `normalize_input_modes`, but to unnormalize to the
original scale. Note that this is also used for normalizing the training targets.
vision_backbone: Name of the torchvision resnet backbone to use for encoding images.
pretrained_backbone_weights: Pretrained weights from torchvision to initialize the backbone.
`None` means no pretrained weights.
replace_final_stride_with_dilation: Whether to replace the ResNet's final 2x2 stride with a dilated
convolution.
pre_norm: Whether to use "pre-norm" in the transformer blocks.
dim_model: The transformer blocks' main hidden dimension.
n_heads: The number of heads to use in the transformer blocks' multi-head attention.
dim_feedforward: The dimension to expand the transformer's hidden dimension to in the feed-forward
layers.
feedforward_activation: The activation to use in the transformer block's feed-forward layers.
n_encoder_layers: The number of transformer layers to use for the transformer encoder.
n_decoder_layers: The number of transformer layers to use for the transformer decoder.
use_vae: Whether to use a variational objective during training. This introduces another transformer
which is used as the VAE's encoder (not to be confused with the transformer encoder - see
documentation in the policy class).
latent_dim: The VAE's latent dimension.
n_vae_encoder_layers: The number of transformer layers to use for the VAE's encoder.
temporal_ensemble_coeff: Coefficient for the exponential weighting scheme to apply for temporal
ensembling. Defaults to None which means temporal ensembling is not used. `n_action_steps` must be
1 when using this feature, as inference needs to happen at every step to form an ensemble. For
more information on how ensembling works, please see `ACTTemporalEnsembler`.
dropout: Dropout to use in the transformer layers (see code for details).
kl_weight: The weight to use for the KL-divergence component of the loss if the variational objective
is enabled. Loss is then calculated as: `reconstruction_loss + kl_weight * kld_loss`.
"""
# 以上三引号是类的文档字符串(docstring),用于解释该配置类的用途和各参数含义。
# 文档内提到的 `input_shapes` / `output_shapes` 等键,属于父类/策略使用的约定。
# Input / output structure.
# 观测/动作的时序结构配置
n_obs_steps: int = 1 # 传入策略的观测步数(时间维度),目前实现只支持 1(当前步)
chunk_size: int = 100 # 一次预测的“动作块”的长度(以环境步数计)
n_action_steps: int = (
100 # 每次调用策略实际执行到环境中的动作步数,不能超过 chunk_size
)
# 归一化模式的默认映射:按模态选择 NormalizationMode
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.MEAN_STD, # 图像模态:减均值/除方差
"STATE": NormalizationMode.MEAN_STD, # 状态模态:减均值/除方差
"ACTION": NormalizationMode.MEAN_STD, # 动作模态:减均值/除方差(训练目标也会用到)
}
)
# Architecture.
# Vision backbone.
vision_backbone: str = (
"resnet18" # 图像编码使用的 ResNet 主干名称(需为 torchvision 的 resnet 变体)
)
pretrained_backbone_weights: str | None = (
"ResNet18_Weights.IMAGENET1K_V1" # 主干的预训练权重标识;None 表示不加载
)
replace_final_stride_with_dilation: int = False # 是否用空洞卷积替换 ResNet 最后一个 2x2 stride(类型标注为 int,但实际布尔使用)
# Transformer layers.
pre_norm: bool = False # Transformer 是否使用 pre-norm 结构(LayerNorm 在子层之前)
dim_model: int = 512 # Transformer 主通道隐藏维度 d_model
n_heads: int = 8 # 多头注意力的头数
dim_feedforward: int = 3200 # 前馈网络的扩展维度(通常为 d_model 的若干倍)
feedforward_activation: str = "relu" # 前馈网络的激活函数类型
n_encoder_layers: int = 4 # Transformer 编码器层数
# Note: Although the original ACT implementation has 7 for `n_decoder_layers`, there is a bug in the code
# that means only the first layer is used. Here we match the original implementation by setting this to 1.
# See this issue https://github.com/tonyzhaozh/act/issues/25#issue-2258740521.
n_decoder_layers: int = (
1 # Transformer 解码器层数(按原实现的实际效果设置为 1,以对齐行为)
)
# VAE.
use_vae: bool = (
True # 训练时是否使用 VAE 目标(引入额外 Transformer 作为 VAE 编码器)
)
latent_dim: int = 32 # VAE 潜变量维度
n_vae_encoder_layers: int = 4 # VAE 编码器的 Transformer 层数
# Inference.
# Note: the value used in ACT when temporal ensembling is enabled is 0.01.
temporal_ensemble_coeff: float | None = (
None # 时间集成(temporal ensembling)的指数加权系数;None 表示关闭
)
# Training and loss computation.
dropout: float = 0.1 # Transformer 层内的 dropout 比例(防止过拟合)
kl_weight: float = (
10.0 # 使用 VAE 时,KL 散度项的损失权重(总损失 = 重构损失 + kl_weight * KL)
)
# Training preset
# 训练预设:优化器相关超参数
optimizer_lr: float = 1e-5 # 主体学习率
optimizer_weight_decay: float = 1e-4 # 权重衰减(L2 正则)
optimizer_lr_backbone: float = (
1e-5 # 视觉主干的学习率(可能与主体不同,用于微调/冻结策略)
)
def __post_init__(self):
# dataclass 的钩子:在 __init__ 之后自动调用。
# 这里首先调用父类的 __post_init__ 来完成通用初始化(如解析输入/输出特征等)。
super().__post_init__()
"""Input validation (not exhaustive)."""
# ——以下是对配置进行基本校验的逻辑(非穷尽)——
# 校验视觉主干名称:必须是 ResNet 家族,否则抛出 ValueError
if not self.vision_backbone.startswith("resnet"):
raise ValueError(
f"`vision_backbone` must be one of the ResNet variants. Got {self.vision_backbone}."
)
# 若启用时间集成(temporal_ensemble_coeff 非 None),则 n_action_steps 必须为 1
# 原因:时间集成需要每一步都查询策略以形成集成
if self.temporal_ensemble_coeff is not None and self.n_action_steps > 1:
raise NotImplementedError(
"`n_action_steps` must be 1 when using temporal ensembling. This is "
"because the policy needs to be queried every step to compute the ensembled action."
)
# n_action_steps 不能超过 chunk_size(一次调用预测的最大可用步数上限)
if self.n_action_steps > self.chunk_size:
raise ValueError(
f"The chunk size is the upper bound for the number of action steps per model invocation. Got "
f"{self.n_action_steps} for `n_action_steps` and {self.chunk_size} for `chunk_size`."
)
# 目前实现不支持多观测步(时间窗口 > 1)
if self.n_obs_steps != 1:
raise ValueError(
f"Multiple observation steps not handled yet. Got `nobs_steps={self.n_obs_steps}`"
)
def get_optimizer_preset(self) -> AdamWConfig:
# 返回一个 AdamW 优化器的配置预设,供上层训练器构造实际优化器实例
return AdamWConfig(
lr=self.optimizer_lr,
weight_decay=self.optimizer_weight_decay,
)
def get_scheduler_preset(self) -> None:
# 返回 None 表示不使用学习率调度器(或由外部训练脚本自行指定)
return None
def validate_features(self) -> None:
# 检查特征输入是否满足最小要求:
# 必须至少提供一种图像特征(来自摄像头)或环境状态特征。
# 这些属性(image_features、env_state_feature)通常在父类 __post_init__ 中解析并赋值。
if not self.image_features and not self.env_state_feature:
raise ValueError(
"You must provide at least one image or the environment state among the inputs."
)
@property
def observation_delta_indices(self) -> None:
# 观测的“增量索引”定义(若用于计算时间差分等)。这里返回 None,表示不定义/不使用。
return None
@property
def action_delta_indices(self) -> list:
# 动作的“增量索引”定义:这里返回 [0, 1, ..., chunk_size-1]
# 常见用途:指示哪些时间步上的动作需要被预测/计算,或用于构建目标序列的索引。
return list(range(self.chunk_size))
@property
def reward_delta_indices(self) -> None:
# 奖励的“增量索引”定义:此处不使用奖励差分(返回 None)
return None
processor_act.py:ACT 流水线#
#!/usr/bin/env python
# Copyright 2024 Tony Z. Zhao and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# 以上是版权与开源许可声明,表明本代码遵循 Apache 2.0 许可证
# 小结:
# - 本文件的核心函数 make_act_pre_post_processors 会根据 ACTConfig 和可选的数据集统计,构造两个流水线对象:
# 1) 前处理流水线(PRE):重命名 -> 加 batch 维 -> 移到设备 -> 归一化
# 2) 后处理流水线(POST):反归一化 -> 移回 CPU
# - 这样做的好处:将数据工程与模型推理解耦,保证输入输出的形状、设备与数值尺度都符合模型与下游使用方的预期
# - features 与 norm_map 的一致性非常重要:保证前处理与后处理的变换可逆且匹配
# limitations under the License.
from typing import (
Any,
) # 从 typing 导入 Any,表示“任意类型”,常用于类型提示中表示通用容器
import torch # 导入 PyTorch,用于张量(Tensor)及设备(device)管理
from lerobot.policies.act.configuration_act import ACTConfig # 导入 ACT 策略的配置类
# 从 lerobot.processor 导入一系列“处理步骤(ProcessorStep)”与管道(Pipeline)相关类
from lerobot.processor import (
AddBatchDimensionProcessorStep, # 处理步骤:为输入添加 batch 维度(例如从 [C,H,W] 变为 [B,C,H,W])
DeviceProcessorStep, # 处理步骤:将数据移动到指定设备(如 "cuda:0" 或 "cpu")
NormalizerProcessorStep, # 处理步骤:对特征做归一化(根据统计量,如均值/方差)
PolicyAction, # 策略输出动作的数据结构(类型别名/封装)
PolicyProcessorPipeline, # 通用的策略处理“流水线”定义,包含一系列有序步骤
RenameObservationsProcessorStep, # 处理步骤:重命名观测字典中的键(key),以适配模型预期的输入名
UnnormalizerProcessorStep, # 处理步骤:把归一化后的输出反归一化回原始尺度
)
from lerobot.processor.converters import (
policy_action_to_transition, # 转换函数:将策略动作结构转为“transition”结构(过渡/样本格式)
transition_to_policy_action, # 转换函数:与上相反,将 transition 转回策略动作结构
)
from lerobot.utils.constants import (
POLICY_POSTPROCESSOR_DEFAULT_NAME, # 常量:后处理流水线的默认命名
POLICY_PREPROCESSOR_DEFAULT_NAME, # 常量:前处理流水线的默认命名
)
def make_act_pre_post_processors(
config: ACTConfig, # ACT 策略配置对象,内含设备、特征配置、归一化映射关系等信息
dataset_stats: dict[str, dict[str, torch.Tensor]]
| None = None, # 数据集统计信息(如 mean/std),按特征名组织;可为 None
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
"""Creates the pre- and post-processing pipelines for the ACT policy.
The pre-processing pipeline handles normalization, batching, and device placement for the model inputs.
The post-processing pipeline handles unnormalization and moves the model outputs back to the CPU.
Args:
config (ACTConfig): The ACT policy configuration object.
dataset_stats (dict[str, dict[str, torch.Tensor]] | None): A dictionary containing dataset
statistics (e.g., mean and std) used for normalization. Defaults to None.
Returns:
tuple[PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], PolicyProcessorPipeline[PolicyAction, PolicyAction]]: A tuple containing the
pre-processor pipeline and the post-processor pipeline.
"""
# 上面的英文文档说明:
# - 本函数构造并返回“前处理(pre)”与“后处理(post)”两个流水线,用于 ACT 策略的输入和输出数据处理。
# - 前处理:将原始观测做重命名、补齐 batch 维度、移动到指定设备、并按数据集统计进行归一化,使之适配模型输入。
# - 后处理:对模型输出做反归一化(还原到原尺度),并移动回 CPU(便于后续使用或与非 GPU 代码交互)。
# - dataset_stats:通常包含每个特征的 mean/std,用于 Normalizer/Unnormalizer;为 None 时可能使用默认策略或跳过部分操作。
# - 返回值是一个元组:(前处理流水线, 后处理流水线)
# 定义前处理流水线中包含的“步骤”列表(按顺序执行)
input_steps = [
RenameObservationsProcessorStep(
rename_map={}
), # 重命名观测键的步骤:这里给了空映射,表示当前不需要改名
AddBatchDimensionProcessorStep(), # 添加 batch 维:当输入是单样本时,变为批大小为 1 的张量,便于模型统一处理
DeviceProcessorStep(
device=config.device
), # 设备迁移:将(可能包含张量的)输入移动到 config.device(如 "cuda" 或 "cpu")
NormalizerProcessorStep( # 归一化步骤:对输入/输出特征集合按 norm_map 和 stats 做标准化/归一化
features={
**config.input_features,
**config.output_features,
}, # 指定需要归一化的特征集合:将输入与输出特征合并
norm_map=config.normalization_mapping, # 指定特征名到“归一化配置/方式”的映射(例如使用哪组统计量)
stats=dataset_stats, # 数据集统计(如 mean/std),用于归一化参数
device=config.device, # 将统计量和运算放在相同设备上,避免跨设备拷贝/错误
),
]
# 以上前处理的意图:
# 1) RenameObservationsProcessorStep:有些数据集的键名与模型期望不一致,通过重命名统一接口(此处为空映射,意味着保持原样)
# 2) AddBatchDimensionProcessorStep:即便是单条数据也要添加 batch 维度,满足大多数深度学习模型形状要求
# 3) DeviceProcessorStep:统一把数据移动到 config 指定的设备(GPU/CPU),确保后续张量运算在同一设备上
# 4) NormalizerProcessorStep:将输入(甚至包含模型要预测的目标特征)进行标准化,使训练/推理更稳定
# 定义后处理流水线步骤:将模型输出从标准化空间映射回原空间,并迁移到 CPU
output_steps = [
UnnormalizerProcessorStep( # 反归一化:把模型输出(先前按统计量标准化过)还原到原始数值范围
features=config.output_features, # 仅对输出相关的特征进行反归一化(不会动输入特征)
norm_map=config.normalization_mapping, # 使用与前处理一致的归一化映射表,保证前后处理对齐
stats=dataset_stats, # 使用相同的数据集统计参数进行反变换
),
DeviceProcessorStep(
device="cpu"
), # 将最终结果统一移回 CPU:便于日志记录、与非 GPU 组件交互或序列化
]
# 返回前/后处理两个 PolicyProcessorPipeline 实例:
# - 对于前处理流水线:输入与输出都是字典(键到任意类型),因为输入通常是多模态观测的字典结构
# - 对于后处理流水线:输入与输出是 PolicyAction(策略动作)结构/对象
return (
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=input_steps, # 指定流水线包含的步骤序列
name=POLICY_PREPROCESSOR_DEFAULT_NAME, # 使用默认的“前处理”名称,便于日志或调试
),
PolicyProcessorPipeline[PolicyAction, PolicyAction](
steps=output_steps, # 指定后处理步骤序列
name=POLICY_POSTPROCESSOR_DEFAULT_NAME, # 使用默认的“后处理”名称
to_transition=policy_action_to_transition, # 指定如何把 PolicyAction 转换为 transition(内部可能用于统一接口)
to_output=transition_to_policy_action, # 指定如何把 transition 转回 PolicyAction(与上相反的方向)
),
)
modeling_act.py:ACT 模型#
#!/usr/bin/env python
# Copyright 2024 Tony Z. Zhao and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 以上是版权与开源许可声明,表明本代码遵循 Apache 2.0 许可证
"""Action Chunking Transformer Policy
As per Learning Fine-Grained Bimanual Manipulation with Low-Cost Hardware (https://huggingface.co/papers/2304.13705).
The majority of changes here involve removing unused code, unifying naming, and adding helpful comments.
"""
# 说明:
# 本文件实现 ACT(Action Chunking Transformer)策略与其底层神经网络。ACT 旨在在机器人强化学习/模仿学习中,
# 一次性预测一段连续的动作序列(“动作块”/chunk),以减少每步都要前向一次模型的开销,并改善时序一致性。
# 代码支持两种模式:
# 1) 纯 Transformer 预测动作序列;
# 2) 可选 VAE(变分自编码器)训练目标:使用一个 VAE encoder 产生隐变量,再用 Transformer(此时相当于 VAE 的解码器)预测动作序列。
# 另外支持可选的“时间集成(Temporal Ensembling)”在推理时对多步动作进行指数加权平均,从而平滑并提升鲁棒性。
# 模块结构总览:
# - ACTPolicy:策略封装(选择动作、训练前向、优化参数分组、时间集成/动作队列)
# - ACTTemporalEnsembler:在线指数加权时间集成器
# - ACT:核心模型(视觉骨干网络 + Transformer 编码器/解码器 + 各种投影/位置编码 + 动作回归头)
# - ACTEncoder / ACTDecoder / 对应 Layer:标准 Transformer 层(支持 pre-norm/post-norm)
# - 位置编码(1D/2D 正弦位置编码)
# - 实用函数:get_activation_fn、create_sinusoidal_pos_embedding
#
# 术语与张量形状约定:
# - B: batch size
# - S: 序列长度(这里多指 chunk_size,即要预测的动作步数)
# - D: 隐藏维度(dim_model)
# - L: 潜变量维度(latent_dim)
# - action_dim: 动作维度
# - 图片特征:从视觉骨干网络输出的 feature map (B, C, H, W),随后会被重排为序列。
# - Transformer 采用 PyTorch 标准接口,序列维度在最前(Seq, Batch, Channel)。
#
# 限制:仅添加注释,不修改任何原始代码逻辑或接口。
import math
from collections import deque
from collections.abc import Callable
from itertools import chain
import einops # 张量重排工具库,便于通道/维度变换
import numpy as np
import torch
import torch.nn.functional as F # noqa: N812
import torchvision
from lerobot.policies.act.configuration_act import (
ACTConfig,
) # 配置对象,集中管理所有超参数
from lerobot.policies.pretrained import PreTrainedPolicy # 通用策略基类
from lerobot.utils.constants import (
ACTION,
OBS_ENV_STATE,
OBS_IMAGES,
OBS_STATE,
) # 约定的数据字典键名
from torch import Tensor, nn
from torchvision.models._utils import (
IntermediateLayerGetter,
) # 从骨干网络中提取中间层输出
from torchvision.ops.misc import (
FrozenBatchNorm2d,
) # 冻结的 BN,常用于迁移学习避免数值漂移
class ACTPolicy(PreTrainedPolicy):
"""
Action Chunking Transformer Policy as per Learning Fine-Grained Bimanual Manipulation with Low-Cost
Hardware (paper: https://huggingface.co/papers/2304.13705, code: https://github.com/tonyzhaozh/act)
"""
config_class = ACTConfig
name = "act"
def __init__(
self,
config: ACTConfig,
):
"""
Args:
config: Policy configuration class instance or None, in which case the default instantiation of
the configuration class is used.
"""
super().__init__(config)
config.validate_features() # 校验特征配置是否一致/可用(例如是否提供所需的键)
self.config = config
self.model = ACT(config) # 核心模型:视觉 + Transformer
if config.temporal_ensemble_coeff is not None:
# 如果配置了时间集成,就用指数加权的在线方法平滑动作序列
self.temporal_ensembler = ACTTemporalEnsembler(
config.temporal_ensemble_coeff, config.chunk_size
)
self.reset() # 初始化动作队列或时间集成器状态
def get_optim_params(self) -> dict:
# TODO(aliberts, rcadene): As of now, lr_backbone == lr
# Should we remove this and just `return self.parameters()`?
# 将参数分组以设置不同学习率(例如对视觉骨干设置较小 LR)
return [
{
"params": [
p
for n, p in self.named_parameters()
if not n.startswith("model.backbone") and p.requires_grad
]
},
{
"params": [
p
for n, p in self.named_parameters()
if n.startswith("model.backbone") and p.requires_grad
],
"lr": self.config.optimizer_lr_backbone,
},
]
def reset(self):
"""This should be called whenever the environment is reset."""
# 环境重置时需清空时间集成器或动作队列,以免使用旧状态
if self.config.temporal_ensemble_coeff is not None:
self.temporal_ensembler.reset()
else:
# 无时间集成时,使用一个定长队列缓存已预测的动作块,逐步弹出
self._action_queue = deque([], maxlen=self.config.n_action_steps)
@torch.no_grad()
def select_action(self, batch: dict[str, Tensor]) -> Tensor:
"""Select a single action given environment observations.
This method wraps `select_actions` in order to return one action at a time for execution in the
environment. It works by managing the actions in a queue and only calling `select_actions` when the
queue is empty.
"""
# 选择动作时确保 eval 模式(禁用 Dropout/BN 统计更新)
self.eval() # keeping the policy in eval mode as it could be set to train mode while queue is consumed
if self.config.temporal_ensemble_coeff is not None:
# 使用时间集成:每次对最新的动作块进行在线融合,并返回当前应执行的单步动作
actions = self.predict_action_chunk(batch)
action = self.temporal_ensembler.update(actions)
return action
# 无时间集成:维护一个动作队列(n_action_steps 个),当队列为空时,再预测新的动作块填充
if len(self._action_queue) == 0:
actions = self.predict_action_chunk(batch)[:, : self.config.n_action_steps]
# 模型输出形状为 (B, n_action_steps, action_dim),而队列按时间步推进,等价 (n_action_steps, B, *)
# 因此需要转置再按时间步扩展到队列
self._action_queue.extend(actions.transpose(0, 1))
return self._action_queue.popleft()
@torch.no_grad()
def predict_action_chunk(self, batch: dict[str, Tensor]) -> Tensor:
"""Predict a chunk of actions given environment observations."""
self.eval()
if self.config.image_features:
# 若配置了图像特征,将用户提供的多路图像拼到统一键 OBS_IMAGES 下(浅拷贝避免改动原 batch)
batch = dict(
batch
) # shallow copy so that adding a key doesn't modify the original
batch[OBS_IMAGES] = [batch[key] for key in self.config.image_features]
actions = self.model(batch)[0] # 仅取预测的动作(忽略 VAE 参数返回)
return actions
def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict]:
"""Run the batch through the model and compute the loss for training or validation."""
# 训练/验证前向:输出预测动作与损失
if self.config.image_features:
batch = dict(
batch
) # shallow copy so that adding a key doesn't modify the original
batch[OBS_IMAGES] = [batch[key] for key in self.config.image_features]
actions_hat, (mu_hat, log_sigma_x2_hat) = self.model(batch)
# L1 行为克隆损失(对 padding 掩码为 True 的时间步不计入损失)
l1_loss = (
F.l1_loss(batch[ACTION], actions_hat, reduction="none")
* ~batch["action_is_pad"].unsqueeze(-1)
).mean()
loss_dict = {"l1_loss": l1_loss.item()}
if self.config.use_vae:
# 当使用 VAE 目标时,额外计算 KL 散度(对潜变量逐维求和,再对 batch 求均值)
# log_sigma_x2 是 2*log(sigma),保持与原实现一致
mean_kld = (
(
-0.5
* (1 + log_sigma_x2_hat - mu_hat.pow(2) - (log_sigma_x2_hat).exp())
)
.sum(-1)
.mean()
)
loss_dict["kld_loss"] = mean_kld.item()
loss = l1_loss + mean_kld * self.config.kl_weight
else:
loss = l1_loss
return loss, loss_dict
class ACTTemporalEnsembler:
def __init__(self, temporal_ensemble_coeff: float, chunk_size: int) -> None:
"""Temporal ensembling as described in Algorithm 2 of https://huggingface.co/papers/2304.13705.
The weights are calculated as wᵢ = exp(-temporal_ensemble_coeff * i) where w₀ is the oldest action.
They are then normalized to sum to 1 by dividing by Σwᵢ. Here's some intuition around how the
coefficient works:
- Setting it to 0 uniformly weighs all actions.
- Setting it positive gives more weight to older actions.
- Setting it negative gives more weight to newer actions.
NOTE: The default value for `temporal_ensemble_coeff` used by the original ACT work is 0.01. This
results in older actions being weighed more highly than newer actions (the experiments documented in
https://github.com/huggingface/lerobot/pull/319 hint at why highly weighing new actions might be
detrimental: doing so aggressively may diminish the benefits of action chunking).
Here we use an online method for computing the average rather than caching a history of actions in
order to compute the average offline. For a simple 1D sequence it looks something like:
```
import torch
seq = torch.linspace(8, 8.5, 100)
print(seq)
m = 0.01
exp_weights = torch.exp(-m * torch.arange(len(seq)))
print(exp_weights)
# Calculate offline
avg = (exp_weights * seq).sum() / exp_weights.sum()
print("offline", avg)
# Calculate online
for i, item in enumerate(seq):
if i == 0:
avg = item
continue
avg *= exp_weights[:i].sum()
avg += item * exp_weights[i]
avg /= exp_weights[: i + 1].sum()
print("online", avg)
```
"""
# 中文补充:时间集成器对一个动作块内的每个时间步位置 i(0 为最旧)分配权重 w_i = exp(-m * i)。
# 在线更新避免缓存历史所有动作,大幅节省内存/计算,适合推理时逐步滑动窗口融合。
self.chunk_size = chunk_size
self.ensemble_weights = torch.exp(
-temporal_ensemble_coeff * torch.arange(chunk_size)
)
# 累积和用于在线“归一化”更新
self.ensemble_weights_cumsum = torch.cumsum(self.ensemble_weights, dim=0)
self.reset()
def reset(self):
"""Resets the online computation variables."""
# 清零内部缓存:当前融合的动作序列与对应的计数(每个时间步融合了多少次)
self.ensembled_actions = None
# (chunk_size,) count of how many actions are in the ensemble for each time step in the sequence.
self.ensembled_actions_count = None
def update(self, actions: Tensor) -> Tensor:
"""
Takes a (batch, chunk_size, action_dim) sequence of actions, update the temporal ensemble for all
time steps, and pop/return the next batch of actions in the sequence.
"""
# 将权重张量放到与输入相同的 device 上(CPU/GPU 兼容)
self.ensemble_weights = self.ensemble_weights.to(device=actions.device)
self.ensemble_weights_cumsum = self.ensemble_weights_cumsum.to(
device=actions.device
)
if self.ensembled_actions is None:
# 第一次调用:直接把预测的动作块克隆为当前融合序列
self.ensembled_actions = actions.clone()
# 记录每个时间步目前的“融合次数”=1(形状对齐为 (S,1),便于广播)
self.ensembled_actions_count = torch.ones(
(self.chunk_size, 1),
dtype=torch.long,
device=self.ensembled_actions.device,
)
else:
# 对已有的融合序列(除了最后一个时间步)进行在线更新:
# old_avg * sum(w[:i]) + new * w[i] 再除以 sum(w[:i+1]),形如“带权平均”的递推公式
self.ensembled_actions *= self.ensemble_weights_cumsum[
self.ensembled_actions_count - 1
]
self.ensembled_actions += (
actions[:, :-1] * self.ensemble_weights[self.ensembled_actions_count]
)
self.ensembled_actions /= self.ensemble_weights_cumsum[
self.ensembled_actions_count
]
# 融合计数自增,封顶为 chunk_size
self.ensembled_actions_count = torch.clamp(
self.ensembled_actions_count + 1, max=self.chunk_size
)
# 将“最新一步”的原始动作直接拼到末尾(该位置没有历史平均)
self.ensembled_actions = torch.cat(
[self.ensembled_actions, actions[:, -1:]], dim=1
)
# 对应的计数也拼接 1
self.ensembled_actions_count = torch.cat(
[
self.ensembled_actions_count,
torch.ones_like(self.ensembled_actions_count[-1:]),
]
)
# 消费/弹出融合序列的第一个动作(当前要执行的动作),并滑动窗口
action, self.ensembled_actions, self.ensembled_actions_count = (
self.ensembled_actions[:, 0],
self.ensembled_actions[:, 1:],
self.ensembled_actions_count[1:],
)
return action
class ACT(nn.Module):
"""Action Chunking Transformer: The underlying neural network for ACTPolicy.
Note: In this code we use the terms `vae_encoder`, 'encoder', `decoder`. The meanings are as follows.
- The `vae_encoder` is, as per the literature around variational auto-encoders (VAE), the part of the
model that encodes the target data (a sequence of actions), and the condition (the robot
joint-space).
- A transformer with an `encoder` (not the VAE encoder) and `decoder` (not the VAE decoder) with
cross-attention is used as the VAE decoder. For these terms, we drop the `vae_` prefix because we
have an option to train this model without the variational objective (in which case we drop the
`vae_encoder` altogether, and nothing about this model has anything to do with a VAE).
Transformer
Used alone for inference
(acts as VAE decoder
during training)
┌───────────────────────┐
│ Outputs │
│ ▲ │
│ ┌─────►┌───────┐ │
┌──────┐ │ │ │Transf.│ │
│ │ │ ├─────►│decoder│ │
┌────┴────┐ │ │ │ │ │ │
│ │ │ │ ┌───┴───┬─►│ │ │
│ VAE │ │ │ │ │ └───────┘ │
│ encoder │ │ │ │Transf.│ │
│ │ │ │ │encoder│ │
└───▲─────┘ │ │ │ │ │
│ │ │ └▲──▲─▲─┘ │
│ │ │ │ │ │ │
inputs └─────┼──┘ │ image emb. │
│ state emb. │
└───────────────────────┘
"""
def __init__(self, config: ACTConfig):
# BERT 风格的 VAE 编码器输入: [CLS, 机器人当前关节状态(可选), 动作序列]。
# CLS token 经过投影后输出潜变量分布参数(mean 与 log_sigma_x2)。
super().__init__()
self.config = config
if self.config.use_vae:
# VAE 编码器(仅在使用 VAE 目标且训练阶段时启用)
self.vae_encoder = ACTEncoder(config, is_vae_encoder=True)
self.vae_encoder_cls_embed = nn.Embedding(1, config.dim_model)
# 机器人关节状态投影到 Transformer 隐藏维度
if self.config.robot_state_feature:
self.vae_encoder_robot_state_input_proj = nn.Linear(
self.config.robot_state_feature.shape[0], config.dim_model
)
# 动作(目标关节位姿/速度等)投影到隐藏维度
self.vae_encoder_action_input_proj = nn.Linear(
self.config.action_feature.shape[0],
config.dim_model,
)
# 将 VAE 编码器的 CLS 输出映射为潜变量分布参数(均值和对数方差 * 2)
self.vae_encoder_latent_output_proj = nn.Linear(
config.dim_model, config.latent_dim * 2
)
# 固定正弦位置编码(1D),长度 = 1(CLS) + S(动作步) + [1(关节状态,可选)]
num_input_token_encoder = 1 + config.chunk_size
if self.config.robot_state_feature:
num_input_token_encoder += 1
self.register_buffer(
"vae_encoder_pos_enc",
create_sinusoidal_pos_embedding(
num_input_token_encoder, config.dim_model
).unsqueeze(0),
)
# 视觉骨干网络(例如 ResNet),用于提取图像特征
if self.config.image_features:
backbone_model = getattr(torchvision.models, config.vision_backbone)(
replace_stride_with_dilation=[
False,
False,
config.replace_final_stride_with_dilation,
],
weights=config.pretrained_backbone_weights,
norm_layer=FrozenBatchNorm2d,
)
# 使用 IntermediateLayerGetter 从指定层(这里为 layer4)获取特征图
# 输出为字典 {"feature_map": output}
self.backbone = IntermediateLayerGetter(
backbone_model, return_layers={"layer4": "feature_map"}
)
# Transformer:在使用 VAE 时相当于解码器(cross-attend 到条件输入)
self.encoder = ACTEncoder(config)
self.decoder = ACTDecoder(config)
# Transformer 编码器的输入投影与位置编码:
# token 顺序:[latent, (robot_state), (env_state), (image_feature_map_pixels...)]
if self.config.robot_state_feature:
self.encoder_robot_state_input_proj = nn.Linear(
self.config.robot_state_feature.shape[0], config.dim_model
)
if self.config.env_state_feature:
self.encoder_env_state_input_proj = nn.Linear(
self.config.env_state_feature.shape[0], config.dim_model
)
self.encoder_latent_input_proj = nn.Linear(config.latent_dim, config.dim_model)
if self.config.image_features:
# 将骨干网络的通道维投影到 dim_model(1x1 卷积作为线性投影)
self.encoder_img_feat_input_proj = nn.Conv2d(
backbone_model.fc.in_features, config.dim_model, kernel_size=1
)
# 1D token(latent/robot_state/env_state)的可学习位置嵌入
n_1d_tokens = 1 # latent
if self.config.robot_state_feature:
n_1d_tokens += 1
if self.config.env_state_feature:
n_1d_tokens += 1
self.encoder_1d_feature_pos_embed = nn.Embedding(n_1d_tokens, config.dim_model)
if self.config.image_features:
# 2D 正弦位置编码(对 feature map 的每个像素提供位置信息)
self.encoder_cam_feat_pos_embed = ACTSinusoidalPositionEmbedding2d(
config.dim_model // 2
)
# Transformer 解码器:为 S 个要预测的时间步提供可学习查询(DETR 风格)
self.decoder_pos_embed = nn.Embedding(config.chunk_size, config.dim_model)
# 最终线性头:将 decoder 输出映射为动作维度
self.action_head = nn.Linear(
config.dim_model, self.config.action_feature.shape[0]
)
self._reset_parameters()
def _reset_parameters(self):
"""Xavier-uniform initialization of the transformer parameters as in the original code."""
# 对 Transformer 层进行 Xavier 均匀初始化,提升训练稳定性
for p in chain(self.encoder.parameters(), self.decoder.parameters()):
if p.dim() > 1:
nn.init.xavier_uniform_(p)
def forward(
self, batch: dict[str, Tensor]
) -> tuple[Tensor, tuple[Tensor, Tensor] | tuple[None, None]]:
"""A forward pass through the Action Chunking Transformer (with optional VAE encoder).
`batch` should have the following structure:
{
[robot_state_feature] (optional): (B, state_dim) batch of robot states.
[image_features]: (B, n_cameras, C, H, W) batch of images.
AND/OR
[env_state_feature]: (B, env_dim) batch of environment states.
[action_feature] (optional, only if training with VAE): (B, chunk_size, action dim) batch of actions.
}
Returns:
(B, chunk_size, action_dim) batch of action sequences
Tuple containing the latent PDF's parameters (mean, log(σ²)) both as (B, L) tensors where L is the
latent dimension.
"""
# 当使用 VAE + 训练模式时,要求 batch 中必须包含监督的动作序列 ACTION
if self.config.use_vae and self.training:
assert ACTION in batch, (
"actions must be provided when using the variational objective in training mode."
)
# 估计 batch 大小:优先从图像,若无图像则从环境状态推断
batch_size = (
batch[OBS_IMAGES][0].shape[0]
if OBS_IMAGES in batch
else batch[OBS_ENV_STATE].shape[0]
)
# 1) 准备潜变量(latent)
if self.config.use_vae and ACTION in batch and self.training:
# 训练 + VAE:通过 VAE encoder 从 [CLS, 关节状态(可选), 动作序列] 推断潜变量分布参数
cls_embed = einops.repeat(
self.vae_encoder_cls_embed.weight, "1 d -> b 1 d", b=batch_size
) # (B, 1, D)
if self.config.robot_state_feature:
robot_state_embed = self.vae_encoder_robot_state_input_proj(
batch[OBS_STATE]
)
robot_state_embed = robot_state_embed.unsqueeze(1) # (B, 1, D)
action_embed = self.vae_encoder_action_input_proj(
batch[ACTION]
) # (B, S, D)
if self.config.robot_state_feature:
vae_encoder_input = [
cls_embed,
robot_state_embed,
action_embed,
] # (B, S+2, D)
else:
vae_encoder_input = [cls_embed, action_embed]
vae_encoder_input = torch.cat(vae_encoder_input, axis=1)
# 固定位置编码(与原实现保持一致,使用 clone().detach())
pos_embed = self.vae_encoder_pos_enc.clone().detach() # (1, S+2, D)
# key_padding_mask:前面 CLS 和关节状态不是 padding,后面根据 action_is_pad 指示
# False 表示不是 pad;形状 (B, S+1 或 S+2)
cls_joint_is_pad = torch.full(
(batch_size, 2 if self.config.robot_state_feature else 1),
False,
device=batch[OBS_STATE].device,
)
key_padding_mask = torch.cat(
[cls_joint_is_pad, batch["action_is_pad"]], axis=1
) # (bs, seq+1 or 2)
# 送入 VAE 编码器,取 CLS 位置的输出(包含全序列信息),再映射得到 (mu, log_sigma_x2)
cls_token_out = self.vae_encoder(
vae_encoder_input.permute(1, 0, 2),
pos_embed=pos_embed.permute(1, 0, 2),
key_padding_mask=key_padding_mask,
)[0] # select the class token, with shape (B, D)
latent_pdf_params = self.vae_encoder_latent_output_proj(cls_token_out)
mu = latent_pdf_params[:, : self.config.latent_dim]
# 注意:这里返回的是 2*log(sigma),与原实现一致
log_sigma_x2 = latent_pdf_params[:, self.config.latent_dim :]
# 重参数化采样 latent: z = mu + sigma * eps
latent_sample = mu + log_sigma_x2.div(2).exp() * torch.randn_like(mu)
else:
# 推理或未使用 VAE:latent 设为全零(代表“无信息”的先验)
mu = log_sigma_x2 = None
# TODO(rcadene, alexander-soare): remove call to `.to` to speedup forward ; precompute and use buffer
latent_sample = torch.zeros(
[batch_size, self.config.latent_dim], dtype=torch.float32
).to(batch[OBS_STATE].device)
# 2) 准备 Transformer 编码器输入 token 与位置编码
encoder_in_tokens = [self.encoder_latent_input_proj(latent_sample)]
# 1D token 的可学习位置嵌入(先堆为 list,后面与图像 token 一起 stack)
encoder_in_pos_embed = list(
self.encoder_1d_feature_pos_embed.weight.unsqueeze(1)
)
# 机器人关节状态 token
if self.config.robot_state_feature:
encoder_in_tokens.append(
self.encoder_robot_state_input_proj(batch[OBS_STATE])
)
# 环境状态 token
if self.config.env_state_feature:
encoder_in_tokens.append(
self.encoder_env_state_input_proj(batch[OBS_ENV_STATE])
)
if self.config.image_features:
# 多相机图像:各自经过骨干提特征,再通过 1x1 conv 投影至 dim_model
# 注意:对 MPS 设备做过数值稳定性注意(保持与原实现的注释一致)
for img in batch[OBS_IMAGES]:
cam_features = self.backbone(img)[
"feature_map"
] # (B, C_backbone, H, W)
cam_pos_embed = self.encoder_cam_feat_pos_embed(cam_features).to(
dtype=cam_features.dtype
)
cam_features = self.encoder_img_feat_input_proj(
cam_features
) # -> (B, D, H, W)
# 重排为 (Seq, B, D),其中 Seq = H*W
cam_features = einops.rearrange(cam_features, "b c h w -> (h w) b c")
cam_pos_embed = einops.rearrange(cam_pos_embed, "b c h w -> (h w) b c")
# 直接 extend(列表形式)以避免先积累后再 concat 的额外开销
encoder_in_tokens.extend(list(cam_features))
encoder_in_pos_embed.extend(list(cam_pos_embed))
# 将所有 token 按序列维 stack 成张量:(ES, B, D)
encoder_in_tokens = torch.stack(encoder_in_tokens, axis=0)
encoder_in_pos_embed = torch.stack(encoder_in_pos_embed, axis=0)
# 3) 经过 Transformer 编码器/解码器
encoder_out = self.encoder(encoder_in_tokens, pos_embed=encoder_in_pos_embed)
# 解码器输入初始化为全零(S, B, D),再加上可学习的 decoder_pos_embed 作为查询
# TODO(rcadene, alexander-soare): remove call to `device` ; precompute and use buffer
decoder_in = torch.zeros(
(self.config.chunk_size, batch_size, self.config.dim_model),
dtype=encoder_in_pos_embed.dtype,
device=encoder_in_pos_embed.device,
)
decoder_out = self.decoder(
decoder_in,
encoder_out,
encoder_pos_embed=encoder_in_pos_embed,
decoder_pos_embed=self.decoder_pos_embed.weight.unsqueeze(1),
)
# (S, B, D) -> (B, S, D)
decoder_out = decoder_out.transpose(0, 1)
# 动作回归头:(B, S, D) -> (B, S, action_dim)
actions = self.action_head(decoder_out)
# 返回动作与(可选)VAE 参数(训练使用)
return actions, (mu, log_sigma_x2)
class ACTEncoder(nn.Module):
"""Convenience module for running multiple encoder layers, maybe followed by normalization."""
# 一个封装的 Transformer Encoder 堆叠模块,支持 pre-norm 配置,便于复用(包括作为 VAE encoder)
def __init__(self, config: ACTConfig, is_vae_encoder: bool = False):
super().__init__()
self.is_vae_encoder = is_vae_encoder
num_layers = (
config.n_vae_encoder_layers
if self.is_vae_encoder
else config.n_encoder_layers
)
self.layers = nn.ModuleList(
[ACTEncoderLayer(config) for _ in range(num_layers)]
)
self.norm = nn.LayerNorm(config.dim_model) if config.pre_norm else nn.Identity()
def forward(
self,
x: Tensor,
pos_embed: Tensor | None = None,
key_padding_mask: Tensor | None = None,
) -> Tensor:
# 逐层前向;支持外部传入位置编码与 padding 掩码
for layer in self.layers:
x = layer(x, pos_embed=pos_embed, key_padding_mask=key_padding_mask)
x = self.norm(x)
return x
class ACTEncoderLayer(nn.Module):
def __init__(self, config: ACTConfig):
super().__init__()
self.self_attn = nn.MultiheadAttention(
config.dim_model, config.n_heads, dropout=config.dropout
)
# 前馈网络 FFN:Linear -> 激活 -> Dropout -> Linear
self.linear1 = nn.Linear(config.dim_model, config.dim_feedforward)
self.dropout = nn.Dropout(config.dropout)
self.linear2 = nn.Linear(config.dim_feedforward, config.dim_model)
# 残差层归一化(支持 pre-norm 或 post-norm)
self.norm1 = nn.LayerNorm(config.dim_model)
self.norm2 = nn.LayerNorm(config.dim_model)
self.dropout1 = nn.Dropout(config.dropout)
self.dropout2 = nn.Dropout(config.dropout)
self.activation = get_activation_fn(config.feedforward_activation)
self.pre_norm = config.pre_norm
def forward(
self, x, pos_embed: Tensor | None = None, key_padding_mask: Tensor | None = None
) -> Tensor:
# 自注意力子层
skip = x
if self.pre_norm:
x = self.norm1(x)
q = k = x if pos_embed is None else x + pos_embed
x = self.self_attn(q, k, value=x, key_padding_mask=key_padding_mask)
x = x[0] # note: [0] to select just the output, not the attention weights
x = skip + self.dropout1(x) # 残差
# 前馈子层
if self.pre_norm:
skip = x
x = self.norm2(x)
else:
x = self.norm1(x)
skip = x
x = self.linear2(self.dropout(self.activation(self.linear1(x))))
x = skip + self.dropout2(x)
if not self.pre_norm:
x = self.norm2(x)
return x
class ACTDecoder(nn.Module):
def __init__(self, config: ACTConfig):
"""Convenience module for running multiple decoder layers followed by normalization."""
super().__init__()
self.layers = nn.ModuleList(
[ACTDecoderLayer(config) for _ in range(config.n_decoder_layers)]
)
self.norm = nn.LayerNorm(config.dim_model)
def forward(
self,
x: Tensor,
encoder_out: Tensor,
decoder_pos_embed: Tensor | None = None,
encoder_pos_embed: Tensor | None = None,
) -> Tensor:
# 逐层 Decoder,包含自注意力与跨注意力(对 encoder_out 进行 cross-attention)
for layer in self.layers:
x = layer(
x,
encoder_out,
decoder_pos_embed=decoder_pos_embed,
encoder_pos_embed=encoder_pos_embed,
)
if self.norm is not None:
x = self.norm(x)
return x
class ACTDecoderLayer(nn.Module):
def __init__(self, config: ACTConfig):
super().__init__()
self.self_attn = nn.MultiheadAttention(
config.dim_model, config.n_heads, dropout=config.dropout
)
self.multihead_attn = nn.MultiheadAttention(
config.dim_model, config.n_heads, dropout=config.dropout
)
# FFN
self.linear1 = nn.Linear(config.dim_model, config.dim_feedforward)
self.dropout = nn.Dropout(config.dropout)
self.linear2 = nn.Linear(config.dim_feedforward, config.dim_model)
# 三个归一化/Dropout 对应三处残差连接
self.norm1 = nn.LayerNorm(config.dim_model)
self.norm2 = nn.LayerNorm(config.dim_model)
self.norm3 = nn.LayerNorm(config.dim_model)
self.dropout1 = nn.Dropout(config.dropout)
self.dropout2 = nn.Dropout(config.dropout)
self.dropout3 = nn.Dropout(config.dropout)
self.activation = get_activation_fn(config.feedforward_activation)
self.pre_norm = config.pre_norm
def maybe_add_pos_embed(self, tensor: Tensor, pos_embed: Tensor | None) -> Tensor:
# 若提供了位置编码,则与输入相加(Transformer 常见用法)
return tensor if pos_embed is None else tensor + pos_embed
def forward(
self,
x: Tensor,
encoder_out: Tensor,
decoder_pos_embed: Tensor | None = None,
encoder_pos_embed: Tensor | None = None,
) -> Tensor:
"""
Args:
x: (Decoder Sequence, Batch, Channel) tensor of input tokens.
encoder_out: (Encoder Sequence, B, C) output features from the last layer of the encoder we are
cross-attending with.
encoder_pos_embed: (ES, 1, C) positional embedding for keys (from the encoder).
decoder_pos_embed: (DS, 1, C) positional embedding for the queries (from the decoder).
Returns:
(DS, B, C) tensor of decoder output features.
"""
# 1) 自注意力(Decoder 内部 token 之间交互)
skip = x
if self.pre_norm:
x = self.norm1(x)
q = k = self.maybe_add_pos_embed(x, decoder_pos_embed)
x = self.self_attn(q, k, value=x)[
0
] # select just the output, not the attention weights
x = skip + self.dropout1(x)
# 2) 跨注意力(对 Encoder 输出进行查询)
if self.pre_norm:
skip = x
x = self.norm2(x)
else:
x = self.norm1(x)
skip = x
x = self.multihead_attn(
query=self.maybe_add_pos_embed(x, decoder_pos_embed),
key=self.maybe_add_pos_embed(encoder_out, encoder_pos_embed),
value=encoder_out,
)[0] # select just the output, not the attention weights
x = skip + self.dropout2(x)
# 3) FFN
if self.pre_norm:
skip = x
x = self.norm3(x)
else:
x = self.norm2(x)
skip = x
x = self.linear2(self.dropout(self.activation(self.linear1(x))))
x = skip + self.dropout3(x)
if not self.pre_norm:
x = self.norm3(x)
return x
def create_sinusoidal_pos_embedding(num_positions: int, dimension: int) -> Tensor:
"""1D sinusoidal positional embeddings as in Attention is All You Need.
Args:
num_positions: Number of token positions required.
Returns: (num_positions, dimension) position embeddings (the first dimension is the batch dimension).
"""
# 标准的 1D 正弦/余弦位置编码实现,频率按几何级数变化(温度=10000),偶数维使用正弦,奇数维使用余弦。
def get_position_angle_vec(position):
return [
position / np.power(10000, 2 * (hid_j // 2) / dimension)
for hid_j in range(dimension)
]
sinusoid_table = np.array(
[get_position_angle_vec(pos_i) for pos_i in range(num_positions)]
)
sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2]) # dim 2i
sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2]) # dim 2i+1
return torch.from_numpy(sinusoid_table).float()
class ACTSinusoidalPositionEmbedding2d(nn.Module):
"""2D sinusoidal positional embeddings similar to what's presented in Attention Is All You Need.
The variation is that the position indices are normalized in [0, 2π] (not quite: the lower bound is 1/H
for the vertical direction, and 1/W for the horizontal direction.
"""
# 为 2D 特征图(H,W)生成二维正弦位置编码。与常见实现不同,位置索引被缩放到 [0, 2π] 区间(近似),
# 然后同样以几何级数频率生成正/余弦分量,最后在通道维上拼接(y 分量在前,x 分量在后)。
def __init__(self, dimension: int):
"""
Args:
dimension: The desired dimension of the embeddings.
"""
super().__init__()
self.dimension = dimension
self._two_pi = 2 * math.pi
self._eps = 1e-6
# 频率几何级数的“温度”(与 1D 的 10000 一致)
self._temperature = 10000
def forward(self, x: Tensor) -> Tensor:
"""
Args:
x: A (B, C, H, W) batch of 2D feature map to generate the embeddings for.
Returns:
A (1, C, H, W) batch of corresponding sinusoidal positional embeddings.
"""
# 仅需 H、W 形状,因此构造一个形状 (1, H, W) 的“非 mask”
not_mask = torch.ones_like(x[0, :1]) # (1, H, W)
# y/x 方向的累计和相当于 1..H 与 1..W(原实现从 1 开始,而不是 0)
y_range = not_mask.cumsum(1, dtype=torch.float32)
x_range = not_mask.cumsum(2, dtype=torch.float32)
# 归一化到 [0, 2π](加入 eps 避免分母为 0)
y_range = y_range / (y_range[:, -1:, :] + self._eps) * self._two_pi
x_range = x_range / (x_range[:, :, -1:] + self._eps) * self._two_pi
# 频率几何序列(偶数/奇数通道分别对应 sin/cos)
inverse_frequency = self._temperature ** (
2
* (torch.arange(self.dimension, dtype=torch.float32, device=x.device) // 2)
/ self.dimension
)
# 扩展最后一维以按通道除以频率:(1, H, W, 1)
x_range = x_range.unsqueeze(-1) / inverse_frequency # (1, H, W, 1)
y_range = y_range.unsqueeze(-1) / inverse_frequency # (1, H, W, 1)
# 交错堆叠 sin/cos,并在通道维上展平:(1, H, W, C//2)
pos_embed_x = torch.stack(
(x_range[..., 0::2].sin(), x_range[..., 1::2].cos()), dim=-1
).flatten(3)
pos_embed_y = torch.stack(
(y_range[..., 0::2].sin(), y_range[..., 1::2].cos()), dim=-1
).flatten(3)
pos_embed = torch.cat((pos_embed_y, pos_embed_x), dim=3).permute(
0, 3, 1, 2
) # (1, C, H, W)
return pos_embed
def get_activation_fn(activation: str) -> Callable:
"""Return an activation function given a string."""
# 将字符串名称映射到对应的激活函数实现
if activation == "relu":
return F.relu
if activation == "gelu":
return F.gelu
if activation == "glu":
return F.glu
raise RuntimeError(f"activation should be relu/gelu/glu, not {activation}.")
ACT 模型微调#
关键参数#
详见 configuration_act.py。
常用项如下(括号内为配置项名称):
- 每次预测的动作数量(
chunk_size):通常 50–100;- 快速任务:10–30
- 中等任务:50–100
- 慢速任务:100–200
- 一般建议从 50 起步
- 实际执行的动作步数(
n_action_steps):必须满足n_action_steps ≤ chunk_size,推荐二者相同(如均为 100)。 - 历史观测步数/上下文长度(
n_obs_steps)。 - Transformer 维度(
dim_model、dim_feedforward)、层数(n_encoder_layers、n_decoder_layers)、注意力头数(n_heads)。 - 视觉主干网络(
vision_backbone,如resnet18)。
参考命令#
- 多摄像头配置
# 针对多摄像头设置的 ACT 训练 lerobot-train \ --policy.type act \ --dataset.repo_id ${HF_USER}/your_dataset \ --batch_size 4 \ --steps 100000 \ --output_dir outputs/train/act_multicam \ --job_name act_multicam_training \ --policy.device cuda \ --policy.chunk_size 100 \ --policy.n_action_steps 100 \ --policy.n_obs_steps 2 \ --policy.vision_backbone resnet18 \ --policy.dim_model 512 \ --policy.dim_feedforward 3200 \ --policy.n_encoder_layers 4 \ --policy.n_decoder_layers 1 \ --policy.n_heads 8 \ --policy.optimizer_lr 1e-5 \ --policy.optimizer_weight_decay 1e-4 \ --policy.push_to_hub false \ --save_checkpoint true \ --wandb.enable true - 内存优化配置
# 针对显存较小的 GPU lerobot-train \ --policy.type act \ --dataset.repo_id io-ai-data/lerobot_data \ --batch_size 2 \ --steps 75000 \ --output_dir outputs/train/act_memory_opt \ --job_name act_memory_optimized \ --policy.device cuda \ --policy.chunk_size 100 \ --policy.n_action_steps 100 \ --policy.n_obs_steps 1 \ --policy.vision_backbone resnet18 \ --policy.dim_model 256 \ --policy.optimizer_lr 1e-5 \ --policy.use_amp true \ --num_workers 2 \ --policy.push_to_hub false \ --save_checkpoint true \ --wandb.enable true
结语#
本文对 ACT 模型的原理、架构、训练与推理流程、消融实验、代码实现与调参与实践建议进行了梳理,期望对你的应用落地有所帮助。

