# 案例分析一：追踪带噪声的正弦信号

本 Notebook 是文章《深入浅出卡尔曼滤波》的配套实践案例一。

我们将从零开始，为一个带噪声的正弦波构建并实现一个完整的卡尔曼滤波器。通过这个过程，您将学习到：

1.  如何为特定问题（正弦波）推导并建立状态空间模型。
2.  如何使用 `pykalman` 库快速实现滤波与平滑。
3.  如何手动实现卡尔曼滤波的核心五个公式，以加深理解。
4.  如何将滤波、平滑和预测的结果进行动态可视化，直观地感受滤波效果。

## 环境与依赖项设置

在运行此 Notebook 之前，请确保您已在终端中激活相应的 Python 环境，并安装了必要的库。

```bash
# 激活您的 conda 环境 (例如 KF-test)
conda activate KF-test

# 安装核心依赖
conda install -y numpy matplotlib

# 安装 pykalman
conda install -y -c conda-forge pykalman

# 安装 ffmpeg
conda install ffmpeg -c conda-forge
```

In [None]:
# 导入依赖
import os
import unicodedata
try:
    import numpy as np
    import matplotlib
    import matplotlib.pyplot as plt
    from matplotlib.animation import FuncAnimation
    from matplotlib import font_manager as fm
    from pykalman import KalmanFilter
    from IPython.display import Video
    print(f"numpy version: {np.__version__}")
    print(f"matplotlib version: {matplotlib.__version__}")
    print("pykalman library is available.")
except ImportError as e:
    print(f"依赖导入失败，请确认已正确安装 numpy, matplotlib, 和 pykalman: {e}")
    raise

In [None]:
# 中文字体设置，用于在图表中正确显示中文
# 请将 _font_path 修改为您系统中SimHei或其他支持中文的字体文件路径
_font_path = r'./simhei.ttf' # 假设 simhei.ttf 在当前目录下
try:
    if os.path.exists(_font_path):
        fm.fontManager.addfont(_font_path)
        plt.rcParams['font.sans-serif'] = ['SimHei']
        plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
        print(f'中文字体 SimHei 已从路径注册: {_font_path}')
    else:
        print(f'警告: 未在指定路径找到字体文件: {_font_path}，图表中的中文可能无法正常显示。')
except Exception as _e:
    print(f'注册自定义字体时发生错误: {_e}')

In [None]:
# 验证中文字体是否设置成功
plt.figure(figsize=(4, 2))
plt.text(0.5, 0.65, '中文文本测试：平滑/滤波/预测', ha='center', fontsize=12)
plt.text(0.5, 0.35, '负号测试：-1.0, -2.5, -3.8', ha='center', fontsize=12)
plt.axis('off')
plt.tight_layout()
plt.show()

## 步骤一：生成并可视化模拟数据

我们首先需要创建一组模拟数据。这包括一个“完美的”纯净正弦波作为我们的真实状态（Ground Truth），以及一个在此基础上添加了高斯噪声的版本，用以模拟我们从传感器获得的实际观测值（Observations）。

In [None]:
# --- 数据生成参数 ---
rng = np.random.default_rng(0)  # 固定随机种子以保证结果可复现
N = 500                         # 数据点总数
signal_freq = 1.0/50.0          # 信号频率 (周期约为50个采样点)
w = 2.0 * np.pi * signal_freq   # 数字角频率
noise_std = 0.40                # 观测噪声的标准差

# --- 生成数据 ---
x_axis = np.arange(N)
y_true = np.sin(w * x_axis) # 真实信号 (Ground Truth)
y_obs = y_true + rng.normal(0.0, noise_std, size=N) # 带噪声的观测值

print(f"数据已生成: N={N}, 频率={signal_freq:.3f}, 噪声标准差={noise_std:.2f}")

# --- 可视化生成的数据 ---
plt.figure(figsize=(12, 5))
plt.title("真实信号与带噪声的观测数据", fontsize=14)

# 绘制真实信号 (作为参考)
plt.plot(x_axis, y_true, color='k', linestyle='--', linewidth=2.0, label="真实信号 (Ground Truth)")

# 绘制带噪声的观测值 (散点)
plt.plot(x_axis, y_obs, 'o', color='#348ABD', markersize=4, alpha=0.6, label="带噪声的观测值")

plt.xlabel("时间步 (t)", fontsize=12)
plt.ylabel("信号幅值 (y)", fontsize=12)
plt.ylim(-2.0, 2.0)
plt.legend(loc="upper right", fontsize=11)
plt.grid(True, linestyle=':', alpha=0.7)
plt.tight_layout()
plt.show()


