SpectraRust/.claude/skills/f2r-check/scripts/f2r_check.py
2026-03-27 11:59:23 +08:00

639 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Fortran to Rust 一致性检查工具 (f2r_check)
功能:
1. 对比 Fortran 模块与对应 Rust 模块
2. 检查函数签名、逻辑流程、变量映射等
3. 生成差异报告和修复建议
用法:
python3 f2r_check.py <fortran_file> # 检查单个模块
python3 f2r_check.py --all # 检查所有已实现模块
python3 f2r_check.py --module START # 检查指定模块
python3 f2r_check.py --diff START # 生成详细差异报告
python3 f2r_check.py --flow START # 检查控制流程一致性
"""
import os
import re
import sys
import argparse
import glob
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict, Set, Optional, Tuple
# ============================================================================
# 路径配置
# ============================================================================
EXTRACTED_DIR = "/home/fmq/program/tlusty/tl208-s54/rust/tlusty/extracted"
RUST_BASE_DIR = "/home/fmq/.zeroclaw/workspace/SpectraRust/src"
# ============================================================================
# Fortran 内置函数
# ============================================================================
FORTRAN_INTRINSICS = {
'SIN', 'COS', 'TAN', 'ASIN', 'ACOS', 'ATAN', 'ATAN2',
'SINH', 'COSH', 'TANH', 'EXP', 'LOG', 'LOG10', 'LOG2',
'SQRT', 'ABS', 'MOD', 'SIGN', 'MAX', 'MIN', 'MAX0', 'MIN0',
'INT', 'IFIX', 'IDINT', 'FLOAT', 'SNGL', 'DBLE', 'CMPLX',
'REAL', 'AIMAG', 'CONJG', 'ICHAR', 'CHAR', 'INDEX', 'LEN',
'IF', 'THEN', 'ELSE', 'ENDIF', 'END', 'DO', 'CONTINUE',
'RETURN', 'STOP', 'PAUSE', 'GOTO', 'CALL', 'SUBROUTINE',
'FUNCTION', 'PROGRAM', 'MODULE', 'USE', 'IMPLICIT',
'PARAMETER', 'DATA', 'DIMENSION', 'COMMON', 'SAVE',
'EXTERNAL', 'INTRINSIC', 'READ', 'WRITE', 'OPEN', 'CLOSE',
'FORMAT', 'PRINT', 'ERF', 'ERFC', 'GAMMA',
}
# ============================================================================
# 数据结构
# ============================================================================
@dataclass
class FortranSubroutine:
"""Fortran 子程序信息"""
name: str
file: str
params: List[str] = field(default_factory=list)
calls: List[str] = field(default_factory=list)
includes: List[str] = field(default_factory=list)
commons: List[str] = field(default_factory=list)
has_io: bool = False
lines: List[str] = field(default_factory=list)
control_flow: List[str] = field(default_factory=list)
@dataclass
class RustFunction:
"""Rust 函数信息"""
name: str
file: str
params: List[str] = field(default_factory=list)
calls: List[str] = field(default_factory=list)
has_io: bool = False
lines: List[str] = field(default_factory=list)
control_flow: List[str] = field(default_factory=list)
is_stub: bool = False # 是否是空实现/占位符
@dataclass
class CheckResult:
"""检查结果"""
fortran_name: str
rust_name: str
fortran_file: str
rust_file: str
status: str # 'match', 'mismatch', 'missing', 'partial'
issues: List[str] = field(default_factory=list)
flow_diff: List[str] = field(default_factory=list)
suggestions: List[str] = field(default_factory=list)
# ============================================================================
# 模块映射 (从 analyze_fortran.py 复制)
# ============================================================================
SPECIAL_MAPPINGS = {
'gfree': ['gfree0', 'gfreed', 'gfree1'],
'interpolate': ['yint', 'lagran'],
'sgmer': ['sgmer0', 'sgmer1', 'sgmerd'],
'ctdata': ['hction', 'hctrecom'],
'cross': ['cross', 'crossd'],
'expint': ['eint', 'expinx'],
'erfcx': ['erfcx', 'erfcin'],
'lineqs': ['lineqs', 'lineqs_nr'],
'gamsp': ['gamsp'],
'bhe': ['bhe', 'bhed', 'bhez'],
'comset': ['comset'],
'ghydop': ['ghydop'],
'levgrp': ['levgrp'],
'profil': ['profil'],
'linspl': ['linspl'],
'convec': ['convec', 'convc1'],
}
# ============================================================================
# Fortran 解析函数
# ============================================================================
def strip_fortran_comments(content: str) -> str:
"""移除 Fortran 注释"""
lines = content.split('\n')
code_lines = []
for line in lines:
if len(line) == 0:
continue
first_char = line[0].upper()
if first_char in ('C', '!', '*'):
continue
code_lines.append(line)
return '\n'.join(code_lines)
def extract_fortran_params(content: str) -> List[str]:
"""提取 Fortran 子程序参数"""
# 匹配 SUBROUTINE NAME(PARAM1, PARAM2, ...)
match = re.search(r'(?i)SUBROUTINE\s+(\w+)\s*\(([^)]*)\)', content)
if match:
params_str = match.group(2)
params = [p.strip() for p in params_str.split(',') if p.strip()]
return params
return []
def extract_fortran_calls(content: str) -> List[str]:
"""提取 CALL 语句"""
code_content = strip_fortran_comments(content)
calls = re.findall(r'(?i)CALL\s+(\w+)(?:\s*\(|\s*$|\s*\n)', code_content)
return [c.upper() for c in calls if c.upper() not in FORTRAN_INTRINSICS]
def extract_fortran_includes(content: str) -> List[str]:
"""提取 INCLUDE 文件"""
includes = re.findall(r"INCLUDE\s*'([^']+)\.FOR'", content, re.IGNORECASE)
return [inc for inc in includes if inc.upper() != 'IMPLIC']
def extract_fortran_commons(content: str) -> List[str]:
"""提取 COMMON 块"""
commons = re.findall(r'(?i)^\s*COMMON\s*/(\w+)/', content, re.MULTILINE)
return list(set(commons))
def extract_control_flow(content: str) -> List[str]:
"""提取控制流程语句"""
flow = []
code_content = strip_fortran_comments(content)
# 提取关键控制语句
patterns = [
(r'(?i)^\s*CALL\s+(\w+)', 'CALL'),
(r'(?i)^\s*IF\s*\([^)]+\)\s*(THEN|GOTO)', 'IF'),
(r'(?i)^\s*ELSE\s*IF', 'ELSEIF'),
(r'(?i)^\s*ELSE\b', 'ELSE'),
(r'(?i)^\s*ENDIF\b', 'ENDIF'),
(r'(?i)^\s*DO\s+\d+', 'DO'),
(r'(?i)^\s*(\d+)\s+CONTINUE', 'LABEL'),
(r'(?i)^\s*GOTO\s+(\d+)', 'GOTO'),
(r'(?i)^\s*RETURN\b', 'RETURN'),
(r'(?i)^\s*STOP\b', 'STOP'),
]
for line in code_content.split('\n'):
for pattern, flow_type in patterns:
if re.search(pattern, line):
flow.append(f"{flow_type}: {line.strip()[:60]}")
break
return flow
def has_file_io(content: str) -> bool:
"""检查是否有文件 I/O"""
patterns = [r'OPEN\s*\(', r'READ\s*\(\s*\d+', r'WRITE\s*\(\s*\d+']
for p in patterns:
if re.search(p, content, re.IGNORECASE):
return True
return False
def parse_fortran_file(fpath: str) -> Optional[FortranSubroutine]:
"""解析 Fortran 文件"""
with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# 提取子程序名
match = re.search(r'(?i)^\s*SUBROUTINE\s+(\w+)', content, re.MULTILINE)
if not match:
return None
name = match.group(1).upper()
return FortranSubroutine(
name=name,
file=os.path.basename(fpath),
params=extract_fortran_params(content),
calls=extract_fortran_calls(content),
includes=extract_fortran_includes(content),
commons=extract_fortran_commons(content),
has_io=has_file_io(content),
lines=content.split('\n'),
control_flow=extract_control_flow(content),
)
# ============================================================================
# Rust 解析函数
# ============================================================================
def extract_rust_function(content: str, func_name: str) -> Optional[RustFunction]:
"""提取 Rust 函数信息"""
# 匹配 pub fn name<R: BufRead, W: Write>(...) 或 pub fn name(...)
pattern = rf'(?i)pub\s+fn\s+{func_name.lower()}\s*(?:<[^>]+>)?\s*\(([^)]*)\)'
match = re.search(pattern, content, re.IGNORECASE | re.DOTALL)
if not match:
return None
params_str = match.group(1)
params = [p.strip() for p in params_str.split(',') if p.strip() and ':' in p]
# 检查是否是空实现/占位符
# 查找函数体
func_start = match.end()
brace_count = 0
func_body_start = func_start
for i, c in enumerate(content[func_start:], func_start):
if c == '{':
if brace_count == 0:
func_body_start = i
brace_count += 1
elif c == '}':
brace_count -= 1
if brace_count == 0:
func_body = content[func_body_start:i+1]
break
else:
func_body = ""
# 检查是否是简化实现
is_stub = False
stub_patterns = [
r'//\s*简化实现',
r'//\s*TODO',
r'//\s*注:',
r'//\s*待实现',
r'简化版本',
r'框架就绪',
r'unimplemented!',
r'todo!',
]
for p in stub_patterns:
if re.search(p, func_body, re.IGNORECASE):
is_stub = True
break
# 提取调用
calls = []
call_patterns = [
r'(\w+)\s*\(&mut\s+\w+_params',
r'(\w+)\s*\(&\w+_params',
r'(\w+)\s*\(\s*&mut',
r'(\w+)_pure\s*\(',
r'(\w+)_io\s*\(',
# 回调接口调用: callbacks.call_xxx(ij)
r'callbacks\.call_(\w+)\s*\(',
# 直接函数调用: crate::tlusty::math::xxx::yyy()
r'crate::tlusty::math::\w+::(\w+)\s*\(',
# 直接模块调用: crate::tlusty::math::xxx(...)
r'crate::tlusty::math::(\w+)\s*\(',
# 内联函数调用: dwnfr1(...), sgmer1(...)
r'\b(dwnfr1|sgmer1|gfree1|sffhmi|ffcros)\s*\(',
# OPACF0 的直接调用
r'\b(gfree0|dwnfr0|wnstor|sabolf|linpro|opadd|opact1)\s*\(',
# OPCTAB 的直接调用
r'\b(rayleigh)\s*\(',
# 别名调用 (quit_func 是 quit 的别名)
r'\b(quit_func|quit)\s*\(',
]
for p in call_patterns:
calls.extend(re.findall(p, func_body, re.IGNORECASE))
# 检查 I/O
has_io = bool(re.search(r'FortranReader|FortranWriter|read_value|write_raw', func_body))
return RustFunction(
name=func_name.lower(),
file="",
params=params,
calls=list(set(c.upper() for c in calls)),
has_io=has_io,
lines=func_body.split('\n'),
control_flow=extract_rust_control_flow(func_body),
is_stub=is_stub,
)
def extract_rust_control_flow(content: str) -> List[str]:
"""提取 Rust 控制流程"""
flow = []
patterns = [
(r'(\w+)\s*\(&mut\s+\w+_params', 'CALL'),
(r'(\w+)_pure\s*\(', 'CALL'),
(r'if\s+.+\s*\{', 'IF'),
(r'}\s*else\s+if', 'ELSEIF'),
(r'}\s*else\s*\{', 'ELSE'),
(r'while\s+', 'WHILE'),
(r'for\s+.+\s+in', 'FOR'),
(r'match\s+', 'MATCH'),
(r'return\s+', 'RETURN'),
(r'break\s*', 'BREAK'),
(r'continue\s*', 'CONTINUE'),
]
for line in content.split('\n'):
for pattern, flow_type in patterns:
if re.search(pattern, line, re.IGNORECASE):
flow.append(f"{flow_type}: {line.strip()[:60]}")
break
return flow
def find_rust_module(fortran_name: str) -> Optional[str]:
"""查找对应的 Rust 模块"""
rust_name = fortran_name.lower()
math_subdirs = [
'ali', 'atomic', 'continuum', 'convection', 'eos', 'hydrogen',
'interpolation', 'io', 'odf', 'opacity', 'partition', 'population',
'radiative', 'rates', 'solvers', 'special', 'temperature', 'utils'
]
# 1. tlusty/math/
rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', f"{rust_name}.rs")
if os.path.exists(rust_file):
return rust_file
# 2. tlusty/math/子目录
for subdir in math_subdirs:
rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', subdir, f"{rust_name}.rs")
if os.path.exists(rust_file):
return rust_file
# 3. tlusty/io/
rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'io', f"{rust_name}.rs")
if os.path.exists(rust_file):
return rust_file
# 4. 特殊映射
for rust_mod, fortran_funcs in SPECIAL_MAPPINGS.items():
if fortran_name.lower() in [f.lower() for f in fortran_funcs]:
for subdir in [''] + math_subdirs:
if subdir:
rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', subdir, f"{rust_mod}.rs")
else:
rust_file = os.path.join(RUST_BASE_DIR, 'tlusty', 'math', f"{rust_mod}.rs")
if os.path.exists(rust_file):
return rust_file
return None
# ============================================================================
# 对比检查函数
# ============================================================================
def compare_modules(fortran_sub: FortranSubroutine, rust_func: RustFunction) -> CheckResult:
"""对比 Fortran 和 Rust 模块"""
result = CheckResult(
fortran_name=fortran_sub.name,
rust_name=rust_func.name,
fortran_file=fortran_sub.file,
rust_file=rust_func.file,
status='match',
)
# 1. 检查是否是简化实现
if rust_func.is_stub:
result.status = 'partial'
result.issues.append("⚠️ Rust 实现是简化版本/占位符")
result.suggestions.append("需要完整实现此模块")
# 2. 检查调用是否匹配
fortran_calls = set(fortran_sub.calls)
rust_calls = set(rust_func.calls)
# 规范化 Rust 调用名称
normalized_rust_calls = set()
for call in rust_calls:
# 移除 _pure, _io, _func 后缀
base = re.sub(r'_(pure|io|func)$', '', call.lower())
normalized_rust_calls.add(base.upper())
missing_calls = fortran_calls - normalized_rust_calls
extra_calls = normalized_rust_calls - fortran_calls
if missing_calls:
result.status = 'mismatch'
result.issues.append(f"❌ 缺少调用: {', '.join(sorted(missing_calls))}")
for call in sorted(missing_calls):
result.suggestions.append(f"添加调用: {call.lower()}(&mut params)")
# 3. 检查 I/O
if fortran_sub.has_io and not rust_func.has_io:
result.issues.append("⚠️ Fortran 有 I/ORust 没有")
# 4. 检查控制流程
if len(fortran_sub.control_flow) != len(rust_func.control_flow):
result.flow_diff.append(f"控制语句数量: Fortran={len(fortran_sub.control_flow)}, Rust={len(rust_func.control_flow)}")
# 5. 检查 INCLUDE/COMMON
if fortran_sub.includes:
result.flow_diff.append(f"Fortran INCLUDE: {', '.join(fortran_sub.includes)}")
if fortran_sub.commons:
result.flow_diff.append(f"Fortran COMMON: {', '.join(fortran_sub.commons)}")
# 6. 如果是简化实现,列出 Fortran 的完整流程
if rust_func.is_stub:
result.flow_diff.append("Fortran 完整流程:")
for i, flow in enumerate(fortran_sub.control_flow[:20]):
result.flow_diff.append(f" {i+1}. {flow}")
if len(fortran_sub.control_flow) > 20:
result.flow_diff.append(f" ... 还有 {len(fortran_sub.control_flow) - 20}")
return result
# ============================================================================
# 输出格式
# ============================================================================
def print_result(result: CheckResult, verbose: bool = False):
"""打印检查结果"""
status_icons = {
'match': '',
'mismatch': '',
'missing': '',
'partial': '⚠️',
}
icon = status_icons.get(result.status, '')
print(f"\n{icon} {result.fortran_name}")
print(f" Fortran: {result.fortran_file}")
print(f" Rust: {result.rust_file}")
if result.issues:
print("\n 问题:")
for issue in result.issues:
print(f" {issue}")
if result.flow_diff and verbose:
print("\n 流程差异:")
for diff in result.flow_diff:
print(f" {diff}")
if result.suggestions:
print("\n 修复建议:")
for i, sug in enumerate(result.suggestions[:10], 1):
print(f" {i}. {sug}")
def generate_diff_report(fortran_sub: FortranSubroutine, rust_func: RustFunction) -> str:
"""生成详细差异报告"""
report = []
report.append("=" * 70)
report.append(f"差异报告: {fortran_sub.name}")
report.append("=" * 70)
report.append("\n## Fortran 代码 ({})".format(fortran_sub.file))
report.append("-" * 40)
for i, line in enumerate(fortran_sub.lines[:50], 1):
report.append(f"{i:4d}: {line}")
if len(fortran_sub.lines) > 50:
report.append(f" ... 还有 {len(fortran_sub.lines) - 50}")
report.append("\n## Rust 代码 ({})".format(rust_func.file))
report.append("-" * 40)
for i, line in enumerate(rust_func.lines[:50], 1):
report.append(f"{i:4d}: {line}")
if len(rust_func.lines) > 50:
report.append(f" ... 还有 {len(rust_func.lines) - 50}")
report.append("\n## Fortran 控制流程")
report.append("-" * 40)
for flow in fortran_sub.control_flow:
report.append(f" {flow}")
report.append("\n## Rust 控制流程")
report.append("-" * 40)
for flow in rust_func.control_flow:
report.append(f" {flow}")
report.append("\n## 调用对比")
report.append("-" * 40)
fortran_calls = set(fortran_sub.calls)
rust_calls = set(c.lower() for c in rust_func.calls)
report.append("Fortran 调用:")
for call in sorted(fortran_calls):
status = "" if call.lower() in rust_calls else ""
report.append(f" {status} {call}")
return "\n".join(report)
# ============================================================================
# 主函数
# ============================================================================
def check_module(module_name: str, verbose: bool = False) -> CheckResult:
"""检查单个模块"""
# 查找 Fortran 文件
fortran_file = os.path.join(EXTRACTED_DIR, f"{module_name.lower()}.f")
if not os.path.exists(fortran_file):
return CheckResult(
fortran_name=module_name.upper(),
rust_name="",
fortran_file="",
rust_file="",
status='missing',
issues=[f"Fortran 文件不存在: {fortran_file}"],
)
# 解析 Fortran
fortran_sub = parse_fortran_file(fortran_file)
if not fortran_sub:
return CheckResult(
fortran_name=module_name.upper(),
rust_name="",
fortran_file=fortran_file,
rust_file="",
status='missing',
issues=["无法解析 Fortran 文件"],
)
# 查找 Rust 模块
rust_file = find_rust_module(fortran_sub.name)
if not rust_file:
return CheckResult(
fortran_name=fortran_sub.name,
rust_name="",
fortran_file=fortran_sub.file,
rust_file="",
status='missing',
issues=["Rust 模块未实现"],
suggestions=[f"创建 Rust 文件: src/tlusty/.../{fortran_sub.name.lower()}.rs"],
)
# 解析 Rust
with open(rust_file, 'r', encoding='utf-8', errors='ignore') as f:
rust_content = f.read()
rust_func = extract_rust_function(rust_content, fortran_sub.name)
if not rust_func:
# 尝试查找 _pure 版本
rust_func = extract_rust_function(rust_content, f"{fortran_sub.name}_pure")
if not rust_func:
return CheckResult(
fortran_name=fortran_sub.name,
rust_name=fortran_sub.name.lower(),
fortran_file=fortran_sub.file,
rust_file=rust_file,
status='missing',
issues=["Rust 函数未找到"],
suggestions=[f"{rust_file} 中添加函数: pub fn {fortran_sub.name.lower()}(...)"],
)
rust_func.file = rust_file
# 对比
result = compare_modules(fortran_sub, rust_func)
if verbose:
result.flow_diff.extend(generate_diff_report(fortran_sub, rust_func).split('\n'))
return result
def check_all(verbose: bool = False):
"""检查所有已实现模块"""
# 获取所有 Fortran 文件
fortran_files = glob.glob(os.path.join(EXTRACTED_DIR, "*.f"))
results = {'match': 0, 'mismatch': 0, 'partial': 0, 'missing': 0}
all_results = []
for fpath in sorted(fortran_files):
name = os.path.splitext(os.path.basename(fpath))[0].upper()
result = check_module(name, verbose=False)
results[result.status] += 1
all_results.append(result)
# 按状态排序输出
for status in ['mismatch', 'partial', 'missing', 'match']:
for result in all_results:
if result.status == status:
print_result(result, verbose)
print("\n" + "=" * 70)
print("统计:")
print(f" ✅ 匹配: {results['match']}")
print(f" ⚠️ 部分实现: {results['partial']}")
print(f" ❌ 不匹配: {results['mismatch']}")
print(f" ❓ 未实现: {results['missing']}")
print(f" 总计: {sum(results.values())}")
def main():
parser = argparse.ArgumentParser(description='Fortran to Rust 一致性检查')
parser.add_argument('module', nargs='?', help='要检查的模块名')
parser.add_argument('--all', action='store_true', help='检查所有模块')
parser.add_argument('--diff', metavar='MODULE', help='生成详细差异报告')
parser.add_argument('--flow', metavar='MODULE', help='检查控制流程')
parser.add_argument('--verbose', '-v', action='store_true', help='详细输出')
args = parser.parse_args()
if args.all:
check_all(args.verbose)
elif args.diff:
result = check_module(args.diff, verbose=True)
print_result(result, verbose=True)
elif args.flow:
result = check_module(args.flow, verbose=True)
print_result(result, verbose=True)
elif args.module:
result = check_module(args.module, args.verbose)
print_result(result, args.verbose)
else:
parser.print_help()
if __name__ == "__main__":
main()