不满 Snakemake 的静态 rule?我用 Python 对象动态构建了一条 RNA-seq 流水线 2025-06-07T16:00:00.000Z 2026-06-08T08:10:13.865Z China
背景
RNA-seq 差异表达分析是一条长链路:从原始数据下载 → 差异分析 → 机器学习特征筛选 → 验证 → 富集分析 → 免疫浸润 → 调控网络 → 预后建模 → 自动生成报告,涉及 30+ 分析步骤、70+ 脚本。
主流方案是用 Snakemake 写 rule 来编排。但 Snakemake 的 rule 语法是静态的——rule 之间的依赖关系在 Snakefile 里写死,模块无法随意组合。分析需求千变万化:有人只做差异+富集,有人要做 WGCNA+ML+验证,有人要加单细胞。每次改流程都得手动调 rule,维护成本很高。
我的思路:不写 rule,用 Python 对象动态生成 rule 。
架构总览
1 2 3 4 5 6 7 8 9 10 11 12 13 YAML 配置 (DAG) │ ▼ parse_node() ──→ networkx.DiGraph (拓扑排序) │ ▼ RnaSeq(RuleSet) ──→ 按 topo order 实例化 30+ RuleSet │ ▼ bound_workflow() ──→ 把 MyRule 树动态注册到 Snakemake Workflow │ ▼ Snakemake 执行
核心设计三个层次:
MyRule :单条 rule 的抽象,声明脚本、参数占位符、输入输出
RuleSet :rule 的容器,多个 MyRule 组合为一个分析模块,模块间通过 _export 暴露数据接口
RnaSeq :顶层编排器,读 YAML 配置构建 DAG,按拓扑序实例化 RuleSet,最终绑定到 Snakemake
核心设计
1. MyRule:从 Python 对象到 Snakemake rule
传统写法:
1 2 3 4 5 6 7 8 9 10 11 rule diff: input : mat="result/input/mat.csv" , map ="result/input/map.csv" output: gene="result/diff/gene.csv" , mat="result/diff/mat.csv" params: pval=0.05 , method="limma" script: "scripts/diff.R"
我的写法:
1 2 3 4 5 6 7 8 9 class DiffRule (MyRule ): def __init__ (self ): super ().__init__() self .script = "diff.R" self .args = ["{input.mat}" , "{input.map}" , "{output.gene}" , "{output.mat}" , "{params.pval}" , "{params.method}" ] self ._input = {"mat" : ..., "map" : ...} self ._output = {"gene" : "gene.csv" , "mat" : "mat.csv" } self ._params = {"pval" : 0.05 , "method" : "limma" }
看起来只是把声明式换成了对象式,但关键区别在于:args 里的 {input.mat} 不是字符串模板,而是声明式标记 。
add_args() 在 __init_subclass__ 阶段自动解析 args,把 {input.mat} 注册到 _input_key,把 {output.gene} 注册到 _output_key。这样 RuleSet 的构造函数传入什么值,对应的 input/output 就是什么——参数来源由运行时决定,而不是写死 。
当 bound_workflow() 被调用时,每个 MyRule 被翻译成 Snakemake 的 RuleInfo 对象,注册到 Workflow 中:
1 2 3 4 5 6 7 def bound_workflow (self, workflow: Workflow ): for r in self .all_rules(): ruleinfo = RuleInfo(r.func()) ruleinfo.input = InOutput([], r.input (), ...) ruleinfo.output = InOutput([], r.output(), ...) ruleinfo.params = ([], r.kwparams()) workflow.rule(name=r.name)(ruleinfo)
到这里,Snakemake 看到的就是普通的 rule——但它是由 Python 代码动态生成的。
2. RuleSet + Parameter:模块间数据流解耦
一个分析模块(比如差异分析 DiffSets)包含多条 rule:差异分析、火山图、热图、染色体定位……它们组成一个 RuleSet。
模块间的数据传递通过 _export 和 Parameter 实现:
1 2 3 4 5 6 7 8 9 class DiffSets (RuleSet ): def __init__ (self, config, name, in_mat, in_map, in_gene, locate ): super ().__init__(config, name) diff = DiffRule(in_mat=in_mat, in_map=in_map, ...) self .append(diff)
Parameter 是带类型的值包装器,支持三种值类型:
value:普通值(数字、字符串)
path:文件路径(在脚本实例化时自动计算相对路径)
json:复杂结构(序列化为 JSON 传给 R 脚本)
1 2 3 4 5 6 7 8 9 class Parameter : def __init__ (self, value, value_type="value" , param_type="param" , name=None ): if isinstance (value, Parameter): self .value = value.value self .value_type = value_type or value.value_type else : self .value = value self .value_type = value_type
这意味着上游模块只需 _export 一个 Parameter,下游模块直接引用——模块间只通过接口耦合,不通过路径字符串耦合 。
3. YAML 配置驱动 DAG
这是整个框架的灵魂。用户通过一个 YAML 文件定义分析流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 step: ALL GSE12345: analysis: input gse: GSE12345 platform: GPL570 Diff: analysis: Diffanalysis pre: input: GSE12345 gene: null locate: 1 pval: 0.05 GO: analysis: GOKEGG pre: gene: Diff IMMUNE: analysis: IMMUNE pre: mat: Diff map: Diff gene: Diff immune: cibersort
parse_node() 把这个 YAML 解析成 networkx.DiGraph,自动拓扑排序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def parse_node (config: dict ) -> nx.DiGraph: dag = nx.DiGraph() for key, value in config.items(): if key == "step" or key == "global" : continue dag.add_node(key, **value) pre = value.get("pre" ) if pre: ... if not nx.is_directed_acyclic_graph(dag): raise ValueError(f"流程中存在环依赖,无法拓扑排序" ) for v, name in enumerate (nx.topological_sort(dag)): dag.nodes[name]["index" ] = v + 1 return dag
拓扑序决定了模块实例化顺序和输出目录编号(1-GSE12345/、3-Diff/、4-GO/)。
想改流程?改一行 YAML 就行。 要加 WGCNA,加个节点指个 pre;要去掉免疫分析,删掉那个节点。不需要动任何 Python 代码。
4. 脚本实例化:自文档化的输出
每个 rule 执行前,write_src() 会把 R 脚本复制到输出目录,并把所有 {input.xxx} / {output.xxx} / {params.xxx} 替换为实际解析后的路径和值:
1 2 3 4 5 6 7 8 9 10 11 def write_src (self ): cmd = self .shellcmd() parts = shlex.split(cmd) filename = os.path.basename(parts[0 ]) out = Path(self .out_prefix, f"{self.index} -{filename} " ) with open (parts[0 ], "r" ) as f: src = f.read() for i, arg in enumerate (resolved_args): src = src.replace(f"args[{i+1 } ]" , json.dumps(str (arg))) with open (out, "w" ) as f: f.write(src)
这意味着每个输出目录里都有一份完整的、带实际参数的脚本副本 。几个月后回来看结果,打开脚本就知道当时跑了什么参数——不需要翻配置文件,不需要猜。
Snakefile:30 行入口
最终的 Snakefile 只有 30 行:
1 2 3 4 5 6 7 8 9 10 11 12 from rules.index import RnaSeqcontainer: "env/rnaseq.sif" config_file = Path("config" , "config.yaml" ) rnaseq = RnaSeq(config_file) rnaseq.bound_workflow(workflow) rule all : input : rnaseq.target_out(), default_target: True
读配置 → 构建对象树 → 绑定 Snakemake → 执行。整个流程的复杂度被封装在 RnaSeq 和 RuleSet 里,Snakefile 只做胶水。
收获
配置驱动的组合能力 :30+ 分析模块通过 YAML DAG 任意组合,新增模块只需写一个 RuleSet 子类 + 注册到 ALL 字典
自文档化输出 :脚本实例化让每个结果目录都是自包含的,可审计、可复现
动态与静态的平衡 :Snakemake 负责执行和依赖管理(它的强项),Python 负责规则构建和模块编排(Snakemake 的弱项)
教训 :框架的抽象层次需要克制。RuleSet 暴露的 _export 接口如果设计不好,下游模块就会反向依赖上游内部细节,失去组合的灵活性
这个思路不只适用于生信——任何需要"模块可组合、流程可配置"的场景,都可以用类似的"Python 对象动态构建 rule"模式。