背景

RNA-seq 差异表达分析是一条长链路:从原始数据下载 → 差异分析 → 机器学习特征筛选 → 验证 → 富集分析 → 免疫浸润 → 调控网络 → 预后建模 → 自动生成报告,涉及 30+ 分析步骤、70+ 脚本。

主流方案是用 Snakemake 写 rule 来编排。但 Snakemake 的 rule 语法是静态的——rule 之间的依赖关系在 Snakefile 里写死,模块无法随意组合。分析需求千变万化:有人只做差异+富集,有人要做 WGCNA+ML+验证,有人要加单细胞。每次改流程都得手动调 rule,维护成本很高。

我的思路:不写 rule,用 Python 对象动态生成 rule

架构总览

1
2
3
4
5
6
7
8
9
10
11
12
13
YAML 配置 (DAG)


parse_node() ──→ networkx.DiGraph (拓扑排序)


RnaSeq(RuleSet) ──→ 按 topo order 实例化 30+ RuleSet


bound_workflow() ──→ 把 MyRule 树动态注册到 Snakemake Workflow


Snakemake 执行

核心设计三个层次:

  • MyRule:单条 rule 的抽象,声明脚本、参数占位符、输入输出
  • RuleSet:rule 的容器,多个 MyRule 组合为一个分析模块,模块间通过 _export 暴露数据接口
  • RnaSeq:顶层编排器,读 YAML 配置构建 DAG,按拓扑序实例化 RuleSet,最终绑定到 Snakemake

核心设计

1. MyRule:从 Python 对象到 Snakemake rule

传统写法:

1
2
3
4
5
6
7
8
9
10
11
rule diff:
input:
mat="result/input/mat.csv",
map="result/input/map.csv"
output:
gene="result/diff/gene.csv",
mat="result/diff/mat.csv"
params:
pval=0.05, method="limma"
script:
"scripts/diff.R"

我的写法:

1
2
3
4
5
6
7
8
9
class DiffRule(MyRule):
def __init__(self):
super().__init__()
self.script = "diff.R"
self.args = ["{input.mat}", "{input.map}", "{output.gene}", "{output.mat}",
"{params.pval}", "{params.method}"]
self._input = {"mat": ..., "map": ...}
self._output = {"gene": "gene.csv", "mat": "mat.csv"}
self._params = {"pval": 0.05, "method": "limma"}

看起来只是把声明式换成了对象式,但关键区别在于:args 里的 {input.mat} 不是字符串模板,而是声明式标记

add_args()__init_subclass__ 阶段自动解析 args,把 {input.mat} 注册到 _input_key,把 {output.gene} 注册到 _output_key。这样 RuleSet 的构造函数传入什么值,对应的 input/output 就是什么——参数来源由运行时决定,而不是写死

bound_workflow() 被调用时,每个 MyRule 被翻译成 Snakemake 的 RuleInfo 对象,注册到 Workflow 中:

1
2
3
4
5
6
7
def bound_workflow(self, workflow: Workflow):
for r in self.all_rules():
ruleinfo = RuleInfo(r.func())
ruleinfo.input = InOutput([], r.input(), ...)
ruleinfo.output = InOutput([], r.output(), ...)
ruleinfo.params = ([], r.kwparams())
workflow.rule(name=r.name)(ruleinfo)

到这里,Snakemake 看到的就是普通的 rule——但它是由 Python 代码动态生成的。

2. RuleSet + Parameter:模块间数据流解耦

一个分析模块(比如差异分析 DiffSets)包含多条 rule:差异分析、火山图、热图、染色体定位……它们组成一个 RuleSet。

模块间的数据传递通过 _exportParameter 实现:

1
2
3
4
5
6
7
8
9
class DiffSets(RuleSet):
def __init__(self, config, name, in_mat, in_map, in_gene, locate):
super().__init__(config, name)
# in_mat 是上游 Check 模块的 out_mat Parameter
# 直接传入,不需要知道上游的文件路径
diff = DiffRule(in_mat=in_mat, in_map=in_map, ...)
self.append(diff)
# 通过 _export 暴露接口给下游
# 下游模块通过 get_pre_file(config[name]["pre"]["gene"], "out_gene") 获取

