#!/usr/bin/env python3 """ 分析 TLUSTY Fortran 文件,提取函数依赖信息。 用法: python3 analyze_fortran.py # 输出 CSV(带完整依赖) python3 analyze_fortran.py --tree # 输出依赖树(文本格式) python3 analyze_fortran.py --priority # 输出重构优先级列表 """ import os import re import glob import argparse from collections import defaultdict def extract_includes(content): """提取 INCLUDE 文件列表""" includes = re.findall(r"INCLUDE\s*'([^']+)\.FOR'", content, re.IGNORECASE) return [inc for inc in includes if inc.upper() != 'IMPLIC'] def extract_commons(content): """提取 COMMON 块名称""" # 匹配 COMMON/NAME/ 或 common/name/ commons = re.findall(r'(?i)^\s*COMMON\s*/(\w+)/', content, re.MULTILINE) return list(set(commons)) def extract_calls(content): """提取 CALL 语句调用的子程序""" calls = re.findall(r'(?i)CALL\s+(\w+)\s*\(', content) # 也提取函数调用 ( FUNCTION 形式 ) funcs = re.findall(r'(?i)^\s*(?:REAL|INTEGER|DOUBLE\s*PRECISION)?\s*FUNCTION\s+(\w+)', content, re.MULTILINE) # 统一转为大写 return list(set(c.upper() for c in calls + funcs)) def has_file_io(content): """检查是否有文件 I/O""" patterns = [ r'OPEN\s*\(', r'READ\s*\(\s*\d+', r'WRITE\s*\(\s*\d+', r'write\s*\(', r'read\s*\(', ] for p in patterns: if re.search(p, content, re.IGNORECASE): return True return False def extract_unit_info(content, filename): """提取单元信息""" units = [] # 匹配 SUBROUTINE sub_match = re.search(r'(?i)^\s*SUBROUTINE\s+(\w+)', content, re.MULTILINE) if sub_match: units.append(('SUBROUTINE', sub_match.group(1).upper())) # 匹配 FUNCTION func_match = re.search(r'(?i)^\s*(?:REAL(?:\*\d+)?|INTEGER(?:\*\d+)?|DOUBLE\s*PRECISION)?\s*FUNCTION\s+(\w+)', content, re.MULTILINE) if func_match: units.append(('FUNCTION', func_match.group(1).upper())) # 匹配 BLOCK DATA block_match = re.search(r'(?i)^\s*BLOCK\s*DATA\s+(\w+)?', content, re.MULTILINE) if block_match: name = block_match.group(1).upper() if block_match.group(1) else '_UNNAMED_' units.append(('BLOCK DATA', name)) # 如果都没匹配到,使用文件名 if not units: base = os.path.splitext(filename)[0] units.append(('UNKNOWN', base.upper())) return units # 特殊映射:一个 Rust 文件实现多个 Fortran 函数 SPECIAL_MAPPINGS = { # Rust 文件名 -> [Fortran 函数名列表] 'gfree': ['gfree0', 'gfreed', 'gfree1'], 'interpolate': ['yint', 'lagran'], 'sgmer': ['sgmer0', 'sgmer1', 'sgmerd'], 'ctdata': ['hction', 'hctrecom'], 'cross': ['cross', 'crossd'], 'expint': ['eint', 'expinx'], 'erfcx': ['erfcx', 'erfcin'], 'lineqs': ['lineqs', 'lineqs_nr'], 'gamsp': ['gamsp'], # alias } def find_rust_module(fortran_name, rust_dir): """查找对应的 Rust 模块""" # 先检查直接匹配 rust_file = os.path.join(rust_dir, f"{fortran_name}.rs") if os.path.exists(rust_file): return f"src/math/{fortran_name}.rs" # 检查特殊映射 for rust_mod, fortran_funcs in SPECIAL_MAPPINGS.items(): if fortran_name in fortran_funcs: return f"src/math/{rust_mod}.rs" return "" def get_transitive_deps(unit_name, units_dict, visited=None): """递归获取所有传递调用依赖""" if visited is None: visited = set() if unit_name in visited: return set() visited.add(unit_name) if unit_name not in units_dict: return set() direct_calls = units_dict[unit_name].get('call_deps', []) all_deps = set(direct_calls) for dep in direct_calls: all_deps.update(get_transitive_deps(dep, units_dict, visited.copy())) return all_deps def get_pending_deps(unit_name, units_dict, visited=None): """获取尚未实现的直接依赖""" if unit_name not in units_dict: return [] calls = units_dict[unit_name].get('call_deps', []) pending = [d for d in calls if d not in units_dict or units_dict[d].get('status') != 'done'] return pending def get_transitive_pending_deps(unit_name, units_dict, visited=None): """递归获取所有传递的未实现依赖""" if visited is None: visited = set() if unit_name in visited: return set() visited.add(unit_name) if unit_name not in units_dict: return set() direct_calls = units_dict[unit_name].get('call_deps', []) # 未实现的直接依赖 pending_deps = set(d for d in direct_calls if d not in units_dict or units_dict[d].get('status') != 'done') # 递归获取所有依赖的未实现依赖 for dep in direct_calls: pending_deps.update(get_transitive_pending_deps(dep, units_dict, visited.copy())) return pending_deps def get_transitive_commons(unit_name, units_dict, visited=None): """递归获取所有传递 COMMON 依赖""" if visited is None: visited = set() if unit_name in visited: return set() visited.add(unit_name) if unit_name not in units_dict: return set() direct_commons = set(units_dict[unit_name].get('common_deps', [])) direct_calls = units_dict[unit_name].get('call_deps', []) all_commons = direct_commons.copy() for dep in direct_calls: all_commons.update(get_transitive_commons(dep, units_dict, visited.copy())) return all_commons def calculate_depth(unit_name, units_dict, memo=None): """计算依赖深度(叶子节点深度为0)""" if memo is None: memo = {} if unit_name in memo: return memo[unit_name] if unit_name not in units_dict: return 0 calls = units_dict[unit_name].get('call_deps', []) if not calls: memo[unit_name] = 0 return 0 max_dep_depth = 0 for dep in calls: if dep != unit_name: # 避免自引用 max_dep_depth = max(max_dep_depth, calculate_depth(dep, units_dict, memo)) depth = max_dep_depth + 1 memo[unit_name] = depth return depth def print_dependency_tree(unit_name, units_dict, indent=0, visited=None, prefix="", show_pending_count=True): """打印依赖树(文本格式)""" if visited is None: visited = set() if unit_name in visited: print(f"{prefix}[循环引用: {unit_name}]") return visited.add(unit_name) if unit_name not in units_dict: print(f"{prefix}{unit_name} [未找到/未实现]") return unit = units_dict[unit_name] status = unit.get('status', 'pending') status_mark = "✓" if status == "done" else "○" # 计算未实现依赖数 pending_count = len(get_pending_deps(unit_name, units_dict)) pending_str = f" ({pending_count}未实现)" if show_pending_count and pending_count > 0 else "" print(f"{prefix}{status_mark} {unit_name}{pending_str}") calls = unit.get('call_deps', []) # 按未实现依赖数排序(未实现多的在前,因为更紧迫) pending_sorted = sorted(calls, key=lambda d: -len(get_pending_deps(d, units_dict) if d in units_dict else [])) for i, dep in enumerate(pending_sorted): is_last = (i == len(pending_sorted) - 1) connector = "└── " if is_last else "├── " print_dependency_tree(dep, units_dict, indent + 1, visited.copy(), prefix + connector, show_pending_count) def main(): parser = argparse.ArgumentParser(description='分析 TLUSTY Fortran 文件依赖') parser.add_argument('--tree', metavar='UNIT', help='输出指定单元的依赖树') parser.add_argument('--priority', action='store_true', help='输出重构优先级列表') parser.add_argument('--full', action='store_true', help='输出完整传递依赖') args = parser.parse_args() extracted_dir = "/home/fmq/program/tlusty/tl208-s54/rust/tlusty/extracted" rust_dir = "/home/fmq/program/tlusty/tl208-s54/rust/src/math" # 收集所有单元信息 units_dict = {} fortran_files = sorted(glob.glob(os.path.join(extracted_dir, "*.f"))) for fpath in fortran_files: fname = os.path.basename(fpath) base_name = os.path.splitext(fname)[0] with open(fpath, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() includes = extract_includes(content) commons = extract_commons(content) calls = extract_calls(content) io = has_file_io(content) units = extract_unit_info(content, fname) is_pure = len(includes) <= 1 and len(commons) == 0 and not io rust_mod = find_rust_module(base_name, rust_dir) status = "done" if rust_mod else "pending" for unit_type, unit_name in units: units_dict[unit_name] = { 'fortran_file': fname, 'unit_type': unit_type, 'is_pure': is_pure, 'common_deps': includes + commons, 'call_deps': calls, 'has_io': io, 'rust_module': rust_mod, 'status': status, } # --tree 模式:输出依赖树 if args.tree: unit_name = args.tree.upper() if unit_name in units_dict: unit = units_dict[unit_name] trans_pending = get_transitive_pending_deps(unit_name, units_dict) trans_calls = get_transitive_deps(unit_name, units_dict) status_mark = "✓" if unit['status'] == "done" else "○" print(f"依赖树: {unit_name} {status_mark}") print("=" * 60) print(f"直接依赖: {len(unit['call_deps'])}, 传递依赖: {len(trans_calls)}, " f"未实现: {len(trans_pending)}") if trans_pending: print(f"未实现依赖: {', '.join(sorted(trans_pending)[:10])}") if len(trans_pending) > 10: print(f" ... 还有 {len(trans_pending) - 10} 个") print("-" * 60) print_dependency_tree(unit_name, units_dict) else: print(f"未找到单元: {unit_name}") # 尝试模糊匹配 matches = [u for u in units_dict if args.tree.lower() in u.lower()] if matches: print(f"可能的匹配: {', '.join(matches[:10])}") return # --priority 模式:输出重构优先级 if args.priority: # 计算每个单元的依赖深度和传递依赖数 priority_list = [] memo = {} for unit_name, unit in units_dict.items(): if unit['status'] == 'done': continue depth = calculate_depth(unit_name, units_dict, memo) trans_calls = len(get_transitive_deps(unit_name, units_dict)) trans_commons = len(get_transitive_commons(unit_name, units_dict)) pending_deps = len(get_pending_deps(unit_name, units_dict)) trans_pending = len(get_transitive_pending_deps(unit_name, units_dict)) priority_list.append({ 'name': unit_name, 'depth': depth, 'direct_calls': len(unit['call_deps']), 'trans_calls': trans_calls, 'direct_commons': len(unit['common_deps']), 'trans_commons': trans_commons, 'pending_deps': pending_deps, 'trans_pending': trans_pending, 'has_io': unit['has_io'], 'is_pure': unit['is_pure'], }) # 按优先级排序:未实现依赖少 > 深度低 > 无IO priority_list.sort(key=lambda x: (x['trans_pending'], x['depth'], x['trans_calls'], x['has_io'])) print("重构优先级列表 (按未实现依赖排序)") print("=" * 100) print(f"{'单元名':<20} {'未实现':>6} {'传递未实现':>10} {'深度':>4} {'直接调用':>8} {'传递调用':>8} {'IO':>4}") print("-" * 100) for item in priority_list[:100]: # 显示前100个 io_mark = "✓" if item['has_io'] else "○" print(f"{item['name']:<20} {item['pending_deps']:>6} {item['trans_pending']:>10} " f"{item['depth']:>4} {item['direct_calls']:>8} {item['trans_calls']:>8} {io_mark:>4}") return # 默认模式:输出 CSV(带完整依赖) if args.full: print("fortran_file,unit_name,unit_type,is_pure,common_deps,call_deps," "trans_commons,trans_calls,has_io,rust_module,status") else: print("fortran_file,unit_name,unit_type,is_pure,common_deps,call_deps,has_io,rust_module,status") memo = {} for unit_name, unit in units_dict.items(): if args.full: trans_commons = get_transitive_commons(unit_name, units_dict) trans_calls = get_transitive_deps(unit_name, units_dict) print(f"{unit['fortran_file']},{unit_name},{unit['unit_type']},{unit['is_pure']}," f"\"{'|'.join(unit['common_deps'])}\",\"{'|'.join(unit['call_deps'])}\"," f"\"{'|'.join(trans_commons)}\",\"{'|'.join(trans_calls)}\"," f"{unit['has_io']},{unit['rust_module']},{unit['status']}") else: print(f"{unit['fortran_file']},{unit_name},{unit['unit_type']},{unit['is_pure']}," f"\"{'|'.join(unit['common_deps'])}\",\"{'|'.join(unit['call_deps'])}\"," f"{unit['has_io']},{unit['rust_module']},{unit['status']}") if __name__ == "__main__": main()