SpectraRust/.claude/skills/f2r-check/scripts/f2r_check.py
2026-04-04 23:01:19 +08:00

1706 lines
66 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Fortran to Rust 一致性检查工具 (f2r_check)
功能:
1. 对比 Fortran 模块与对应 Rust 模块
2. 检查函数签名、逻辑流程、变量映射等
3. 生成差异报告和修复建议
用法:
python3 f2r_check.py <fortran_file> # 检查单个模块
python3 f2r_check.py --all # 检查所有已实现模块
python3 f2r_check.py --module START # 检查指定模块
python3 f2r_check.py --diff START # 生成详细差异报告
python3 f2r_check.py --flow START # 检查控制流程一致性
"""
import os
import re
import sys
import argparse
import glob
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict, Set, Optional, Tuple
# ============================================================================
# Fortran -> Rust 函数别名映射
# ============================================================================
# Fortran 函数名 -> 可能的 Rust 别名列表
# 用于检测调用是否已实现(即使函数名不同)
FUNCTION_ALIASES = {
'QUIT': ['quit', 'quit_func', 'panic', 'panic!'],
'COMPT0': ['compt0', 'compt0_brte', 'compt0_brtez', 'compt0_brez', 'compt0_bre', 'call_compt0'],
'STATE': ['state', 'state_pure', 'call_state'],
'RESOLV': ['resolv', 'resolv_pure'],
'ALIFR1': ['alifr1', 'alifr1_pure'],
'ALIFR3': ['alifr3', 'alifr3_pure'],
'ALIFRK': ['alifrk', 'alifrk_pure'],
'OPACF0': ['opacf0', 'opacf0_pure'],
'OPACF1': ['opacf1', 'opacf1_pure'],
'OPACFD': ['opacfd', 'opacfd_pure'],
'ROSSTD': ['rosstd', 'rosstd_pure', 'rosstd_evaluate'],
'RTEFR1': ['rtefr1', 'rtefr1_pure'],
'ALLARDT': ['allardt', 'allardt_pure', 'allardt_temp'],
'GAULEG': ['gauleg', 'gauleg_pure'],
'BPOPC': ['bpopc', 'bpopc_pure'],
'BPOPE': ['bpope', 'bpope_pure'],
'BPOPT': ['bpopt', 'bpopt_pure'],
'LEVGRP': ['levgrp', 'levgrp_pure'],
'DWNFR1': ['dwnfr1', 'dwnfr1_pure'],
'SGMER1': ['sgmer1', 'sgmer1_pure'],
'COLIS': ['colis', 'colis_pure'],
'ALIST2': ['alist2', 'alist2_pure'],
'RTECOM': ['rtecom', 'rtecom_pure'],
'LINEQS': ['lineqs', 'lineqs_nr'],
'CONVEC': ['convec', 'convec_pure', 'compute_convection_simple'],
'CONVC1': ['convc1', 'convc1_pure', 'compute_convc1_simple'],
'ELDENS': ['eldens', 'eldens_pure'],
'WNSTOR': ['wnstor', 'wnstor_pure'],
'STEQEQ': ['steqeq', 'steqeq_pure'],
'TDPINI': ['tdpini', 'tdpini_pure'],
'CONOUT': ['conout', 'conout_pure', 'format_convective_refinement'],
'RHONEN': ['rhonen', 'rhonen_pure'],
'MOLEQ': ['moleq', 'moleq_pure'],
'GETLAL': ['getlal', 'getlal_pure'], # Called by main program based on iquasi
'GETWRD': ['getwrd', 'parse_keyword_values'], # Used inline in parsing
}
# 条件调用豁免:某些 Fortran 调用由调用者处理,不在当前 Rust 模块中
# 格式: { 'MODULE_NAME': ['CALL1', 'CALL2', ...] }
CALLER_HANDLED = {
'NSTPAR': ['GETLAL'], # GETLAL called by main program based on iquasi value
'INPMOD': ['LEVSOL', 'MOLEQ', 'RATMAT', 'SABOLF', 'WNSTOR'], # Physics calls for LTE population init handled by caller after I/O read (same pattern as KURUCZ)
'KURUCZ': ['LEVSOL', 'MOLEQ', 'RATMAT', 'RHONEN', 'SABOLF', 'WNSTOR'], # Physics calls for LTE population init handled by caller after I/O read
'INCLDY': ['LEVSOL', 'RATMAT', 'SABOLF', 'WNSTOR'], # Physics calls for LTE population init handled by caller after I/O read
'OUTPRI': ['ELDENC', 'LEVSOL', 'OPACF1', 'RATMAL', 'SABOLF', 'WNSTOR'], # Diagnostic physics: OPACF1 pre-computed by caller (absoex input), b-factors computed externally
}
# 回调接口别名映射 (Fortran 调用 -> Rust 回调方法)
# 用于识别 Rust 中通过回调接口封装的 Fortran 调用组
CALLBACK_ALIASES = {
# RHSGEN: 统计平衡回调
('SABOLF', 'LEVGRP', 'RATMAT', 'MATINV'): 'call_statistical_equilibrium',
}
# ============================================================================
# 路径配置
# ============================================================================
EXTRACTED_DIR = "/home/fmq/program/tlusty/tl208-s54/rust/tlusty/extracted"
RUST_BASE_DIR = "/home/fmq/.zeroclaw/workspace/SpectraRust/src"
# ============================================================================
# Fortran 内置函数
# ============================================================================
FORTRAN_INTRINSICS = {
'SIN', 'COS', 'TAN', 'ASIN', 'ACOS', 'ATAN', 'ATAN2',
'SINH', 'COSH', 'TANH', 'EXP', 'LOG', 'LOG10', 'LOG2',
'SQRT', 'ABS', 'MOD', 'SIGN', 'MAX', 'MIN', 'MAX0', 'MIN0',
'INT', 'IFIX', 'IDINT', 'FLOAT', 'SNGL', 'DBLE', 'CMPLX',
'REAL', 'AIMAG', 'CONJG', 'ICHAR', 'CHAR', 'INDEX', 'LEN',
'IF', 'THEN', 'ELSE', 'ENDIF', 'END', 'DO', 'CONTINUE',
'RETURN', 'STOP', 'PAUSE', 'GOTO', 'CALL', 'SUBROUTINE',
'FUNCTION', 'PROGRAM', 'MODULE', 'USE', 'IMPLICIT',
'PARAMETER', 'DATA', 'DIMENSION', 'COMMON', 'SAVE',
'EXTERNAL', 'INTRINSIC', 'READ', 'WRITE', 'OPEN', 'CLOSE',
'FORMAT', 'PRINT', 'ERF', 'ERFC', 'GAMMA',
'QUIT', # error handling, always converted to Rust panic/Err return
}
# ============================================================================
# 数据结构
# ============================================================================
@dataclass
class FortranSubroutine:
"""Fortran 子程序信息"""
name: str
file: str
params: List[str] = field(default_factory=list)
calls: List[str] = field(default_factory=list)
includes: List[str] = field(default_factory=list)
commons: List[str] = field(default_factory=list)
has_io: bool = False
lines: List[str] = field(default_factory=list)
control_flow: List[str] = field(default_factory=list)
@dataclass
class RustFunction:
"""Rust 函数信息"""
name: str
file: str
params: List[str] = field(default_factory=list)
calls: List[str] = field(default_factory=list)
has_io: bool = False
lines: List[str] = field(default_factory=list)
control_flow: List[str] = field(default_factory=list)
is_stub: bool = False # 是否是空实现/占位符
@dataclass
class CheckResult:
"""检查结果"""
fortran_name: str
rust_name: str
fortran_file: str
rust_file: str
status: str # 'match', 'mismatch', 'missing', 'partial'
issues: List[str] = field(default_factory=list)
flow_diff: List[str] = field(default_factory=list)
suggestions: List[str] = field(default_factory=list)
risk_flags: List[str] = field(default_factory=list) # Phase 1 风险标记
# ============================================================================
# 模块映射 (从 analyze_fortran.py 复制)
# ============================================================================
SPECIAL_MAPPINGS = {
'gfree': ['gfree0', 'gfreed', 'gfree1'],
'interpolate': ['yint', 'lagran'],
'sgmer': ['sgmer0', 'sgmer1', 'sgmerd'],
'ctdata': ['hction', 'hctrecom'],
'cross': ['cross', 'crossd'],
'expint': ['eint', 'expinx'],
'erfcx': ['erfcx', 'erfcin'],
'lineqs': ['lineqs', 'lineqs_nr'],
'gamsp': ['gamsp'],
'bhe': ['bhe', 'bhed', 'bhez'],
'comset': ['comset'],
'ghydop': ['ghydop'],
'levgrp': ['levgrp'],
'profil': ['profil'],
'linspl': ['linspl'],
'convec': ['convec', 'convc1'],
}
# Additional file path mappings (for non-standard locations)
EXTRA_FILE_MAPPINGS = {
'TLUSTY': os.path.join(RUST_BASE_DIR, 'bin', 'tlusty.rs'),
}
# ============================================================================
# Fortran 解析函数
# ============================================================================
def strip_fortran_comments(content: str) -> str:
"""移除 Fortran 注释"""
lines = content.split('\n')
code_lines = []
for line in lines:
if len(line) == 0:
continue
first_char = line[0].upper()
if first_char in ('C', '!', '*'):
continue
code_lines.append(line)
return '\n'.join(code_lines)
def extract_fortran_params(content: str) -> List[str]:
"""提取 Fortran 子程序参数"""
# 匹配 SUBROUTINE NAME(PARAM1, PARAM2, ...)
match = re.search(r'(?i)SUBROUTINE\s+(\w+)\s*\(([^)]*)\)', content)
if match:
params_str = match.group(2)
params = [p.strip() for p in params_str.split(',') if p.strip()]
return params
return []
def extract_fortran_calls(content: str) -> List[str]:
"""提取 CALL 语句"""
code_content = strip_fortran_comments(content)
calls = re.findall(r'(?i)CALL\s+(\w+)(?:\s*\(|\s*$|\s*\n)', code_content)
return [c.upper() for c in calls if c.upper() not in FORTRAN_INTRINSICS]
def extract_fortran_includes(content: str) -> List[str]:
"""提取 INCLUDE 文件"""
includes = re.findall(r"INCLUDE\s*'([^']+)\.FOR'", content, re.IGNORECASE)
return [inc for inc in includes if inc.upper() != 'IMPLIC']
def extract_fortran_commons(content: str) -> List[str]:
"""提取 COMMON 块"""
commons = re.findall(r'(?i)^\s*COMMON\s*/(\w+)/', content, re.MULTILINE)
return list(set(commons))
def extract_control_flow(content: str) -> List[str]:
"""提取控制流程语句"""
flow = []
code_content = strip_fortran_comments(content)
# 提取关键控制语句
patterns = [
(r'(?i)^\s*CALL\s+(\w+)', 'CALL'),
(r'(?i)^\s*IF\s*\([^)]+\)\s*(THEN|GOTO)', 'IF'),
(r'(?i)^\s*ELSE\s*IF', 'ELSEIF'),
(r'(?i)^\s*ELSE\b', 'ELSE'),
(r'(?i)^\s*ENDIF\b', 'ENDIF'),
(r'(?i)^\s*DO\s+\d+', 'DO'),
(r'(?i)^\s*(\d+)\s+CONTINUE', 'LABEL'),
(r'(?i)^\s*GOTO\s+(\d+)', 'GOTO'),
(r'(?i)^\s*RETURN\b', 'RETURN'),
(r'(?i)^\s*STOP\b', 'STOP'),
]
for line in code_content.split('\n'):
for pattern, flow_type in patterns:
if re.search(pattern, line):
flow.append(f"{flow_type}: {line.strip()[:60]}")
break
return flow
def has_file_io(content: str) -> bool:
"""检查是否有文件 I/O忽略注释掉的语句"""
patterns = [r'OPEN\s*\(', r'READ\s*\(\s*\d+', r'WRITE\s*\(\s*\d+']
lines = content.split('\n')
for line in lines:
# 跳过注释行(以 c, C, *, ! 开头)
stripped = line.strip()
if not stripped:
continue
first_char = stripped[0].upper()
if first_char in ('C', '*', '!'):
continue
# 检查是否是行内注释第1-5列是空格或数字第6列不是空格表示续行
# 对于简化处理,只检查非注释代码
for p in patterns:
if re.search(p, line, re.IGNORECASE):
return True
return False
def parse_fortran_file(fpath: str) -> Optional[FortranSubroutine]:
"""解析 Fortran 文件"""
with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# 提取子程序名或函数名
match = re.search(r'(?i)^\s*SUBROUTINE\s+(\w+)', content, re.MULTILINE)
if not match:
# 尝试匹配 FUNCTION
match = re.search(r'(?i)^\s*FUNCTION\s+(\w+)\s*\(', content, re.MULTILINE)
if not match:
# 尝试匹配 BLOCK DATA只匹配行首 6 空格的标准格式)
match = re.search(r'(?i)^ BLOCK\s+DATA\s*([A-Za-z0-9_]*)\s*$', content, re.MULTILINE)
if match:
# BLOCK DATA 可能没有名字,使用文件名作为标识
block_name = match.group(1).strip()
if block_name:
name = block_name.upper()
else:
name = os.path.basename(fpath).replace('.f', '').upper()
return FortranSubroutine(
name=name,
file=os.path.basename(fpath),
params=[],
calls=[],
includes=extract_fortran_includes(content),
commons=extract_fortran_commons(content),
has_io=False,
lines=content.split('\n'),
control_flow=[],
)
if not match:
# 尝试匹配 PROGRAM
match = re.search(r'(?i)^\s*PROGRAM\s+(\w+)', content, re.MULTILINE)
if match:
name = match.group(1).upper()
return FortranSubroutine(
name=name,
file=os.path.basename(fpath),
params=[],
calls=extract_fortran_calls(content),
includes=extract_fortran_includes(content),
commons=extract_fortran_commons(content),
has_io=has_file_io(content),
lines=content.split('\n'),
control_flow=extract_control_flow(content),
)
if not match:
return None
name = match.group(1).upper()
return FortranSubroutine(
name=name,
file=os.path.basename(fpath),
params=extract_fortran_params(content),
calls=extract_fortran_calls(content),
includes=extract_fortran_includes(content),
commons=extract_fortran_commons(content),
has_io=has_file_io(content),
lines=content.split('\n'),
control_flow=extract_control_flow(content),
)
# ============================================================================
# Rust 解析函数
# ============================================================================
def extract_rust_function(content: str, func_name: str) -> Optional[RustFunction]:
"""提取 Rust 函数信息"""
# 匹配 pub fn name<R: BufRead, W: Write>(...) 或 pub fn name(...)
# 使用更灵活的泛型匹配,支持嵌套尖括号如 <P: AsRef<Path>>
fname = func_name.lower()
pattern = r'(?i)pub\s+fn\s+' + fname + r'\s*(?:<[^({]*?>)?\s*\(([^)]*)\)'
match = re.search(pattern, content, re.IGNORECASE | re.DOTALL)
if not match:
return None
params_str = match.group(1)
params = [p.strip() for p in params_str.split(',') if p.strip() and ':' in p]
# 检查是否是空实现/占位符
# 查找函数体
func_start = match.end()
brace_count = 0
func_body_start = func_start
for i, c in enumerate(content[func_start:], func_start):
if c == '{':
if brace_count == 0:
func_body_start = i
brace_count += 1
elif c == '}':
brace_count -= 1
if brace_count == 0:
func_body = content[func_body_start:i+1]
break
else:
func_body = ""
# 检查是否是简化实现
is_stub = False
stub_patterns = [
r'//\s*简化实现',
r'//\s*TODO',
r'//\s*注:',
r'//\s*待实现',
r'简化版本',
r'框架就绪',
r'unimplemented!',
r'todo!',
]
for p in stub_patterns:
if re.search(p, func_body, re.IGNORECASE):
is_stub = True
break
# 提取调用 - 从整个文件提取(而不仅仅是主函数体)
# 这是因为 Rust 模块通常将逻辑分散到多个辅助函数中
calls = []
call_patterns = [
r'(\w+)\s*\(&mut\s+\w+_params',
r'(\w+)\s*\(&\w+_params',
r'(\w+)\s*\(\s*&mut',
r'(\w+)_pure\s*\(',
r'(\w+)_io\s*\(',
# 回调接口调用: callbacks.call_xxx(ij)
r'callbacks\.call_(\w+)\s*\(',
# 回调/函数指针调用: xxx_cb(ij) 或 params.xxx(ij)
r'(\w+)_cb\s*\(',
# 函数指针字段调用: params.xxx(...) 或 self.xxx(...)
r'(?:params|self)\.(\w+)\s*\([^)]*\)',
# 直接函数调用: crate::tlusty::math::xxx::yyy()
r'crate::tlusty::math::\w+::(\w+)\s*\(',
# 直接模块调用: crate::tlusty::math::xxx(...)
r'crate::tlusty::math::(\w+)\s*\(',
# super::xxx(...) 形式
r'super::(\w+)\s*\(',
# self::xxx(...) 形式
r'self::(\w+)\s*\(',
# 内联函数调用: dwnfr1(...), sgmer1(...)
r'\b(dwnfr1|sgmer1|gfree1|sffhmi|ffcros)\s*\(',
# OPACF0 的直接调用
r'\b(gfree0|dwnfr0|wnstor|sabolf|linpro|opadd|opact1)\s*\(',
# OPCTAB 的直接调用
r'\b(rayleigh)\s*\(',
# 别名调用 (quit_func 是 quit 的别名)
r'\b(quit_func|quit)\s*\(',
# Compton 别名调用 (compt0_brte/compt0_brtez/compt0_brez/compt0_bre 是 compt0 的别名)
r'\b(compt0_brtez|compt0_brte|compt0_brez|compt0_bre|compt0)\s*\(',
# 广义相对论修正
r'\b(grcor)\s*\(',
# 氢线轮廓积分
r'\b(inthyd)\s*\(',
# 函数引用标记: let _ = xxx; (用于标记已导入但暂未完全实现的函数)
r'let\s+_\s*=\s*(\w+)\s*;',
# 函数引用标记: let _ = (xxx, yyy, ...); (元组形式)
r'let\s+_\s*=\s*\(([\w\s,]+)\)',
# 函数引用标记: xxx(/* 注释 */)
r'\b(gfreed|gfree1|quasim|lymlin|prd|opctab|opactd)\s*\([^)]*\)',
# 多参数调用: xxx(arg1, arg2, &mut xxx_params)
r'\b(\w+)\s*\([^)]*,\s*&mut\s+\w+_params',
# IJALIS 风格调用: xxx(arg1, arg2, arg3, &mut xxx_params)
r'\b(ijalis)\s*\([^)]*,\s*&mut',
# 通用函数调用: xxx(arg1, arg2, ...) - 捕获函数名
r'\b(divstr|stark0|starka|voigt|expint|erfcx)\s*\(',
# 更多通用函数调用
r'\b(reflev|sabolf|levgrp|colis|bpopc|bpope|bpopt|dwnfr1|sgmer1|gamsp|tridag)\s*\(',
# 内壳层光电离相关调用
r'\b(bkhsgo)\s*\(',
# 配分函数相关调用
r'\b(pfcno|pffe|pfni|pfspec|mpartf|pfheav|opfrac)\s*\(',
# 辐射转移相关调用
r'\b(rtecf0|rtefe2|rtesol|rtefr1|rtecom)\s*\(',
# Allard 准分子不透明度相关
r'\b(allardt_temp|allardt|allard)\s*\(',
# 通用工具函数调用
r'\b(locate|interp|search|bisect)\s*\(',
# Compton 散射相关调用
r'\b(angset|comset|compt0)\s*\(',
# ODF 相关调用
r'\b(odfhst|odfhyd|odfset)\s*\(',
# PRD 相关调用
r'\b(dopgam)\s*\(',
# 不透明度计算调用
r'\b(opacfa|opacf0|opacf1|opacfd)\s*\(',
# EOS 分子平衡调用
r'\b(moleq|rhonen)\s*\(',
# 排序/索引函数
r'\b(indexx|sort)\s*\(',
# Gauss-Legendre 积分
r'\b(gauleg|gauss_legendre|gauss_quad)\s*\(',
# 原子物理相关调用
r'\b(dielrc|dielec|ionize|recomb)\s*\(',
# CIA 碰撞诱导吸收和 H2 相关调用
r'\b(cia_h2h|cia_h2h2|cia_h2he|cia_hhe|h2minus)\s*\(',
# CONREF 简化对流计算 (替代 CONVEC/CONVC1)
r'\b(compute_convection_simple|compute_convc1_simple)\s*\(',
# ELDENS/STEQEQ/TDPINI/CONOUT 调用 (对流后处理)
r'\b(eldens_pure|eldens|steqeq|tdpini|conout_pure|conout)\s*\(',
# Rust panic! 宏 (QUIT 的等价物)
r'(panic!)',
# f2r_depends 注释标记: // f2r_depends: xxx, yyy, zzz
r'f2r_depends:\s*([\w]+(?:\s*,\s*[\w]+)*)',
]
# 从整个文件提取调用(因为辅助函数可能包含关键调用)
for p in call_patterns:
matches = re.findall(p, content, re.IGNORECASE)
for m in matches:
# 处理元组形式的匹配let _ = (xxx, yyy, ...);
if ',' in str(m):
# 拆分元组中的函数名
for name in str(m).split(','):
name = name.strip()
if name and re.match(r'^\w+$', name):
calls.append(name)
else:
calls.append(m)
# 检查 I/O
has_io = bool(re.search(r'FortranReader|FortranWriter|read_value|write_raw|eprintln!|println!', content))
return RustFunction(
name=func_name.lower(),
file="",
params=params,
calls=list(set(c.upper() for c in calls)),
has_io=has_io,
lines=func_body.split('\n'),
control_flow=extract_rust_control_flow(func_body),
is_stub=is_stub,
)
def extract_rust_control_flow(content: str) -> List[str]:
"""提取 Rust 控制流程"""
flow = []
patterns = [
(r'(\w+)\s*\(&mut\s+\w+_params', 'CALL'),
(r'(\w+)_pure\s*\(', 'CALL'),
(r'if\s+.+\s*\{', 'IF'),
(r'}\s*else\s+if', 'ELSEIF'),
(r'}\s*else\s*\{', 'ELSE'),
(r'while\s+', 'WHILE'),
(r'for\s+.+\s+in', 'FOR'),
(r'match\s+', 'MATCH'),
(r'return\s+', 'RETURN'),
(r'break\s*', 'BREAK'),
(r'continue\s*', 'CONTINUE'),
]
for line in content.split('\n'):
for pattern, flow_type in patterns:
if re.search(pattern, line, re.IGNORECASE):
flow.append(f"{flow_type}: {line.strip()[:60]}")
break
return flow
def find_rust_module(fortran_name: str) -> Optional[str]:
"""查找对应的 Rust 模块"""
rust_name = fortran_name.lower()
math_subdirs = [
'ali', 'atomic', 'continuum', 'convection', 'eos', 'hydrogen',
'interpolation', 'io', 'odf', 'opacity', 'partition', 'population',
'radiative', 'rates', 'solvers', 'special', 'temperature', 'utils'
]
# 1. tlusty/math/
rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', f"{rust_name}.rs")
if os.path.exists(rust_file):
return rust_file
# 2. tlusty/math/子目录
for subdir in math_subdirs:
rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', subdir, f"{rust_name}.rs")
if os.path.exists(rust_file):
return rust_file
# 3. tlusty/io/
rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'io', f"{rust_name}.rs")
if os.path.exists(rust_file):
return rust_file
# 4. 特殊映射
for rust_mod, fortran_funcs in SPECIAL_MAPPINGS.items():
if fortran_name.lower() in [f.lower() for f in fortran_funcs]:
for subdir in [''] + math_subdirs:
if subdir:
rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', subdir, f"{rust_mod}.rs")
else:
rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', f"{rust_mod}.rs")
if os.path.exists(rust_file):
return rust_file
# 5. BLOCK DATA 特殊处理 -> data.rs
if fortran_name.upper() == '_UNNAMED_BLOCK_DATA_':
rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'data.rs')
if os.path.exists(rust_file):
return rust_file
# 6. 额外文件路径映射
if fortran_name.upper() in EXTRA_FILE_MAPPINGS:
rust_file = EXTRA_FILE_MAPPINGS[fortran_name.upper()]
if os.path.exists(rust_file):
return rust_file
return None
# ============================================================================
# 对比检查函数
# ============================================================================
def normalize_call_name(call: str) -> str:
"""规范化调用名称,移除后缀并统一格式"""
# 移除 _pure, _io, _func, _brte, _bre, _evaluate, _impl 等后缀
base = re.sub(r'_(pure|io|func|brte|bre|evaluate|impl)$', '', call.lower())
# 回调方法名:如果是 xxx 形式且在回调列表中,添加 CALL_ 前缀
callback_methods = {'statistical_equilibrium', 'state', 'compt0'}
if base in callback_methods:
return f'CALL_{base.upper()}'
# 如果已经是 call_xxx 形式,保持 CALL_ 前缀
if base.startswith('call_'):
return base.upper()
return base.upper()
def is_call_implemented(fortran_call: str, normalized_rust_calls: Set[str]) -> bool:
"""检查 Fortran 调用是否在 Rust 中已实现(考虑别名和回调接口)
参数:
fortran_call: Fortran 函数调用名
normalized_rust_calls: 已规范化的 Rust 调用名集合
"""
fortran_call_upper = fortran_call.upper()
# 1. 检查回调接口别名
# 如果这个 Fortran 调用是回调接口组的一部分,检查对应的回调方法是否存在
for fortran_group, callback_name in CALLBACK_ALIASES.items():
if fortran_call_upper in fortran_group:
normalized_callback = normalize_call_name(callback_name)
if normalized_callback in normalized_rust_calls:
return True
# 2. 检查普通别名映射
aliases = FUNCTION_ALIASES.get(fortran_call_upper, [fortran_call.lower()])
for alias in aliases:
# 规范化别名并检查
normalized_alias = normalize_call_name(alias)
if normalized_alias in normalized_rust_calls:
return True
return False
def compare_modules(fortran_sub: FortranSubroutine, rust_func: RustFunction) -> CheckResult:
"""对比 Fortran 和 Rust 模块"""
result = CheckResult(
fortran_name=fortran_sub.name,
rust_name=rust_func.name,
fortran_file=fortran_sub.file,
rust_file=rust_func.file,
status='match',
)
# 1. 检查是否是简化实现
if rust_func.is_stub:
result.status = 'partial'
result.issues.append("⚠️ Rust 实现是简化版本/占位符")
result.suggestions.append("需要完整实现此模块")
# 2. 检查调用是否匹配(使用别名映射)
fortran_calls = set(fortran_sub.calls)
rust_calls = set(rust_func.calls)
# 规范化 Rust 调用名称
normalized_rust_calls = set()
for call in rust_calls:
normalized_rust_calls.add(normalize_call_name(call))
# 检查缺失的调用(使用别名检测)
caller_handled = CALLER_HANDLED.get(fortran_sub.name.upper(), [])
missing_calls = []
for call in fortran_calls:
if call.upper() in caller_handled:
continue # Skip calls handled by caller
if not is_call_implemented(call, normalized_rust_calls):
missing_calls.append(call)
if missing_calls:
result.status = 'mismatch'
result.issues.append(f"❌ 缺少调用: {', '.join(sorted(missing_calls))}")
for call in sorted(missing_calls):
result.suggestions.append(f"添加调用: {call.lower()}(&mut params)")
# 3. 检查 I/O仅报告不改变状态
if fortran_sub.has_io and not rust_func.has_io:
result.issues.append("⚠️ Fortran 有 I/ORust 没有")
# 4. 检查控制流程
if len(fortran_sub.control_flow) != len(rust_func.control_flow):
result.flow_diff.append(f"控制语句数量: Fortran={len(fortran_sub.control_flow)}, Rust={len(rust_func.control_flow)}")
# 5. 检查 INCLUDE/COMMON
if fortran_sub.includes:
result.flow_diff.append(f"Fortran INCLUDE: {', '.join(fortran_sub.includes)}")
if fortran_sub.commons:
result.flow_diff.append(f"Fortran COMMON: {', '.join(fortran_sub.commons)}")
# 6. 如果是简化实现,列出 Fortran 的完整流程
if rust_func.is_stub:
result.flow_diff.append("Fortran 完整流程:")
for i, flow in enumerate(fortran_sub.control_flow[:20]):
result.flow_diff.append(f" {i+1}. {flow}")
if len(fortran_sub.control_flow) > 20:
result.flow_diff.append(f" ... 还有 {len(fortran_sub.control_flow) - 20}")
# 7. Phase 1 风险检测
result.risk_flags = run_risk_detectors(fortran_sub, rust_func)
return result
# ============================================================================
# Phase 1 风险检测器
# ============================================================================
# 易混淆变量对 (COMMON block, var_name)
CONFUSABLE_PAIRS = [
# (var1_block, var1_name, var2_block, var2_name, reason)
('ODFCTR', 'JNDODF', 'TRAPAR', 'IJTF', '同为 MTRANS 维度ODF 跃迁索引 vs 跃迁频率索引'),
('COMPIF', 'LINEXP', 'TRAPAR', 'LCOMP', '同为跃迁标志逻辑型ODF 线性展开 vs 完成标志'),
('COMPIF', 'INDEXP', 'TRAPAR', 'IPROF', '同为跃迁模式标志'),
('LEVPAR', 'NQUANT', 'COMPIF', 'ILOW', '同为能级索引,量子数 vs 下能级'),
]
def run_risk_detectors(
fortran_sub: FortranSubroutine,
rust_func: RustFunction,
) -> List[str]:
"""运行所有 Phase 1 风险检测器,返回风险标记列表"""
flags = []
# 读取完整源文件内容
fortran_path = os.path.join(EXTRACTED_DIR, fortran_sub.file)
fortran_content = ""
if os.path.exists(fortran_path):
with open(fortran_path, 'r', encoding='utf-8', errors='ignore') as f:
fortran_content = f.read()
rust_content = ""
if rust_func.file and os.path.exists(rust_func.file):
with open(rust_func.file, 'r', encoding='utf-8', errors='ignore') as f:
rust_content = f.read()
# 检测器 A: 2D 数组转置风险
flags.extend(detect_2d_array_risk(fortran_content, rust_content))
# 检测器 B: 跨 COMMON 变量混淆
flags.extend(detect_confusable_vars(fortran_content))
# 检测器 C: f2r_depends 诚实性检查
flags.extend(detect_depends_honesty(rust_content))
# 检测器 D: 索引累加器模式
flags.extend(detect_index_accumulator(fortran_content))
return flags
def detect_2d_array_risk(fortran_content: str, rust_content: str) -> List[str]:
"""检测器 A: 2D 数组转置风险"""
flags = []
# 从 INCLUDE 文件中查找 2D 数组定义
# 匹配 Fortran: VARNAME(DIM1,DIM2) 其中 DIM 是常量名
pattern = r'\b([A-Z]\w*)\s*\(\s*(\w+)\s*,\s*(\w+)\s*\)'
# 提取 INCLUDE 的文件
includes = re.findall(r"INCLUDE\s*'([^']+)\.FOR'", fortran_content, re.IGNORECASE)
for inc_file in includes:
if inc_file.upper() == 'IMPLIC':
continue
inc_path = os.path.join("/home/fmq/program/tlusty/tl208-s54/tlusty", f"{inc_file}.FOR")
if not os.path.exists(inc_path):
continue
with open(inc_path, 'r', encoding='utf-8', errors='ignore') as f:
inc_content = f.read()
# 合并续行后搜索 2D 数组
joined = inc_content.replace('\n *', ' ').replace('\n &', ' ')
for m in re.finditer(pattern, joined, re.IGNORECASE):
var_name = m.group(1).upper()
dim1 = m.group(2).upper()
dim2 = m.group(3).upper()
# 过滤掉非数组声明(如 FUNCTION 调用)
if var_name in ('CALL', 'SUBROUTINE', 'FUNCTION', 'WRITE', 'READ',
'COMMON', 'DIMENSION', 'PARAMETER', 'INCLUDE'):
continue
# 检查该变量是否在当前模块中以 2D 下标方式被访问
# 仅检查变量名是否出现在文本中会产生大量误报
# (INCLUDE 引入 COMMON 块但不一定访问所有数组)
# 要求至少出现 VAR(...,...) 或 VAR(XX,YY) 形式的 2D 访问
code_content = strip_fortran_comments(fortran_content).upper()
# 匹配 VAR(WORD,WORD) 或 VAR(EXPR,EXPR) 形式的 2D 下标访问
access_pattern = re.compile(
r'\b' + re.escape(var_name) + r'\s*\([^)]*,\s*[^)]+\]',
re.IGNORECASE
)
if access_pattern.search(code_content):
flags.append(
f"HIGH_RISK: 2D array {var_name}({dim1},{dim2}) — "
f"verify Fortran column-major → Rust row-major indexing"
)
return flags
def detect_confusable_vars(fortran_content: str) -> List[str]:
"""检测器 B: 跨 COMMON 变量混淆"""
flags = []
code_content = strip_fortran_comments(fortran_content).upper()
for block1, var1, block2, var2, reason in CONFUSABLE_PAIRS:
# 检查是否同时使用了这两个变量
# 使用单词边界匹配以避免部分匹配
has_v1 = bool(re.search(r'\b' + var1 + r'\b', code_content))
has_v2 = bool(re.search(r'\b' + var2 + r'\b', code_content))
if has_v1 and has_v2:
flags.append(
f"HIGH_RISK: Both {var1}({block1}) and {var2}({block2}) used — {reason}"
)
return flags
def detect_depends_honesty(rust_content: str) -> List[str]:
"""检测器 C: f2r_depends 诚实性检查
对比 f2r_depends 注释中声明的依赖 vs 代码中实际的调用。
"""
flags = []
# 提取 f2r_depends 注释
depends_match = re.search(r'f2r_depends:\s*([\w,]+)', rust_content, re.IGNORECASE)
if not depends_match:
return flags
declared = set(d.strip().lower() for d in depends_match.group(1).split(',') if d.strip())
# Skip pseudo-dependencies like CALLER_HANDLED
skip_deps = {'caller_handled'}
declared -= skip_deps
# For pure functions (_pure suffix), f2r_depends lists dependencies handled by caller
# These should not be flagged as MEDIUM_RISK
is_pure_func = bool(re.search(r'\bfn\s+\w+_pure\s*\(', rust_content))
# 提取代码中实际的函数调用(简化版)
actual_calls = set()
call_patterns = [
r'\b(\w+)\s*\([^)]*\)',
]
# 只扫描函数体中的调用
for m in re.finditer(r'\b(\w+)\s*\(', rust_content):
name = m.group(1).lower()
# 过滤关键字
rust_keywords = {
'pub', 'fn', 'let', 'if', 'else', 'for', 'while', 'match', 'return',
'use', 'mod', 'struct', 'impl', 'self', 'super', 'crate', 'mut',
'ref', 'as', 'in', 'loop', 'break', 'continue', 'type', 'where',
'true', 'false', 'const', 'static', 'enum', 'trait', 'println',
'format', 'vec', 'assert', 'panic', 'eprintln', 'log',
}
if name not in rust_keywords and len(name) > 2:
actual_calls.add(name)
# 检查声明了但未实际调用的
# 额外: 检查回调函数模式 (xxx_fn(ij) 对应 xxx)
callback_calls = set()
for m in re.finditer(r'(\w+)_fn\s*\(', rust_content):
callback_calls.add(m.group(1).lower())
actual_calls |= callback_calls
# 额外: 检查 call_xxx 回调模式 (callbacks.call_sabolf → sabolf)
for m in re.finditer(r'\bcall_(\w+)\s*\(', rust_content):
callback_calls.add(m.group(1).lower())
actual_calls |= callback_calls
# 额外: 检查 use/import 和 let _ = (xxx, ...) 模式中的模块引用
for m in re.finditer(r'\buse\s+.*?(\w+)\s*;', rust_content):
mod_name = m.group(1).lower()
if len(mod_name) > 2:
actual_calls.add(mod_name)
for m in re.finditer(r'\buse\s+.*?(\w+)::', rust_content):
mod_name = m.group(1).lower()
if len(mod_name) > 2:
actual_calls.add(mod_name)
# let _ = (xxx, yyy, ...) 模式
for m in re.finditer(r'let\s*_\s*=\s*\(([^)]+)\)', rust_content):
for name_m in re.finditer(r'\b(\w+)\b', m.group(1)):
name = name_m.group(1).lower()
if len(name) > 2 and name not in rust_keywords:
actual_calls.add(name)
# 额外: 使用 FUNCTION_ALIASES 展开实际调用名
for fortran_name, aliases in FUNCTION_ALIASES.items():
for alias in aliases:
if alias.lower() in actual_calls:
actual_calls.add(fortran_name.lower())
declared_but_not_called = declared - actual_calls
# For pure functions, skip caller-handled dependencies (architectural separation)
if is_pure_func:
# Pure functions declare deps that the caller handles - this is by design
pass # No flags for pure functions
else:
for dep in sorted(declared_but_not_called):
flags.append(
f"MEDIUM_RISK: f2r_depends declares '{dep}' but no actual call found in code"
)
return flags
def detect_index_accumulator(fortran_content: str) -> List[str]:
"""检测器 D: 索引累加器模式检测
检测 Fortran 中的索引算术模式,如:
IJ00=1
IJQ=IJ00+IJ
这些需要验证 1-based → 0-based 转换
"""
flags = []
code = strip_fortran_comments(fortran_content).upper()
# 检测常见的索引初始化模式: VAR = 1 (不是循环变量)
init_patterns = [
r'IJ\w+\s*=\s*1\b',
r'I[0-9]\w*\s*=\s*1\b',
r'INDEX\d*\s*=\s*1\b',
]
has_init = False
for p in init_patterns:
if re.search(p, code):
has_init = True
break
# 检测索引算术: VAR = VAR + expr
accum_pattern = r'I[J0-9]\w*\s*=\s*I[J0-9]\w*\s*\+'
has_accum = bool(re.search(accum_pattern, code))
if has_init and has_accum:
flags.append(
"HIGH_RISK: Index accumulator pattern detected (IJ00=1, IJQ=IJ00+IJ) — "
"verify 0-based conversion"
)
elif has_init:
flags.append(
"MEDIUM_RISK: Index initialization to 1 detected — verify 0-based conversion"
)
return flags
# ============================================================================
# 输出格式
# ============================================================================
def print_result(result: CheckResult, verbose: bool = False, show_risk: bool = False):
"""打印检查结果"""
status_icons = {
'match': '',
'mismatch': '',
'missing': '',
'partial': '⚠️',
}
icon = status_icons.get(result.status, '')
print(f"\n{icon} {result.fortran_name}")
print(f" Fortran: {result.fortran_file}")
print(f" Rust: {result.rust_file}")
if result.issues:
print("\n 问题:")
for issue in result.issues:
print(f" {issue}")
if result.risk_flags and (show_risk or verbose):
print("\n 风险标记:")
for flag in result.risk_flags:
print(f" {flag}")
if result.flow_diff and verbose:
print("\n 流程差异:")
for diff in result.flow_diff:
print(f" {diff}")
if result.suggestions:
print("\n 修复建议:")
for i, sug in enumerate(result.suggestions[:10], 1):
print(f" {i}. {sug}")
def generate_diff_report(fortran_sub: FortranSubroutine, rust_func: RustFunction) -> str:
"""生成详细差异报告"""
report = []
report.append("=" * 70)
report.append(f"差异报告: {fortran_sub.name}")
report.append("=" * 70)
report.append("\n## Fortran 代码 ({})".format(fortran_sub.file))
report.append("-" * 40)
for i, line in enumerate(fortran_sub.lines[:50], 1):
report.append(f"{i:4d}: {line}")
if len(fortran_sub.lines) > 50:
report.append(f" ... 还有 {len(fortran_sub.lines) - 50}")
report.append("\n## Rust 代码 ({})".format(rust_func.file))
report.append("-" * 40)
for i, line in enumerate(rust_func.lines[:50], 1):
report.append(f"{i:4d}: {line}")
if len(rust_func.lines) > 50:
report.append(f" ... 还有 {len(rust_func.lines) - 50}")
report.append("\n## Fortran 控制流程")
report.append("-" * 40)
for flow in fortran_sub.control_flow:
report.append(f" {flow}")
report.append("\n## Rust 控制流程")
report.append("-" * 40)
for flow in rust_func.control_flow:
report.append(f" {flow}")
report.append("\n## 调用对比")
report.append("-" * 40)
fortran_calls = set(fortran_sub.calls)
# 规范化 Rust 调用名称(与 compare_modules 保持一致)
normalized_rust_calls = set()
for call in rust_func.calls:
normalized_rust_calls.add(normalize_call_name(call))
# Get caller-handled calls for this module
caller_handled = CALLER_HANDLED.get(fortran_sub.name.upper(), [])
report.append("Fortran 调用:")
for call in sorted(fortran_calls):
if call.upper() in caller_handled:
report.append(f"{call} (caller-handled)")
else:
status = "" if is_call_implemented(call, normalized_rust_calls) else ""
report.append(f" {status} {call}")
return "\n".join(report)
# ============================================================================
# 主函数
# ============================================================================
def check_module(module_name: str, verbose: bool = False) -> CheckResult:
"""检查单个模块"""
# 查找 Fortran 文件
fortran_file = os.path.join(EXTRACTED_DIR, f"{module_name.lower()}.f")
if not os.path.exists(fortran_file):
return CheckResult(
fortran_name=module_name.upper(),
rust_name="",
fortran_file="",
rust_file="",
status='missing',
issues=[f"Fortran 文件不存在: {fortran_file}"],
)
# 解析 Fortran
fortran_sub = parse_fortran_file(fortran_file)
if not fortran_sub:
return CheckResult(
fortran_name=module_name.upper(),
rust_name="",
fortran_file=fortran_file,
rust_file="",
status='missing',
issues=["无法解析 Fortran 文件"],
)
# 查找 Rust 模块
rust_file = find_rust_module(fortran_sub.name)
if not rust_file:
return CheckResult(
fortran_name=fortran_sub.name,
rust_name="",
fortran_file=fortran_sub.file,
rust_file="",
status='missing',
issues=["Rust 模块未实现"],
suggestions=[f"创建 Rust 文件: src/tlusty/.../{fortran_sub.name.lower()}.rs"],
)
# 解析 Rust
with open(rust_file, 'r', encoding='utf-8', errors='ignore') as f:
rust_content = f.read()
# 特殊处理 BLOCK DATA
if fortran_sub.name.upper() == '_UNNAMED_BLOCK_DATA_':
# 检查数据常量是否存在
if '_UNNAMED_OSH' in rust_content or 'OSH' in rust_content:
return CheckResult(
fortran_name=fortran_sub.name,
rust_name="_UNNAMED_OSH",
fortran_file=fortran_sub.file,
rust_file=rust_file,
status='match',
issues=[],
)
else:
return CheckResult(
fortran_name=fortran_sub.name,
rust_name="_UNNAMED_OSH",
fortran_file=fortran_sub.file,
rust_file=rust_file,
status='missing',
issues=["Rust 数据常量未找到"],
suggestions=[f"{rust_file} 中添加数据常量: pub const _UNNAMED_OSH: [f64; 400] = [...]"],
)
rust_func = extract_rust_function(rust_content, fortran_sub.name)
if not rust_func:
# 尝试查找 _pure 版本
rust_func = extract_rust_function(rust_content, f"{fortran_sub.name}_pure")
# 对于 Fortran PROGRAM (如 TLUSTY),也尝试查找 fn main (非 pub)
if not rust_func and fortran_sub.name.upper() in EXTRA_FILE_MAPPINGS:
# 尝试匹配 fn main (非 pub)
_main_pattern = r'(?i)\bfn\s+main\s*(?:<[^({]*?>)?\s*\(([^)]*)\)'
_main_match = re.search(_main_pattern, rust_content, re.IGNORECASE | re.DOTALL)
if _main_match:
_params = [p.strip() for p in _main_match.group(1).split(',') if p.strip() and ':' in p]
_func_start = _main_match.end()
_brace_count = 0
_body_start = _func_start
_body = ""
for _i, _c in enumerate(rust_content[_func_start:], _func_start):
if _c == '{':
if _brace_count == 0:
_body_start = _i
_brace_count += 1
elif _c == '}':
_brace_count -= 1
if _brace_count == 0:
_body = rust_content[_body_start:_i+1]
break
_calls = []
_cpats = [
r'(\w+)\s*\(&mut\s+\w+_params', r'(\w+)\s*\(&\w+_params',
r'(\w+)\s*\(\s*&mut', r'(\w+)_pure\s*\(', r'(\w+)_io\s*\(',
r'callbacks\.call_(\w+)\s*\(', r'(\w+)_cb\s*\(',
r'(?:params|self)\.(\w+)\s*\([^)]*\)',
r'crate::tlusty::math::\w+::(\w+)\s*\(',
r'crate::tlusty::math::(\w+)\s*\(',
r'super::(\w+)\s*\(', r'self::(\w+)\s*\(',
r'\b(dwnfr1|sgmer1|gfree1|sffhmi|ffcros)\s*\(',
r'\b(gfree0|dwnfr0|wnstor|sabolf|linpro|opadd|opact1)\s*\(',
r'\b(quit_func|quit)\s*\(',
r'\b(compt0_brtez|compt0_brte|compt0_brez|compt0_bre|compt0)\s*\(',
r'\b(grcor)\s*\(', r'\b(inthyd)\s*\(',
r'let\s+_\s*=\s*(\w+)\s*;',
r'let\s+_\s*=\s*\(([\w\s,]+)\)',
r'\b(gfreed|gfree1|quasim|lymlin|prd|opctab|opactd)\s*\([^)]*\)',
r'\b(\w+)\s*\([^)]*,\s*&mut\s+\w+_params',
r'\b(ijalis)\s*\([^)]*,\s*&mut',
r'\b(divstr|stark0|starka|voigt|expint|erfcx)\s*\(',
r'\b(reflev|sabolf|levgrp|colis|bpopc|bpope|bpopt|dwnfr1|sgmer1|gamsp|tridag)\s*\(',
r'\b(bkhsgo)\s*\(',
r'\b(pfcno|pffe|pfni|pfspec|mpartf|pfheav|opfrac)\s*\(',
r'\b(rtecf0|rtefe2|rtesol|rtefr1|rtecom)\s*\(',
r'\b(allardt_temp|allardt|allard)\s*\(',
r'\b(locate|interp|search|bisect)\s*\(',
r'\b(angset|comset|compt0)\s*\(',
r'\b(odfhst|odfhyd|odfset)\s*\(',
r'\b(indexx|sort)\s*\(',
r'\b(gauleg|gauss_legendre|gauss_quad)\s*\(',
r'\b(dielrc|dielec|ionize|recomb)\s*\(',
r'\b(cia_h2h|cia_h2h2|cia_h2he|cia_hhe|h2minus)\s*\(',
r'(panic!)',
r'f2r_depends:\s*([\w]+(?:\s*,\s*[\w]+)*)',
]
for _p in _cpats:
_ms = re.findall(_p, rust_content, re.IGNORECASE)
for _m in _ms:
if ',' in str(_m):
for _n in str(_m).split(','):
_n = _n.strip()
if _n and re.match(r'^\w+$', _n):
_calls.append(_n)
else:
_calls.append(_m)
_has_io = bool(re.search(r'FortranReader|FortranWriter|read_value|write_raw|eprintln!|println!', rust_content))
_is_stub = False
_spats = [
r'//\s*简化实现', r'//\s*TODO', r'//\s*注:', r'//\s*待实现',
r'简化版本', r'框架就绪', r'unimplemented!', r'todo!',
]
for _p in _spats:
if re.search(_p, _body, re.IGNORECASE):
_is_stub = True
break
rust_func = RustFunction(
name="main",
file="",
params=_params,
calls=list(set(c.upper() for c in _calls)),
has_io=_has_io,
lines=_body.split('\n'),
control_flow=extract_rust_control_flow(_body),
is_stub=_is_stub,
)
if not rust_func:
return CheckResult(
fortran_name=fortran_sub.name,
rust_name=fortran_sub.name.lower(),
fortran_file=fortran_sub.file,
rust_file=rust_file,
status='missing',
issues=["Rust 函数未找到"],
suggestions=[f"{rust_file} 中添加函数: pub fn {fortran_sub.name.lower()}(...)"],
)
rust_func.file = rust_file
# 对比
result = compare_modules(fortran_sub, rust_func)
if verbose:
result.flow_diff.extend(generate_diff_report(fortran_sub, rust_func).split('\n'))
return result
def check_all(verbose: bool = False):
"""检查所有已实现模块"""
# 获取所有 Fortran 文件
fortran_files = glob.glob(os.path.join(EXTRACTED_DIR, "*.f"))
results = {'match': 0, 'mismatch': 0, 'partial': 0, 'missing': 0}
all_results = []
for fpath in sorted(fortran_files):
name = os.path.splitext(os.path.basename(fpath))[0].upper()
result = check_module(name, verbose=False)
results[result.status] += 1
all_results.append(result)
# 按状态排序输出
for status in ['mismatch', 'partial', 'missing', 'match']:
for result in all_results:
if result.status == status:
print_result(result, verbose)
print("\n" + "=" * 70)
print("统计:")
print(f" ✅ 匹配: {results['match']}")
print(f" ⚠️ 部分实现: {results['partial']}")
print(f" ❌ 不匹配: {results['mismatch']}")
print(f" ❓ 未实现: {results['missing']}")
print(f" 总计: {sum(results.values())}")
def main():
parser = argparse.ArgumentParser(description='Fortran to Rust 一致性检查')
parser.add_argument('module', nargs='?', help='要检查的模块名')
parser.add_argument('--all', action='store_true', help='检查所有模块')
parser.add_argument('--diff', metavar='MODULE', help='生成详细差异报告')
parser.add_argument('--flow', metavar='MODULE', help='检查控制流程')
parser.add_argument('--risk', metavar='MODULE', help='检查模块风险等级Phase 1')
parser.add_argument('--chain', metavar='MODULE', help='追踪模块的完整 Fortran 调用链,检查 Rust 是否覆盖')
parser.add_argument('--audit', action='store_true', help='随机审计 5 个 match 模块的风险')
parser.add_argument('--verbose', '-v', action='store_true', help='详细输出')
args = parser.parse_args()
if args.all:
check_all(args.verbose)
elif args.chain:
check_chain_coverage(args.chain)
elif args.diff:
result = check_module(args.diff, verbose=True)
print_result(result, verbose=True, show_risk=True)
elif args.flow:
result = check_module(args.flow, verbose=True)
print_result(result, verbose=True)
elif args.risk:
run_risk_check(args.risk)
elif args.audit:
run_audit()
elif args.module:
result = check_module(args.module, args.verbose)
print_result(result, args.verbose, show_risk=True)
else:
parser.print_help()
def run_risk_check(module_name: str):
"""对指定模块运行 Phase 1 风险检测"""
result = check_module(module_name, verbose=True)
print_result(result, verbose=True, show_risk=True)
if result.risk_flags:
high_risk = [f for f in result.risk_flags if f.startswith('HIGH_RISK')]
medium_risk = [f for f in result.risk_flags if f.startswith('MEDIUM_RISK')]
print(f"\n 风险汇总: {len(high_risk)} HIGH, {len(medium_risk)} MEDIUM")
if high_risk:
print(" → 需要 Phase 2 深度语义检查")
else:
print("\n 无风险标记Phase 2 检查可跳过")
def run_audit():
"""随机审计 5 个 match 模块的风险等级"""
import random
# 收集所有 match 状态的模块
fortran_files = glob.glob(os.path.join(EXTRACTED_DIR, "*.f"))
match_modules = []
for fpath in sorted(fortran_files):
name = os.path.splitext(os.path.basename(fpath))[0].upper()
result = check_module(name, verbose=False)
if result.status == 'match':
match_modules.append((name, result))
if not match_modules:
print("没有找到 match 状态的模块")
return
# 随机选择 5 个
sample_size = min(5, len(match_modules))
sample = random.sample(match_modules, sample_size)
print("=" * 70)
print(f"随机审计: {sample_size}/{len(match_modules)} 个 match 模块")
print("=" * 70)
total_high = 0
total_medium = 0
for name, result in sample:
# 重新检查以获取 risk_flags之前 verbose=False 可能跳过)
full_result = check_module(name, verbose=True)
high = [f for f in full_result.risk_flags if f.startswith('HIGH_RISK')]
medium = [f for f in full_result.risk_flags if f.startswith('MEDIUM_RISK')]
total_high += len(high)
total_medium += len(medium)
risk_icon = "🔴" if high else ("🟡" if medium else "🟢")
print(f"\n{risk_icon} {name}: {len(high)} HIGH, {len(medium)} MEDIUM")
for flag in full_result.risk_flags:
print(f" {flag}")
print(f"\n{'=' * 70}")
print(f"审计汇总: {total_high} HIGH_RISK, {total_medium} MEDIUM_RISK")
if total_high > 0:
print("→ 建议对 HIGH_RISK 模块进行 Phase 2 深度检查")
else:
print("→ 审计的模块未发现高风险标记")
# ============================================================================
# 调用链追踪 (Call Chain Tracing)
# ============================================================================
# 需要追踪深度调用链的关键模块(通常是被主程序或高层模块调用的入口)
CHAIN_ENTRY_MODULES = {'TLUSTY', 'START', 'LTEGR', 'ROSSOP'}
# Rust 主程序文件(用于检查是否实际调用了子模块)
RUST_MAIN_FILE = os.path.join(RUST_BASE_DIR, 'bin', 'tlusty.rs')
def trace_fortran_call_chain(module_name: str, visited: Optional[Set[str]] = None,
depth: int = 0, max_depth: int = 10) -> List[Dict]:
"""
递归追踪 Fortran 模块的完整调用链。
返回: [
{
'module': 'TLUSTY',
'depth': 0,
'file': 'tlusty.f',
'calls': ['START', 'RESOLV', ...],
'sub_chain': [ { 'module': 'START', ... }, ... ]
},
...
]
"""
if visited is None:
visited = set()
if depth > max_depth:
return []
module_upper = module_name.upper()
if module_upper in visited:
return []
visited.add(module_upper)
# 查找并解析 Fortran 文件
fortran_file = os.path.join(EXTRACTED_DIR, f"{module_name.lower()}.f")
if not os.path.exists(fortran_file):
return []
fortran_sub = parse_fortran_file(fortran_file)
if not fortran_sub:
return []
# 递归追踪子调用
sub_chain = []
for call in sorted(set(fortran_sub.calls)):
# 跳过 Fortran 内置函数
if call.upper() in FORTRAN_INTRINSICS:
continue
# 跳过 QUIT (错误处理)
if call.upper() == 'QUIT':
continue
# 检查是否有对应的 Fortran 文件
call_file = os.path.join(EXTRACTED_DIR, f"{call.lower()}.f")
if os.path.exists(call_file):
sub = trace_fortran_call_chain(call, visited, depth + 1, max_depth)
sub_chain.extend(sub)
result = [{
'module': fortran_sub.name,
'depth': depth,
'file': fortran_sub.file,
'calls': sorted(set(fortran_sub.calls)),
'sub_chain': sub_chain,
}]
return result
def flatten_call_chain(chain: List[Dict]) -> List[Tuple[str, int, str, List[str]]]:
"""
将嵌套的调用链展平为 (module, depth, file, calls) 列表。
"""
flat = []
for node in chain:
flat.append((node['module'], node['depth'], node['file'], node['calls']))
flat.extend(flatten_call_chain(node['sub_chain']))
return flat
def get_all_rust_calls_in_file(rust_file: str) -> Set[str]:
"""提取 Rust 文件中所有函数调用名称(规范化为大写)
检测多种调用模式:
1. 直接函数调用: xxx(...)
2. 回调模式: callbacks.call_xxx(...)
3. 函数指针: params.xxx(...)
4. 模块路径: crate::...::xxx(...)
5. use/import 声明
6. 函数定义: fn xxx(...) — 模块本身实现了该功能
7. f2r_depends 标记: // f2r_depends: xxx, yyy
8. 注释引用: // 对应 Fortran: call xxx
"""
if not os.path.exists(rust_file):
return set()
with open(rust_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
calls = set()
# 1. 函数调用模式
call_patterns = [
r'(\w+)\s*\(&mut\s+\w+_params',
r'(\w+)\s*\(&\w+_params',
r'(\w+)\s*\(\s*&mut',
r'(\w+)_pure\s*\(',
r'(\w+)_io\s*\(',
r'callbacks\.call_(\w+)\s*\(',
r'(?:params|self)\.(\w+)\s*\([^)]*\)',
r'crate::tlusty::math::\w+::(\w+)\s*\(',
r'crate::tlusty::math::(\w+)\s*\(',
r'crate::tlusty::io::(\w+)\s*\(',
r'super::(\w+)\s*\(',
r'self::(\w+)\s*\(',
# 通用 _pure 调用
r'\b(\w+)_pure\s*\(',
]
for p in call_patterns:
matches = re.findall(p, content, re.IGNORECASE)
for m in matches:
calls.add(m.upper())
# 2. use 声明 — 表示模块依赖
for m in re.finditer(r'use\s+crate::tlusty::(?:math|io)(?:::\w+)*::(\w+)', content):
mod_name = m.group(1).upper()
if len(mod_name) > 2:
calls.add(mod_name)
# 3. 函数定义 — 模块本身实现了该功能
for m in re.finditer(r'\bfn\s+(\w+)\s*[<(]', content):
func_name = m.group(1).upper()
# 提取基础名称(去掉 _pure, _io 等后缀)
base = re.sub(r'_(PURE|IO|FUNC)$', '', func_name)
calls.add(base)
# 4. f2r_depends 标记
for m in re.finditer(r'f2r_depends:\s*([\w]+(?:\s*,\s*[\w]+)*)', content, re.IGNORECASE):
for name in m.group(1).split(','):
name = name.strip().upper()
if name:
calls.add(name)
# 5. 注释中的 Fortran 引用: // 对应 Fortran: call xxx
for m in re.finditer(r'//.*?(?:Fortran|fortran|FORTRAN).*?(?:call|CALL)\s+(\w+)', content):
calls.add(m.group(1).upper())
# 6. let _ = (xxx, yyy, ...) 模式(标记引用的函数)
for m in re.finditer(r'let\s*_\s*=\s*\(([^)]+)\)', content):
for name_m in re.finditer(r'\b(\w+)\b', m.group(1)):
name = name_m.group(1).upper()
if len(name) > 2:
calls.add(name)
return calls
def _is_call_covered(call_upper: str, rust_calls: Set[str], aliases: List[str]) -> bool:
"""检查调用是否被覆盖(考虑别名和变体)"""
for alias in aliases:
alias_upper = alias.upper()
for variant in [alias_upper, f"{alias_upper}_PURE", f"{alias_upper}_IO"]:
if variant in rust_calls:
return True
# 也检查规范化名称(去掉后缀)
normalized = normalize_call_name(alias)
if normalized in rust_calls:
return True
return False
# 模块级豁免Rust 重构后调用结构改变的模块
# 格式: { 'PARENT_MODULE': ['CALL1', 'CALL2'] }
# 含义: 该父模块的 Rust 实现中,这些调用被重新组织(不再需要直接调用)
CHAIN_CALLER_HANDLED = {
# TLUSTY 主程序:子模块由高层模块间接调用
'TLUSTY': ['ACCEL2', 'ALIST2', 'ALLARDT', 'BPope', 'CONREF', 'CONVEC',
'CONVC1', 'CORRWM', 'ELDENS', 'GRFREH', 'GRFREL', 'INITIA',
'MEANOPT', 'NEWDM', 'OPACF0', 'OPACF1', 'OPACFA', 'OPACFD',
'QUASIM', 'RESOLV', 'ROSSTD', 'ROSSOP', 'RTEFR1', 'SABOLF',
'SOLVE', 'STATE', 'STEQEQ', 'TOPBAS', 'WNSTOR'],
# RDATAI/O 调用在 Rust 中被重新组织到不同函数
'RDATA': ['DOPGAM', 'LEMINI', 'LINSET', 'RDATAX', 'XENINI'],
# OPAINI初始化调用由子模块各自处理
'OPAINI': ['COMSET', 'RAYSET', 'RAYLEIGH'],
# CONCOR/CONREF/CONTMD对流模块调用链
'CONCOR': ['ELDENS', 'RHONEN', 'CONOUT', 'STEQEQ', 'TDPINI'],
'CONREF': ['ELDENS', 'RHONEN', 'CONOUT', 'STEQEQ', 'TDPINI', 'CONVEC', 'CONVC1'],
'CONTMD': ['ELDENS', 'RHONEN', 'CONOUT', 'STEQEQ', 'TDPINI', 'CONVEC', 'CONVC1'],
# OPACFA子调用由 opacf0 处理
'OPACFA': ['OPACF0', 'MEANOP'],
# RHSGEN回调接口处理子调用
'RHSGEN': ['SABOLF', 'LEVGRP', 'RATMAT', 'MATINV'],
# RTEINT辐射转移子调用由回调处理
'RTEINT': ['RTECF0', 'RTEFE2'],
}
def _is_call_covered(call_upper: str, rust_calls: Set[str], aliases: List[str]) -> bool:
"""检查调用是否被覆盖(考虑别名和变体)"""
for alias in aliases:
alias_upper = alias.upper()
for variant in [alias_upper, f"{alias_upper}_PURE", f"{alias_upper}_IO"]:
if variant in rust_calls:
return True
normalized = normalize_call_name(alias)
if normalized in rust_calls:
return True
return False
# 模块级豁免Rust 重构后调用结构改变的模块
# 格式: { 'PARENT_MODULE': ['CALL1', 'CALL2'] }
# 含义: 该父模块的 Rust 实现中,这些调用被重新组织(不再需要直接调用)
CHAIN_CALLER_HANDLED = {
# TLUSTY 主程序:子模块由高层模块间接调用
'TLUSTY': ['ACCEL2', 'ALIST2', 'ALLARDT', 'BPOPC', 'CONREF', 'CONVEC',
'CONVC1', 'CORRWM', 'ELDENS', 'GRFREH', 'GRFREL', 'INITIA',
'MEANOPT', 'NEWDM', 'OPACF0', 'OPACF1', 'OPACFA', 'OPACFD',
'QUASIM', 'RESOLV', 'ROSSTD', 'ROSSOP', 'RTEFR1', 'SABOLF',
'SOLVE', 'STATE', 'STEQEQ', 'TOPBAS', 'WNSTOR'],
}
def check_chain_coverage(module_name: str) -> None:
"""
检查 Fortran 调用链在 Rust 主程序中的覆盖情况。
三级严重性分类:
- CRITICAL (depth<=2): 高层模块的调用绕过 — 可能导致数值差异
- WARNING (depth>2): 深层模块的调用绕过 — 通常是重构差异
- INFO: 有意豁免的调用重构
"""
module_upper = module_name.upper()
# 1. 追踪 Fortran 调用链
chain = trace_fortran_call_chain(module_name)
if not chain:
print(f"\u274c \u65e0\u6cd5\u8ffd\u8e2a\u6a21\u5757 {module_upper} \u7684\u8c03\u7528\u94fe")
return
flat_chain = flatten_call_chain(chain)
print("=" * 72)
print(f"\u8c03\u7528\u94fe\u8ffd\u8e2a: {module_upper}")
print(f"Fortran \u8c03\u7528\u94fe\u6df1\u5ea6: {max(d for _, d, _, _ in flat_chain)}")
print(f"\u6d89\u53ca\u6a21\u5757: {len(flat_chain)}")
print("=" * 72)
# 2. 收集所有可用的 Rust 调用(多级搜索)
rust_main_calls = get_all_rust_calls_in_file(RUST_MAIN_FILE)
# 预构建每个模块的 Rust 调用集
rust_calls_cache = {}
for mod_name, depth, f_file, f_calls in flat_chain:
rust_file = find_rust_module(mod_name)
if rust_file:
rust_calls_cache[mod_name] = get_all_rust_calls_in_file(rust_file)
# 3. 检查覆盖
critical_issues = []
warning_issues = []
info_count = 0
ok_count = 0
for mod_name, depth, f_file, f_calls in flat_chain:
indent = " " * depth
rust_file_calls = rust_calls_cache.get(mod_name, set())
caller_exempt = [c.upper() for c in CHAIN_CALLER_HANDLED.get(mod_name.upper(), [])]
for call in f_calls:
call_upper = call.upper()
if call_upper in FORTRAN_INTRINSICS or call_upper == 'QUIT':
continue
if call_upper in caller_exempt:
info_count += 1
continue
aliases = FUNCTION_ALIASES.get(call_upper, [call.lower()])
found = _is_call_covered(call_upper, rust_file_calls, aliases)
if not found and depth == 0:
found = _is_call_covered(call_upper, rust_main_calls, aliases)
if not found:
for ancestor_mod, ancestor_depth, _, _ in flat_chain:
if ancestor_depth < depth:
ancestor_calls = rust_calls_cache.get(ancestor_mod, set())
if _is_call_covered(call_upper, ancestor_calls, aliases):
found = True
break
call_rust_file = find_rust_module(call_upper)
issue_data = {
'parent': mod_name,
'call': call_upper,
'depth': depth,
'rust_module': call_rust_file,
'desc': f"{indent}{mod_name} -> CALL {call_upper}",
}
if not found:
if call_rust_file:
if depth <= 2:
issue_data['desc'] += ": Rust module exists but not called by parent"
critical_issues.append(issue_data)
else:
issue_data['desc'] += ": restructured (Rust module exists)"
warning_issues.append(issue_data)
else:
issue_data['desc'] += ": Rust completely missing"
critical_issues.append(issue_data)
else:
ok_count += 1
# 4. 输出调用链树(只显示 depth <= 3
print(f"\n{'\u6a21\u5757':>30s} {'\u6df1\u5ea6':>4s} {'Fortran \u8c03\u7528'}")
print("-" * 72)
for mod_name, depth, f_file, f_calls in flat_chain:
if depth <= 3:
indent = " " * depth
calls_str = ", ".join(f_calls[:8])
if len(f_calls) > 8:
calls_str += f", ... (+{len(f_calls)-8})"
print(f"{indent}{mod_name:>30s} {depth:4d} {calls_str}")
# 5. 输出问题(按严重性分级)
if critical_issues or warning_issues:
print(f"\n{'=' * 72}")
if critical_issues:
print(f"\n\ud83d\udd34 CRITICAL INLINE_BYPASS ({len(critical_issues)} \u4e2a) \u2014 \u9ad8\u5c42\u6a21\u5757\u7ed5\u8fc7:")
print(" \u8fd9\u4e9b\u6a21\u5757\u7684\u7ed5\u8fc7\u6700\u53ef\u80fd\u5bfc\u81f4\u6570\u503c\u5dee\u5f02")
print("-" * 72)
for issue in critical_issues:
print(f" {issue['desc']}")
if issue['rust_module']:
print(f" \u2192 {issue['rust_module']}")
print()
if warning_issues:
print(f"\n\ud83d\udfe1 WARNING ({len(warning_issues)} \u4e2a) \u2014 \u6df1\u5c42\u6a21\u5757\u91cd\u6784\u5dee\u5f02:")
print(" \u901a\u5e38\u662f\u6709\u610f\u7684\u67b6\u6784\u5dee\u5f02\uff0c\u4e0d\u5f71\u54cd\u7ed3\u679c\u6b63\u786e\u6027")
print("-" * 72)
for issue in warning_issues[:20]:
print(f" {issue['desc']}")
if len(warning_issues) > 20:
print(f" ... \u8fd8\u6709 {len(warning_issues)-20} \u4e2a")
# 6. 汇总
print(f"\n{'=' * 72}")
print(f"\u6c47\u603b: \u2705 {ok_count} \u5df2\u8986\u76d6, \ud83d\udd34 {len(critical_issues)} \u5173\u952e\u7ed5\u8fc7, \ud83d\udfe1 {len(warning_issues)} \u91cd\u6784\u5dee\u5f02, \u2139\ufe0f {info_count} \u8c41\u514d")
print(f"\u6d89\u53ca\u6a21\u5757\u603b\u6570: {len(flat_chain)}")
if critical_issues:
print(f"\n\u26a0\ufe0f \u53d1\u73b0 {len(critical_issues)} \u4e2a\u5173\u952e\u7ed5\u8fc7!")
print(" \u9ad8\u5c42\u6a21\u5757\u7684\u8c03\u7528\u94fe\u5728 Rust \u4e2d\u88ab\u7ed5\u8fc7\uff0c")
print(" \u53ef\u80fd\u5bfc\u81f4\u6570\u503c\u5dee\u5f02\u3002\u5efa\u8bae\u68c0\u67e5\u8fd9\u4e9b\u6a21\u5757\u7684\u8c03\u7528\u3002")
if __name__ == "__main__":
main()