## 步骤二：定义并初始化卡尔曼滤波器

在这一步，我们将根据文章中推导出的模型，定义卡尔曼滤波器所需的所有矩阵和初始参数。我们将使用 `pykalman` 库来构建滤波器对象，这大大简化了实现过程。

In [None]:
# --- 模型参数定义 ---
# 状态向量 x_k = [y_k, y_{k-1}]^T
# 状态转移方程: x_k = F @ x_{k-1} + w_{k-1}
# 观测方程: z_k = H @ x_k + v_k

# 状态转移矩阵 F
F = np.array([[2 * np.cos(w), -1.0],
              [1.0,            0.0 ]], dtype=float)

# 观测矩阵 H
H = np.array([[1.0, 0.0]], dtype=float)

# --- 噪声协方差矩阵 (超参数) ---
# 过程噪声协方差 Q: 反映了模型预测的不确定性。值越小，代表我们越信任模型的预测能力。
process_noise_var = 1e-5
Q = np.eye(2) * process_noise_var

# 测量噪声协方差 R: 反映了观测值的不确定性。值越小，代表我们越信任观测数据。
# 理论上它应该约等于观测噪声的方差 (noise_std**2)，即 0.4**2 = 0.16。
observation_noise_var = 0.16
R = np.array([[observation_noise_var]])

# --- 初始状态 ---
# 使用前两个观测值来初始化状态向量 [y_0, y_{-1}]。假设 y_{-1} 近似等于 y_0。
initial_state_mean = np.array([y_obs[0], y_obs[0] if N <= 1 else y_obs[1]])

# 初始误差协方差矩阵 P_0: 代表初始状态的不确定性。通常设为一个较大的对角矩阵。
initial_state_covariance = np.eye(2) * 1.0

# --- 使用 pykalman 构建滤波器 ---
kf = KalmanFilter(
    transition_matrices=F,
    observation_matrices=H,
    transition_covariance=Q,
    observation_covariance=R,
    initial_state_mean=initial_state_mean,
    initial_state_covariance=initial_state_covariance
)

# --- 执行滤波与平滑 ---
# .filter() 返回每个时间点的后验状态估计（仅使用直到当前时间的数据）
filtered_state_means, filtered_state_covariances = kf.filter(y_obs)

# .smooth() 返回每个时间点的后验状态估计（使用所有时间的数据）
smoothed_state_means, smoothed_state_covariances = kf.smooth(y_obs)

# 从状态向量中提取我们关心的 y_t
y_filtered = filtered_state_means[:, 0]    # 滤波结果
y_smoothed = smoothed_state_means[:, 0]  # 平滑结果

print("卡尔曼滤波与平滑计算完成。")

## 步骤三：实现多步前向预测

卡尔曼滤波的一个强大功能是预测。在任意时刻 $t$，我们可以利用该时刻的滤波后验状态 $\hat{\mathbf{x}}_t$，通过状态转移矩阵 $\mathbf{F}$ 来预测未来多个时间步的状态。

In [None]:
def predict_future(state_vector, F_matrix, steps):
    """从给定的状态向量开始，向前预测指定步数。"""
    x_current = state_vector.copy()
    predictions = []
    for _ in range(steps):
        x_current = F_matrix @ x_current
        predictions.append(float(x_current[0])) # 提取 y_t
    return np.array(predictions)

K_future = 10  # 定义要向前预测的步数
y_predictions_future = np.zeros((N, K_future), dtype=float)

# 在每个时间点 t，都基于当时的滤波状态进行一次未来 K_future 步的预测
for t in range(N):
    y_predictions_future[t] = predict_future(filtered_state_means[t], F, K_future)

print(f"在每个时间点的多步预测已准备完成，预测未来步数 K_future = {K_future}")

## 附录：卡尔曼滤波核心五公式的手动实现

为了更深入地理解滤波过程，下面的代码单元格手动实现了卡尔曼滤波的“预测-更新”循环。这可以帮助我们将理论公式与 `pykalman` 库的内部工作联系起来。我们将演示前几个时间步的详细计算过程。

In [None]:
# 工具函数，用于优化矩阵输出
def get_display_width(text: str) -> int:
    """计算字符串在终端中的显示宽度。"""
    width = 0
    for char in text:
        if unicodedata.east_asian_width(char) in ("F", "W", "A"):
            width += 2
        else:
            width += 1
    return width


