// src/services/batch/asset.rs use std::sync::Arc; use std::fs; use tokio::sync::Mutex; use serde::{Serialize, Deserialize}; use tracing::{info, warn, error}; use sqlx::{SqlitePool, Row}; use crate::Config; use crate::clients::qiniu::QiniuClient; use crate::services::download::Downloader; #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum SyncAction { Download, Parse, Translate, All, } #[derive(Debug, Clone, Serialize)] pub struct AssetSyncStatus { pub active: bool, pub total: i32, pub downloaded: i32, pub parsed: i32, pub download_failed: i32, pub parse_failed: i32, pub current_bibcode: String, pub logs: Vec, pub action: Option, } impl AssetSyncStatus { pub fn new() -> Self { AssetSyncStatus { active: false, total: 0, downloaded: 0, parsed: 0, download_failed: 0, parse_failed: 0, current_bibcode: String::new(), logs: Vec::new(), action: None, } } pub fn add_log(&mut self, log: String) { info!("{}", log); // 保留最新的100条日志 self.logs.push(log); if self.logs.len() > 100 { self.logs.remove(0); } } } pub struct AssetSync; impl AssetSync { /// 启动后台批量下载与结构化解析任务 pub fn start_process( db: SqlitePool, config: Config, downloader: Arc, qiniu: Arc, dict: Arc, action: SyncAction, bibcodes: Vec, status: Arc>, ) { tokio::spawn(async move { let total = bibcodes.len() as i32; { let mut s = status.lock().await; s.active = true; s.total = total; s.downloaded = 0; s.parsed = 0; s.download_failed = 0; s.parse_failed = 0; s.current_bibcode = String::new(); s.logs.clear(); s.action = Some(action); let action_desc = match action { SyncAction::Download => "下载", SyncAction::Parse => "解析", SyncAction::Translate => "翻译", SyncAction::All => "下载与解析", }; s.add_log(format!("批量{}任务启动,共 {} 篇文献需处理。", action_desc, total)); } let mut dl_count = 0; let mut dl_failed_count = 0; let mut join_handles = Vec::new(); for bibcode in bibcodes { // 每次循环前,检查是否被外部停止了(active 设为 false) { let s = status.lock().await; if !s.active { info!("收到停止指令,批量处理任务终止。"); return; } } { let mut s = status.lock().await; s.current_bibcode = bibcode.clone(); s.add_log(format!("开始处理文献: {}", bibcode)); } // 1. 获取文献元数据与当前路径状态 let paper_res = sqlx::query( "SELECT arxiv_id, doi, pdf_path, html_path, markdown_path, doctype, translation_path FROM papers WHERE bibcode = ?" ) .bind(&bibcode) .fetch_optional(&db) .await; let (arxiv_id, doi, mut pdf_path, mut html_path, markdown_path, doctype, translation_path) = match paper_res { Ok(Some(row)) => { let arxiv_id: String = row.get(0); let doi: String = row.get(1); let pdf_path: Option = row.get(2); let html_path: Option = row.get(3); let markdown_path: Option = row.get(4); let doctype: Option = row.get(5); let translation_path: Option = row.get(6); (arxiv_id, doi, pdf_path, html_path, markdown_path, doctype, translation_path) } _ => { let mut s = status.lock().await; s.add_log(format!("数据库中未找到文献 {} 记录,跳过", bibcode)); continue; } }; // 1b. 检查 doctype,如果是 proposal, abstract, catalog, dataset, software, circular 等无数字全文的文件,直接跳过处理 let doctype_str = doctype.unwrap_or_else(|| "article".to_string()).to_lowercase(); if doctype_str == "proposal" || doctype_str == "abstract" || doctype_str == "catalog" || doctype_str == "dataset" || doctype_str == "software" || doctype_str == "circular" || doctype_str == "newsletter" || doctype_str == "obituary" { let mut s = status.lock().await; s.add_log(format!("文献 {} 的类型为 {} (无数字版全文),跳过下载与解析。", bibcode, doctype_str)); // 同样更新处理进度,防止任务进度条卡住 if action == SyncAction::Download || action == SyncAction::All { dl_count += 1; s.downloaded = dl_count; } if action == SyncAction::Parse || action == SyncAction::All { s.parsed += 1; } continue; } // 2. 检查并执行下载 if action == SyncAction::Download || action == SyncAction::All { let is_pdf_exist = pdf_path.as_ref().map(|p| config.library_dir.join(p).exists()).unwrap_or(false); let is_html_exist = html_path.as_ref().map(|p| config.library_dir.join(p).exists()).unwrap_or(false); if !is_pdf_exist && !is_html_exist { // 需要执行下载 { let mut s = status.lock().await; s.add_log(format!("文献 {} 本地无 PDF/HTML,开始下载...", bibcode)); } let (pdf_res, html_res) = if !arxiv_id.is_empty() { downloader.download_arxiv_direct(&arxiv_id, &config.library_dir).await } else { let doi_opt = if !doi.is_empty() { Some(doi.as_str()) } else { None }; downloader.download_paper(&bibcode, doi_opt, &config.library_dir).await }; if pdf_res.is_ok() || html_res.is_ok() { let pdf_rel = match pdf_res { Ok(p) => Some(p.strip_prefix(&config.library_dir).unwrap_or(&p).to_string_lossy().to_string()), Err(e) => Some(format!("error: {}", e)), }; let html_rel = match html_res { Ok(p) => Some(p.strip_prefix(&config.library_dir).unwrap_or(&p).to_string_lossy().to_string()), Err(e) => Some(format!("error: {}", e)), }; // 更新路径变量与数据库 pdf_path = pdf_rel.clone(); html_path = html_rel.clone(); let _ = sqlx::query("UPDATE papers SET pdf_path = ?, html_path = ? WHERE bibcode = ?") .bind(pdf_rel) .bind(html_rel) .bind(&bibcode) .execute(&db) .await; dl_count += 1; { let mut s = status.lock().await; s.downloaded = dl_count; s.add_log(format!("文献 {} 下载成功!", bibcode)); } } else { dl_failed_count += 1; let mut s = status.lock().await; s.download_failed = dl_failed_count; let pdf_err = match pdf_res { Err(e) => format!("error: {}", e), _ => "error: 未知错误".to_string(), }; let html_err = match html_res { Err(e) => format!("error: {}", e), _ => "error: 未知错误".to_string(), }; s.add_log(format!("文献 {} 下载失败。PDF: {}, HTML: {}", bibcode, pdf_err, html_err)); let _ = sqlx::query("UPDATE papers SET pdf_path = ?, html_path = ? WHERE bibcode = ?") .bind(&pdf_err) .bind(&html_err) .bind(&bibcode) .execute(&db) .await; } // 每次下载尝试后,加入 3-5 秒随机延迟,防爬防封 let delay_secs = 3 + (rand::random::() % 3); tokio::time::sleep(tokio::time::Duration::from_secs(delay_secs)).await; } else { { let mut s = status.lock().await; s.add_log(format!("文献 {} 本地已存在 PDF 或 HTML,跳过下载。", bibcode)); } dl_count += 1; { let mut s = status.lock().await; s.downloaded = dl_count; } } } // 3. 检查并执行结构化解析(Markdown 转换) if action == SyncAction::Parse || action == SyncAction::All { let is_md_exist = markdown_path.as_ref().map(|p| config.library_dir.join(p).exists()).unwrap_or(false); if !is_md_exist { if pdf_path.is_some() || html_path.is_some() { { let mut s = status.lock().await; s.add_log(format!("文献 {} 开始进行排版提取与 Markdown 转换...", bibcode)); } let mut relative_md_path = String::new(); // 确定源链接 let source_url = if bibcode.len() == 19 { format!("https://ui.adsabs.harvard.edu/abs/{}/abstract", bibcode) } else if !arxiv_id.is_empty() { format!("https://ui.adsabs.harvard.edu/abs/arXiv:{}/abstract", arxiv_id) } else { format!("https://ui.adsabs.harvard.edu/abs/{}/abstract", bibcode) }; // 策略 1:HTML 优先 if let Some(html_rel) = &html_path { let html_abs = config.library_dir.join(html_rel); if html_abs.exists() { if let Ok(md) = crate::services::parser::html_to_markdown(&html_abs) { // 构建 Meta 头 let paper_meta_res = sqlx::query("SELECT title, authors, pub, year, keywords FROM papers WHERE bibcode = ?") .bind(&bibcode) .fetch_optional(&db) .await; if let Ok(Some(meta_row)) = paper_meta_res { let title: String = meta_row.get(0); let authors_json: String = meta_row.get(1); let pub_journal: String = meta_row.get(2); let year: String = meta_row.get(3); let keywords_json: String = meta_row.get(4); let authors: Vec = serde_json::from_str(&authors_json).unwrap_or_default(); let keywords: Vec = serde_json::from_str(&keywords_json).unwrap_or_default(); let front_matter = format!( "---\ntitle: {}\nauthor: [{}]\npublisher: {}\nsource: \"{}\"\ndate: \"{}\"\ntags: \"{}\"\n---\n\n", serde_json::to_string(&title).unwrap_or_else(|_| format!("\"{}\"", title)), authors.iter().map(|a| format!("\"{}\"", a)).collect::>().join(", "), serde_json::to_string(&pub_journal).unwrap_or_else(|_| format!("\"{}\"", pub_journal)), source_url, year, keywords.join(",") ); let parsed_markdown = format!("{}{}", front_matter, md); let md_filename = format!("{}.md", bibcode); let md_dest = config.library_dir.join("Markdown").join(&md_filename); let _ = fs::create_dir_all(md_dest.parent().unwrap()); if fs::write(&md_dest, &parsed_markdown).is_ok() { relative_md_path = format!("Markdown/{}", md_filename); } } } } } if !relative_md_path.is_empty() { // HTML 解析成功,直接写入数据库并记录成功 let _ = sqlx::query("UPDATE papers SET markdown_path = ? WHERE bibcode = ?") .bind(&relative_md_path) .bind(&bibcode) .execute(&db) .await; { let mut s = status.lock().await; s.parsed += 1; s.add_log(format!("文献 {} HTML 本地解析成功!", bibcode)); } } else { // HTML 解析失败或无 HTML,执行 PDF 回退(异步非阻塞提交 MinerU) if let Some(pdf_rel) = &pdf_path { let pdf_abs = config.library_dir.join(pdf_rel); if pdf_abs.exists() { // 检查是否已经是 mineru_batch: 状态 let existing_batch_id = markdown_path.as_ref() .and_then(|p| p.strip_prefix("mineru_batch:")) .map(|s| s.trim().to_string()); let db_clone = db.clone(); let config_clone = config.clone(); let qiniu_clone = qiniu.clone(); let status_clone = status.clone(); let bibcode_clone = bibcode.clone(); let source_url_clone = source_url.clone(); let mut submitted_ok = true; let batch_id = if let Some(id) = existing_batch_id { { let mut s = status.lock().await; s.add_log(format!("文献 {} 检测到未完成的 MinerU 任务,正在恢复轮询 (Batch ID: {})...", bibcode, id)); } id } else { { let mut s = status.lock().await; s.add_log(format!("文献 {} PDF 提交后台解析 (MinerU)...", bibcode)); } match crate::services::parser::submit_pdf_to_mineru(&pdf_abs, &config).await { Ok(id) => { // 提交成功,立刻把 batch_id 存入数据库以备断点续跑 let marker = format!("mineru_batch:{}", id); let _ = sqlx::query("UPDATE papers SET markdown_path = ? WHERE bibcode = ?") .bind(&marker) .bind(&bibcode) .execute(&db) .await; id } Err(e) => { let mut s = status.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} PDF 提交 MinerU 失败: {}", bibcode, e)); let err_reason = format!("error: {}", e); let _ = sqlx::query("UPDATE papers SET markdown_path = ? WHERE bibcode = ?") .bind(&err_reason) .bind(&bibcode) .execute(&db) .await; submitted_ok = false; String::new() } } }; if submitted_ok { let handle = tokio::spawn(async move { match crate::services::parser::poll_and_extract_mineru(&batch_id, &bibcode_clone, &qiniu_clone, &config_clone).await { Ok(md) => { let paper_meta_res = sqlx::query("SELECT title, authors, pub, year, keywords FROM papers WHERE bibcode = ?") .bind(&bibcode_clone) .fetch_optional(&db_clone) .await; let mut rel_md = String::new(); if let Ok(Some(meta_row)) = paper_meta_res { let title: String = meta_row.get(0); let authors_json: String = meta_row.get(1); let pub_journal: String = meta_row.get(2); let year: String = meta_row.get(3); let keywords_json: String = meta_row.get(4); let authors: Vec = serde_json::from_str(&authors_json).unwrap_or_default(); let keywords: Vec = serde_json::from_str(&keywords_json).unwrap_or_default(); let front_matter = format!( "---\ntitle: {}\nauthor: [{}]\npublisher: {}\nsource: \"{}\"\ndate: \"{}\"\ntags: \"{}\"\n---\n\n", serde_json::to_string(&title).unwrap_or_else(|_| format!("\"{}\"", title)), authors.iter().map(|a| format!("\"{}\"", a)).collect::>().join(", "), serde_json::to_string(&pub_journal).unwrap_or_else(|_| format!("\"{}\"", pub_journal)), source_url_clone, year, keywords.join(",") ); let parsed_markdown = format!("{}{}", front_matter, md); let md_filename = format!("{}.md", bibcode_clone); let md_dest = config_clone.library_dir.join("Markdown").join(&md_filename); let _ = fs::create_dir_all(md_dest.parent().unwrap()); if fs::write(&md_dest, &parsed_markdown).is_ok() { rel_md = format!("Markdown/{}", md_filename); } } if !rel_md.is_empty() { let _ = sqlx::query("UPDATE papers SET markdown_path = ? WHERE bibcode = ?") .bind(&rel_md) .bind(&bibcode_clone) .execute(&db_clone) .await; let mut s = status_clone.lock().await; s.parsed += 1; s.add_log(format!("文献 {} PDF (MinerU) 解析成功!", bibcode_clone)); } else { let mut s = status_clone.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} PDF 写入 Markdown 失败。", bibcode_clone)); let _ = sqlx::query("UPDATE papers SET markdown_path = 'error: PDF 写入 Markdown 失败' WHERE bibcode = ?") .bind(&bibcode_clone) .execute(&db_clone) .await; } } Err(e) => { let mut s = status_clone.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} PDF 结构解析失败 (MinerU): {}", bibcode_clone, e)); let err_reason = format!("error: {}", e); let _ = sqlx::query("UPDATE papers SET markdown_path = ? WHERE bibcode = ?") .bind(&err_reason) .bind(&bibcode_clone) .execute(&db_clone) .await; } } }); join_handles.push(handle); } } else { let mut s = status.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} 本地 PDF 文件不存在,无法解析。", bibcode)); let _ = sqlx::query("UPDATE papers SET markdown_path = 'error: 本地 PDF 文件不存在' WHERE bibcode = ?") .bind(&bibcode) .execute(&db) .await; } } else { let mut s = status.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} HTML 转换失败,且无本地 PDF,无法解析。", bibcode)); let _ = sqlx::query("UPDATE papers SET markdown_path = 'error: HTML 转换失败且无本地 PDF' WHERE bibcode = ?") .bind(&bibcode) .execute(&db) .await; } } } else { let mut s = status.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} 无本地 PDF/HTML,无法解析,跳过。", bibcode)); } } else { { let mut s = status.lock().await; s.add_log(format!("文献 {} 已存在解析后的 Markdown,跳过。", bibcode)); } let mut s = status.lock().await; s.parsed += 1; } } // 4. 检查并执行翻译 if action == SyncAction::Translate { let is_tr_exist = translation_path.as_ref().map(|p| config.library_dir.join(p).exists() && !p.starts_with("error:")).unwrap_or(false); if !is_tr_exist { if let Some(md_rel) = &markdown_path { if !md_rel.starts_with("error:") { let md_abs = config.library_dir.join(md_rel); if md_abs.exists() { { let mut s = status.lock().await; s.add_log(format!("文献 {} 开始调用 LLM 翻译...", bibcode)); } match fs::read_to_string(&md_abs) { Ok(english_markdown) => { match crate::services::translation::translate_markdown(&english_markdown, &dict, &config).await { Ok(translated_markdown) => { let tr_filename = format!("{}_zh.md", bibcode); let tr_dest = config.library_dir.join("Translation").join(&tr_filename); let _ = fs::create_dir_all(tr_dest.parent().unwrap()); if fs::write(&tr_dest, &translated_markdown).is_ok() { let relative_tr_path = format!("Translation/{}", tr_filename); let _ = sqlx::query("UPDATE papers SET translation_path = ? WHERE bibcode = ?") .bind(&relative_tr_path) .bind(&bibcode) .execute(&db) .await; let mut s = status.lock().await; s.parsed += 1; s.add_log(format!("文献 {} 翻译成功!", bibcode)); } else { let error_msg = "error: 写入翻译文件失败"; let _ = sqlx::query("UPDATE papers SET translation_path = ? WHERE bibcode = ?") .bind(error_msg) .bind(&bibcode) .execute(&db) .await; let mut s = status.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} 翻译文件写入失败。", bibcode)); } } Err(e) => { let error_msg = format!("error: {}", e); let _ = sqlx::query("UPDATE papers SET translation_path = ? WHERE bibcode = ?") .bind(&error_msg) .bind(&bibcode) .execute(&db) .await; let mut s = status.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} 翻译失败: {}", bibcode, e)); } } } Err(e) => { let error_msg = format!("error: 读取英文 Markdown 失败: {}", e); let _ = sqlx::query("UPDATE papers SET translation_path = ? WHERE bibcode = ?") .bind(&error_msg) .bind(&bibcode) .execute(&db) .await; let mut s = status.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} 读取英文 Markdown 失败: {}", bibcode, e)); } } } else { let error_msg = "error: 英文 Markdown 文件不存在"; let _ = sqlx::query("UPDATE papers SET translation_path = ? WHERE bibcode = ?") .bind(error_msg) .bind(&bibcode) .execute(&db) .await; let mut s = status.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} 英文 Markdown 文件不存在,无法翻译。", bibcode)); } } else { let error_msg = "error: 英文 Markdown 文件处于解析失败状态"; let _ = sqlx::query("UPDATE papers SET translation_path = ? WHERE bibcode = ?") .bind(error_msg) .bind(&bibcode) .execute(&db) .await; let mut s = status.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} 英文 Markdown 解析失败,跳过翻译。", bibcode)); } } else { let error_msg = "error: 尚未解析英文 Markdown 路径为 NULL"; let _ = sqlx::query("UPDATE papers SET translation_path = ? WHERE bibcode = ?") .bind(error_msg) .bind(&bibcode) .execute(&db) .await; let mut s = status.lock().await; s.parse_failed += 1; s.add_log(format!("文献 {} 尚未解析英文 Markdown,跳过翻译。", bibcode)); } } else { { let mut s = status.lock().await; s.add_log(format!("文献 {} 已存在翻译,跳过。", bibcode)); } let mut s = status.lock().await; s.parsed += 1; } } } if !join_handles.is_empty() { { let mut s = status.lock().await; s.add_log(format!("本地下载与快速解析已完成,正在等待后台共 {} 个 MinerU 异步解析任务结束...", join_handles.len())); } for handle in join_handles { let _ = handle.await; } } { let mut s = status.lock().await; s.active = false; let action_desc = match action { SyncAction::Download => "下载", SyncAction::Parse => "解析", SyncAction::Translate => "翻译", SyncAction::All => "下载与解析", }; s.add_log(format!("批量{}任务顺利完成!", action_desc)); } }); } } #[cfg(test)] mod tests { use super::*; use sqlx::sqlite::SqlitePoolOptions; use std::fs; #[tokio::test] async fn test_process_status_log_rotation() { let mut status = AssetSyncStatus::new(); assert!(!status.active); for i in 0..150 { status.add_log(format!("log {}", i)); } assert_eq!(status.logs.len(), 100); assert_eq!(status.logs[0], "log 50"); assert_eq!(status.logs[99], "log 149"); } #[tokio::test] async fn test_bulk_processor_already_exists() -> anyhow::Result<()> { let pool = SqlitePoolOptions::new() .max_connections(1) .connect("sqlite::memory:") .await?; // 运行迁移 sqlx::migrate!("./migrations") .run(&pool) .await?; // 创建临时目录 let test_id = rand::random::(); let temp_dir = std::env::temp_dir().join(format!("astro_research_test_{}", test_id)); fs::create_dir_all(&temp_dir)?; // 准备子目录 let pdf_dir = temp_dir.join("PDF"); let html_dir = temp_dir.join("HTML"); let md_dir = temp_dir.join("Markdown"); fs::create_dir_all(&pdf_dir)?; fs::create_dir_all(&html_dir)?; fs::create_dir_all(&md_dir)?; // 写入已存在的文件 let bibcode = "2026A&A...123..456X".to_string(); let pdf_file_rel = format!("PDF/{}.pdf", bibcode); let html_file_rel = format!("HTML/{}.html", bibcode); fs::write(temp_dir.join(&pdf_file_rel), b"%PDF-1.5 test")?; fs::write(temp_dir.join(&html_file_rel), b"

