从零搭一套端到端 EEG 信号处理工具链 2025-06-07T16:00:00.000Z 2026-06-08T09:58:35.899Z China
起点
学完信号处理、MNE-Python、PyQt6 之后,知识点是散的。滤波是滤波,ICA 是 ICA,GUI 是 GUI。需要一个项目把它们串起来——不是跑通一个 tutorial,而是做一个自己会用、别人能跑 的工具。
目标:从原始 EEG 文件加载,一路走到解码结果输出,中间每一步都可配置、可替换。离线能批量跑,实时能流式看。
架构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ┌──────────────────────────────────────────────────┐ │ GUI (PyQt6) │ │ ┌────────────┐ ┌────────────────────────┐ │ │ │ BatchTab │ │ StreamTab │ │ │ │ 离线分析 │ │ 实时流式 │ │ │ └─────┬──────┘ └─────┬──────────────────┘ │ │ ┌─────┴──────┐ ┌─────┴──────────────┐ │ │ │ BatchWorker │ │ StreamWorker │ │ │ │ (QThread) │ │ (QTimer+QObject) │ │ │ └─────┬──────┘ └─────┬──────────────┘ │ └────────┼──────────────────┼───────────────────────┘ │ │ ▼ ▼ ┌───────────┐ ┌──────────────────┐ │ BCIPipeline│ │ SlidingWindow │ │ 流式链式调用│ │ + OnlineProcessor│ │ │ │ + StreamSource │ └─────┬─────┘ │ + Decoder │ │ └────┬─────────────┘ ┌────┼────┬─────┬────┼─────┬────────┐ ▼ ▼ ▼ ▼ ▼ ▼ ▼ Loader Preproc Epocher Source Processor Decoder
两条路径共享同一套核心模块,区别只在处理器和调度方式:
离线 :BCIPipeline 链式调用,用 OfflineProcessor(零相位 filtfilt)
实时 :StreamWorker + SlidingWindow,用 OnlineProcessor(因果 lfilter,跨 chunk 保持状态)
逐层实现
1. 数据加载:4 种格式,一个接口
EEG 数据格式碎片化严重:EDF、FIF、EEGLAB(.set/.fdt)、BrainVision(.vhdr)。用 MNE 做底层,按后缀分发:
1 2 3 4 5 6 7 8 9 10 11 12 13 class DataLoader : SUPPORTED_FORMATS = ('.edf' , '.fif' , '.set' , '.vhdr' , '.fdt' ) def load (self, filepath, preload=True ): suffix = Path(filepath).suffix if suffix == '.edf' : self .raw = mne.io.read_raw_edf(filepath, preload=preload) elif suffix == '.fif' : self .raw = mne.io.read_raw_fif(filepath, preload=preload) elif suffix in ('.set' , '.fdt' ): self .raw = mne.io.read_raw_eeglab(filepath, preload=preload) elif suffix == '.vhdr' : self .raw = mne.io.read_raw_brainvision(filepath, preload=preload)
2. 预处理:链式调用
每个方法返回 self,可以一路点下去:
1 2 proc = Preprocessor(raw) proc.bandpass(1 , 40 ).notch([50 ]).set_reference('average' )
底层全是 MNE 的 raw.filter() / raw.notch_filter() / raw.set_eeg_reference(),但包了一层 fluent API 让调用更自然。ICA 去伪迹也包了:
1 2 ica = proc.apply_ica(n_components=15 )
3. 数据源:统一抽象,三种实现
这是整条工具链里我最满意的设计。
1 2 3 4 5 6 7 8 9 10 11 class DataSource (ABC ): @abstractmethod def open (self ): ... @abstractmethod def read_chunk (self, n_samples ): ... @abstractmethod def close (self ): ... @property @abstractmethod def is_stream (self ) -> bool : ...
三种实现:
FileSource
StreamSource
SessionSource
用途
离线分析
模拟实时流
多 run 合并流式
数据访问
全量 get_data(start, stop)
read_chunk(n) 逐块读
read_chunk(n) 跨 run 无缝读
is_stream
False
True
True
SessionSource 是实际最常用的:PhysioNet 的运动想象数据一个被试有 14 个 run(S001R01.edf ~ S001R14.edf),需要合并成一条连续流。find_session_runs() 用正则匹配同一被试的所有 run 文件,按编号排序,加载时逐 run 拼接。
4. 信号处理:离线 vs 在线,两种引擎
同一个概念——带通滤波,两种实现:
离线 :零相位 filtfilt,信号整体处理,没有相位畸变,精度最高。但必须拿到完整数据才能跑。
1 2 3 4 class OfflineProcessor : def bandpass (self, data, sfreq, l_freq, h_freq, order=4 ): b, a = signal.butter(order, [l_freq, h_freq], btype='band' , fs=sfreq) return signal.filtfilt(b, a, data)
在线 :因果 lfilter,逐 chunk 处理,每个 chunk 的滤波输出依赖前一个 chunk 的尾部状态。状态用 zi/zf 在 chunk 间传递:
1 2 3 4 5 6 7 8 class OnlineProcessor : def bandpass (self, data, l_freq, h_freq, order=4 ): b, a = signal.butter(order, [l_freq, h_freq], btype='band' , fs=self .sfreq) for ch in range (data.shape[0 ]): data[ch], self ._bandpass_zf[ch] = signal.lfilter( b, a, data[ch], zi=self ._bandpass_zf[ch] ) return data
归一化也是两套:离线用全局 z-score,在线用 EMA(指数移动平均)逐 chunk 更新统计量。
这不是过度设计——实时 BCI 必须用因果滤波 。filtfilt 要求完整信号,实时场景拿不到。两个引擎对应两种真实场景。
5. Epoch 提取:事件检测的兜底策略
MNE 提取事件有两种途径:stim 通道或 annotation。不一定哪个有数据,所以做兜底:
1 2 3 4 5 6 def find_events (self, stim_channel=None ): try : events = mne.find_events(self .raw, stim_channel=stim_channel) except ValueError: events, _ = mne.events_from_annotations(self .raw) return events
加了幅值剔除:reject_threshold 超过阈值的 epoch 自动丢弃,返回 EpochStats 告诉你扔了多少。
6. 解码器:插件注册,5 种可插拔
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @register('lda' ) class LDADecoder (Decoder ): ...@register('ssvep' ) class SSVEPDecoder (Decoder ): ...@register('fbcca' ) class FBCCADecoder (Decoder ): ...@register('cnn' ) class CNNDecoder (Decoder ): ...@register('transformer' ) class TransformerDecoder (Decoder ): ...
Decoder 基类统一 fit/predict/predict_proba/save/load 接口。decode() 顶层函数自动做 StratifiedKFold 交叉验证(SSVEP/FBCCA 除外,它们不需要训练)。
7. Pipeline:流式链式调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class BCIPipeline : def load (self, filepath ) -> 'BCIPipeline' : ... return self def preprocess (self ) -> 'BCIPipeline' : ... return self def create_epochs (self, events, event_id ) -> 'BCIPipeline' : ... return self def decode (self ) -> PipelineResult: ... return result
一行跑全链路:
1 result = BCIPipeline(config).load('S001R04.edf' ).preprocess().create_epochs(events, event_id).decode()
8. 实时流式:SlidingWindow + QTimer
这是最有趣的部分。实时 BCI 的核心问题:解码器需要固定长度的窗口,但数据是逐 chunk 来的 。
SlidingWindow 是一个环形缓冲区,解决这个问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class SlidingWindow : def push (self, chunk ): """推入一个 chunk 的数据""" n = chunk.shape[1 ] for i in range (self .n_channels): self ._buf[i, self ._pos : self ._pos + n] = chunk[i] self ._pos = (self ._pos + n) % self .window_size self ._total_pushed += n self ._since_last += n def ready (self ) -> bool : """缓冲区满 且 到了决策间隔""" return (self ._total_pushed >= self .window_size and self ._since_last >= self .decision_interval) def get_window (self ) -> np.ndarray: """从环形缓冲区重建时间顺序""" ...
GUI 的 StreamWorker 用 QTimer 按设定速度从 StreamSource 读 chunk,经过 OnlineProcessor 滤波,推入 SlidingWindow。ready() 时取窗口数据,喂给解码器,把预测结果发信号给 UI。
9. GUI:双 Tab,QThread 防卡死
PyQt6 最容易犯的错:在主线程做耗时操作,界面冻住。解决方案:所有 I/O 和计算都放 QThread。
LoadWorker(QThread):后台加载 EEG 文件,发 load_progress 信号更新进度条
BatchWorker(QThread):后台跑 Pipeline,发 progress 信号(0/20/50/70/100)
StreamWorker(QObject + QTimer):定时推数据,发 chunk_processed / prediction 信号
主线程只负责接收信号、更新控件。信号槽机制天然线程安全。
实时 Tab 的控件:速度调节(0.25×~100×)、滑窗参数、在线滤波参数、加载模型按钮。显示面板四个:波形、频谱、地形图、预测结果。
遇到的坑
ICA 在线不能用 :ICA 需要完整数据做分解,实时场景做不到。在线去伪迹只能用幅值剔除或回归
lfilter 相位延迟 :因果滤波比零相位滤波有延迟,实时系统的解码窗口要留余量
QTimer 精度 :Windows 下 QTimer 精度约 15ms,Linux 下好一些。速度设太高(>50×)实际达不到,chunk 之间会有堆积
EEG 格式陷阱 :EDF 文件的通道名不规范(“EEG Fp1-Ref” vs “Fp1”),加载后要做通道名清洗才能和 10-20 系统对齐
环形缓冲区的 off-by-one :_pos 的取模计算和 get_window() 的顺序重建容易写错边界,测试用例必须覆盖"刚好满"和"超过一圈"两种情况
最终效果
1 2 3 4 5 6 uv run bci data.edf --method mi uv run bci data.edf --method lda uv run bci --gui
支持 4 种格式、5 种解码器、离线+实时双模式、1400 行测试。从零到跑通用 8 周,但最花时间的不是写代码,是理清"离线和在线到底哪里不一样"——想清楚了这个,代码自然就出来了。