def print_matrix(
    name: str, matrix, offset: int = 4, precision: int = 6
):
    """格式化打印矩阵，确保对齐并正确显示小数值。"""
    offset_str = " " * offset
    array_str_lines = np.array2string(
        np.asarray(matrix),
        precision=precision,
        suppress_small=False,
        separator=", ",
    ).split("\n")

    # 计算后续行的缩进
    indent_width = offset + get_display_width(name) - 1
    indent_str = " " * indent_width

    # 打印
    print(f"{offset_str}{name}{array_str_lines[0]}")
    for i in range(1, len(array_str_lines)):
        line_content = array_str_lines[i].strip()
        print(f"{indent_str}{line_content}")


In [None]:
# --- 1) 核心函数 ---
np.set_printoptions(precision=6, suppress=True)

def kf_step(x_prev, P_prev, z_t, F, H, Q, R):
    """
    执行一次完整的卡尔曼滤波“预测-更新”步骤。
    该函数严格遵循文章中总结的5个核心公式。
    """
    # === 预测 ===
    # 公式(1): 预测状态
    x_prior = F @ x_prev
    # 公式(2): 预测先验误差协方差
    P_prior = F @ P_prev @ F.T + Q

    # === 更新 ===
    # 创新 (Innovation) 或测量残差
    y_residual = z_t - (H @ x_prior)
    # 创新协方差
    S = H @ P_prior @ H.T + R

    # 公式(3): 计算卡尔曼增益
    K = P_prior @ H.T @ np.linalg.inv(S)

    # 公式(4): 更新状态估计
    x_post = x_prior + K @ y_residual

    # 公式(5): 更新后验误差协方差
    I = np.eye(P_prior.shape[0])  # noqa: E741
    P_post = (I - K @ H) @ P_prior

    return x_post, P_post

# --- 2) 数据演示 ---
print_matrix("状态转移矩阵 F =", F)
print_matrix("过程噪声协方差 Q =", Q)
print_matrix("测量噪声协方差 R =", R)

# [t=0] 初始状态
print("\n--- [t=0] 初始状态 ---")
x0 = initial_state_mean.copy()
P0 = initial_state_covariance.copy()
print(f"    初始状态 x_0 = {x0}")
print_matrix("初始协方差 P_0 =", P0)

# 注意：pykalman在t=0时会先进行一次更新，我们这里为简化，直接从t=1的预测开始
x_post_prev, P_post_prev = x0, P0

# 循环演示前几个步骤
for t in range(1, 4):
    print(f"\n--- [t={t}] 预测与更新 ---")
    z_t = y_obs[t]
    print(f"    当前观测值 z_{t} = {z_t:.4f}")

    x_post, P_post = kf_step(x_post_prev, P_post_prev, z_t, F, H, Q, R)

    print(f"    后验状态 x_{t} = {x_post}")
    print_matrix(f"后验协方差 P_{t} =", P_post)

    # 为下一次迭代准备
    x_post_prev, P_post_prev = x_post, P_post

## 步骤四：结果可视化与动画生成

最后，我们将所有计算结果——滤波、平滑和预测——都绘制在动态图表中。动画可以非常直观地展示卡尔曼滤波器是如何在每个时间步利用新的观测值来修正其状态估计的。

In [None]:
# --- 动画准备 ---
plt.close("all")
fig, axes = plt.subplots(2, 2, figsize=(14, 9), sharex=True, sharey=True, dpi=96)
ax_all, ax_smooth, ax_filt, ax_pred = axes.flatten()  # <--- 修正行
fig.suptitle("卡尔曼滤波、平滑与预测的可视化"
        + "Q="
        + str(Q[0])
        + "R="
        + str(R[0])
        + "\n                        "
        + str(Q[1]), fontsize=16)

# 统一设置所有子图的样式
for ax, title in [
    (ax_all, "综合视图"),
    (ax_smooth, "平滑 (Smoothing)"),
    (ax_filt, "滤波 (Filtering)"),
    (ax_pred, "预测 (Prediction)"),
]:
    ax.set_title(title, fontsize=12)
    ax.set_xlim(-10, N + K_future + 10)
    ax.set_ylim(-2.2, 2.2)
    ax.set_xlabel("时间步 (t)")
    ax.set_ylabel("信号幅值 (y)")
    ax.grid(True, linestyle=":", alpha=0.6)
    ax.plot(
        x_axis,
        y_true,
        color="#AAAAAA",
        linestyle="--",
        linewidth=1.5,
        zorder=0,
        label="真实信号",
    )