Parameter 是带类型的值包装器,支持三种值类型:

  • value:普通值(数字、字符串)
  • path:文件路径(在脚本实例化时自动计算相对路径)
  • json:复杂结构(序列化为 JSON 传给 R 脚本)
1
2
3
4
5
6
7
8
9
class Parameter:
def __init__(self, value, value_type="value", param_type="param", name=None):
# 如果传入的是已有 Parameter,继承其属性
if isinstance(value, Parameter):
self.value = value.value
self.value_type = value_type or value.value_type
else:
self.value = value
self.value_type = value_type

这意味着上游模块只需 _export 一个 Parameter,下游模块直接引用——模块间只通过接口耦合,不通过路径字符串耦合

3. YAML 配置驱动 DAG

这是整个框架的灵魂。用户通过一个 YAML 文件定义分析流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
step: ALL

GSE12345:
analysis: input
gse: GSE12345
platform: GPL570

Diff:
analysis: Diffanalysis
pre:
input: GSE12345
gene: null
locate: 1
pval: 0.05

GO:
analysis: GOKEGG
pre:
gene: Diff

IMMUNE:
analysis: IMMUNE
pre:
mat: Diff
map: Diff
gene: Diff
immune: cibersort

parse_node() 把这个 YAML 解析成 networkx.DiGraph,自动拓扑排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def parse_node(config: dict) -> nx.DiGraph:
dag = nx.DiGraph()
for key, value in config.items():
if key == "step" or key == "global": continue
dag.add_node(key, **value)
pre = value.get("pre")
if pre:
# 根据 pre 字段建边
...
if not nx.is_directed_acyclic_graph(dag):
raise ValueError(f"流程中存在环依赖,无法拓扑排序")
for v, name in enumerate(nx.topological_sort(dag)):
dag.nodes[name]["index"] = v + 1 # 作为输出目录前缀
return dag

拓扑序决定了模块实例化顺序和输出目录编号(1-GSE12345/3-Diff/4-GO/)。

想改流程?改一行 YAML 就行。 要加 WGCNA,加个节点指个 pre;要去掉免疫分析,删掉那个节点。不需要动任何 Python 代码。

4. 脚本实例化:自文档化的输出

每个 rule 执行前,write_src() 会把 R 脚本复制到输出目录,并把所有 {input.xxx} / {output.xxx} / {params.xxx} 替换为实际解析后的路径和值:

1
2
3
4
5
6
7
8
9
10
11
def write_src(self):
cmd = self.shellcmd()
parts = shlex.split(cmd)
filename = os.path.basename(parts[0])
out = Path(self.out_prefix, f"{self.index}-{filename}")
with open(parts[0], "r") as f:
src = f.read()
for i, arg in enumerate(resolved_args):
src = src.replace(f"args[{i+1}]", json.dumps(str(arg)))
with open(out, "w") as f:
f.write(src)

这意味着每个输出目录里都有一份完整的、带实际参数的脚本副本。几个月后回来看结果,打开脚本就知道当时跑了什么参数——不需要翻配置文件,不需要猜。

Snakefile:30 行入口

最终的 Snakefile 只有 30 行:

1
2
3
4
5
6
7
8
9
10
11
12
from rules.index import RnaSeq

container: "env/rnaseq.sif"

config_file = Path("config", "config.yaml")
rnaseq = RnaSeq(config_file)
rnaseq.bound_workflow(workflow)

rule all:
input:
rnaseq.target_out(),
default_target: True

读配置 → 构建对象树 → 绑定 Snakemake → 执行。整个流程的复杂度被封装在 RnaSeqRuleSet 里,Snakefile 只做胶水。

收获

  1. 配置驱动的组合能力:30+ 分析模块通过 YAML DAG 任意组合,新增模块只需写一个 RuleSet 子类 + 注册到 ALL 字典
  2. 自文档化输出:脚本实例化让每个结果目录都是自包含的,可审计、可复现
  3. 动态与静态的平衡:Snakemake 负责执行和依赖管理(它的强项),Python 负责规则构建和模块编排(Snakemake 的弱项)
  4. 教训:框架的抽象层次需要克制。RuleSet 暴露的 _export 接口如果设计不好,下游模块就会反向依赖上游内部细节,失去组合的灵活性

这个思路不只适用于生信——任何需要"模块可组合、流程可配置"的场景,都可以用类似的"Python 对象动态构建 rule"模式。