#!/usr/bin/env python3 """ 公司认定方案清洗 v2 — 完整提取 3工作簿 × 2类工作表 表1-专项施工方案 (危大) → certified_schemes_detail.csv 表2-I II Ⅲ类技术方案 → certified_tech_schemes_detail.csv 输入: 附件1:技术方案(含专项施工方案)编制计划表 (3个xlsx) 输出: dashboard/data/认定数据/2026/ (年度固定目录) """ import pandas as pd import openpyxl from pathlib import Path import json import difflib from datetime import datetime # ═══ CONFIG ═══ RAW_DIR = Path("/mnt/y/WorkingEmail/OA收文_Incoming") CERT_SRC_DIR = RAW_DIR / "2026-03-16_关于公布公司2026年度技术方案编制计划的通知" OUT_DIR = Path("/mnt/y/Openclaw_Hub/03.资源/实施项目 wiki/dashboard/data/认定数据/2026") OUT_DIR.mkdir(parents=True, exist_ok=True) CERT_FILES = [ CERT_SRC_DIR / "附件1:技术方案(含专项施工方案)编制计划表-建筑类项目.xlsx", CERT_SRC_DIR / "附件1:技术方案(含专项施工方案)编制计划表-非建筑轨道类.xlsx", CERT_SRC_DIR / "附件1:技术方案(含专项施工方案)编制计划表--轨道类项目.xlsx", ] def parse_date(val): """Excel序列号 or datetime or string → YYYY-MM-DD""" if val is None: return '' if isinstance(val, datetime): return val.strftime('%Y-%m-%d') if isinstance(val, (int, float)) and val > 40000: # Excel serial date (days since 1899-12-30) from datetime import timedelta return (datetime(1899, 12, 30) + timedelta(days=int(val))).strftime('%Y-%m-%d') s = str(val).strip() if not s or s in ('/', '-', 'None', 'nan'): return '' # Try common formats for fmt in ['%Y-%m-%d', '%Y-%m-%d %H:%M:%S', '%Y.%m.%d', '%Y/%m/%d']: try: return datetime.strptime(s[:10], '%Y-%m-%d' if '-' in s[:10] else fmt).strftime('%Y-%m-%d') except: continue # Fallback: just take first 10 chars return s[:10] if len(s) >= 10 else s # ══════════════════════════════════════════ # 表1-专项施工方案(危大方案) # ══════════════════════════════════════════ print("=" * 60) print("📖 表1-专项施工方案(3文件合并)...") print("=" * 60) all_schemes = [] seen_schemes = set() # 跨文件去重: (项目,方案,编制单位) for fpath in CERT_FILES: if not fpath.exists(): print(f" ⚠️ 跳过: {fpath.name}") continue wb = openpyxl.load_workbook(fpath, data_only=True) if '表1-专项施工方案' not in wb.sheetnames: print(f" ⚠️ {fpath.name}: 无 表1-专项施工方案") wb.close() continue ws = wb['表1-专项施工方案'] file_count = 0 for row in ws.iter_rows(min_row=3, max_row=ws.max_row, values_only=True): region = str(row[1]).strip() if row[1] else '' if region != '中东': continue proj = str(row[3]).strip() if row[3] else '' scheme = str(row[6]).strip() if row[6] else '' if not scheme or scheme in ('/', '-', 'nan', 'None'): continue unit = str(row[7]).strip() if row[7] else '' # 跨文件去重 — 同一(项目,方案,编制单位)只保留首次出现 key = (proj, scheme, unit) if key in seen_schemes: continue seen_schemes.add(key) all_schemes.append({ '序号': str(row[0]).strip() if row[0] else '', '数据来源': fpath.name, '工作表': '表1-专项施工方案', '所属区域': region, '所属国别': str(row[2]).strip() if row[2] else '', '项目名称': proj, '方案名称': scheme, '编制单位': str(row[7]).strip() if row[7] else '', '工程类别': str(row[8]).strip() if row[8] else '', '分部工程类别': str(row[9]).strip() if row[9] else '', '方案等级': '危大', # 表1 全部是危大方案 '是否超一定规模': str(row[11]).strip() if len(row) > 11 and row[11] else '', '工程特点/说明': str(row[12]).strip() if len(row) > 12 and row[12] else '', '计划开工日期': parse_date(row[13]) if len(row) > 13 else '', '方案计划报审日期': parse_date(row[14]) if len(row) > 14 else '', }) file_count += 1 wb.close() print(f" {fpath.name}: {file_count} 中东条目") df_schemes = pd.DataFrame(all_schemes) print(f"\n 表1 中东方案合计: {len(df_schemes)}") print(f" 超规: {(df_schemes['是否超一定规模']=='是').sum()}, " f"一般: {(df_schemes['是否超一定规模']!='是').sum()}") print(f" 覆盖项目: {df_schemes['项目名称'].nunique()}") # ══════════════════════════════════════════ # 表2-I II Ⅲ类技术方案 # ══════════════════════════════════════════ print("\n" + "=" * 60) print("📖 表2-I II Ⅲ类技术方案(3文件合并)...") print("=" * 60) all_tech = [] seen_tech = set() # 跨文件去重: (项目,方案,编制单位) for fpath in CERT_FILES: if not fpath.exists(): continue wb = openpyxl.load_workbook(fpath, data_only=True) sheet_name = None for sn in wb.sheetnames: if '表2' in sn and '技术方案' in sn: sheet_name = sn break if not sheet_name: print(f" ⚠️ {fpath.name}: 无 表2 类工作表") wb.close() continue ws = wb[sheet_name] file_count = 0 for row in ws.iter_rows(min_row=3, max_row=ws.max_row, values_only=True): region = str(row[1]).strip() if row[1] else '' if region != '中东': continue proj = str(row[3]).strip() if row[3] else '' scheme = str(row[6]).strip() if row[6] else '' if not scheme or scheme in ('/', '-', 'nan', 'None'): continue unit = str(row[7]).strip() if row[7] else '' key = (proj, scheme, unit) if key in seen_tech: continue seen_tech.add(key) all_tech.append({ '序号': str(row[0]).strip() if row[0] else '', '数据来源': fpath.name, '工作表': sheet_name, '所属区域': region, '所属国别': str(row[2]).strip() if row[2] else '', '项目名称': proj, '方案名称': scheme, '编制单位': str(row[7]).strip() if row[7] else '', '工程类别': str(row[8]).strip() if row[8] else '', '分部工程类别': '', # 表2无此列 '方案等级': str(row[10]).strip() if len(row) > 10 and row[10] else '', # Ⅰ/Ⅱ/Ⅲ '是否超一定规模': '', # 表2无此列 '工程特点/说明': str(row[9]).strip() if len(row) > 9 and row[9] else '', '计划开工日期': parse_date(row[11]) if len(row) > 11 else '', '方案计划报审日期': parse_date(row[13]) if len(row) > 13 else '', }) file_count += 1 wb.close() print(f" {fpath.name} [{sheet_name}]: {file_count} 中东条目") df_tech = pd.DataFrame(all_tech) print(f"\n 表2 中东方案合计: {len(df_tech)}") print(f" 等级分布: {df_tech['方案等级'].value_counts().to_dict()}") print(f" 覆盖项目: {df_tech['项目名称'].nunique()}") # ══════════════════════════════════════════ # 合计统计 # ══════════════════════════════════════════ print("\n" + "=" * 60) print("📊 汇总统计") print("=" * 60) print(f" 表1 危大方案: {len(df_schemes)} 项(超规{(df_schemes['是否超一定规模']=='是').sum()})") print(f" 表2 技术方案: {len(df_tech)} 项(Ⅰ/Ⅱ/Ⅲ类)") print(f" 认定总计: {len(df_schemes) + len(df_tech)} 项") all_projects = set(df_schemes['项目名称'].unique()) | set(df_tech['项目名称'].unique()) print(f" 覆盖项目: {len(all_projects)}") # ══════════════════════════════════════════ # 项目级汇总(表1 vs OA平台) # ══════════════════════════════════════════ df_proj = df_schemes.groupby('项目名称').agg( 认定_危大方案总数=('方案名称', 'count'), 认定_超规数=('是否超一定规模', lambda x: (x == '是').sum()), ).reset_index() country_map = df_schemes.groupby('项目名称')['所属国别'].first().to_dict() df_proj['所属国别'] = df_proj['项目名称'].map(country_map) PLATFORM_PARQUET = Path("/mnt/y/Openclaw_Hub/03.资源/实施项目 wiki/dashboard/data/2026-05-24/cleaned/methods_cleaned.parquet") if PLATFORM_PARQUET.exists(): df_platform = pd.read_parquet(PLATFORM_PARQUET) df_platform_valid = df_platform[df_platform['是否有效登记'] == True] platform_counts = df_platform_valid.groupby('项目名称').agg( 平台_方案总数=('方案名称', 'count'), 平台_超规数=('是否超一定规模', lambda x: (x == '是').sum()), ).reset_index() matched = [] for _, cert_row in df_proj.iterrows(): cert_name = cert_row['项目名称'] cert_clean = cert_name.replace('工程项目', '').replace('项目', '').strip() best_match = None; best_ratio = 0 for plat_name in platform_counts['项目名称']: plat_clean = str(plat_name).replace('工程项目', '').replace('项目', '').strip() ratio = difflib.SequenceMatcher(None, cert_clean, plat_clean).ratio() if cert_clean in plat_clean or plat_clean in cert_clean: ratio = max(ratio, 0.95) if ratio > best_ratio: best_ratio = ratio; best_match = plat_name row_dict = cert_row.to_dict() # 认定一般数 = 总数 - 超规 row_dict['认定_一般数'] = row_dict['认定_危大方案总数'] - row_dict['认定_超规数'] if best_match and best_ratio >= 0.75: prow = platform_counts[platform_counts['项目名称'] == best_match].iloc[0] row_dict['平台_匹配项目'] = best_match row_dict['平台_方案总数'] = int(prow['平台_方案总数']) row_dict['平台_超规数'] = int(prow['平台_超规数']) row_dict['平台_一般数'] = int(prow['平台_方案总数']) - int(prow['平台_超规数']) row_dict['差额_超规'] = int(prow['平台_超规数']) - row_dict['认定_超规数'] row_dict['差额_一般'] = row_dict['平台_一般数'] - row_dict['认定_一般数'] row_dict['差额_合计'] = int(prow['平台_方案总数']) - row_dict['认定_危大方案总数'] row_dict['匹配状态'] = '✅' else: row_dict['平台_匹配项目'] = '' row_dict['平台_方案总数'] = 0 row_dict['平台_超规数'] = 0 row_dict['平台_一般数'] = 0 row_dict['差额_超规'] = -row_dict['认定_超规数'] row_dict['差额_一般'] = -row_dict['认定_一般数'] row_dict['差额_合计'] = -row_dict['认定_危大方案总数'] row_dict['匹配状态'] = '⚠️ 未匹配' matched.append(row_dict) df_result = pd.DataFrame(matched) print(f"\n 对标: {len(df_result)} 项目, 匹配 {(df_result['匹配状态']=='✅').sum()}") else: df_result = df_proj.copy() print(" ⚠️ 未找到平台 Parquet,跳过对标") # ══════════════════════════════════════════ # 输出 # ══════════════════════════════════════════ # 表1明细 df_schemes.to_csv(OUT_DIR / "certified_schemes_detail.csv", index=False, encoding='utf-8-sig') df_schemes.to_parquet(OUT_DIR / "certified_schemes_detail.parquet", index=False) print(f"\n✅ 表1方案明细: {OUT_DIR / 'certified_schemes_detail.csv'} ({len(df_schemes)}行)") # 表2明细 (NEW) df_tech.to_csv(OUT_DIR / "certified_tech_schemes_detail.csv", index=False, encoding='utf-8-sig') df_tech.to_parquet(OUT_DIR / "certified_tech_schemes_detail.parquet", index=False) print(f"✅ 表2技术方案明细: {OUT_DIR / 'certified_tech_schemes_detail.csv'} ({len(df_tech)}行)") # 项目级对比 df_result.to_csv(OUT_DIR / "certified_schemes.csv", index=False, encoding='utf-8-sig') df_result.to_parquet(OUT_DIR / "certified_schemes.parquet", index=False) print(f"✅ 项目对比: {OUT_DIR / 'certified_schemes.csv'} ({len(df_result)}行)") # 校验报告 report = { "表1_危大方案": len(df_schemes), "表1_超规": int((df_schemes['是否超一定规模'] == '是').sum()), "表2_技术方案": len(df_tech), "表2_等级分布": df_tech['方案等级'].value_counts().to_dict(), "认定总计": len(df_schemes) + len(df_tech), "覆盖项目": len(all_projects), "数据来源": [f.name for f in CERT_FILES], } with open(OUT_DIR / "certified_validation.json", 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2, default=str) print(f"✅ 校验报告: {OUT_DIR / 'certified_validation.json'}")