SpectraRust/scripts/analyze_fortran.py
2026-03-21 16:23:35 +08:00

435 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
分析 TLUSTY Fortran 文件,提取函数依赖信息。
用法:
python3 analyze_fortran.py # 输出 CSV带完整依赖
python3 analyze_fortran.py --tree # 输出依赖树(文本格式)
python3 analyze_fortran.py --priority # 输出重构优先级列表
"""
import os
import re
import glob
import argparse
from collections import defaultdict
def extract_includes(content):
"""提取 INCLUDE 文件列表"""
includes = re.findall(r"INCLUDE\s*'([^']+)\.FOR'", content, re.IGNORECASE)
return [inc for inc in includes if inc.upper() != 'IMPLIC']
def extract_commons(content):
"""提取 COMMON 块名称"""
# 匹配 COMMON/NAME/ 或 common/name/
commons = re.findall(r'(?i)^\s*COMMON\s*/(\w+)/', content, re.MULTILINE)
return list(set(commons))
# Fortran 内置函数列表(不需要追踪)
FORTRAN_INTRINSICS = {
'SIN', 'COS', 'TAN', 'ASIN', 'ACOS', 'ATAN', 'ATAN2',
'SINH', 'COSH', 'TANH',
'EXP', 'LOG', 'LOG10', 'LOG2',
'SQRT', 'ABS', 'MOD', 'SIGN',
'MAX', 'MIN', 'MAX0', 'MIN0', 'MAX1', 'MIN1', 'AMAX0', 'AMIN0',
'INT', 'IFIX', 'IDINT', 'FLOAT', 'SNGL', 'DBLE', 'CMPLX',
'REAL', 'AIMAG', 'CONJG',
'ICHAR', 'CHAR', 'INDEX', 'LEN', 'LGE', 'LGT', 'LLE', 'LLT',
'DOT_PRODUCT', 'MATMUL', 'TRANSPOSE', 'RESHAPE',
'SIZE', 'SHAPE', 'LBOUND', 'UBOUND',
'ALLOCATED', 'ALLOCATE', 'DEALLOCATE',
'KIND', 'SELECTED_REAL_KIND', 'SELECTED_INT_KIND',
'DIGITS', 'EPSILON', 'HUGE', 'TINY', 'PRECISION', 'RANGE',
'FLOOR', 'CEILING', 'NINT', 'ANINT',
'ADJUSTL', 'ADJUSTR', 'TRIM', 'REPEAT', 'SCAN', 'VERIFY',
'PRESENT', 'ASSOCIATED',
# TLUSTY 常用数学函数
'ERF', 'ERFC', 'GAMMA', 'LOG_GAMMA',
}
def extract_calls(content, known_functions=None):
"""提取 CALL 语句和 FUNCTION 调用
Args:
content: Fortran 源码
known_functions: 已知的函数名集合(用于区分函数调用和数组访问)
"""
calls = set()
# 1. 提取 CALL 语句(支持有括号和无括号两种形式)
# CALL NAME(...) 或 CALL NAME
call_stmts = re.findall(r'(?i)CALL\s+(\w+)(?:\s*\(|\s*$|\s*\n)', content)
calls.update(c.upper() for c in call_stmts)
# 2. 提取可能的 FUNCTION 调用
if known_functions:
# 只匹配已知函数名
func_assign = re.findall(r'(?i)=\s*([A-Z][A-Z0-9]*)\s*\(', content)
calls.update(f.upper() for f in func_assign
if f.upper() in known_functions and f.upper() not in FORTRAN_INTRINSICS)
func_expr = re.findall(r'(?i)[=(,]\s*([A-Z][A-Z0-9]*)\s*\(', content)
calls.update(f.upper() for f in func_expr
if f.upper() in known_functions and f.upper() not in FORTRAN_INTRINSICS)
return list(calls)
def has_file_io(content):
"""检查是否有文件 I/O"""
patterns = [
r'OPEN\s*\(',
r'READ\s*\(\s*\d+',
r'WRITE\s*\(\s*\d+',
r'write\s*\(',
r'read\s*\(',
]
for p in patterns:
if re.search(p, content, re.IGNORECASE):
return True
return False
def extract_unit_info(content, filename):
"""提取单元信息"""
units = []
# 匹配 SUBROUTINE
sub_match = re.search(r'(?i)^\s*SUBROUTINE\s+(\w+)', content, re.MULTILINE)
if sub_match:
units.append(('SUBROUTINE', sub_match.group(1).upper()))
# 匹配 FUNCTION
func_match = re.search(r'(?i)^\s*(?:REAL(?:\*\d+)?|INTEGER(?:\*\d+)?|DOUBLE\s*PRECISION)?\s*FUNCTION\s+(\w+)', content, re.MULTILINE)
if func_match:
units.append(('FUNCTION', func_match.group(1).upper()))
# 匹配 BLOCK DATA
block_match = re.search(r'(?i)^\s*BLOCK\s*DATA\s+(\w+)?', content, re.MULTILINE)
if block_match:
name = block_match.group(1).upper() if block_match.group(1) else '_UNNAMED_'
units.append(('BLOCK DATA', name))
# 如果都没匹配到,使用文件名
if not units:
base = os.path.splitext(filename)[0]
units.append(('UNKNOWN', base.upper()))
return units
# 特殊映射:一个 Rust 文件实现多个 Fortran 函数
SPECIAL_MAPPINGS = {
# Rust 文件名 -> [Fortran 函数名列表]
'gfree': ['gfree0', 'gfreed', 'gfree1'],
'interpolate': ['yint', 'lagran'],
'sgmer': ['sgmer0', 'sgmer1', 'sgmerd'],
'ctdata': ['hction', 'hctrecom'],
'cross': ['cross', 'crossd'],
'expint': ['eint', 'expinx'],
'erfcx': ['erfcx', 'erfcin'],
'lineqs': ['lineqs', 'lineqs_nr'],
'gamsp': ['gamsp'], # alias
'bhe': ['bhe', 'bhed', 'bhez'], # 流体静力学平衡方程
'comset': ['comset'], # Compton 散射参数设置
'ghydop': ['ghydop'], # 氢不透明度 (Gomez 表)
'levgrp': ['levgrp'], # 能级分组
'profil': ['profil'], # 标准吸收轮廓
'linspl': ['linspl'], # 谱线轮廓设置
}
def find_rust_module(fortran_name, rust_dir):
"""查找对应的 Rust 模块"""
# 先检查直接匹配
rust_file = os.path.join(rust_dir, f"{fortran_name}.rs")
if os.path.exists(rust_file):
return f"src/math/{fortran_name}.rs"
# 检查特殊映射
for rust_mod, fortran_funcs in SPECIAL_MAPPINGS.items():
if fortran_name in fortran_funcs:
return f"src/math/{rust_mod}.rs"
return ""
def get_transitive_deps(unit_name, units_dict, visited=None):
"""递归获取所有传递调用依赖"""
if visited is None:
visited = set()
if unit_name in visited:
return set()
visited.add(unit_name)
if unit_name not in units_dict:
return set()
direct_calls = units_dict[unit_name].get('call_deps', [])
all_deps = set(direct_calls)
for dep in direct_calls:
all_deps.update(get_transitive_deps(dep, units_dict, visited.copy()))
return all_deps
def get_pending_deps(unit_name, units_dict, visited=None):
"""获取尚未实现的直接依赖"""
if unit_name not in units_dict:
return []
calls = units_dict[unit_name].get('call_deps', [])
pending = [d for d in calls if d not in units_dict or units_dict[d].get('status') != 'done']
return pending
def get_transitive_pending_deps(unit_name, units_dict, visited=None):
"""递归获取所有传递的未实现依赖"""
if visited is None:
visited = set()
if unit_name in visited:
return set()
visited.add(unit_name)
if unit_name not in units_dict:
return set()
direct_calls = units_dict[unit_name].get('call_deps', [])
# 未实现的直接依赖
pending_deps = set(d for d in direct_calls if d not in units_dict or units_dict[d].get('status') != 'done')
# 递归获取所有依赖的未实现依赖
for dep in direct_calls:
pending_deps.update(get_transitive_pending_deps(dep, units_dict, visited.copy()))
return pending_deps
def get_transitive_commons(unit_name, units_dict, visited=None):
"""递归获取所有传递 COMMON 依赖"""
if visited is None:
visited = set()
if unit_name in visited:
return set()
visited.add(unit_name)
if unit_name not in units_dict:
return set()
direct_commons = set(units_dict[unit_name].get('common_deps', []))
direct_calls = units_dict[unit_name].get('call_deps', [])
all_commons = direct_commons.copy()
for dep in direct_calls:
all_commons.update(get_transitive_commons(dep, units_dict, visited.copy()))
return all_commons
def calculate_depth(unit_name, units_dict, memo=None):
"""计算依赖深度叶子节点深度为0"""
if memo is None:
memo = {}
if unit_name in memo:
return memo[unit_name]
if unit_name not in units_dict:
return 0
calls = units_dict[unit_name].get('call_deps', [])
if not calls:
memo[unit_name] = 0
return 0
max_dep_depth = 0
for dep in calls:
if dep != unit_name: # 避免自引用
max_dep_depth = max(max_dep_depth, calculate_depth(dep, units_dict, memo))
depth = max_dep_depth + 1
memo[unit_name] = depth
return depth
def print_dependency_tree(unit_name, units_dict, indent=0, visited=None, prefix="", show_pending_count=True):
"""打印依赖树(文本格式)"""
if visited is None:
visited = set()
if unit_name in visited:
print(f"{prefix}[循环引用: {unit_name}]")
return
visited.add(unit_name)
if unit_name not in units_dict:
print(f"{prefix}{unit_name} [未找到/未实现]")
return
unit = units_dict[unit_name]
status = unit.get('status', 'pending')
status_mark = "" if status == "done" else ""
# 计算未实现依赖数
pending_count = len(get_pending_deps(unit_name, units_dict))
pending_str = f" ({pending_count}未实现)" if show_pending_count and pending_count > 0 else ""
print(f"{prefix}{status_mark} {unit_name}{pending_str}")
calls = unit.get('call_deps', [])
# 按未实现依赖数排序(未实现多的在前,因为更紧迫)
pending_sorted = sorted(calls, key=lambda d: -len(get_pending_deps(d, units_dict) if d in units_dict else []))
for i, dep in enumerate(pending_sorted):
is_last = (i == len(pending_sorted) - 1)
connector = "└── " if is_last else "├── "
print_dependency_tree(dep, units_dict, indent + 1, visited.copy(), prefix + connector, show_pending_count)
def main():
parser = argparse.ArgumentParser(description='分析 TLUSTY Fortran 文件依赖')
parser.add_argument('--tree', metavar='UNIT', help='输出指定单元的依赖树')
parser.add_argument('--priority', action='store_true', help='输出重构优先级列表')
parser.add_argument('--full', action='store_true', help='输出完整传递依赖')
args = parser.parse_args()
extracted_dir = "/home/fmq/program/tlusty/tl208-s54/rust/tlusty/extracted"
rust_dir = "/home/fmq/program/tlusty/tl208-s54/rust/src/math"
# 第一遍:收集所有已定义的 SUBROUTINE 和 FUNCTION 名称
all_defined_units = set()
fortran_files = sorted(glob.glob(os.path.join(extracted_dir, "*.f")))
for fpath in fortran_files:
with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
units = extract_unit_info(content, os.path.basename(fpath))
for unit_type, unit_name in units:
all_defined_units.add(unit_name)
# 第二遍:收集所有单元信息(使用已知函数名来过滤调用)
units_dict = {}
for fpath in fortran_files:
fname = os.path.basename(fpath)
base_name = os.path.splitext(fname)[0]
with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
includes = extract_includes(content)
commons = extract_commons(content)
calls = extract_calls(content, known_functions=all_defined_units)
io = has_file_io(content)
units = extract_unit_info(content, fname)
is_pure = len(includes) <= 1 and len(commons) == 0 and not io
rust_mod = find_rust_module(base_name, rust_dir)
status = "done" if rust_mod else "pending"
for unit_type, unit_name in units:
units_dict[unit_name] = {
'fortran_file': fname,
'unit_type': unit_type,
'is_pure': is_pure,
'common_deps': includes + commons,
'call_deps': calls,
'has_io': io,
'rust_module': rust_mod,
'status': status,
}
# --tree 模式:输出依赖树
if args.tree:
unit_name = args.tree.upper()
if unit_name in units_dict:
unit = units_dict[unit_name]
trans_pending = get_transitive_pending_deps(unit_name, units_dict)
trans_calls = get_transitive_deps(unit_name, units_dict)
status_mark = "" if unit['status'] == "done" else ""
print(f"依赖树: {unit_name} {status_mark}")
print("=" * 60)
print(f"直接依赖: {len(unit['call_deps'])}, 传递依赖: {len(trans_calls)}, "
f"未实现: {len(trans_pending)}")
if trans_pending:
print(f"未实现依赖: {', '.join(sorted(trans_pending)[:10])}")
if len(trans_pending) > 10:
print(f" ... 还有 {len(trans_pending) - 10}")
print("-" * 60)
print_dependency_tree(unit_name, units_dict)
else:
print(f"未找到单元: {unit_name}")
# 尝试模糊匹配
matches = [u for u in units_dict if args.tree.lower() in u.lower()]
if matches:
print(f"可能的匹配: {', '.join(matches[:10])}")
return
# --priority 模式:输出重构优先级
if args.priority:
# 计算每个单元的依赖深度和传递依赖数
priority_list = []
memo = {}
for unit_name, unit in units_dict.items():
if unit['status'] == 'done':
continue
# 跳过无法识别程序单元的文件(如纯注释文件)
if unit['unit_type'] == 'UNKNOWN':
continue
depth = calculate_depth(unit_name, units_dict, memo)
trans_calls = len(get_transitive_deps(unit_name, units_dict))
trans_commons = len(get_transitive_commons(unit_name, units_dict))
pending_deps = len(get_pending_deps(unit_name, units_dict))
trans_pending = len(get_transitive_pending_deps(unit_name, units_dict))
priority_list.append({
'name': unit_name,
'depth': depth,
'direct_calls': len(unit['call_deps']),
'trans_calls': trans_calls,
'direct_commons': len(unit['common_deps']),
'trans_commons': trans_commons,
'pending_deps': pending_deps,
'trans_pending': trans_pending,
'has_io': unit['has_io'],
'is_pure': unit['is_pure'],
})
# 按优先级排序无IO > 未实现依赖少 > 深度低
priority_list.sort(key=lambda x: (x['has_io'], x['trans_pending'], x['depth'], x['trans_calls']))
print("重构优先级列表 (优先无IO按未实现依赖排序)")
print("=" * 100)
print(f"{'单元名':<20} {'未实现':>6} {'传递未实现':>10} {'深度':>4} {'直接调用':>8} {'传递调用':>8} {'IO':>4}")
print("-" * 100)
for item in priority_list[:100]: # 显示前100个
io_mark = "" if item['has_io'] else ""
print(f"{item['name']:<20} {item['pending_deps']:>6} {item['trans_pending']:>10} "
f"{item['depth']:>4} {item['direct_calls']:>8} {item['trans_calls']:>8} {io_mark:>4}")
return
# 默认模式:输出 CSV带完整依赖
if args.full:
print("fortran_file,unit_name,unit_type,is_pure,common_deps,call_deps,"
"trans_commons,trans_calls,has_io,rust_module,status")
else:
print("fortran_file,unit_name,unit_type,is_pure,common_deps,call_deps,has_io,rust_module,status")
memo = {}
for unit_name, unit in units_dict.items():
if args.full:
trans_commons = get_transitive_commons(unit_name, units_dict)
trans_calls = get_transitive_deps(unit_name, units_dict)
print(f"{unit['fortran_file']},{unit_name},{unit['unit_type']},{unit['is_pure']},"
f"\"{'|'.join(unit['common_deps'])}\",\"{'|'.join(unit['call_deps'])}\","
f"\"{'|'.join(trans_commons)}\",\"{'|'.join(trans_calls)}\","
f"{unit['has_io']},{unit['rust_module']},{unit['status']}")
else:
print(f"{unit['fortran_file']},{unit_name},{unit['unit_type']},{unit['is_pure']},"
f"\"{'|'.join(unit['common_deps'])}\",\"{'|'.join(unit['call_deps'])}\","
f"{unit['has_io']},{unit['rust_module']},{unit['status']}")
if __name__ == "__main__":
main()