SpectraRust/.claude/skills/fortran-analyzer/scripts/analyze_fortran.py
Asfmq 0674b4f174 feat: 添加 9 个 SYNSPEC 数学模块 (第8批)
新增模块:
- count_words: 字符串单词计数工具
- divhe2: He II Stark 轮廓除数参数计算
- extprf: 谱线轮廓波长外推 (Cooper 公式)
- feautr: Lyman-α Stark 加宽 (Feautrier 方法)
- gamhe: 中性氦 Stark 加宽参数
- griem: Griem Stark 阻尼参数计算
- intrp: 二分法高效插值程序
- partdv: 配分函数计算 (含压力效应)
- sffhmi_old: H- 自由-自由截面 (Kurucz 公式)

改进:
- 修复 fortran-analyzer 注释行误匹配问题

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-25 06:52:44 +08:00

483 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
分析 TLUSTY Fortran 文件,提取函数依赖信息。
用法:
python3 analyze_fortran.py # 输出 CSV带完整依赖
python3 analyze_fortran.py --tree # 输出依赖树(文本格式)
python3 analyze_fortran.py --priority # 输出重构优先级列表
"""
import os
import re
import glob
import argparse
from collections import defaultdict
def extract_includes(content):
"""提取 INCLUDE 文件列表"""
includes = re.findall(r"INCLUDE\s*'([^']+)\.FOR'", content, re.IGNORECASE)
return [inc for inc in includes if inc.upper() != 'IMPLIC']
def extract_commons(content):
"""提取 COMMON 块名称"""
# 匹配 COMMON/NAME/ 或 common/name/
commons = re.findall(r'(?i)^\s*COMMON\s*/(\w+)/', content, re.MULTILINE)
return list(set(commons))
# Fortran 内置函数列表(不需要追踪)
FORTRAN_INTRINSICS = {
'SIN', 'COS', 'TAN', 'ASIN', 'ACOS', 'ATAN', 'ATAN2',
'SINH', 'COSH', 'TANH',
'EXP', 'LOG', 'LOG10', 'LOG2',
'SQRT', 'ABS', 'MOD', 'SIGN',
'MAX', 'MIN', 'MAX0', 'MIN0', 'MAX1', 'MIN1', 'AMAX0', 'AMIN0',
'INT', 'IFIX', 'IDINT', 'FLOAT', 'SNGL', 'DBLE', 'CMPLX',
'REAL', 'AIMAG', 'CONJG',
'ICHAR', 'CHAR', 'INDEX', 'LEN', 'LGE', 'LGT', 'LLE', 'LLT',
'DOT_PRODUCT', 'MATMUL', 'TRANSPOSE', 'RESHAPE',
'SIZE', 'SHAPE', 'LBOUND', 'UBOUND',
'ALLOCATED', 'ALLOCATE', 'DEALLOCATE',
'KIND', 'SELECTED_REAL_KIND', 'SELECTED_INT_KIND',
'DIGITS', 'EPSILON', 'HUGE', 'TINY', 'PRECISION', 'RANGE',
'FLOOR', 'CEILING', 'NINT', 'ANINT',
'ADJUSTL', 'ADJUSTR', 'TRIM', 'REPEAT', 'SCAN', 'VERIFY',
'PRESENT', 'ASSOCIATED',
# TLUSTY 常用数学函数
'ERF', 'ERFC', 'GAMMA', 'LOG_GAMMA',
# Fortran 语句关键字(不是函数,不应被追踪)
'IF', 'THEN', 'ELSE', 'ENDIF', 'END', 'DO', 'CONTINUE', 'RETURN',
'STOP', 'PAUSE', 'GOTO', 'CALL', 'SUBROUTINE', 'FUNCTION',
'PROGRAM', 'MODULE', 'USE', 'IMPLICIT', 'PARAMETER', 'DATA',
'DIMENSION', 'COMMON', 'SAVE', 'EXTERNAL', 'INTRINSIC',
'READ', 'WRITE', 'OPEN', 'CLOSE', 'FORMAT', 'PRINT',
}
def strip_fortran_comments(content):
"""移除 Fortran 注释行(固定格式)
Fortran 固定格式中,以下开头的行是注释:
- 'c''C' 在第 1 列
- '!' 在第 1 列(自由格式也支持)
- '*' 在第 1 列
- 空行
"""
lines = content.split('\n')
code_lines = []
for line in lines:
if len(line) == 0:
continue
first_char = line[0].upper()
if first_char in ('C', '!', '*'):
continue # 注释行
code_lines.append(line)
return '\n'.join(code_lines)
def extract_calls(content, known_functions=None):
"""提取 CALL 语句和 FUNCTION 调用
Args:
content: Fortran 源码
known_functions: 已知的函数名集合(用于区分函数调用和数组访问)
"""
calls = set()
# 先移除注释行,避免误匹配注释中的 CALL
code_content = strip_fortran_comments(content)
# 1. 提取 CALL 语句(支持有括号和无括号两种形式)
# CALL NAME(...) 或 CALL NAME
call_stmts = re.findall(r'(?i)CALL\s+(\w+)(?:\s*\(|\s*$|\s*\n)', code_content)
# 过滤掉 Fortran 关键字IF, DO, THEN 等不是子程序名)
calls.update(c.upper() for c in call_stmts if c.upper() not in FORTRAN_INTRINSICS)
# 2. 提取可能的 FUNCTION 调用
if known_functions:
# 只匹配已知函数名
func_assign = re.findall(r'(?i)=\s*([A-Z][A-Z0-9]*)\s*\(', content)
calls.update(f.upper() for f in func_assign
if f.upper() in known_functions and f.upper() not in FORTRAN_INTRINSICS)
func_expr = re.findall(r'(?i)[=(,]\s*([A-Z][A-Z0-9]*)\s*\(', content)
calls.update(f.upper() for f in func_expr
if f.upper() in known_functions and f.upper() not in FORTRAN_INTRINSICS)
return list(calls)
def has_file_io(content):
"""检查是否有文件 I/O"""
patterns = [
r'OPEN\s*\(',
r'READ\s*\(\s*\d+',
r'WRITE\s*\(\s*\d+',
r'write\s*\(',
r'read\s*\(',
]
for p in patterns:
if re.search(p, content, re.IGNORECASE):
return True
return False
def extract_unit_info(content, filename):
"""提取单元信息"""
units = []
# 匹配 SUBROUTINE
sub_match = re.search(r'(?i)^\s*SUBROUTINE\s+(\w+)', content, re.MULTILINE)
if sub_match:
units.append(('SUBROUTINE', sub_match.group(1).upper()))
# 匹配 FUNCTION
func_match = re.search(r'(?i)^\s*(?:REAL(?:\*\d+)?|INTEGER(?:\*\d+)?|DOUBLE\s*PRECISION)?\s*FUNCTION\s+(\w+)', content, re.MULTILINE)
if func_match:
units.append(('FUNCTION', func_match.group(1).upper()))
# 匹配 BLOCK DATA注意名字必须在同一行不能跨行匹配注释 C
# Fortran 中 C 开头的行是注释,不应被匹配为名字
block_match = re.search(r'(?i)^\s*BLOCK\s*DATA\s*(\w+)?\s*$', content, re.MULTILINE)
if block_match:
name = block_match.group(1).upper() if block_match.group(1) else '_UNNAMED_'
units.append(('BLOCK DATA', name))
# 如果都没匹配到,使用文件名
if not units:
base = os.path.splitext(filename)[0]
units.append(('UNKNOWN', base.upper()))
return units
# 特殊映射:一个 Rust 文件实现多个 Fortran 函数
SPECIAL_MAPPINGS = {
# Rust 文件名 -> [Fortran 函数名列表]
'gfree': ['gfree0', 'gfreed', 'gfree1'],
'interpolate': ['yint', 'lagran'],
'sgmer': ['sgmer0', 'sgmer1', 'sgmerd'],
'ctdata': ['hction', 'hctrecom'],
'cross': ['cross', 'crossd'],
'expint': ['eint', 'expinx'],
'erfcx': ['erfcx', 'erfcin'],
'lineqs': ['lineqs', 'lineqs_nr'],
'gamsp': ['gamsp'], # alias
'bhe': ['bhe', 'bhed', 'bhez'], # 流体静力学平衡方程
'comset': ['comset'], # Compton 散射参数设置
'ghydop': ['ghydop'], # 氢不透明度 (Gomez 表)
'levgrp': ['levgrp'], # 能级分组
'profil': ['profil'], # 标准吸收轮廓
'linspl': ['linspl'], # 谱线轮廓设置
'convec': ['convec', 'convc1'], # 混合长度对流
}
def find_rust_module(fortran_name, rust_math_dir, rust_io_dir):
"""查找对应的 Rust 模块"""
# Fortran 名称是大写Rust 文件是小写
rust_name = fortran_name.lower()
# 先检查 math 目录
rust_file = os.path.join(rust_math_dir, f"{rust_name}.rs")
if os.path.exists(rust_file):
return f"src/math/{rust_name}.rs"
# 检查 io 目录
rust_file = os.path.join(rust_io_dir, f"{rust_name}.rs")
if os.path.exists(rust_file):
return f"src/io/{rust_name}.rs"
# 检查特殊映射 (math 目录) - 必须验证文件实际存在
for rust_mod, fortran_funcs in SPECIAL_MAPPINGS.items():
if fortran_name.lower() in [f.lower() for f in fortran_funcs]:
mapped_file = os.path.join(rust_math_dir, f"{rust_mod}.rs")
if os.path.exists(mapped_file):
return f"src/math/{rust_mod}.rs"
# 如果映射的文件不存在,继续查找其他映射或返回空
break
return ""
def get_transitive_deps(unit_name, units_dict, visited=None):
"""递归获取所有传递调用依赖"""
if visited is None:
visited = set()
if unit_name in visited:
return set()
visited.add(unit_name)
if unit_name not in units_dict:
return set()
direct_calls = units_dict[unit_name].get('call_deps', [])
all_deps = set(direct_calls)
for dep in direct_calls:
all_deps.update(get_transitive_deps(dep, units_dict, visited.copy()))
return all_deps
def get_pending_deps(unit_name, units_dict, visited=None):
"""获取尚未实现的直接依赖"""
if unit_name not in units_dict:
return []
calls = units_dict[unit_name].get('call_deps', [])
pending = [d for d in calls if d not in units_dict or units_dict[d].get('status') != 'done']
return pending
def get_transitive_pending_deps(unit_name, units_dict, visited=None):
"""递归获取所有传递的未实现依赖"""
if visited is None:
visited = set()
if unit_name in visited:
return set()
visited.add(unit_name)
if unit_name not in units_dict:
return set()
direct_calls = units_dict[unit_name].get('call_deps', [])
# 未实现的直接依赖
pending_deps = set(d for d in direct_calls if d not in units_dict or units_dict[d].get('status') != 'done')
# 递归获取所有依赖的未实现依赖
for dep in direct_calls:
pending_deps.update(get_transitive_pending_deps(dep, units_dict, visited.copy()))
return pending_deps
def get_transitive_commons(unit_name, units_dict, visited=None):
"""递归获取所有传递 COMMON 依赖"""
if visited is None:
visited = set()
if unit_name in visited:
return set()
visited.add(unit_name)
if unit_name not in units_dict:
return set()
direct_commons = set(units_dict[unit_name].get('common_deps', []))
direct_calls = units_dict[unit_name].get('call_deps', [])
all_commons = direct_commons.copy()
for dep in direct_calls:
all_commons.update(get_transitive_commons(dep, units_dict, visited.copy()))
return all_commons
def calculate_depth(unit_name, units_dict, memo=None):
"""计算依赖深度叶子节点深度为0"""
if memo is None:
memo = {}
if unit_name in memo:
return memo[unit_name]
if unit_name not in units_dict:
return 0
calls = units_dict[unit_name].get('call_deps', [])
if not calls:
memo[unit_name] = 0
return 0
max_dep_depth = 0
for dep in calls:
if dep != unit_name: # 避免自引用
max_dep_depth = max(max_dep_depth, calculate_depth(dep, units_dict, memo))
depth = max_dep_depth + 1
memo[unit_name] = depth
return depth
def print_dependency_tree(unit_name, units_dict, indent=0, visited=None, prefix="", show_pending_count=True):
"""打印依赖树(文本格式)"""
if visited is None:
visited = set()
if unit_name in visited:
print(f"{prefix}[循环引用: {unit_name}]")
return
visited.add(unit_name)
if unit_name not in units_dict:
print(f"{prefix}{unit_name} [未找到/未实现]")
return
unit = units_dict[unit_name]
status = unit.get('status', 'pending')
status_mark = "" if status == "done" else ""
# 计算未实现依赖数
pending_count = len(get_pending_deps(unit_name, units_dict))
pending_str = f" ({pending_count}未实现)" if show_pending_count and pending_count > 0 else ""
print(f"{prefix}{status_mark} {unit_name}{pending_str}")
calls = unit.get('call_deps', [])
# 按未实现依赖数排序(未实现多的在前,因为更紧迫)
pending_sorted = sorted(calls, key=lambda d: -len(get_pending_deps(d, units_dict) if d in units_dict else []))
for i, dep in enumerate(pending_sorted):
is_last = (i == len(pending_sorted) - 1)
connector = "└── " if is_last else "├── "
print_dependency_tree(dep, units_dict, indent + 1, visited.copy(), prefix + connector, show_pending_count)
def main():
parser = argparse.ArgumentParser(description='分析 TLUSTY Fortran 文件依赖')
parser.add_argument('--tree', metavar='UNIT', help='输出指定单元的依赖树')
parser.add_argument('--priority', action='store_true', help='输出重构优先级列表')
parser.add_argument('--full', action='store_true', help='输出完整传递依赖')
args = parser.parse_args()
extracted_dir = "/home/fmq/program/tlusty/tl208-s54/rust/tlusty/extracted"
rust_math_dir = "/home/fmq/.zeroclaw/workspace/SpectraRust/src/math"
rust_io_dir = "/home/fmq/.zeroclaw/workspace/SpectraRust/src/io"
# 第一遍:收集所有已定义的 SUBROUTINE 和 FUNCTION 名称
all_defined_units = set()
fortran_files = sorted(glob.glob(os.path.join(extracted_dir, "*.f")))
for fpath in fortran_files:
with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
units = extract_unit_info(content, os.path.basename(fpath))
for unit_type, unit_name in units:
all_defined_units.add(unit_name)
# 第二遍:收集所有单元信息(使用已知函数名来过滤调用)
units_dict = {}
for fpath in fortran_files:
fname = os.path.basename(fpath)
base_name = os.path.splitext(fname)[0]
with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
includes = extract_includes(content)
commons = extract_commons(content)
calls = extract_calls(content, known_functions=all_defined_units)
io = has_file_io(content)
units = extract_unit_info(content, fname)
is_pure = len(includes) <= 1 and len(commons) == 0 and not io
rust_mod = find_rust_module(base_name, rust_math_dir, rust_io_dir)
status = "done" if rust_mod else "pending"
for unit_type, unit_name in units:
units_dict[unit_name] = {
'fortran_file': fname,
'unit_type': unit_type,
'is_pure': is_pure,
'common_deps': includes + commons,
'call_deps': calls,
'has_io': io,
'rust_module': rust_mod,
'status': status,
}
# --tree 模式:输出依赖树
if args.tree:
unit_name = args.tree.upper()
if unit_name in units_dict:
unit = units_dict[unit_name]
trans_pending = get_transitive_pending_deps(unit_name, units_dict)
trans_calls = get_transitive_deps(unit_name, units_dict)
status_mark = "" if unit['status'] == "done" else ""
print(f"依赖树: {unit_name} {status_mark}")
print("=" * 60)
print(f"直接依赖: {len(unit['call_deps'])}, 传递依赖: {len(trans_calls)}, "
f"未实现: {len(trans_pending)}")
if trans_pending:
print(f"未实现依赖: {', '.join(sorted(trans_pending)[:10])}")
if len(trans_pending) > 10:
print(f" ... 还有 {len(trans_pending) - 10}")
print("-" * 60)
print_dependency_tree(unit_name, units_dict)
else:
print(f"未找到单元: {unit_name}")
# 尝试模糊匹配
matches = [u for u in units_dict if args.tree.lower() in u.lower()]
if matches:
print(f"可能的匹配: {', '.join(matches[:10])}")
return
# --priority 模式:输出重构优先级
if args.priority:
# 计算每个单元的依赖深度和传递依赖数
priority_list = []
memo = {}
for unit_name, unit in units_dict.items():
if unit['status'] == 'done':
continue
# 跳过无法识别程序单元的文件(如纯注释文件)
if unit['unit_type'] == 'UNKNOWN':
continue
# 跳过 BLOCK DATA数据初始化块不是函数
if unit['unit_type'] == 'BLOCK DATA':
continue
depth = calculate_depth(unit_name, units_dict, memo)
trans_calls = len(get_transitive_deps(unit_name, units_dict))
trans_commons = len(get_transitive_commons(unit_name, units_dict))
pending_deps = len(get_pending_deps(unit_name, units_dict))
trans_pending = len(get_transitive_pending_deps(unit_name, units_dict))
priority_list.append({
'name': unit_name,
'depth': depth,
'direct_calls': len(unit['call_deps']),
'trans_calls': trans_calls,
'direct_commons': len(unit['common_deps']),
'trans_commons': trans_commons,
'pending_deps': pending_deps,
'trans_pending': trans_pending,
'has_io': unit['has_io'],
'is_pure': unit['is_pure'],
})
# 按优先级排序:未实现依赖少 > 无IO > 深度低
priority_list.sort(key=lambda x: (x['trans_pending'], x['has_io'], x['depth'], x['trans_calls']))
print("重构优先级列表")
print("=" * 100)
print(f"{'单元名':<20} {'未实现':>6} {'传递未实现':>10} {'深度':>4} {'直接调用':>8} {'传递调用':>8} {'IO':>4}")
print("-" * 100)
for item in priority_list[:100]: # 显示前100个
io_mark = "" if item['has_io'] else ""
print(f"{item['name']:<20} {item['pending_deps']:>6} {item['trans_pending']:>10} "
f"{item['depth']:>4} {item['direct_calls']:>8} {item['trans_calls']:>8} {io_mark:>4}")
return
# 默认模式:输出 CSV带完整依赖
if args.full:
print("fortran_file,unit_name,unit_type,is_pure,common_deps,call_deps,"
"trans_commons,trans_calls,has_io,rust_module,status")
else:
print("fortran_file,unit_name,unit_type,is_pure,common_deps,call_deps,has_io,rust_module,status")
memo = {}
for unit_name, unit in units_dict.items():
if args.full:
trans_commons = get_transitive_commons(unit_name, units_dict)
trans_calls = get_transitive_deps(unit_name, units_dict)
print(f"{unit['fortran_file']},{unit_name},{unit['unit_type']},{unit['is_pure']},"
f"\"{'|'.join(unit['common_deps'])}\",\"{'|'.join(unit['call_deps'])}\","
f"\"{'|'.join(trans_commons)}\",\"{'|'.join(trans_calls)}\","
f"{unit['has_io']},{unit['rust_module']},{unit['status']}")
else:
print(f"{unit['fortran_file']},{unit_name},{unit['unit_type']},{unit['is_pure']},"
f"\"{'|'.join(unit['common_deps'])}\",\"{'|'.join(unit['call_deps'])}\","
f"{unit['has_io']},{unit['rust_module']},{unit['status']}")
if __name__ == "__main__":
main()