Skip to content

Commit 98a58ef

Browse files
committed
fix: tighten retry sync and api cache handling
1 parent 90c4d6c commit 98a58ef

7 files changed

Lines changed: 150 additions & 72 deletions

File tree

quantclass_sync_internal/gui/api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,8 @@ def start_sync(self, retry_failed: bool = False) -> Dict[str, Any]:
451451
item.get("product", "") for item in failed
452452
if isinstance(item, dict) and item.get("product")
453453
]
454+
if not retry_products:
455+
return {"started": False, "message": "没有失败产品"}
454456

455457
self._progress = dict(_PROGRESS_INIT)
456458
self._progress["status"] = "syncing"

quantclass_sync_internal/orchestrator.py

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,19 @@ def _is_business_day_only_product(product: str) -> bool:
381381

382382
return normalize_product_name(product) in BUSINESS_DAY_ONLY_PRODUCTS
383383

384+
385+
def _normalize_cached_api_dates(raw_dates: object, product: str) -> List[str]:
386+
"""兼容新旧缓存格式,统一转成日期列表。"""
387+
388+
if isinstance(raw_dates, str):
389+
items = [raw_dates]
390+
elif isinstance(raw_dates, (list, tuple, set)):
391+
items = [str(x) for x in raw_dates]
392+
else:
393+
items = []
394+
return _normalize_date_queue(items, product=product, apply_business_day_filter=False)
395+
396+
384397
def _normalize_date_queue(
385398
raw_dates: Sequence[str],
386399
*,
@@ -499,7 +512,7 @@ def _resolve_requested_dates_for_plan(
499512
t_product_start: float,
500513
catch_up_to_latest: bool = False,
501514
lock: Optional[threading.Lock] = None,
502-
api_date_cache: Optional[Dict[str, Tuple[str, str]]] = None,
515+
api_date_cache: Optional[Dict[str, Tuple[List[str], str]]] = None,
503516
) -> Tuple[List[str], bool]:
504517
"""
505518
解析单产品执行日期列表,并处理 timestamp 门控。
@@ -522,19 +535,19 @@ def _resolve_requested_dates_for_plan(
522535
if api_date_cache:
523536
cached = api_date_cache.get(product_name) or api_date_cache.get(plan.name)
524537
if cached:
525-
cached_date, checked_at_str = cached
538+
cached_dates, checked_at_str = cached
526539
if _is_cache_fresh(checked_at_str):
540+
api_latest_candidates = _normalize_cached_api_dates(cached_dates, product_name)
527541
# 计算缓存年龄用于日志
528542
try:
529543
checked_at = datetime.strptime(checked_at_str, "%Y-%m-%dT%H:%M:%S")
530544
age_seconds = (datetime.now() - checked_at).total_seconds()
531545
except ValueError:
532546
age_seconds = 0.0
533547
log_info(
534-
f"[{plan.name}] 使用缓存 API 日期 {cached_date}{int(age_seconds)}s 前查询)",
548+
f"[{plan.name}] 使用缓存 API 日期 {api_latest_candidates[-1]}{int(age_seconds)}s 前查询)",
535549
event="PRODUCT_PLAN", decision="cache_hit",
536550
)
537-
api_latest_candidates = [cached_date]
538551
cache_hit = True
539552

540553
if not cache_hit:
@@ -703,8 +716,14 @@ def _upsert_product_status_after_success(
703716
status.last_update_time = utc_now_iso()
704717
status.data_time = actual_time
705718
status.data_content_time = actual_time
706-
upsert_product_status(conn, status)
707-
write_local_timestamp(command_ctx.data_root, product, actual_time)
719+
try:
720+
upsert_product_status(conn, status, commit_immediately=False)
721+
write_local_timestamp(command_ctx.data_root, product, actual_time)
722+
conn.commit()
723+
except Exception:
724+
with contextlib.suppress(Exception):
725+
conn.rollback()
726+
raise
708727

709728

710729
def _collect_preprocess_source_successes(report: RunReport) -> List[ProductRunResult]:
@@ -916,7 +935,7 @@ def _prefetch_api_dates(
916935
hid: str,
917936
headers: Dict[str, str],
918937
max_workers: int = 8,
919-
) -> Dict[str, Tuple[str, str]]:
938+
) -> Dict[str, Tuple[List[str], str]]:
920939
"""并发预取产品的 API 最新日期,写入缓存并返回。
921940
922941
已在缓存中且未过期的产品跳过。失败的产品静默跳过,
@@ -947,18 +966,18 @@ def _prefetch_api_dates(
947966
f"[预取] 并发查询 {len(uncached)}/{len(products)} 个产品",
948967
event="PREFETCH", decision="fetching",
949968
)
950-
fetched: Dict[str, str] = {} # 写入仅在主线程的 as_completed 循环内,无并发写入
969+
fetched: Dict[str, List[str]] = {} # 写入仅在主线程的 as_completed 循环内,无并发写入
951970
# abort_event 只能拦截尚未开始的 worker,已在执行的请求会自然完成或超时
952971
abort_event = threading.Event()
953972
t_start = time.time()
954973

955-
def _fetch_one(product: str) -> Tuple[str, Optional[str]]:
974+
def _fetch_one(product: str) -> Tuple[str, Optional[List[str]]]:
956975
"""单产品 HTTP 查询,401/403 触发全局中止。"""
957976
if abort_event.is_set():
958977
return product, None
959978
try:
960-
date_str = get_latest_time(api_base, product, hid, headers)
961-
return product, date_str
979+
date_list = get_latest_times(api_base, product, hid, headers)
980+
return product, date_list
962981
except FatalRequestError as exc:
963982
# 认证失败时中止整个预取
964983
if exc.status_code in (401, 403):
@@ -973,9 +992,9 @@ def _fetch_one(product: str) -> Tuple[str, Optional[str]]:
973992
futures = {executor.submit(_fetch_one, p): p for p in uncached}
974993
for future in as_completed(futures, timeout=30):
975994
try:
976-
product, date_str = future.result()
977-
if date_str:
978-
fetched[product] = date_str
995+
product, date_list = future.result()
996+
if date_list:
997+
fetched[product] = date_list
979998
except Exception:
980999
pass
9811000
if abort_event.is_set():
@@ -999,15 +1018,15 @@ def _fetch_one(product: str) -> Tuple[str, Optional[str]]:
9991018
pass
10001019
# 合并:保留新鲜的已有缓存 + 刚预取的结果
10011020
checked_at_now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
1002-
merged: Dict[str, Tuple[str, str]] = dict(existing_cache)
1003-
for product, date_str in fetched.items():
1004-
merged[product] = (date_str, checked_at_now)
1021+
merged: Dict[str, Tuple[List[str], str]] = dict(existing_cache)
1022+
for product, date_list in fetched.items():
1023+
merged[product] = (list(date_list), checked_at_now)
10051024
return merged
10061025

10071026

10081027
def _estimate_sync_workload(
10091028
plans: Sequence[ProductPlan],
1010-
api_date_cache: Dict[str, Tuple[str, str]],
1029+
api_date_cache: Dict[str, Tuple[List[str], str]],
10111030
data_root: Path,
10121031
api_call_limit: int = 50,
10131032
course_type: str = "",
@@ -1032,7 +1051,8 @@ def _estimate_sync_workload(
10321051
local_date = infer_local_date_from_csv(data_root, product_name, rule)
10331052
# 读 API 最新日期(来自预取缓存)
10341053
cached = api_date_cache.get(product_name)
1035-
api_date = cached[0] if cached else None
1054+
api_dates = _normalize_cached_api_dates(cached[0], product_name) if cached else []
1055+
api_date = api_dates[-1] if api_dates else None
10361056
if not api_date:
10371057
# 无 API 日期,计为 1 次
10381058
products_list.append({
@@ -1053,9 +1073,10 @@ def _estimate_sync_workload(
10531073
try:
10541074
local_d = date.fromisoformat(local_date)
10551075
api_d = date.fromisoformat(api_date)
1056-
gap = max(0, (api_d - local_d).days)
1076+
calendar_gap = max(0, (api_d - local_d).days)
10571077
except ValueError:
1058-
gap = 1
1078+
calendar_gap = 1
1079+
gap = len(_expected_catchup_dates(local_date, api_date, product_name)) or calendar_gap
10591080
if gap == 0:
10601081
continue # 已是最新,不计入
10611082
products_list.append({
@@ -1238,7 +1259,6 @@ def _run_one_plan(plan: ProductPlan) -> Tuple[bool, float, SyncStats, str, str]:
12381259
continue
12391260

12401261
# 成功路径:total.merge + 状态持久化 + _append_result 在同一锁作用域
1241-
status_persist_warning = ""
12421262
with _lock:
12431263
total.merge(stats)
12441264
product_stats.merge(stats) # 累积本产品 stats 用于进度回调
@@ -1250,9 +1270,13 @@ def _run_one_plan(plan: ProductPlan) -> Tuple[bool, float, SyncStats, str, str]:
12501270
actual_time=actual_time,
12511271
)
12521272
except Exception as status_exc:
1253-
status_persist_warning = (
1254-
f"状态持久化失败(已忽略,不影响本次成功结果): {status_exc}"
1255-
)
1273+
raise ProductSyncError(
1274+
message=(
1275+
f"产品 {product} 状态持久化失败;"
1276+
f"为避免数据文件与 timestamp/状态库不一致,本次按失败处理。原始错误:{status_exc}"
1277+
),
1278+
reason_code=REASON_MERGE_ERROR,
1279+
) from status_exc
12561280
_append_result(
12571281
report,
12581282
product=product,
@@ -1265,12 +1289,6 @@ def _run_one_plan(plan: ProductPlan) -> Tuple[bool, float, SyncStats, str, str]:
12651289
source_path=source_path,
12661290
)
12671291
report.phase_sync_seconds += max(0.0, time.time() - t_sync_phase)
1268-
if status_persist_warning:
1269-
log_info(
1270-
f"[{plan.name}] {status_persist_warning}",
1271-
event="SYNC_WARN",
1272-
reason_code=reason_code,
1273-
)
12741292
continue
12751293
except ProductSyncError as exc:
12761294
# 可预期业务错误:带有明确 reason_code。

quantclass_sync_internal/status_store.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,18 @@
3737
_SOURCE_SYNC = "sync"
3838

3939

40+
def _normalize_api_date_candidates(raw: object) -> List[str]:
41+
"""把缓存里的 API 日期载荷规范化为升序去重列表。"""
42+
43+
if isinstance(raw, str):
44+
items = [x for x in re.split(r"[,\s]+", raw) if x]
45+
elif isinstance(raw, (list, tuple, set)):
46+
items = [str(x).strip() for x in raw if str(x).strip()]
47+
else:
48+
items = []
49+
return sorted({x for x in (normalize_data_date(item) for item in items) if x})
50+
51+
4052
def _status_db_has_rows(path: Path) -> bool:
4153
"""判断状态库是否可用(存在 product_status 且至少 1 行)。"""
4254

@@ -483,7 +495,7 @@ def _update_product_last_status(log_dir: Path, report: RunReport) -> None:
483495
tmp.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding="utf-8")
484496

485497

486-
def update_api_latest_dates(log_dir: Path, api_latest_dates: Dict[str, str]) -> None:
498+
def update_api_latest_dates(log_dir: Path, api_latest_dates: Dict[str, object]) -> None:
487499
"""将检查更新查到的 API 最新日期写入累积状态文件的 date_time 字段。
488500
489501
只更新 api_latest_dates 中有的产品,其他产品保持不变。
@@ -505,26 +517,31 @@ def update_api_latest_dates(log_dir: Path, api_latest_dates: Dict[str, str]) ->
505517
else:
506518
existing = _scan_reports_for_backfill(log_dir)
507519
checked_at = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
508-
for product, date_str in api_latest_dates.items():
520+
for product, raw_dates in api_latest_dates.items():
521+
candidates = _normalize_api_date_candidates(raw_dates)
522+
if not candidates:
523+
continue
524+
latest_date = candidates[-1]
509525
if product in existing:
510-
existing[product]["date_time"] = date_str
526+
existing[product]["date_time"] = latest_date
527+
existing[product]["api_dates"] = candidates
511528
existing[product]["checked_at"] = checked_at
512529
existing[product]["source"] = _SOURCE_API_CHECK
513530
else:
514531
existing[product] = {
515532
"status": "", "reason_code": "", "error": "",
516-
"date_time": date_str, "checked_at": checked_at,
533+
"date_time": latest_date, "api_dates": candidates, "checked_at": checked_at,
517534
"source": _SOURCE_API_CHECK,
518535
}
519536
with atomic_temp_path(status_path, tag="last_status") as tmp:
520537
tmp.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding="utf-8")
521538

522539

523-
def load_api_latest_dates(log_dir: Path) -> Dict[str, Tuple[str, str]]:
540+
def load_api_latest_dates(log_dir: Path) -> Dict[str, Tuple[List[str], str]]:
524541
"""读取 product_last_status.json 中由 check_updates 写入的 API 最新日期缓存。
525542
526543
只返回 source=="api_check" 的记录(排除同步结果写入的记录)。
527-
返回 {product: (date_time, checked_at)}。
544+
返回 {product: ([date_time...], checked_at)}。
528545
文件不存在或解析失败时返回空字典(静默降级)。
529546
"""
530547
status_path = log_dir / PRODUCT_LAST_STATUS_FILE
@@ -536,14 +553,15 @@ def load_api_latest_dates(log_dir: Path) -> Dict[str, Tuple[str, str]]:
536553
return {}
537554
if not isinstance(data, dict):
538555
return {}
539-
result: Dict[str, Tuple[str, str]] = {}
556+
result: Dict[str, Tuple[List[str], str]] = {}
540557
for product, info in data.items():
541558
if not isinstance(info, dict):
542559
continue
543560
# 只读取 check_updates 写入的记录,排除同步结果
544561
if info.get("source") != _SOURCE_API_CHECK:
545562
continue
546-
dt, ca = info.get("date_time"), info.get("checked_at")
547-
if isinstance(dt, str) and isinstance(ca, str):
548-
result[product] = (dt, ca)
563+
candidates = _normalize_api_date_candidates(info.get("api_dates", info.get("date_time")))
564+
checked_at = info.get("checked_at")
565+
if candidates and isinstance(checked_at, str):
566+
result[product] = (candidates, checked_at)
549567
return result

tests/test_gui_api.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,6 +1129,33 @@ def test_no_failed_products(self):
11291129
self.assertEqual(result["message"], "没有失败产品")
11301130

11311131

1132+
class TestStartSyncRetryFailedMalformedEntries(unittest.TestCase):
1133+
"""start_sync(retry_failed=True) 遇到坏数据时不应退化成全量同步。"""
1134+
1135+
def test_malformed_failed_products_returns_error(self):
1136+
with tempfile.TemporaryDirectory() as tmp_dir:
1137+
config_file = Path(tmp_dir) / "user_config.json"
1138+
config_file.write_text("{}", encoding="utf-8")
1139+
mock_config = _make_mock_config(tmp_dir)
1140+
1141+
with patch(f"{_API_MOD}.DEFAULT_USER_CONFIG_FILE", config_file), \
1142+
patch(f"{_API_MOD}.load_user_config_or_raise", return_value=mock_config), \
1143+
patch(f"{_API_MOD}.load_catalog_or_raise", return_value=["product-a"]):
1144+
1145+
from quantclass_sync_internal.gui.api import SyncApi
1146+
api = SyncApi()
1147+
1148+
with api._lock:
1149+
api._progress["run_summary"] = {
1150+
"failed_products": [{"error": "missing product key"}]
1151+
}
1152+
1153+
result = api.start_sync(retry_failed=True)
1154+
1155+
self.assertFalse(result["started"])
1156+
self.assertEqual(result["message"], "没有失败产品")
1157+
1158+
11321159
class TestStartSyncRetryFailedNoSummary(unittest.TestCase):
11331160
"""start_sync(retry_failed=True) 但无 run_summary 时返回错误。"""
11341161

tests/test_state_consistency_and_workdir_isolation.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from unittest.mock import patch
66

77

8-
from quantclass_sync_internal.constants import REASON_OK, STRATEGY_MERGE_KNOWN, TIMESTAMP_FILE_NAME
8+
from quantclass_sync_internal.constants import REASON_MERGE_ERROR, REASON_OK, STRATEGY_MERGE_KNOWN, TIMESTAMP_FILE_NAME
99
from quantclass_sync_internal.models import CommandContext, ProductPlan, SyncStats
1010
from quantclass_sync_internal import orchestrator
1111
from quantclass_sync_internal.reporting import _new_report
@@ -29,7 +29,7 @@ def setUp(self) -> None:
2929
def tearDown(self) -> None:
3030
self.tmp.cleanup()
3131

32-
def test_execute_plans_keeps_business_success_when_status_write_fails(self) -> None:
32+
def test_execute_plans_marks_error_when_status_write_fails(self) -> None:
3333
report = _new_report(self.ctx.run_id, mode="network")
3434

3535
def fake_process_product(
@@ -63,9 +63,9 @@ def fake_process_product(
6363
catch_up_to_latest=False,
6464
)
6565

66-
self.assertFalse(has_error)
67-
self.assertEqual(["ok"], [item.status for item in report.products])
68-
self.assertEqual(REASON_OK, report.products[0].reason_code)
66+
self.assertTrue(has_error)
67+
self.assertEqual(["error"], [item.status for item in report.products])
68+
self.assertEqual(REASON_MERGE_ERROR, report.products[0].reason_code)
6969

7070
def test_download_and_prepare_extract_scopes_workdir_by_run_id(self) -> None:
7171
work_dir = self.root / ".cache"

tests/test_status_store.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,15 @@ class TestLoadApiLatestDates(unittest.TestCase):
188188
"""load_api_latest_dates 读取缓存日期。"""
189189

190190
def test_normal_read(self):
191-
"""正常读取,返回 {product: (date_time, checked_at)}。"""
191+
"""正常读取,返回 {product: ([date_time...], checked_at)}。"""
192192
with tempfile.TemporaryDirectory() as tmpdir:
193193
log_dir = Path(tmpdir)
194194
update_api_latest_dates(log_dir, {"prod-a": "2026-03-18", "prod-b": "2026-03-17"})
195195
cache = load_api_latest_dates(log_dir)
196196
self.assertIn("prod-a", cache)
197197
self.assertIn("prod-b", cache)
198-
date_time, checked_at = cache["prod-a"]
199-
self.assertEqual(date_time, "2026-03-18")
198+
date_times, checked_at = cache["prod-a"]
199+
self.assertEqual(date_times, ["2026-03-18"])
200200
self.assertIn("T", checked_at)
201201

202202
def test_missing_file_returns_empty(self):

0 commit comments

Comments
 (0)