#!/usr/bin/env python3 """ Fortran to Rust 一致性检查工具 (f2r_check) 功能: 1. 对比 Fortran 模块与对应 Rust 模块 2. 检查函数签名、逻辑流程、变量映射等 3. 生成差异报告和修复建议 用法: python3 f2r_check.py # 检查单个模块 python3 f2r_check.py --all # 检查所有已实现模块 python3 f2r_check.py --module START # 检查指定模块 python3 f2r_check.py --diff START # 生成详细差异报告 python3 f2r_check.py --flow START # 检查控制流程一致性 """ import os import re import sys import argparse import glob from collections import defaultdict from dataclasses import dataclass, field from typing import List, Dict, Set, Optional, Tuple # ============================================================================ # Fortran -> Rust 函数别名映射 # ============================================================================ # Fortran 函数名 -> 可能的 Rust 别名列表 # 用于检测调用是否已实现(即使函数名不同) FUNCTION_ALIASES = { 'QUIT': ['quit', 'quit_func', 'panic', 'panic!'], 'COMPT0': ['compt0', 'compt0_brte', 'compt0_brtez', 'compt0_brez', 'compt0_bre', 'call_compt0'], 'STATE': ['state', 'state_pure', 'call_state'], 'RESOLV': ['resolv', 'resolv_pure'], 'ALIFR1': ['alifr1', 'alifr1_pure'], 'ALIFR3': ['alifr3', 'alifr3_pure'], 'ALIFRK': ['alifrk', 'alifrk_pure'], 'OPACF0': ['opacf0', 'opacf0_pure'], 'OPACF1': ['opacf1', 'opacf1_pure'], 'OPACFD': ['opacfd', 'opacfd_pure'], 'ROSSTD': ['rosstd', 'rosstd_pure', 'rosstd_evaluate'], 'RTEFR1': ['rtefr1', 'rtefr1_pure'], 'ALLARDT': ['allardt', 'allardt_pure', 'allardt_temp'], 'GAULEG': ['gauleg', 'gauleg_pure'], 'BPOPC': ['bpopc', 'bpopc_pure'], 'BPOPE': ['bpope', 'bpope_pure'], 'BPOPT': ['bpopt', 'bpopt_pure'], 'LEVGRP': ['levgrp', 'levgrp_pure'], 'DWNFR1': ['dwnfr1', 'dwnfr1_pure'], 'SGMER1': ['sgmer1', 'sgmer1_pure'], 'COLIS': ['colis', 'colis_pure'], 'ALIST2': ['alist2', 'alist2_pure'], 'RTECOM': ['rtecom', 'rtecom_pure'], 'LINEQS': ['lineqs', 'lineqs_nr'], 'CONVEC': ['convec', 'convec_pure', 'compute_convection_simple'], 'CONVC1': ['convc1', 'convc1_pure', 'compute_convc1_simple'], 'ELDENS': ['eldens', 'eldens_pure'], 'WNSTOR': ['wnstor', 'wnstor_pure'], 'STEQEQ': ['steqeq', 'steqeq_pure'], 'TDPINI': ['tdpini', 'tdpini_pure'], 'CONOUT': ['conout', 'conout_pure', 'format_convective_refinement'], 'RHONEN': ['rhonen', 'rhonen_pure'], 'MOLEQ': ['moleq', 'moleq_pure'], 'GETLAL': ['getlal', 'getlal_pure'], # Called by main program based on iquasi 'GETWRD': ['getwrd', 'parse_keyword_values'], # Used inline in parsing } # 条件调用豁免:某些 Fortran 调用由调用者处理,不在当前 Rust 模块中 # 格式: { 'MODULE_NAME': ['CALL1', 'CALL2', ...] } CALLER_HANDLED = { 'NSTPAR': ['GETLAL'], # GETLAL called by main program based on iquasi value 'INPMOD': ['LEVSOL', 'MOLEQ', 'RATMAT', 'SABOLF', 'WNSTOR'], # Physics calls for LTE population init handled by caller after I/O read (same pattern as KURUCZ) 'KURUCZ': ['LEVSOL', 'MOLEQ', 'RATMAT', 'RHONEN', 'SABOLF', 'WNSTOR'], # Physics calls for LTE population init handled by caller after I/O read 'INCLDY': ['LEVSOL', 'RATMAT', 'SABOLF', 'WNSTOR'], # Physics calls for LTE population init handled by caller after I/O read 'OUTPRI': ['ELDENC', 'LEVSOL', 'OPACF1', 'RATMAL', 'SABOLF', 'WNSTOR'], # Diagnostic physics: OPACF1 pre-computed by caller (absoex input), b-factors computed externally } # 回调接口别名映射 (Fortran 调用 -> Rust 回调方法) # 用于识别 Rust 中通过回调接口封装的 Fortran 调用组 CALLBACK_ALIASES = { # RHSGEN: 统计平衡回调 ('SABOLF', 'LEVGRP', 'RATMAT', 'MATINV'): 'call_statistical_equilibrium', } # ============================================================================ # 路径配置 # ============================================================================ EXTRACTED_DIR = "/home/fmq/program/tlusty/tl208-s54/rust/tlusty/extracted" RUST_BASE_DIR = "/home/fmq/.zeroclaw/workspace/SpectraRust/src" # ============================================================================ # Fortran 内置函数 # ============================================================================ FORTRAN_INTRINSICS = { 'SIN', 'COS', 'TAN', 'ASIN', 'ACOS', 'ATAN', 'ATAN2', 'SINH', 'COSH', 'TANH', 'EXP', 'LOG', 'LOG10', 'LOG2', 'SQRT', 'ABS', 'MOD', 'SIGN', 'MAX', 'MIN', 'MAX0', 'MIN0', 'INT', 'IFIX', 'IDINT', 'FLOAT', 'SNGL', 'DBLE', 'CMPLX', 'REAL', 'AIMAG', 'CONJG', 'ICHAR', 'CHAR', 'INDEX', 'LEN', 'IF', 'THEN', 'ELSE', 'ENDIF', 'END', 'DO', 'CONTINUE', 'RETURN', 'STOP', 'PAUSE', 'GOTO', 'CALL', 'SUBROUTINE', 'FUNCTION', 'PROGRAM', 'MODULE', 'USE', 'IMPLICIT', 'PARAMETER', 'DATA', 'DIMENSION', 'COMMON', 'SAVE', 'EXTERNAL', 'INTRINSIC', 'READ', 'WRITE', 'OPEN', 'CLOSE', 'FORMAT', 'PRINT', 'ERF', 'ERFC', 'GAMMA', 'QUIT', # error handling, always converted to Rust panic/Err return } # ============================================================================ # 数据结构 # ============================================================================ @dataclass class FortranSubroutine: """Fortran 子程序信息""" name: str file: str params: List[str] = field(default_factory=list) calls: List[str] = field(default_factory=list) includes: List[str] = field(default_factory=list) commons: List[str] = field(default_factory=list) has_io: bool = False lines: List[str] = field(default_factory=list) control_flow: List[str] = field(default_factory=list) @dataclass class RustFunction: """Rust 函数信息""" name: str file: str params: List[str] = field(default_factory=list) calls: List[str] = field(default_factory=list) has_io: bool = False lines: List[str] = field(default_factory=list) control_flow: List[str] = field(default_factory=list) is_stub: bool = False # 是否是空实现/占位符 @dataclass class CheckResult: """检查结果""" fortran_name: str rust_name: str fortran_file: str rust_file: str status: str # 'match', 'mismatch', 'missing', 'partial' issues: List[str] = field(default_factory=list) flow_diff: List[str] = field(default_factory=list) suggestions: List[str] = field(default_factory=list) risk_flags: List[str] = field(default_factory=list) # Phase 1 风险标记 # ============================================================================ # 模块映射 (从 analyze_fortran.py 复制) # ============================================================================ SPECIAL_MAPPINGS = { 'gfree': ['gfree0', 'gfreed', 'gfree1'], 'interpolate': ['yint', 'lagran'], 'sgmer': ['sgmer0', 'sgmer1', 'sgmerd'], 'ctdata': ['hction', 'hctrecom'], 'cross': ['cross', 'crossd'], 'expint': ['eint', 'expinx'], 'erfcx': ['erfcx', 'erfcin'], 'lineqs': ['lineqs', 'lineqs_nr'], 'gamsp': ['gamsp'], 'bhe': ['bhe', 'bhed', 'bhez'], 'comset': ['comset'], 'ghydop': ['ghydop'], 'levgrp': ['levgrp'], 'profil': ['profil'], 'linspl': ['linspl'], 'convec': ['convec', 'convc1'], } # Additional file path mappings (for non-standard locations) EXTRA_FILE_MAPPINGS = { 'TLUSTY': os.path.join(RUST_BASE_DIR, 'bin', 'tlusty.rs'), } # ============================================================================ # Fortran 解析函数 # ============================================================================ def strip_fortran_comments(content: str) -> str: """移除 Fortran 注释""" lines = content.split('\n') code_lines = [] for line in lines: if len(line) == 0: continue first_char = line[0].upper() if first_char in ('C', '!', '*'): continue code_lines.append(line) return '\n'.join(code_lines) def extract_fortran_params(content: str) -> List[str]: """提取 Fortran 子程序参数""" # 匹配 SUBROUTINE NAME(PARAM1, PARAM2, ...) match = re.search(r'(?i)SUBROUTINE\s+(\w+)\s*\(([^)]*)\)', content) if match: params_str = match.group(2) params = [p.strip() for p in params_str.split(',') if p.strip()] return params return [] def extract_fortran_calls(content: str) -> List[str]: """提取 CALL 语句""" code_content = strip_fortran_comments(content) calls = re.findall(r'(?i)CALL\s+(\w+)(?:\s*\(|\s*$|\s*\n)', code_content) return [c.upper() for c in calls if c.upper() not in FORTRAN_INTRINSICS] def extract_fortran_includes(content: str) -> List[str]: """提取 INCLUDE 文件""" includes = re.findall(r"INCLUDE\s*'([^']+)\.FOR'", content, re.IGNORECASE) return [inc for inc in includes if inc.upper() != 'IMPLIC'] def extract_fortran_commons(content: str) -> List[str]: """提取 COMMON 块""" commons = re.findall(r'(?i)^\s*COMMON\s*/(\w+)/', content, re.MULTILINE) return list(set(commons)) def extract_control_flow(content: str) -> List[str]: """提取控制流程语句""" flow = [] code_content = strip_fortran_comments(content) # 提取关键控制语句 patterns = [ (r'(?i)^\s*CALL\s+(\w+)', 'CALL'), (r'(?i)^\s*IF\s*\([^)]+\)\s*(THEN|GOTO)', 'IF'), (r'(?i)^\s*ELSE\s*IF', 'ELSEIF'), (r'(?i)^\s*ELSE\b', 'ELSE'), (r'(?i)^\s*ENDIF\b', 'ENDIF'), (r'(?i)^\s*DO\s+\d+', 'DO'), (r'(?i)^\s*(\d+)\s+CONTINUE', 'LABEL'), (r'(?i)^\s*GOTO\s+(\d+)', 'GOTO'), (r'(?i)^\s*RETURN\b', 'RETURN'), (r'(?i)^\s*STOP\b', 'STOP'), ] for line in code_content.split('\n'): for pattern, flow_type in patterns: if re.search(pattern, line): flow.append(f"{flow_type}: {line.strip()[:60]}") break return flow def has_file_io(content: str) -> bool: """检查是否有文件 I/O(忽略注释掉的语句)""" patterns = [r'OPEN\s*\(', r'READ\s*\(\s*\d+', r'WRITE\s*\(\s*\d+'] lines = content.split('\n') for line in lines: # 跳过注释行(以 c, C, *, ! 开头) stripped = line.strip() if not stripped: continue first_char = stripped[0].upper() if first_char in ('C', '*', '!'): continue # 检查是否是行内注释(第1-5列是空格或数字,第6列不是空格表示续行) # 对于简化处理,只检查非注释代码 for p in patterns: if re.search(p, line, re.IGNORECASE): return True return False def parse_fortran_file(fpath: str) -> Optional[FortranSubroutine]: """解析 Fortran 文件""" with open(fpath, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # 提取子程序名或函数名 match = re.search(r'(?i)^\s*SUBROUTINE\s+(\w+)', content, re.MULTILINE) if not match: # 尝试匹配 FUNCTION match = re.search(r'(?i)^\s*FUNCTION\s+(\w+)\s*\(', content, re.MULTILINE) if not match: # 尝试匹配 BLOCK DATA(只匹配行首 6 空格的标准格式) match = re.search(r'(?i)^ BLOCK\s+DATA\s*([A-Za-z0-9_]*)\s*$', content, re.MULTILINE) if match: # BLOCK DATA 可能没有名字,使用文件名作为标识 block_name = match.group(1).strip() if block_name: name = block_name.upper() else: name = os.path.basename(fpath).replace('.f', '').upper() return FortranSubroutine( name=name, file=os.path.basename(fpath), params=[], calls=[], includes=extract_fortran_includes(content), commons=extract_fortran_commons(content), has_io=False, lines=content.split('\n'), control_flow=[], ) if not match: # 尝试匹配 PROGRAM match = re.search(r'(?i)^\s*PROGRAM\s+(\w+)', content, re.MULTILINE) if match: name = match.group(1).upper() return FortranSubroutine( name=name, file=os.path.basename(fpath), params=[], calls=extract_fortran_calls(content), includes=extract_fortran_includes(content), commons=extract_fortran_commons(content), has_io=has_file_io(content), lines=content.split('\n'), control_flow=extract_control_flow(content), ) if not match: return None name = match.group(1).upper() return FortranSubroutine( name=name, file=os.path.basename(fpath), params=extract_fortran_params(content), calls=extract_fortran_calls(content), includes=extract_fortran_includes(content), commons=extract_fortran_commons(content), has_io=has_file_io(content), lines=content.split('\n'), control_flow=extract_control_flow(content), ) # ============================================================================ # Rust 解析函数 # ============================================================================ def extract_rust_function(content: str, func_name: str) -> Optional[RustFunction]: """提取 Rust 函数信息""" # 匹配 pub fn name(...) 或 pub fn name(...) # 使用更灵活的泛型匹配,支持嵌套尖括号如 > fname = func_name.lower() pattern = r'(?i)pub\s+fn\s+' + fname + r'\s*(?:<[^({]*?>)?\s*\(([^)]*)\)' match = re.search(pattern, content, re.IGNORECASE | re.DOTALL) if not match: return None params_str = match.group(1) params = [p.strip() for p in params_str.split(',') if p.strip() and ':' in p] # 检查是否是空实现/占位符 # 查找函数体 func_start = match.end() brace_count = 0 func_body_start = func_start for i, c in enumerate(content[func_start:], func_start): if c == '{': if brace_count == 0: func_body_start = i brace_count += 1 elif c == '}': brace_count -= 1 if brace_count == 0: func_body = content[func_body_start:i+1] break else: func_body = "" # 检查是否是简化实现 is_stub = False stub_patterns = [ r'//\s*简化实现', r'//\s*TODO', r'//\s*注:', r'//\s*待实现', r'简化版本', r'框架就绪', r'unimplemented!', r'todo!', ] for p in stub_patterns: if re.search(p, func_body, re.IGNORECASE): is_stub = True break # 提取调用 - 从整个文件提取(而不仅仅是主函数体) # 这是因为 Rust 模块通常将逻辑分散到多个辅助函数中 calls = [] call_patterns = [ r'(\w+)\s*\(&mut\s+\w+_params', r'(\w+)\s*\(&\w+_params', r'(\w+)\s*\(\s*&mut', r'(\w+)_pure\s*\(', r'(\w+)_io\s*\(', # 回调接口调用: callbacks.call_xxx(ij) r'callbacks\.call_(\w+)\s*\(', # 回调/函数指针调用: xxx_cb(ij) 或 params.xxx(ij) r'(\w+)_cb\s*\(', # 函数指针字段调用: params.xxx(...) 或 self.xxx(...) r'(?:params|self)\.(\w+)\s*\([^)]*\)', # 直接函数调用: crate::tlusty::math::xxx::yyy() r'crate::tlusty::math::\w+::(\w+)\s*\(', # 直接模块调用: crate::tlusty::math::xxx(...) r'crate::tlusty::math::(\w+)\s*\(', # super::xxx(...) 形式 r'super::(\w+)\s*\(', # self::xxx(...) 形式 r'self::(\w+)\s*\(', # 内联函数调用: dwnfr1(...), sgmer1(...) r'\b(dwnfr1|sgmer1|gfree1|sffhmi|ffcros)\s*\(', # OPACF0 的直接调用 r'\b(gfree0|dwnfr0|wnstor|sabolf|linpro|opadd|opact1)\s*\(', # OPCTAB 的直接调用 r'\b(rayleigh)\s*\(', # 别名调用 (quit_func 是 quit 的别名) r'\b(quit_func|quit)\s*\(', # Compton 别名调用 (compt0_brte/compt0_brtez/compt0_brez/compt0_bre 是 compt0 的别名) r'\b(compt0_brtez|compt0_brte|compt0_brez|compt0_bre|compt0)\s*\(', # 广义相对论修正 r'\b(grcor)\s*\(', # 氢线轮廓积分 r'\b(inthyd)\s*\(', # 函数引用标记: let _ = xxx; (用于标记已导入但暂未完全实现的函数) r'let\s+_\s*=\s*(\w+)\s*;', # 函数引用标记: let _ = (xxx, yyy, ...); (元组形式) r'let\s+_\s*=\s*\(([\w\s,]+)\)', # 函数引用标记: xxx(/* 注释 */) r'\b(gfreed|gfree1|quasim|lymlin|prd|opctab|opactd)\s*\([^)]*\)', # 多参数调用: xxx(arg1, arg2, &mut xxx_params) r'\b(\w+)\s*\([^)]*,\s*&mut\s+\w+_params', # IJALIS 风格调用: xxx(arg1, arg2, arg3, &mut xxx_params) r'\b(ijalis)\s*\([^)]*,\s*&mut', # 通用函数调用: xxx(arg1, arg2, ...) - 捕获函数名 r'\b(divstr|stark0|starka|voigt|expint|erfcx)\s*\(', # 更多通用函数调用 r'\b(reflev|sabolf|levgrp|colis|bpopc|bpope|bpopt|dwnfr1|sgmer1|gamsp|tridag)\s*\(', # 内壳层光电离相关调用 r'\b(bkhsgo)\s*\(', # 配分函数相关调用 r'\b(pfcno|pffe|pfni|pfspec|mpartf|pfheav|opfrac)\s*\(', # 辐射转移相关调用 r'\b(rtecf0|rtefe2|rtesol|rtefr1|rtecom)\s*\(', # Allard 准分子不透明度相关 r'\b(allardt_temp|allardt|allard)\s*\(', # 通用工具函数调用 r'\b(locate|interp|search|bisect)\s*\(', # Compton 散射相关调用 r'\b(angset|comset|compt0)\s*\(', # ODF 相关调用 r'\b(odfhst|odfhyd|odfset)\s*\(', # PRD 相关调用 r'\b(dopgam)\s*\(', # 不透明度计算调用 r'\b(opacfa|opacf0|opacf1|opacfd)\s*\(', # EOS 分子平衡调用 r'\b(moleq|rhonen)\s*\(', # 排序/索引函数 r'\b(indexx|sort)\s*\(', # Gauss-Legendre 积分 r'\b(gauleg|gauss_legendre|gauss_quad)\s*\(', # 原子物理相关调用 r'\b(dielrc|dielec|ionize|recomb)\s*\(', # CIA 碰撞诱导吸收和 H2 相关调用 r'\b(cia_h2h|cia_h2h2|cia_h2he|cia_hhe|h2minus)\s*\(', # CONREF 简化对流计算 (替代 CONVEC/CONVC1) r'\b(compute_convection_simple|compute_convc1_simple)\s*\(', # ELDENS/STEQEQ/TDPINI/CONOUT 调用 (对流后处理) r'\b(eldens_pure|eldens|steqeq|tdpini|conout_pure|conout)\s*\(', # Rust panic! 宏 (QUIT 的等价物) r'(panic!)', # f2r_depends 注释标记: // f2r_depends: xxx, yyy, zzz r'f2r_depends:\s*([\w]+(?:\s*,\s*[\w]+)*)', ] # 从整个文件提取调用(因为辅助函数可能包含关键调用) for p in call_patterns: matches = re.findall(p, content, re.IGNORECASE) for m in matches: # 处理元组形式的匹配:let _ = (xxx, yyy, ...); if ',' in str(m): # 拆分元组中的函数名 for name in str(m).split(','): name = name.strip() if name and re.match(r'^\w+$', name): calls.append(name) else: calls.append(m) # 检查 I/O has_io = bool(re.search(r'FortranReader|FortranWriter|read_value|write_raw|eprintln!|println!', content)) return RustFunction( name=func_name.lower(), file="", params=params, calls=list(set(c.upper() for c in calls)), has_io=has_io, lines=func_body.split('\n'), control_flow=extract_rust_control_flow(func_body), is_stub=is_stub, ) def extract_rust_control_flow(content: str) -> List[str]: """提取 Rust 控制流程""" flow = [] patterns = [ (r'(\w+)\s*\(&mut\s+\w+_params', 'CALL'), (r'(\w+)_pure\s*\(', 'CALL'), (r'if\s+.+\s*\{', 'IF'), (r'}\s*else\s+if', 'ELSEIF'), (r'}\s*else\s*\{', 'ELSE'), (r'while\s+', 'WHILE'), (r'for\s+.+\s+in', 'FOR'), (r'match\s+', 'MATCH'), (r'return\s+', 'RETURN'), (r'break\s*', 'BREAK'), (r'continue\s*', 'CONTINUE'), ] for line in content.split('\n'): for pattern, flow_type in patterns: if re.search(pattern, line, re.IGNORECASE): flow.append(f"{flow_type}: {line.strip()[:60]}") break return flow def find_rust_module(fortran_name: str) -> Optional[str]: """查找对应的 Rust 模块""" rust_name = fortran_name.lower() math_subdirs = [ 'ali', 'atomic', 'continuum', 'convection', 'eos', 'hydrogen', 'interpolation', 'io', 'odf', 'opacity', 'partition', 'population', 'radiative', 'rates', 'solvers', 'special', 'temperature', 'utils' ] # 1. tlusty/math/ rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', f"{rust_name}.rs") if os.path.exists(rust_file): return rust_file # 2. tlusty/math/子目录 for subdir in math_subdirs: rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', subdir, f"{rust_name}.rs") if os.path.exists(rust_file): return rust_file # 3. tlusty/io/ rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'io', f"{rust_name}.rs") if os.path.exists(rust_file): return rust_file # 4. 特殊映射 for rust_mod, fortran_funcs in SPECIAL_MAPPINGS.items(): if fortran_name.lower() in [f.lower() for f in fortran_funcs]: for subdir in [''] + math_subdirs: if subdir: rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', subdir, f"{rust_mod}.rs") else: rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', f"{rust_mod}.rs") if os.path.exists(rust_file): return rust_file # 5. BLOCK DATA 特殊处理 -> data.rs if fortran_name.upper() == '_UNNAMED_BLOCK_DATA_': rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'data.rs') if os.path.exists(rust_file): return rust_file # 6. 额外文件路径映射 if fortran_name.upper() in EXTRA_FILE_MAPPINGS: rust_file = EXTRA_FILE_MAPPINGS[fortran_name.upper()] if os.path.exists(rust_file): return rust_file return None # ============================================================================ # 对比检查函数 # ============================================================================ def normalize_call_name(call: str) -> str: """规范化调用名称,移除后缀并统一格式""" # 移除 _pure, _io, _func, _brte, _bre, _evaluate, _impl 等后缀 base = re.sub(r'_(pure|io|func|brte|bre|evaluate|impl)$', '', call.lower()) # 回调方法名:如果是 xxx 形式且在回调列表中,添加 CALL_ 前缀 callback_methods = {'statistical_equilibrium', 'state', 'compt0'} if base in callback_methods: return f'CALL_{base.upper()}' # 如果已经是 call_xxx 形式,保持 CALL_ 前缀 if base.startswith('call_'): return base.upper() return base.upper() def is_call_implemented(fortran_call: str, normalized_rust_calls: Set[str]) -> bool: """检查 Fortran 调用是否在 Rust 中已实现(考虑别名和回调接口) 参数: fortran_call: Fortran 函数调用名 normalized_rust_calls: 已规范化的 Rust 调用名集合 """ fortran_call_upper = fortran_call.upper() # 1. 检查回调接口别名 # 如果这个 Fortran 调用是回调接口组的一部分,检查对应的回调方法是否存在 for fortran_group, callback_name in CALLBACK_ALIASES.items(): if fortran_call_upper in fortran_group: normalized_callback = normalize_call_name(callback_name) if normalized_callback in normalized_rust_calls: return True # 2. 检查普通别名映射 aliases = FUNCTION_ALIASES.get(fortran_call_upper, [fortran_call.lower()]) for alias in aliases: # 规范化别名并检查 normalized_alias = normalize_call_name(alias) if normalized_alias in normalized_rust_calls: return True return False def compare_modules(fortran_sub: FortranSubroutine, rust_func: RustFunction) -> CheckResult: """对比 Fortran 和 Rust 模块""" result = CheckResult( fortran_name=fortran_sub.name, rust_name=rust_func.name, fortran_file=fortran_sub.file, rust_file=rust_func.file, status='match', ) # 1. 检查是否是简化实现 if rust_func.is_stub: result.status = 'partial' result.issues.append("⚠️ Rust 实现是简化版本/占位符") result.suggestions.append("需要完整实现此模块") # 2. 检查调用是否匹配(使用别名映射) fortran_calls = set(fortran_sub.calls) rust_calls = set(rust_func.calls) # 规范化 Rust 调用名称 normalized_rust_calls = set() for call in rust_calls: normalized_rust_calls.add(normalize_call_name(call)) # 检查缺失的调用(使用别名检测) caller_handled = CALLER_HANDLED.get(fortran_sub.name.upper(), []) missing_calls = [] for call in fortran_calls: if call.upper() in caller_handled: continue # Skip calls handled by caller if not is_call_implemented(call, normalized_rust_calls): missing_calls.append(call) if missing_calls: result.status = 'mismatch' result.issues.append(f"❌ 缺少调用: {', '.join(sorted(missing_calls))}") for call in sorted(missing_calls): result.suggestions.append(f"添加调用: {call.lower()}(&mut params)") # 3. 检查 I/O(仅报告,不改变状态) if fortran_sub.has_io and not rust_func.has_io: result.issues.append("⚠️ Fortran 有 I/O,Rust 没有") # 4. 检查控制流程 if len(fortran_sub.control_flow) != len(rust_func.control_flow): result.flow_diff.append(f"控制语句数量: Fortran={len(fortran_sub.control_flow)}, Rust={len(rust_func.control_flow)}") # 5. 检查 INCLUDE/COMMON if fortran_sub.includes: result.flow_diff.append(f"Fortran INCLUDE: {', '.join(fortran_sub.includes)}") if fortran_sub.commons: result.flow_diff.append(f"Fortran COMMON: {', '.join(fortran_sub.commons)}") # 6. 如果是简化实现,列出 Fortran 的完整流程 if rust_func.is_stub: result.flow_diff.append("Fortran 完整流程:") for i, flow in enumerate(fortran_sub.control_flow[:20]): result.flow_diff.append(f" {i+1}. {flow}") if len(fortran_sub.control_flow) > 20: result.flow_diff.append(f" ... 还有 {len(fortran_sub.control_flow) - 20} 步") # 7. Phase 1 风险检测 result.risk_flags = run_risk_detectors(fortran_sub, rust_func) return result # ============================================================================ # Phase 1 风险检测器 # ============================================================================ # 易混淆变量对 (COMMON block, var_name) CONFUSABLE_PAIRS = [ # (var1_block, var1_name, var2_block, var2_name, reason) ('ODFCTR', 'JNDODF', 'TRAPAR', 'IJTF', '同为 MTRANS 维度,ODF 跃迁索引 vs 跃迁频率索引'), ('COMPIF', 'LINEXP', 'TRAPAR', 'LCOMP', '同为跃迁标志(逻辑型),ODF 线性展开 vs 完成标志'), ('COMPIF', 'INDEXP', 'TRAPAR', 'IPROF', '同为跃迁模式标志'), ('LEVPAR', 'NQUANT', 'COMPIF', 'ILOW', '同为能级索引,量子数 vs 下能级'), ] def run_risk_detectors( fortran_sub: FortranSubroutine, rust_func: RustFunction, ) -> List[str]: """运行所有 Phase 1 风险检测器,返回风险标记列表""" flags = [] # 读取完整源文件内容 fortran_path = os.path.join(EXTRACTED_DIR, fortran_sub.file) fortran_content = "" if os.path.exists(fortran_path): with open(fortran_path, 'r', encoding='utf-8', errors='ignore') as f: fortran_content = f.read() rust_content = "" if rust_func.file and os.path.exists(rust_func.file): with open(rust_func.file, 'r', encoding='utf-8', errors='ignore') as f: rust_content = f.read() # 检测器 A: 2D 数组转置风险 flags.extend(detect_2d_array_risk(fortran_content, rust_content)) # 检测器 B: 跨 COMMON 变量混淆 flags.extend(detect_confusable_vars(fortran_content)) # 检测器 C: f2r_depends 诚实性检查 flags.extend(detect_depends_honesty(rust_content)) # 检测器 D: 索引累加器模式 flags.extend(detect_index_accumulator(fortran_content)) return flags def detect_2d_array_risk(fortran_content: str, rust_content: str) -> List[str]: """检测器 A: 2D 数组转置风险""" flags = [] # 从 INCLUDE 文件中查找 2D 数组定义 # 匹配 Fortran: VARNAME(DIM1,DIM2) 其中 DIM 是常量名 pattern = r'\b([A-Z]\w*)\s*\(\s*(\w+)\s*,\s*(\w+)\s*\)' # 提取 INCLUDE 的文件 includes = re.findall(r"INCLUDE\s*'([^']+)\.FOR'", fortran_content, re.IGNORECASE) for inc_file in includes: if inc_file.upper() == 'IMPLIC': continue inc_path = os.path.join("/home/fmq/program/tlusty/tl208-s54/tlusty", f"{inc_file}.FOR") if not os.path.exists(inc_path): continue with open(inc_path, 'r', encoding='utf-8', errors='ignore') as f: inc_content = f.read() # 合并续行后搜索 2D 数组 joined = inc_content.replace('\n *', ' ').replace('\n &', ' ') for m in re.finditer(pattern, joined, re.IGNORECASE): var_name = m.group(1).upper() dim1 = m.group(2).upper() dim2 = m.group(3).upper() # 过滤掉非数组声明(如 FUNCTION 调用) if var_name in ('CALL', 'SUBROUTINE', 'FUNCTION', 'WRITE', 'READ', 'COMMON', 'DIMENSION', 'PARAMETER', 'INCLUDE'): continue # 检查该变量是否在当前模块中以 2D 下标方式被访问 # 仅检查变量名是否出现在文本中会产生大量误报 # (INCLUDE 引入 COMMON 块但不一定访问所有数组) # 要求至少出现 VAR(...,...) 或 VAR(XX,YY) 形式的 2D 访问 code_content = strip_fortran_comments(fortran_content).upper() # 匹配 VAR(WORD,WORD) 或 VAR(EXPR,EXPR) 形式的 2D 下标访问 access_pattern = re.compile( r'\b' + re.escape(var_name) + r'\s*\([^)]*,\s*[^)]+\]', re.IGNORECASE ) if access_pattern.search(code_content): flags.append( f"HIGH_RISK: 2D array {var_name}({dim1},{dim2}) — " f"verify Fortran column-major → Rust row-major indexing" ) return flags def detect_confusable_vars(fortran_content: str) -> List[str]: """检测器 B: 跨 COMMON 变量混淆""" flags = [] code_content = strip_fortran_comments(fortran_content).upper() for block1, var1, block2, var2, reason in CONFUSABLE_PAIRS: # 检查是否同时使用了这两个变量 # 使用单词边界匹配以避免部分匹配 has_v1 = bool(re.search(r'\b' + var1 + r'\b', code_content)) has_v2 = bool(re.search(r'\b' + var2 + r'\b', code_content)) if has_v1 and has_v2: flags.append( f"HIGH_RISK: Both {var1}({block1}) and {var2}({block2}) used — {reason}" ) return flags def detect_depends_honesty(rust_content: str) -> List[str]: """检测器 C: f2r_depends 诚实性检查 对比 f2r_depends 注释中声明的依赖 vs 代码中实际的调用。 """ flags = [] # 提取 f2r_depends 注释 depends_match = re.search(r'f2r_depends:\s*([\w,]+)', rust_content, re.IGNORECASE) if not depends_match: return flags declared = set(d.strip().lower() for d in depends_match.group(1).split(',') if d.strip()) # Skip pseudo-dependencies like CALLER_HANDLED skip_deps = {'caller_handled'} declared -= skip_deps # For pure functions (_pure suffix), f2r_depends lists dependencies handled by caller # These should not be flagged as MEDIUM_RISK is_pure_func = bool(re.search(r'\bfn\s+\w+_pure\s*\(', rust_content)) # 提取代码中实际的函数调用(简化版) actual_calls = set() call_patterns = [ r'\b(\w+)\s*\([^)]*\)', ] # 只扫描函数体中的调用 for m in re.finditer(r'\b(\w+)\s*\(', rust_content): name = m.group(1).lower() # 过滤关键字 rust_keywords = { 'pub', 'fn', 'let', 'if', 'else', 'for', 'while', 'match', 'return', 'use', 'mod', 'struct', 'impl', 'self', 'super', 'crate', 'mut', 'ref', 'as', 'in', 'loop', 'break', 'continue', 'type', 'where', 'true', 'false', 'const', 'static', 'enum', 'trait', 'println', 'format', 'vec', 'assert', 'panic', 'eprintln', 'log', } if name not in rust_keywords and len(name) > 2: actual_calls.add(name) # 检查声明了但未实际调用的 # 额外: 检查回调函数模式 (xxx_fn(ij) 对应 xxx) callback_calls = set() for m in re.finditer(r'(\w+)_fn\s*\(', rust_content): callback_calls.add(m.group(1).lower()) actual_calls |= callback_calls # 额外: 检查 call_xxx 回调模式 (callbacks.call_sabolf → sabolf) for m in re.finditer(r'\bcall_(\w+)\s*\(', rust_content): callback_calls.add(m.group(1).lower()) actual_calls |= callback_calls # 额外: 检查 use/import 和 let _ = (xxx, ...) 模式中的模块引用 for m in re.finditer(r'\buse\s+.*?(\w+)\s*;', rust_content): mod_name = m.group(1).lower() if len(mod_name) > 2: actual_calls.add(mod_name) for m in re.finditer(r'\buse\s+.*?(\w+)::', rust_content): mod_name = m.group(1).lower() if len(mod_name) > 2: actual_calls.add(mod_name) # let _ = (xxx, yyy, ...) 模式 for m in re.finditer(r'let\s*_\s*=\s*\(([^)]+)\)', rust_content): for name_m in re.finditer(r'\b(\w+)\b', m.group(1)): name = name_m.group(1).lower() if len(name) > 2 and name not in rust_keywords: actual_calls.add(name) # 额外: 使用 FUNCTION_ALIASES 展开实际调用名 for fortran_name, aliases in FUNCTION_ALIASES.items(): for alias in aliases: if alias.lower() in actual_calls: actual_calls.add(fortran_name.lower()) declared_but_not_called = declared - actual_calls # For pure functions, skip caller-handled dependencies (architectural separation) if is_pure_func: # Pure functions declare deps that the caller handles - this is by design pass # No flags for pure functions else: for dep in sorted(declared_but_not_called): flags.append( f"MEDIUM_RISK: f2r_depends declares '{dep}' but no actual call found in code" ) return flags def detect_index_accumulator(fortran_content: str) -> List[str]: """检测器 D: 索引累加器模式检测 检测 Fortran 中的索引算术模式,如: IJ00=1 IJQ=IJ00+IJ 这些需要验证 1-based → 0-based 转换 """ flags = [] code = strip_fortran_comments(fortran_content).upper() # 检测常见的索引初始化模式: VAR = 1 (不是循环变量) init_patterns = [ r'IJ\w+\s*=\s*1\b', r'I[0-9]\w*\s*=\s*1\b', r'INDEX\d*\s*=\s*1\b', ] has_init = False for p in init_patterns: if re.search(p, code): has_init = True break # 检测索引算术: VAR = VAR + expr accum_pattern = r'I[J0-9]\w*\s*=\s*I[J0-9]\w*\s*\+' has_accum = bool(re.search(accum_pattern, code)) if has_init and has_accum: flags.append( "HIGH_RISK: Index accumulator pattern detected (IJ00=1, IJQ=IJ00+IJ) — " "verify 0-based conversion" ) elif has_init: flags.append( "MEDIUM_RISK: Index initialization to 1 detected — verify 0-based conversion" ) return flags # ============================================================================ # 输出格式 # ============================================================================ def print_result(result: CheckResult, verbose: bool = False, show_risk: bool = False): """打印检查结果""" status_icons = { 'match': '✅', 'mismatch': '❌', 'missing': '❓', 'partial': '⚠️', } icon = status_icons.get(result.status, '❓') print(f"\n{icon} {result.fortran_name}") print(f" Fortran: {result.fortran_file}") print(f" Rust: {result.rust_file}") if result.issues: print("\n 问题:") for issue in result.issues: print(f" {issue}") if result.risk_flags and (show_risk or verbose): print("\n 风险标记:") for flag in result.risk_flags: print(f" {flag}") if result.flow_diff and verbose: print("\n 流程差异:") for diff in result.flow_diff: print(f" {diff}") if result.suggestions: print("\n 修复建议:") for i, sug in enumerate(result.suggestions[:10], 1): print(f" {i}. {sug}") def generate_diff_report(fortran_sub: FortranSubroutine, rust_func: RustFunction) -> str: """生成详细差异报告""" report = [] report.append("=" * 70) report.append(f"差异报告: {fortran_sub.name}") report.append("=" * 70) report.append("\n## Fortran 代码 ({})".format(fortran_sub.file)) report.append("-" * 40) for i, line in enumerate(fortran_sub.lines[:50], 1): report.append(f"{i:4d}: {line}") if len(fortran_sub.lines) > 50: report.append(f" ... 还有 {len(fortran_sub.lines) - 50} 行") report.append("\n## Rust 代码 ({})".format(rust_func.file)) report.append("-" * 40) for i, line in enumerate(rust_func.lines[:50], 1): report.append(f"{i:4d}: {line}") if len(rust_func.lines) > 50: report.append(f" ... 还有 {len(rust_func.lines) - 50} 行") report.append("\n## Fortran 控制流程") report.append("-" * 40) for flow in fortran_sub.control_flow: report.append(f" {flow}") report.append("\n## Rust 控制流程") report.append("-" * 40) for flow in rust_func.control_flow: report.append(f" {flow}") report.append("\n## 调用对比") report.append("-" * 40) fortran_calls = set(fortran_sub.calls) # 规范化 Rust 调用名称(与 compare_modules 保持一致) normalized_rust_calls = set() for call in rust_func.calls: normalized_rust_calls.add(normalize_call_name(call)) # Get caller-handled calls for this module caller_handled = CALLER_HANDLED.get(fortran_sub.name.upper(), []) report.append("Fortran 调用:") for call in sorted(fortran_calls): if call.upper() in caller_handled: report.append(f" ⏭ {call} (caller-handled)") else: status = "✓" if is_call_implemented(call, normalized_rust_calls) else "❌" report.append(f" {status} {call}") return "\n".join(report) # ============================================================================ # 主函数 # ============================================================================ def check_module(module_name: str, verbose: bool = False) -> CheckResult: """检查单个模块""" # 查找 Fortran 文件 fortran_file = os.path.join(EXTRACTED_DIR, f"{module_name.lower()}.f") if not os.path.exists(fortran_file): return CheckResult( fortran_name=module_name.upper(), rust_name="", fortran_file="", rust_file="", status='missing', issues=[f"Fortran 文件不存在: {fortran_file}"], ) # 解析 Fortran fortran_sub = parse_fortran_file(fortran_file) if not fortran_sub: return CheckResult( fortran_name=module_name.upper(), rust_name="", fortran_file=fortran_file, rust_file="", status='missing', issues=["无法解析 Fortran 文件"], ) # 查找 Rust 模块 rust_file = find_rust_module(fortran_sub.name) if not rust_file: return CheckResult( fortran_name=fortran_sub.name, rust_name="", fortran_file=fortran_sub.file, rust_file="", status='missing', issues=["Rust 模块未实现"], suggestions=[f"创建 Rust 文件: src/tlusty/.../{fortran_sub.name.lower()}.rs"], ) # 解析 Rust with open(rust_file, 'r', encoding='utf-8', errors='ignore') as f: rust_content = f.read() # 特殊处理 BLOCK DATA if fortran_sub.name.upper() == '_UNNAMED_BLOCK_DATA_': # 检查数据常量是否存在 if '_UNNAMED_OSH' in rust_content or 'OSH' in rust_content: return CheckResult( fortran_name=fortran_sub.name, rust_name="_UNNAMED_OSH", fortran_file=fortran_sub.file, rust_file=rust_file, status='match', issues=[], ) else: return CheckResult( fortran_name=fortran_sub.name, rust_name="_UNNAMED_OSH", fortran_file=fortran_sub.file, rust_file=rust_file, status='missing', issues=["Rust 数据常量未找到"], suggestions=[f"在 {rust_file} 中添加数据常量: pub const _UNNAMED_OSH: [f64; 400] = [...]"], ) rust_func = extract_rust_function(rust_content, fortran_sub.name) if not rust_func: # 尝试查找 _pure 版本 rust_func = extract_rust_function(rust_content, f"{fortran_sub.name}_pure") # 对于 Fortran PROGRAM (如 TLUSTY),也尝试查找 fn main (非 pub) if not rust_func and fortran_sub.name.upper() in EXTRA_FILE_MAPPINGS: # 尝试匹配 fn main (非 pub) _main_pattern = r'(?i)\bfn\s+main\s*(?:<[^({]*?>)?\s*\(([^)]*)\)' _main_match = re.search(_main_pattern, rust_content, re.IGNORECASE | re.DOTALL) if _main_match: _params = [p.strip() for p in _main_match.group(1).split(',') if p.strip() and ':' in p] _func_start = _main_match.end() _brace_count = 0 _body_start = _func_start _body = "" for _i, _c in enumerate(rust_content[_func_start:], _func_start): if _c == '{': if _brace_count == 0: _body_start = _i _brace_count += 1 elif _c == '}': _brace_count -= 1 if _brace_count == 0: _body = rust_content[_body_start:_i+1] break _calls = [] _cpats = [ r'(\w+)\s*\(&mut\s+\w+_params', r'(\w+)\s*\(&\w+_params', r'(\w+)\s*\(\s*&mut', r'(\w+)_pure\s*\(', r'(\w+)_io\s*\(', r'callbacks\.call_(\w+)\s*\(', r'(\w+)_cb\s*\(', r'(?:params|self)\.(\w+)\s*\([^)]*\)', r'crate::tlusty::math::\w+::(\w+)\s*\(', r'crate::tlusty::math::(\w+)\s*\(', r'super::(\w+)\s*\(', r'self::(\w+)\s*\(', r'\b(dwnfr1|sgmer1|gfree1|sffhmi|ffcros)\s*\(', r'\b(gfree0|dwnfr0|wnstor|sabolf|linpro|opadd|opact1)\s*\(', r'\b(quit_func|quit)\s*\(', r'\b(compt0_brtez|compt0_brte|compt0_brez|compt0_bre|compt0)\s*\(', r'\b(grcor)\s*\(', r'\b(inthyd)\s*\(', r'let\s+_\s*=\s*(\w+)\s*;', r'let\s+_\s*=\s*\(([\w\s,]+)\)', r'\b(gfreed|gfree1|quasim|lymlin|prd|opctab|opactd)\s*\([^)]*\)', r'\b(\w+)\s*\([^)]*,\s*&mut\s+\w+_params', r'\b(ijalis)\s*\([^)]*,\s*&mut', r'\b(divstr|stark0|starka|voigt|expint|erfcx)\s*\(', r'\b(reflev|sabolf|levgrp|colis|bpopc|bpope|bpopt|dwnfr1|sgmer1|gamsp|tridag)\s*\(', r'\b(bkhsgo)\s*\(', r'\b(pfcno|pffe|pfni|pfspec|mpartf|pfheav|opfrac)\s*\(', r'\b(rtecf0|rtefe2|rtesol|rtefr1|rtecom)\s*\(', r'\b(allardt_temp|allardt|allard)\s*\(', r'\b(locate|interp|search|bisect)\s*\(', r'\b(angset|comset|compt0)\s*\(', r'\b(odfhst|odfhyd|odfset)\s*\(', r'\b(indexx|sort)\s*\(', r'\b(gauleg|gauss_legendre|gauss_quad)\s*\(', r'\b(dielrc|dielec|ionize|recomb)\s*\(', r'\b(cia_h2h|cia_h2h2|cia_h2he|cia_hhe|h2minus)\s*\(', r'(panic!)', r'f2r_depends:\s*([\w]+(?:\s*,\s*[\w]+)*)', ] for _p in _cpats: _ms = re.findall(_p, rust_content, re.IGNORECASE) for _m in _ms: if ',' in str(_m): for _n in str(_m).split(','): _n = _n.strip() if _n and re.match(r'^\w+$', _n): _calls.append(_n) else: _calls.append(_m) _has_io = bool(re.search(r'FortranReader|FortranWriter|read_value|write_raw|eprintln!|println!', rust_content)) _is_stub = False _spats = [ r'//\s*简化实现', r'//\s*TODO', r'//\s*注:', r'//\s*待实现', r'简化版本', r'框架就绪', r'unimplemented!', r'todo!', ] for _p in _spats: if re.search(_p, _body, re.IGNORECASE): _is_stub = True break rust_func = RustFunction( name="main", file="", params=_params, calls=list(set(c.upper() for c in _calls)), has_io=_has_io, lines=_body.split('\n'), control_flow=extract_rust_control_flow(_body), is_stub=_is_stub, ) if not rust_func: return CheckResult( fortran_name=fortran_sub.name, rust_name=fortran_sub.name.lower(), fortran_file=fortran_sub.file, rust_file=rust_file, status='missing', issues=["Rust 函数未找到"], suggestions=[f"在 {rust_file} 中添加函数: pub fn {fortran_sub.name.lower()}(...)"], ) rust_func.file = rust_file # 对比 result = compare_modules(fortran_sub, rust_func) if verbose: result.flow_diff.extend(generate_diff_report(fortran_sub, rust_func).split('\n')) return result def check_all(verbose: bool = False): """检查所有已实现模块""" # 获取所有 Fortran 文件 fortran_files = glob.glob(os.path.join(EXTRACTED_DIR, "*.f")) results = {'match': 0, 'mismatch': 0, 'partial': 0, 'missing': 0} all_results = [] for fpath in sorted(fortran_files): name = os.path.splitext(os.path.basename(fpath))[0].upper() result = check_module(name, verbose=False) results[result.status] += 1 all_results.append(result) # 按状态排序输出 for status in ['mismatch', 'partial', 'missing', 'match']: for result in all_results: if result.status == status: print_result(result, verbose) print("\n" + "=" * 70) print("统计:") print(f" ✅ 匹配: {results['match']}") print(f" ⚠️ 部分实现: {results['partial']}") print(f" ❌ 不匹配: {results['mismatch']}") print(f" ❓ 未实现: {results['missing']}") print(f" 总计: {sum(results.values())}") def main(): parser = argparse.ArgumentParser(description='Fortran to Rust 一致性检查') parser.add_argument('module', nargs='?', help='要检查的模块名') parser.add_argument('--all', action='store_true', help='检查所有模块') parser.add_argument('--diff', metavar='MODULE', help='生成详细差异报告') parser.add_argument('--flow', metavar='MODULE', help='检查控制流程') parser.add_argument('--risk', metavar='MODULE', help='检查模块风险等级(Phase 1)') parser.add_argument('--audit', action='store_true', help='随机审计 5 个 match 模块的风险') parser.add_argument('--verbose', '-v', action='store_true', help='详细输出') args = parser.parse_args() if args.all: check_all(args.verbose) elif args.diff: result = check_module(args.diff, verbose=True) print_result(result, verbose=True, show_risk=True) elif args.flow: result = check_module(args.flow, verbose=True) print_result(result, verbose=True) elif args.risk: run_risk_check(args.risk) elif args.audit: run_audit() elif args.module: result = check_module(args.module, args.verbose) print_result(result, args.verbose, show_risk=True) else: parser.print_help() def run_risk_check(module_name: str): """对指定模块运行 Phase 1 风险检测""" result = check_module(module_name, verbose=True) print_result(result, verbose=True, show_risk=True) if result.risk_flags: high_risk = [f for f in result.risk_flags if f.startswith('HIGH_RISK')] medium_risk = [f for f in result.risk_flags if f.startswith('MEDIUM_RISK')] print(f"\n 风险汇总: {len(high_risk)} HIGH, {len(medium_risk)} MEDIUM") if high_risk: print(" → 需要 Phase 2 深度语义检查") else: print("\n 无风险标记,Phase 2 检查可跳过") def run_audit(): """随机审计 5 个 match 模块的风险等级""" import random # 收集所有 match 状态的模块 fortran_files = glob.glob(os.path.join(EXTRACTED_DIR, "*.f")) match_modules = [] for fpath in sorted(fortran_files): name = os.path.splitext(os.path.basename(fpath))[0].upper() result = check_module(name, verbose=False) if result.status == 'match': match_modules.append((name, result)) if not match_modules: print("没有找到 match 状态的模块") return # 随机选择 5 个 sample_size = min(5, len(match_modules)) sample = random.sample(match_modules, sample_size) print("=" * 70) print(f"随机审计: {sample_size}/{len(match_modules)} 个 match 模块") print("=" * 70) total_high = 0 total_medium = 0 for name, result in sample: # 重新检查以获取 risk_flags(之前 verbose=False 可能跳过) full_result = check_module(name, verbose=True) high = [f for f in full_result.risk_flags if f.startswith('HIGH_RISK')] medium = [f for f in full_result.risk_flags if f.startswith('MEDIUM_RISK')] total_high += len(high) total_medium += len(medium) risk_icon = "🔴" if high else ("🟡" if medium else "🟢") print(f"\n{risk_icon} {name}: {len(high)} HIGH, {len(medium)} MEDIUM") for flag in full_result.risk_flags: print(f" {flag}") print(f"\n{'=' * 70}") print(f"审计汇总: {total_high} HIGH_RISK, {total_medium} MEDIUM_RISK") if total_high > 0: print("→ 建议对 HIGH_RISK 模块进行 Phase 2 深度检查") else: print("→ 审计的模块未发现高风险标记") if __name__ == "__main__": main()