Python Pandas数据清洗:表达矩阵、差异结果、样本注释
2116 字
11 分钟
Python Pandas数据清洗:表达矩阵、差异结果、样本注释
生信数据清洗常见场景:表达矩阵过滤、差异结果筛选与注释、样本元数据合并。用 Pandas 替代 Excel 可将处理时间从小时级压缩到分钟级。本文通过三个生信场景实操讲解 Pandas:缺失值处理、分组聚合、多表连接,附带 5 个踩坑记录。
实测环境:Debian 13,Python 3.10,Pandas 2.1.4,NumPy 1.26。
1. 环境与数据准备
# 安装mamba install -c conda-forge pandas numpy openpyxl -yimport pandas as pdimport numpy as npfrom pathlib import Path
# 查看版本print(f"Pandas: {pd.__version__}")print(f"NumPy: {np.__version__}")模拟三份数据
# 1. 表达矩阵(基因 × 样本,TPM 值)np.random.seed(42)genes = [f"GENE{i:04d}" for i in range(2000)]samples = [f"S{str(i).zfill(2)}" for i in range(1, 13)]
expr_data = np.random.lognormal(mean=2, sigma=1.5, size=(2000, 12))expr_matrix = pd.DataFrame( expr_data, index=genes, columns=samples)# 故意加一些 0 和缺失值expr_matrix.iloc[0:5, 0:2] = 0expr_matrix.iloc[10:15, 3] = np.nanexpr_matrix.iloc[100, 5] = np.nan
print(f"Expression matrix shape: {expr_matrix.shape}")print(expr_matrix.head())# 2. 差异表达结果deg_data = { 'gene_id': np.random.choice(genes, 1500, replace=False), 'log2FC': np.random.normal(0, 1.5, 1500), 'pvalue': np.random.uniform(0, 0.5, 1500), 'baseMean': np.random.lognormal(3, 1, 1500)}deg_df = pd.DataFrame(deg_data)deg_df['padj'] = deg_df['pvalue'].apply(lambda x: min(x * len(deg_df), 1.0))# 故意制造一些混乱:gene_id 有重复、缺失deg_df.loc[0:10, 'gene_id'] = 'GENE????' # 脏数据deg_df.loc[20:25, 'padj'] = np.nan
print(f"DEG results shape: {deg_df.shape}")print(deg_df.head())# 3. 样本注释表sample_info = pd.DataFrame({ 'sample_id': samples, 'group': ['Control'] * 6 + ['Treatment'] * 6, 'batch': ['A', 'A', 'B', 'B', 'C', 'C'] * 2, 'age': np.random.randint(25, 70, 12), 'sex': np.random.choice(['M', 'F'], 12), 'rin': np.round(np.random.uniform(6.0, 10.0, 12), 1)})# 故意制造问题:sample_id 写成大写sample_info.loc[0, 'sample_id'] = 'S01' # 应该是 S01 但后面要匹配小写...sample_info.loc[5, 'rin'] = np.nan
print(sample_info)2. 场景一:表达矩阵清洗
2.1 缺失值处理——定位和填充
# 查看缺失值分布missing_stats = expr_matrix.isnull().sum()print("Samples with missing values:")print(missing_stats[missing_stats > 0])
# 查看缺失值百分比total_cells = expr_matrix.sizetotal_missing = expr_matrix.isnull().sum().sum()print(f"Total missing: {total_missing}/{total_cells} ({total_missing/total_cells*100:.2f}%)")三种常见处理策略:
# 策略1:基因层面——缺失超过 20% 样本就扔掉gene_missing_pct = expr_matrix.isnull().mean(axis=1)genes_to_keep = gene_missing_pct[gene_missing_pct < 0.2].indexexpr_clean1 = expr_matrix.loc[genes_to_keep]
# 策略2:样本层面——缺失超过 10% 基因的样本扔掉sample_missing_pct = expr_matrix.isnull().mean(axis=0)samples_to_keep = sample_missing_pct[sample_missing_pct < 0.1].indexexpr_clean2 = expr_matrix[samples_to_keep]
# 策略3:填充——用基因的中位数填充(比均值更稳健)expr_filled = expr_matrix.T.fillna(expr_matrix.median(axis=1)).T# 注意 .T 转置两次:fillna 默认按列填充,转置后按行填充为什么用中位数不用均值? 表达数据通常服从对数正态分布,均值受极端值(极高表达基因)影响大:
如果 用均值估计,一个异常高表达的样本会拉偏整个基因的归一化。中位数(median)对异常值稳健。
2.2 低表达基因过滤
# 过滤条件:至少在 3 个样本中 TPM > 1# 与 DESeq2 的 pre-filtering 逻辑一致expr_tpm = expr_filled.copy()min_samples = 3min_expression = 1.0
keep_genes = (expr_tpm > min_expression).sum(axis=1) >= min_samplesexpr_filtered = expr_tpm.loc[keep_genes]
print(f"Before filtering: {expr_tpm.shape[0]} genes")print(f"After filtering: {expr_filtered.shape[0]} genes")print(f"Removed: {expr_tpm.shape[0] - expr_filtered.shape[0]} genes")2.3 归一化——log2 转换
# 生信表达矩阵标准操作:TPM/FPKM → log2(TPM + 1)# 加 1 防止 log2(0) = -infexpr_log2 = np.log2(expr_filtered + 1)
# 验证分布是否改善import matplotlib.pyplot as pltfig, axes = plt.subplots(1, 2, figsize=(12, 4))axes[0].hist(expr_filtered.values.flatten(), bins=100, color='steelblue', alpha=0.7)axes[0].set_title('Before log2 transform')axes[0].set_xlabel('TPM')axes[1].hist(expr_log2.values.flatten(), bins=100, color='coral', alpha=0.7)axes[1].set_title('After log2 transform')axes[1].set_xlabel('log2(TPM+1)')plt.tight_layout()plt.savefig('log2_distribution.png', dpi=150)log2 转换的价值:
这个变换把指数分布拉成正态近似——大多数下游统计方法(t 检验、Pearson 相关、PCA)都假设正态性。
3. 场景二:差异表达结果清洗与注释
3.1 数据质量检查
# 查看基本信息print(deg_df.info())print(deg_df.describe())
# 检查重复duplicates = deg_df[deg_df['gene_id'].duplicated(keep=False)]print(f"Duplicate gene_ids: {len(duplicates)}")
# 检查异常值(pvalue 不应该 >1)bad_pval = deg_df[deg_df['pvalue'] > 1]print(f"pvalue > 1: {len(bad_pval)} rows")
# 检查缺失 padj(差异分析没做完?)missing_padj = deg_df[deg_df['padj'].isnull()]print(f"Missing padj: {len(missing_padj)} rows")3.2 筛选差异基因
# 经典阈值:|log2FC| > 1 且 padj < 0.05sig_degs = deg_df[ (deg_df['padj'] < 0.05) & (deg_df['log2FC'].abs() > 1)].copy()
# 添加方向标签sig_degs['direction'] = np.where(sig_degs['log2FC'] > 0, 'Up', 'Down')
print(f"Significant DEGs: {len(sig_degs)}")print(sig_degs['direction'].value_counts())3.3 多条件筛选(Pandas query)
# 用 query() 方法写复杂筛选——比链式索引快且更可读top_up = deg_df.query('padj < 0.01 and log2FC > 2 and baseMean > 100')top_down = deg_df.query('padj < 0.01 and log2FC < -2 and baseMean > 100')
# 合并top_degs = pd.concat([top_up, top_down])print(f"Top DEGs: {len(top_degs)}")print(top_degs[['gene_id', 'log2FC', 'padj']].head(10))3.4 合并基因注释
# 模拟基因注释表(gene_id → gene_symbol, chr, biotype)annot = pd.DataFrame({ 'gene_id': genes, 'gene_symbol': [f"SYMBOL{i:04d}" for i in range(2000)], 'chr': np.random.choice([f"chr{c}" for c in range(1, 23)], 2000), 'biotype': np.random.choice(['protein_coding', 'lncRNA', 'pseudogene'], 2000, p=[0.7, 0.2, 0.1])})
# LEFT JOIN:保留所有差异基因,补上注释sig_annotated = sig_degs.merge( annot, on='gene_id', how='left' # 左连接:保留所有差异基因)
# 检查哪些基因没匹配到注释(通常是 gene_id 格式不一致)unmatched = sig_annotated[sig_annotated['gene_symbol'].isnull()]print(f"Unmatched genes: {len(unmatched)}")if len(unmatched) > 0: print(unmatched[['gene_id']].head())merge 的四种方式:
| how | 含义 | 适用场景 |
|---|---|---|
inner | 交集 | 只要两边都有的 |
left | 保留左边全部 | 保留所有差异基因 |
right | 保留右边全部 | 保留所有注释项 |
outer | 并集 | 全面检查缺失 |
3.5 按 biotype 分组统计
# groupby + agg:分组聚合biotype_stats = sig_annotated.groupby('biotype').agg( count=('gene_id', 'count'), mean_log2FC=('log2FC', 'mean'), up_count=('direction', lambda x: (x == 'Up').sum()), down_count=('direction', lambda x: (x == 'Down').sum())).reset_index()
biotype_stats['pct'] = (biotype_stats['count'] / biotype_stats['count'].sum() * 100).round(1)print(biotype_stats)4. 场景三:样本注释表合并与质控
4.1 统一样本名
# 样本名不一致是最大的坑。先把所有样本名标准化expr_matrix.columns = expr_matrix.columns.str.strip().str.upper()sample_info['sample_id'] = sample_info['sample_id'].str.strip().str.upper()
# 验证一致性expr_samples = set(expr_matrix.columns)info_samples = set(sample_info['sample_id'])
print(f"In expression matrix, not in info: {expr_samples - info_samples}")print(f"In info, not in expression matrix: {info_samples - expr_samples}")4.2 表达矩阵与样本表对齐
# 把表达矩阵转置(样本 × 基因),然后 merge 样本信息expr_long = expr_log2.T # 转置:样本为行expr_long.index.name = 'sample_id'expr_long = expr_long.reset_index()
# 合并full_data = expr_long.merge( sample_info, on='sample_id', how='inner' # 只要两边都有的样本)
print(f"Matched samples: {len(full_data)}")
# 把 sample_id 设为 index,方便后续分析full_data = full_data.set_index('sample_id')4.3 按分组计算均值
# 分组计算每个基因的平均表达gene_cols = [c for c in full_data.columns if c.startswith('GENE')]group_means = full_data.groupby('group')[gene_cols].mean()
print(group_means.shape) # (2 组, N 基因)print(group_means.iloc[:, :5]) # 前 5 个基因4.4 PCA 数据准备(为 sklearn 准备数据)
from sklearn.decomposition import PCAfrom sklearn.preprocessing import StandardScaler
# 提取表达矩阵部分X = full_data[gene_cols].values
# 标准化(PCA 前必须)scaler = StandardScaler()X_scaled = scaler.fit_transform(X)
# PCApca = PCA(n_components=2)X_pca = pca.fit_transform(X_scaled)
# 做成 DataFramepca_df = pd.DataFrame(X_pca, columns=['PC1', 'PC2'], index=full_data.index)pca_df = pca_df.join(full_data[['group', 'batch']])
# 查看方差解释率explained_var = pca.explained_variance_ratio_print(f"PC1: {explained_var[0]*100:.1f}%, PC2: {explained_var[1]*100:.1f}%")方差解释率的计算:
其中 是第 个主成分的特征值(eigenvalue)。PC1 + PC2 之和 > 50% 说明前两个主成分捕获了大部分变异——分组效应强。
5. 完整数据清洗 Pipeline
def clean_expression_pipeline(expr_df, sample_df, min_expression=1.0, min_samples=3): """ 表达矩阵清洗 pipeline """ # 1. 统一列名 expr_df.columns = expr_df.columns.str.strip().str.upper() sample_df['sample_id'] = sample_df['sample_id'].str.strip().str.upper()
# 2. 样本交集 common_samples = list(set(expr_df.columns) & set(sample_df['sample_id'])) expr_df = expr_df[common_samples] sample_df = sample_df[sample_df['sample_id'].isin(common_samples)]
# 3. 填充缺失值(中位数) expr_df = expr_df.T.fillna(expr_df.median(axis=1)).T
# 4. 过滤低表达基因 keep = (expr_df > min_expression).sum(axis=1) >= min_samples expr_df = expr_df.loc[keep]
# 5. log2 转换 expr_log2 = np.log2(expr_df + 1)
print(f"Pipeline complete: {expr_log2.shape[0]} genes × {expr_log2.shape[1]} samples") return expr_log2, sample_df
# 运行expr_clean, meta_clean = clean_expression_pipeline(expr_matrix, sample_info)print(f"Cleaned: {expr_clean.shape}")6. 踩坑记录
坑1:SettingWithCopyWarning——Pandas 最烦人的警告
# 会触发警告的写法sig_degs = deg_df[deg_df['padj'] < 0.05]sig_degs['direction'] = 'Up' # SettingWithCopyWarning!
# 正确:用 .copy()sig_degs = deg_df[deg_df['padj'] < 0.05].copy()sig_degs['direction'] = 'Up' # 安静原因: Pandas 的链式索引返回的可能是 view(视图)也可能是 copy(副本)。改 view 没问题,改 copy 不会影响原始 DataFrame——Pandas 不确定你想要哪种,所以报警告。.copy() 明确告诉 Pandas:“我就要副本”。
坑2:merge 后行数暴增(多对多 join)
# 如果 gene_id 在两边都有重复...print(deg_df['gene_id'].duplicated().sum()) # 如果有重复print(annot['gene_id'].duplicated().sum()) # 也有重复# merge 后行数可能是 N_deg × N_annot ——爆炸!
# 解决:merge 前去重annot_unique = annot.drop_duplicates(subset='gene_id')deg_unique = deg_df.drop_duplicates(subset='gene_id')result = deg_unique.merge(annot_unique, on='gene_id', how='left')坑3:np.nan == np.nan 返回 False
import mathprint(np.nan == np.nan) # False!
# 正确检测 NaNprint(pd.isna(np.nan)) # Trueprint(np.isnan(np.nan)) # True
# 在 DataFrame 中:deg_df['padj'].isna().sum() # 这才是对的# deg_df['padj'] == np.nan # 永远是 False!这是 IEEE 754 浮点规范定义的——NaN 不等于任何值,包括它自己。
坑4:groupby 之后结果类型混乱
# groupby 返回的可能是 Series 也可能是 DataFrame,取决于你之后链了什么result = deg_df.groupby('gene_id')['log2FC'].mean()# -> Series
result = deg_df.groupby('gene_id')[['log2FC', 'padj']].mean()# -> DataFrame(因为用了双层方括号)
# 不确定类型时,强制转 DataFrame:result = deg_df.groupby('gene_id')['log2FC'].mean().reset_index()# 现在确定是 DataFrame坑5:CSV 读取时自动把基因名转成日期
# 如果你的表达矩阵第一列基因名包含 "SEPT1"、"MARCH1" 这样的...df = pd.read_csv('expression.csv', index_col=0)# SEPT1 被 Pandas 自动识别为 "1-Sep"(9 月 1 日)!
# 解决:读入时强制所有列为字符串df = pd.read_csv('expression.csv', index_col=0, dtype=str)
# 然后再转换数值列df = df.apply(pd.to_numeric, errors='coerce')这个坑是用 Pandas 的经典地狱时刻——你的基因列表突然变成了日期列表,而且不会报任何错。
本文于 2025-11-18 在 Debian 13 + Python 3.10 + Pandas 2.1.4 上实测。
文章分享
如果这篇文章对你有帮助,欢迎分享给更多人!
Python Pandas数据清洗:表达矩阵、差异结果、样本注释
https://fg.ink/posts/python-pandas-bioinfo-clean/ 相关文章 智能推荐
1
jq与csvkit:JSON/CSV数据处理
技术 jq处理NCBI/Ensembl/ENA等生信API返回的JSON数据,csvkit处理CSV/TSV元数据表格。过滤、转换、统计、合并一步到位。Debian 13实测,含6个踩坑记录
2
Biopython序列处理:文件读写与NCBI数据获取
技术 Biopython核心模块实操:SeqIO读FASTA/FASTQ、Seq对象序列操作、Entrez从NCBI获取数据、SeqRecord对象、多序列比对文件解析。含7个生信高频场景代码
3
生信自学路线图:从Linux基础到独立分析
技术 生信学习路线:Linux基础→Conda环境→数据获取→质控比对→差异分析→可视化→可重复性。附每个阶段的学习资源和避坑指南
4
R语言tidyverse:dplyr/ggplot2/tidyr数据清洗
技术 dplyr数据操作、ggplot2基础绘图、tidyr长宽转换、readr读写、stringr字符串处理。生信表达矩阵、差异结果、样本注释表的日常操作全解。Debian实测
5
ENSEMBL BioMart批量数据导出:REST API与biomaRt
技术 BioMart REST API + biomaRt R包批量导出基因注释:GO/KEGG/InterPro/PFAM/跨物种同源基因。Python+R双语言实现,含7个踩坑记录
随机文章 随机推荐