Python Pandas数据清洗:表达矩阵、差异结果、样本注释

2116 字
11 分钟
Python Pandas数据清洗:表达矩阵、差异结果、样本注释

生信数据清洗常见场景:表达矩阵过滤、差异结果筛选与注释、样本元数据合并。用 Pandas 替代 Excel 可将处理时间从小时级压缩到分钟级。本文通过三个生信场景实操讲解 Pandas:缺失值处理、分组聚合、多表连接,附带 5 个踩坑记录。

实测环境:Debian 13,Python 3.10,Pandas 2.1.4,NumPy 1.26。

1. 环境与数据准备#

Terminal window
# 安装
mamba install -c conda-forge pandas numpy openpyxl -y
import pandas as pd
import numpy as np
from pathlib import Path
# 查看版本
print(f"Pandas: {pd.__version__}")
print(f"NumPy: {np.__version__}")

模拟三份数据#

# 1. 表达矩阵(基因 × 样本,TPM 值)
np.random.seed(42)
genes = [f"GENE{i:04d}" for i in range(2000)]
samples = [f"S{str(i).zfill(2)}" for i in range(1, 13)]
expr_data = np.random.lognormal(mean=2, sigma=1.5, size=(2000, 12))
expr_matrix = pd.DataFrame(
expr_data,
index=genes,
columns=samples
)
# 故意加一些 0 和缺失值
expr_matrix.iloc[0:5, 0:2] = 0
expr_matrix.iloc[10:15, 3] = np.nan
expr_matrix.iloc[100, 5] = np.nan
print(f"Expression matrix shape: {expr_matrix.shape}")
print(expr_matrix.head())
# 2. 差异表达结果
deg_data = {
'gene_id': np.random.choice(genes, 1500, replace=False),
'log2FC': np.random.normal(0, 1.5, 1500),
'pvalue': np.random.uniform(0, 0.5, 1500),
'baseMean': np.random.lognormal(3, 1, 1500)
}
deg_df = pd.DataFrame(deg_data)
deg_df['padj'] = deg_df['pvalue'].apply(lambda x: min(x * len(deg_df), 1.0))
# 故意制造一些混乱:gene_id 有重复、缺失
deg_df.loc[0:10, 'gene_id'] = 'GENE????' # 脏数据
deg_df.loc[20:25, 'padj'] = np.nan
print(f"DEG results shape: {deg_df.shape}")
print(deg_df.head())
# 3. 样本注释表
sample_info = pd.DataFrame({
'sample_id': samples,
'group': ['Control'] * 6 + ['Treatment'] * 6,
'batch': ['A', 'A', 'B', 'B', 'C', 'C'] * 2,
'age': np.random.randint(25, 70, 12),
'sex': np.random.choice(['M', 'F'], 12),
'rin': np.round(np.random.uniform(6.0, 10.0, 12), 1)
})
# 故意制造问题:sample_id 写成大写
sample_info.loc[0, 'sample_id'] = 'S01' # 应该是 S01 但后面要匹配小写...
sample_info.loc[5, 'rin'] = np.nan
print(sample_info)

2. 场景一:表达矩阵清洗#

2.1 缺失值处理——定位和填充#

# 查看缺失值分布
missing_stats = expr_matrix.isnull().sum()
print("Samples with missing values:")
print(missing_stats[missing_stats > 0])
# 查看缺失值百分比
total_cells = expr_matrix.size
total_missing = expr_matrix.isnull().sum().sum()
print(f"Total missing: {total_missing}/{total_cells} ({total_missing/total_cells*100:.2f}%)")

三种常见处理策略:

# 策略1:基因层面——缺失超过 20% 样本就扔掉
gene_missing_pct = expr_matrix.isnull().mean(axis=1)
genes_to_keep = gene_missing_pct[gene_missing_pct < 0.2].index
expr_clean1 = expr_matrix.loc[genes_to_keep]
# 策略2:样本层面——缺失超过 10% 基因的样本扔掉
sample_missing_pct = expr_matrix.isnull().mean(axis=0)
samples_to_keep = sample_missing_pct[sample_missing_pct < 0.1].index
expr_clean2 = expr_matrix[samples_to_keep]
# 策略3:填充——用基因的中位数填充(比均值更稳健)
expr_filled = expr_matrix.T.fillna(expr_matrix.median(axis=1)).T
# 注意 .T 转置两次:fillna 默认按列填充,转置后按行填充