# --- 初始化所有绘图线条对象 ---
# 综合视图
(line_all_obs,) = ax_all.plot(
    [], [], "o", color="#CCCCCC", markersize=3, label="观测点"
)
(line_all_smooth,) = ax_all.plot(
    [], [], color="#d62728", linestyle="-", linewidth=2.5, label="平滑结果"
)
(line_all_filt,) = ax_all.plot(
    [], [], color="#1f77b4", linestyle="-", linewidth=2.0, label="滤波结果"
)
(line_all_pred,) = ax_all.plot(
    [], [], color="#2ca02c", linestyle="-", linewidth=2.0, label="未来预测"
)

# 平滑子图
(line_sm_obs,) = ax_smooth.plot(
    [], [], "o", color="#CCCCCC", markersize=3, label="观测点"
)
(line_sm_smooth,) = ax_smooth.plot(
    [], [], color="#d62728", linestyle="-", linewidth=2.5, label="平滑结果"
)

# 滤波子图
(line_fi_obs,) = ax_filt.plot(
    [], [], "o", color="#CCCCCC", markersize=3, label="观测点"
)
(line_fi_filt,) = ax_filt.plot(
    [], [], color="#1f77b4", linestyle="-", linewidth=2.0, label="滤波结果"
)

# 预测子图
(line_pr_obs,) = ax_pred.plot(
    [], [], "o", color="#CCCCCC", markersize=3, label="观测点"
)
(line_pr_pred,) = ax_pred.plot(
    [], [], color="#2ca02c", linestyle="-", linewidth=2.0, label="未来预测"
)

for ax in [ax_all, ax_smooth, ax_filt, ax_pred]:
    ax.legend(loc="upper right")

fig.tight_layout(rect=[0, 0, 1, 0.96])  # 调整布局为总标题留出空间


# --- 动画更新函数 ---
def update_animation(frame):
    t = int(frame)
    if t == 0:  # 避免 t-1 索引为负
        return (
            line_all_obs,
            line_all_smooth,
            line_all_filt,
            line_all_pred,
            line_sm_obs,
            line_sm_smooth,
            line_fi_obs,
            line_fi_filt,
            line_pr_obs,
            line_pr_pred,
        )

    x_history = x_axis[: t + 1]
    x_future = np.arange(t, t + K_future)

    # 综合视图
    line_all_obs.set_data(x_history, y_obs[: t + 1])
    line_all_smooth.set_data(x_history, y_smoothed[: t + 1])
    line_all_filt.set_data(x_history, y_filtered[: t + 1])
    line_all_pred.set_data(x_future, y_predictions_future[t - 1])

    # 平滑子图
    line_sm_obs.set_data(x_history, y_obs[: t + 1])
    line_sm_smooth.set_data(x_history, y_smoothed[: t + 1])

    # 滤波子图
    line_fi_obs.set_data(x_history, y_obs[: t + 1])
    line_fi_filt.set_data(x_history, y_filtered[: t + 1])

    # 预测子图
    line_pr_obs.set_data(x_history, y_obs[: t + 1])
    line_pr_pred.set_data(x_future, y_predictions_future[t - 1])

    return (
        line_all_obs,
        line_all_smooth,
        line_all_filt,
        line_all_pred,
        line_sm_obs,
        line_sm_smooth,
        line_fi_obs,
        line_fi_filt,
        line_pr_obs,
        line_pr_pred,
    )


# --- 创建并保存动画 ---
anim = FuncAnimation(fig, update_animation, frames=N, interval=50, blit=True)
output_filename = f"01-DEMO-Q={process_noise_var:.1e}-R={observation_noise_var:.2f}.mp4"

print(f"正在将动画保存为 '{output_filename}'，此过程可能需要一些时间...")
anim.save(output_filename, writer="ffmpeg", dpi=120, fps=20)
print("动画保存完毕！")
plt.close(fig)


In [None]:
# 在 Notebook 中显示生成的视频
Video(output_filename)

## 总结与探索

至此，我们完成了对带噪声正弦波的卡尔曼滤波全过程。动画清晰地展示了：

- **滤波 (Filtering)** 是对当前状态的实时最优估计，它会随着新的观测值而实时调整。
- **平滑 (Smoothing)** 是对过去状态的“事后”最优估计。因为它利用了全局信息，所以其轨迹通常比滤波结果更平滑、更贴近真实信号。
- **预测 (Prediction)** 展示了模型基于当前状态推演未来趋势的能力。

现在，您可以返回到“步骤二：定义并初始化卡尔曼滤波器”单元格，尝试调整过程噪声协方差 `Q` 和测量噪声协方差 `R` 的值，然后重新运行所有单元格，观察它们如何影响最终的滤波效果。

- 增大 `Q` 会发生什么？
- 增大 `R` 又会发生什么？