Test Paper

Content

")?; // 插入数据库记录 sqlx::query( "INSERT INTO papers (bibcode, title, authors, pub, year, keywords, abstract, arxiv_id, doi, pdf_path, html_path, markdown_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" ) .bind(&bibcode) .bind("Test Title") .bind("[\"Author A\"]") .bind("Test Journal") .bind("2026") .bind("[\"Key\"]") .bind("Test abstract") .bind("") .bind("10.1000/test.doi") .bind(&pdf_file_rel) .bind(&html_file_rel) .bind(None::) .execute(&pool) .await?; let mut config = Config::from_env(); config.library_dir = temp_dir.clone(); let downloader = Arc::new(Downloader::new()); let qiniu = Arc::new(QiniuClient::new("test_access".to_string(), "test_secret".to_string(), "test_bucket".to_string(), "test_domain".to_string())); let status = Arc::new(Mutex::new(AssetSyncStatus::new())); let dict = Arc::new(crate::services::translation::Dictionary::new()); AssetSync::start_process( pool.clone(), config, downloader, qiniu, dict, SyncAction::All, vec![bibcode.clone()], status.clone(), ); // 轮询直至 active 为 false let mut success = false; for _ in 0..50 { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; let s = status.lock().await; if !s.active { success = true; break; } } assert!(success); // 检查状态 { let s = status.lock().await; assert_eq!(s.total, 1); assert_eq!(s.downloaded, 1); // 存在本地文件时,直接 downloaded = 1 assert_eq!(s.parsed, 1); // 应该成功解析了 markdown } // 检查数据库和本地文件是否生成 let row = sqlx::query("SELECT markdown_path FROM papers WHERE bibcode = ?") .bind(&bibcode) .fetch_one(&pool) .await?; let md_path_rel: String = row.get(0); assert_eq!(md_path_rel, format!("Markdown/{}.md", bibcode)); assert!(temp_dir.join(&md_path_rel).exists()); // 清理临时目录 let _ = fs::remove_dir_all(&temp_dir); Ok(()) } #[tokio::test] async fn test_bulk_processor_stop() -> anyhow::Result<()> { let pool = SqlitePoolOptions::new() .max_connections(1) .connect("sqlite::memory:") .await?; sqlx::migrate!("./migrations") .run(&pool) .await?; let test_id = rand::random::(); let temp_dir = std::env::temp_dir().join(format!("astro_research_test_stop_{}", test_id)); fs::create_dir_all(&temp_dir)?; // Setup directories fs::create_dir_all(temp_dir.join("PDF"))?; fs::create_dir_all(temp_dir.join("Markdown"))?; let bib1 = "2026A&A...123..456A".to_string(); let bib2 = "2026MNRAS.530.1234B".to_string(); // Write dummy files to skip download/parsing for both fs::write(temp_dir.join(format!("PDF/{}.pdf", bib1)), b"PDF")?; fs::write(temp_dir.join(format!("Markdown/{}.md", bib1)), b"MD")?; fs::write(temp_dir.join(format!("PDF/{}.pdf", bib2)), b"PDF")?; fs::write(temp_dir.join(format!("Markdown/{}.md", bib2)), b"MD")?; // Seed DB for bib1 sqlx::query( "INSERT INTO papers (bibcode, title, authors, pub, year, keywords, abstract, arxiv_id, doi, pdf_path, markdown_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" ) .bind(&bib1) .bind("Paper 1") .bind("[]") .bind("A&A") .bind("2026") .bind("[]") .bind("") .bind("") .bind("") .bind(format!("PDF/{}.pdf", bib1)) .bind(format!("Markdown/{}.md", bib1)) .execute(&pool) .await?; // Seed DB for bib2 sqlx::query( "INSERT INTO papers (bibcode, title, authors, pub, year, keywords, abstract, arxiv_id, doi, pdf_path, markdown_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" ) .bind(&bib2) .bind("Paper 2") .bind("[]") .bind("MNRAS") .bind("2026") .bind("[]") .bind("") .bind("") .bind("") .bind(format!("PDF/{}.pdf", bib2)) .bind(format!("Markdown/{}.md", bib2)) .execute(&pool) .await?; let mut config = Config::from_env(); config.library_dir = temp_dir.clone(); let downloader = Arc::new(Downloader::new()); let qiniu = Arc::new(QiniuClient::new("test_access".to_string(), "test_secret".to_string(), "test_bucket".to_string(), "test_domain".to_string())); let status = Arc::new(Mutex::new(AssetSyncStatus::new())); let dict = Arc::new(crate::services::translation::Dictionary::new()); AssetSync::start_process( pool.clone(), config, downloader, qiniu, dict, SyncAction::All, vec![bib1.clone(), bib2.clone()], status.clone(), ); // Wait until bib1 starts processing, then stop it immediately let mut stopped = false; for _ in 0..10000 { tokio::task::yield_now().await; let mut s = status.lock().await; if s.active && s.current_bibcode == bib1 { s.active = false; stopped = true; break; } } assert!(stopped); // Wait until active becomes false let mut success = false; for _ in 0..100 { tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; let s = status.lock().await; if !s.active { success = true; break; } } assert!(success); // Verify that bib2 was not processed (downloaded/parsed stats should be at most 1) { let s = status.lock().await; assert!(s.downloaded <= 1); assert!(s.parsed <= 1); } // Clean up let _ = fs::remove_dir_all(&temp_dir); Ok(()) } }