为什么用中位数不用均值? 表达数据通常服从对数正态分布,均值受极端值(极高表达基因)影响大:

Zgene=log2(TPM+1)μgeneσgeneZ_{gene} = \frac{\log_2(TPM + 1) - \mu_{gene}}{\sigma_{gene}}

如果 μgene\mu_{gene} 用均值估计,一个异常高表达的样本会拉偏整个基因的归一化。中位数(median)对异常值稳健。

2.2 低表达基因过滤#

# 过滤条件:至少在 3 个样本中 TPM > 1
# 与 DESeq2 的 pre-filtering 逻辑一致
expr_tpm = expr_filled.copy()
min_samples = 3
min_expression = 1.0
keep_genes = (expr_tpm > min_expression).sum(axis=1) >= min_samples
expr_filtered = expr_tpm.loc[keep_genes]
print(f"Before filtering: {expr_tpm.shape[0]} genes")
print(f"After filtering: {expr_filtered.shape[0]} genes")
print(f"Removed: {expr_tpm.shape[0] - expr_filtered.shape[0]} genes")

2.3 归一化——log2 转换#

# 生信表达矩阵标准操作:TPM/FPKM → log2(TPM + 1)
# 加 1 防止 log2(0) = -inf
expr_log2 = np.log2(expr_filtered + 1)
# 验证分布是否改善
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].hist(expr_filtered.values.flatten(), bins=100, color='steelblue', alpha=0.7)
axes[0].set_title('Before log2 transform')
axes[0].set_xlabel('TPM')
axes[1].hist(expr_log2.values.flatten(), bins=100, color='coral', alpha=0.7)
axes[1].set_title('After log2 transform')
axes[1].set_xlabel('log2(TPM+1)')
plt.tight_layout()
plt.savefig('log2_distribution.png', dpi=150)

log2 转换的价值:

X=log2(TPM+1)X' = \log_2(TPM + 1)

这个变换把指数分布拉成正态近似——大多数下游统计方法(t 检验、Pearson 相关、PCA)都假设正态性。

3. 场景二:差异表达结果清洗与注释#

3.1 数据质量检查#

# 查看基本信息
print(deg_df.info())
print(deg_df.describe())
# 检查重复
duplicates = deg_df[deg_df['gene_id'].duplicated(keep=False)]
print(f"Duplicate gene_ids: {len(duplicates)}")
# 检查异常值(pvalue 不应该 >1)
bad_pval = deg_df[deg_df['pvalue'] > 1]
print(f"pvalue > 1: {len(bad_pval)} rows")
# 检查缺失 padj(差异分析没做完?)
missing_padj = deg_df[deg_df['padj'].isnull()]
print(f"Missing padj: {len(missing_padj)} rows")

3.2 筛选差异基因#

# 经典阈值:|log2FC| > 1 且 padj < 0.05
sig_degs = deg_df[
(deg_df['padj'] < 0.05) &
(deg_df['log2FC'].abs() > 1)
].copy()
# 添加方向标签
sig_degs['direction'] = np.where(sig_degs['log2FC'] > 0, 'Up', 'Down')
print(f"Significant DEGs: {len(sig_degs)}")
print(sig_degs['direction'].value_counts())

3.3 多条件筛选(Pandas query)#

# 用 query() 方法写复杂筛选——比链式索引快且更可读
top_up = deg_df.query('padj < 0.01 and log2FC > 2 and baseMean > 100')
top_down = deg_df.query('padj < 0.01 and log2FC < -2 and baseMean > 100')
# 合并
top_degs = pd.concat([top_up, top_down])
print(f"Top DEGs: {len(top_degs)}")
print(top_degs[['gene_id', 'log2FC', 'padj']].head(10))

3.4 合并基因注释#

# 模拟基因注释表(gene_id → gene_symbol, chr, biotype)
annot = pd.DataFrame({
'gene_id': genes,
'gene_symbol': [f"SYMBOL{i:04d}" for i in range(2000)],
'chr': np.random.choice([f"chr{c}" for c in range(1, 23)], 2000),
'biotype': np.random.choice(['protein_coding', 'lncRNA', 'pseudogene'], 2000, p=[0.7, 0.2, 0.1])
})
# LEFT JOIN:保留所有差异基因,补上注释
sig_annotated = sig_degs.merge(
annot,
on='gene_id',
how='left' # 左连接:保留所有差异基因
)
# 检查哪些基因没匹配到注释(通常是 gene_id 格式不一致)
unmatched = sig_annotated[sig_annotated['gene_symbol'].isnull()]
print(f"Unmatched genes: {len(unmatched)}")
if len(unmatched) > 0:
print(unmatched[['gene_id']].head())

merge 的四种方式:

how含义适用场景
inner交集只要两边都有的
left保留左边全部保留所有差异基因
right保留右边全部保留所有注释项
outer并集全面检查缺失

3.5 按 biotype 分组统计#

# groupby + agg:分组聚合
biotype_stats = sig_annotated.groupby('biotype').agg(
count=('gene_id', 'count'),
mean_log2FC=('log2FC', 'mean'),
up_count=('direction', lambda x: (x == 'Up').sum()),
down_count=('direction', lambda x: (x == 'Down').sum())
).reset_index()
biotype_stats['pct'] = (biotype_stats['count'] / biotype_stats['count'].sum() * 100).round(1)
print(biotype_stats)

4. 场景三:样本注释表合并与质控#

4.1 统一样本名#

# 样本名不一致是最大的坑。先把所有样本名标准化
expr_matrix.columns = expr_matrix.columns.str.strip().str.upper()
sample_info['sample_id'] = sample_info['sample_id'].str.strip().str.upper()
# 验证一致性
expr_samples = set(expr_matrix.columns)
info_samples = set(sample_info['sample_id'])
print(f"In expression matrix, not in info: {expr_samples - info_samples}")
print(f"In info, not in expression matrix: {info_samples - expr_samples}")

4.2 表达矩阵与样本表对齐#

# 把表达矩阵转置(样本 × 基因),然后 merge 样本信息
expr_long = expr_log2.T # 转置:样本为行
expr_long.index.name = 'sample_id'
expr_long = expr_long.reset_index()
# 合并
full_data = expr_long.merge(
sample_info,
on='sample_id',
how='inner' # 只要两边都有的样本
)
print(f"Matched samples: {len(full_data)}")
# 把 sample_id 设为 index,方便后续分析
full_data = full_data.set_index('sample_id')

4.3 按分组计算均值#

# 分组计算每个基因的平均表达
gene_cols = [c for c in full_data.columns if c.startswith('GENE')]
group_means = full_data.groupby('group')[gene_cols].mean()
print(group_means.shape) # (2 组, N 基因)
print(group_means.iloc[:, :5]) # 前 5 个基因

4.4 PCA 数据准备(为 sklearn 准备数据)#

from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
# 提取表达矩阵部分
X = full_data[gene_cols].values
# 标准化(PCA 前必须)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# PCA
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)
# 做成 DataFrame
pca_df = pd.DataFrame(X_pca, columns=['PC1', 'PC2'], index=full_data.index)
pca_df = pca_df.join(full_data[['group', 'batch']])
# 查看方差解释率
explained_var = pca.explained_variance_ratio_
print(f"PC1: {explained_var[0]*100:.1f}%, PC2: {explained_var[1]*100:.1f}%")

方差解释率的计算:

Variance Explainedk=λki=1nλi×100%\text{Variance Explained}_k = \frac{\lambda_k}{\sum_{i=1}^{n} \lambda_i} \times 100\%

其中 λk\lambda_k 是第 kk 个主成分的特征值(eigenvalue)。PC1 + PC2 之和 > 50% 说明前两个主成分捕获了大部分变异——分组效应强。

5. 完整数据清洗 Pipeline#

def clean_expression_pipeline(expr_df, sample_df, min_expression=1.0, min_samples=3):
"""
表达矩阵清洗 pipeline
"""
# 1. 统一列名
expr_df.columns = expr_df.columns.str.strip().str.upper()
sample_df['sample_id'] = sample_df['sample_id'].str.strip().str.upper()
# 2. 样本交集
common_samples = list(set(expr_df.columns) & set(sample_df['sample_id']))
expr_df = expr_df[common_samples]
sample_df = sample_df[sample_df['sample_id'].isin(common_samples)]
# 3. 填充缺失值(中位数)
expr_df = expr_df.T.fillna(expr_df.median(axis=1)).T
# 4. 过滤低表达基因
keep = (expr_df > min_expression).sum(axis=1) >= min_samples
expr_df = expr_df.loc[keep]
# 5. log2 转换
expr_log2 = np.log2(expr_df + 1)
print(f"Pipeline complete: {expr_log2.shape[0]} genes × {expr_log2.shape[1]} samples")
return expr_log2, sample_df
# 运行
expr_clean, meta_clean = clean_expression_pipeline(expr_matrix, sample_info)
print(f"Cleaned: {expr_clean.shape}")

6. 踩坑记录#

坑1:SettingWithCopyWarning——Pandas 最烦人的警告#

# 会触发警告的写法
sig_degs = deg_df[deg_df['padj'] < 0.05]
sig_degs['direction'] = 'Up' # SettingWithCopyWarning!
# 正确:用 .copy()
sig_degs = deg_df[deg_df['padj'] < 0.05].copy()
sig_degs['direction'] = 'Up' # 安静

原因: Pandas 的链式索引返回的可能是 view(视图)也可能是 copy(副本)。改 view 没问题,改 copy 不会影响原始 DataFrame——Pandas 不确定你想要哪种,所以报警告。.copy() 明确告诉 Pandas:“我就要副本”。

坑2:merge 后行数暴增(多对多 join)#

# 如果 gene_id 在两边都有重复...
print(deg_df['gene_id'].duplicated().sum()) # 如果有重复
print(annot['gene_id'].duplicated().sum()) # 也有重复
# merge 后行数可能是 N_deg × N_annot ——爆炸!
# 解决:merge 前去重
annot_unique = annot.drop_duplicates(subset='gene_id')
deg_unique = deg_df.drop_duplicates(subset='gene_id')
result = deg_unique.merge(annot_unique, on='gene_id', how='left')

坑3:np.nan == np.nan 返回 False#

import math
print(np.nan == np.nan) # False!
# 正确检测 NaN
print(pd.isna(np.nan)) # True
print(np.isnan(np.nan)) # True
# 在 DataFrame 中:
deg_df['padj'].isna().sum() # 这才是对的
# deg_df['padj'] == np.nan # 永远是 False!

这是 IEEE 754 浮点规范定义的——NaN 不等于任何值,包括它自己。

坑4:groupby 之后结果类型混乱#

# groupby 返回的可能是 Series 也可能是 DataFrame,取决于你之后链了什么
result = deg_df.groupby('gene_id')['log2FC'].mean()
# -> Series
result = deg_df.groupby('gene_id')[['log2FC', 'padj']].mean()
# -> DataFrame(因为用了双层方括号)
# 不确定类型时,强制转 DataFrame:
result = deg_df.groupby('gene_id')['log2FC'].mean().reset_index()
# 现在确定是 DataFrame

坑5:CSV 读取时自动把基因名转成日期#

# 如果你的表达矩阵第一列基因名包含 "SEPT1"、"MARCH1" 这样的...
df = pd.read_csv('expression.csv', index_col=0)
# SEPT1 被 Pandas 自动识别为 "1-Sep"(9 月 1 日)!
# 解决:读入时强制所有列为字符串
df = pd.read_csv('expression.csv', index_col=0, dtype=str)
# 然后再转换数值列
df = df.apply(pd.to_numeric, errors='coerce')

这个坑是用 Pandas 的经典地狱时刻——你的基因列表突然变成了日期列表,而且不会报任何错。


本文于 2025-11-18 在 Debian 13 + Python 3.10 + Pandas 2.1.4 上实测。

文章分享

如果这篇文章对你有帮助,欢迎分享给更多人!

Python Pandas数据清洗:表达矩阵、差异结果、样本注释
https://fg.ink/posts/python-pandas-bioinfo-clean/
作者
风观
发布于
2025-11-18
许可协议
CC BY-NC-SA 4.0
Profile Image of the Author
风观
风有来路,观有所思
分类
标签
站点统计
文章
52
分类
1
标签
38
总字数
64,085
运行时长
0
最后活动
0 天前

文章目录