From 87537eb8b0415613986165c017d42ba276ebcff5 Mon Sep 17 00:00:00 2001 From: Copilot Date: Wed, 3 Jun 2026 05:35:22 -0500 Subject: [PATCH] fix(scrape): preserve partial temps on OOM; large-file salvage merge OOM/aborted channel exports no longer delete partial temp downloads. Salvage uses grep boundary repair with python merge/validate for files over 64 MiB. Retain stale temps when merge fails instead of discarding. --- ...3-043-fix-kotor-validation-closure-plan.md | 35 ++++ docs/recurring-scrape-merge-readiness.md | 4 +- scripts/run-discord-scrape.sh | 176 ++++++++++++++---- 3 files changed, 180 insertions(+), 35 deletions(-) create mode 100644 docs/plans/2026-06-03-043-fix-kotor-validation-closure-plan.md diff --git a/docs/plans/2026-06-03-043-fix-kotor-validation-closure-plan.md b/docs/plans/2026-06-03-043-fix-kotor-validation-closure-plan.md new file mode 100644 index 00000000..c74b8862 --- /dev/null +++ b/docs/plans/2026-06-03-043-fix-kotor-validation-closure-plan.md @@ -0,0 +1,35 @@ +--- +title: "fix: Preserve partial exports on OOM skip; large-file salvage" +type: fix +status: complete +date: 2026-06-03 +origin: /lfg — yes_general re-downloads because OOM skip deletes partial temp; salvage fails on 500MB+ JSON +--- + +# fix: Preserve partial exports on OOM skip; large-file salvage + +## Problem + +1. **OOM skip discards progress:** When export exits 134/137/139, `scrape_target` SKIPs the channel and `rm -rf`s the temp dir — losing partial downloads (514 MB, 1 GB). +2. **Salvage fails on large files:** Python marker salvage + `jq empty` on 500 MB+ truncated JSON fails in container (`mktemp` / memory). +3. **Re-download loop:** Stale temps discarded → incremental starts from 2021 archive cursor → 35+ min re-fetch every run. + +## Requirements + +| ID | Requirement | +|----|-------------| +| R1 | On SKIPPED export (exit 2), **do not** delete temp dir — leave for next-run salvage | +| R2 | `salvage_truncated_json` uses grep/head boundary repair; mktemp uses `${TMPDIR:-/tmp}` | +| R3 | Skip full-file `jq empty` on exports > 64 MiB; validate via python message-count probe | +| R4 | Large merge (>64 MiB combined) uses python id-merge instead of jq | +| R5 | Smoke tests pass; salvage-stale smoke unchanged | +| R6 | Salvage current 1 GB yes_general temp, merge into archive, verify `--after` advances | + +## Verification + +```bash +./scripts/tests/run-discord-scrape-smoke.sh +DCE_MIN_FREE_MB=0 ./scripts/run-all-smokes.sh +# After merge, incremental should show recent dateRange.after not 2021 +``` + diff --git a/docs/recurring-scrape-merge-readiness.md b/docs/recurring-scrape-merge-readiness.md index 77763cc1..c512cbf3 100644 --- a/docs/recurring-scrape-merge-readiness.md +++ b/docs/recurring-scrape-merge-readiness.md @@ -111,9 +111,9 @@ DCE_MIN_FREE_MB=0 ./scripts/run-operator-validation.sh --sync-gui --per-target - | expanded_kotor_discord | pass | pass | validation-resume | | eod_discord | pass | pass | validation-resume | | DS_Discord_msgs | pass | pass | validation-resume; some channels forbidden | -| KotOR_discord_msgs | **in progress** | — | plan 041 retry after abort-skip fix; log `logs/kotor-retry-20260530.log` | +| KotOR_discord_msgs | **in progress** | — | `yes_general` has ~5 years of backlog (archive cursor was Jan 2021); plan 043 preserves partial temps on OOM skip | -**KotOR remediation (plan 040–041):** `run-discord-scrape.sh` skips channels when export exits 134/137/139 (abort/OOM) or log matches disk/forbidden patterns. Offline regression: `run-discord-scrape-smoke.sh` `skip-abort` target. Re-run: +**KotOR / yes_general (plan 040–043):** Incremental `--after` works for all channels; most return `UNCHANGED` in seconds. `yes_general` archive last message was **2021-01-17** — the first catch-up legitimately fetches years of history. Prior bug: OOM skip **deleted** partial temp exports, causing re-download loops. Plan 043 preserves partial temps and salvages on next run. ```bash docker compose build # or podman-compose build diff --git a/scripts/run-discord-scrape.sh b/scripts/run-discord-scrape.sh index 0c36ebd1..c31bcf65 100755 --- a/scripts/run-discord-scrape.sh +++ b/scripts/run-discord-scrape.sh @@ -549,34 +549,83 @@ message_count() { jq -r '(.messages | length) // 0' "$export_path" } -salvage_truncated_json() { +file_size_bytes() { + local path=$1 + stat -c '%s' "$path" 2>/dev/null || stat -f '%z' "$path" 2>/dev/null || echo 0 +} + +LARGE_EXPORT_BYTES=67108864 + +json_is_valid() { + local path=$1 + local size + size=$(file_size_bytes "$path") + if (( size > LARGE_EXPORT_BYTES )); then + python3 - "$path" <<'PY' >/dev/null 2>&1 +import json, sys +with open(sys.argv[1]) as f: + json.load(f) +PY + return $? + fi + jq empty "$path" >/dev/null 2>&1 +} + +message_count_fast() { local export_path=$1 - if jq empty "$export_path" >/dev/null 2>&1; then + local size count + size=$(file_size_bytes "$export_path") + if (( size > LARGE_EXPORT_BYTES )); then + count=$(python3 - "$export_path" <<'PY' +import json, sys +with open(sys.argv[1]) as f: + print(len(json.load(f).get("messages", []))) +PY +) + printf '%s\n' "$count" return 0 fi - command -v python3 >/dev/null 2>&1 || return 1 - python3 - "$export_path" <<'PY' || return 1 -import sys -from pathlib import Path + message_count "$export_path" +} -path = Path(sys.argv[1]) -data = path.read_bytes() -marker = b"},\n {" -idx = data.rfind(marker) -if idx < 0: - sys.exit(1) +salvage_truncated_json() { + local export_path=$1 + if json_is_valid "$export_path"; then + return 0 + fi -truncated = data[: idx + 1] -suffix = b'\n ],\n "messageCount": 0\n}' -path.write_bytes(truncated + suffix) -PY - jq empty "$export_path" >/dev/null 2>&1 || return 1 - local temp_file - temp_file=$(mktemp "${TMPDIR:-/tmp}/dce-salvage-fix.XXXXXX.json") - if jq '.messageCount = (.messages | length)' "$export_path" >"$temp_file" 2>/dev/null; then - mv -f "$temp_file" "$export_path" + local last_boundary_line temp_salvage + last_boundary_line=$(grep -n '^ },' "$export_path" | tail -1 | cut -d: -f1) + [[ -n "$last_boundary_line" ]] || return 1 + + temp_salvage=$(mktemp "${TMPDIR:-/tmp}/dce-salvage.XXXXXX") + { + head -n "$last_boundary_line" "$export_path" | sed '$ s/,$//' + printf ' ],\n "messageCount": 0\n}\n' + } >"$temp_salvage" + + if json_is_valid "$temp_salvage"; then + mv -f "$temp_salvage" "$export_path" else - rm -f "$temp_file" + rm -f "$temp_salvage" + return 1 + fi + + local temp_fix + temp_fix=$(mktemp "${TMPDIR:-/tmp}/dce-salvage-fix.XXXXXX") + if jq '.messageCount = (.messages | length)' "$export_path" >"$temp_fix" 2>/dev/null; then + mv -f "$temp_fix" "$export_path" + else + rm -f "$temp_fix" + python3 - "$export_path" <<'PY' 2>/dev/null || true +import json, sys +path = sys.argv[1] +with open(path) as f: + data = json.load(f) +data["messageCount"] = len(data.get("messages", [])) +with open(path, "w") as f: + json.dump(data, f, ensure_ascii=False) +PY fi } @@ -611,8 +660,8 @@ salvage_stale_temp_exports() { continue fi - local salvage_count - salvage_count=$(message_count "$stale_export") + local salvage_count merged_ok=0 + salvage_count=$(message_count_fast "$stale_export") if (( salvage_count == 0 )); then rm -rf "$stale_dir" continue @@ -620,22 +669,35 @@ salvage_stale_temp_exports() { if [[ -n "$destination_path" && -f "$destination_path" ]]; then salvage_merged="$stale_dir/merged.json" - if merge_exports "$destination_path" "$stale_export" "$salvage_merged" && [[ -s "$salvage_merged" ]]; then - if jq empty "$salvage_merged" >/dev/null 2>&1; then + if merge_exports_auto "$destination_path" "$stale_export" "$salvage_merged" && [[ -s "$salvage_merged" ]]; then + if json_is_valid "$salvage_merged"; then local before_count after_count - before_count=$(message_count "$destination_path") + before_count=$(message_count_fast "$destination_path") commit_merged_export "$destination_path" "$salvage_merged" - after_count=$(message_count "$destination_path") - log " SALVAGED $destination_path (+$((after_count - before_count)) messages from stale temp, $before_count → $after_count)" + after_count=$(message_count_fast "$destination_path") + if (( after_count > before_count )); then + log " SALVAGED $destination_path (+$((after_count - before_count)) messages from stale temp, $before_count → $after_count)" + merged_ok=1 + else + log " Stale temp merged with no new messages, discarding: $stale_dir" + merged_ok=1 + fi + else + log " Stale temp merge produced invalid JSON, retaining for retry: $stale_dir" fi + else + log " Stale temp merge failed, retaining for retry: $stale_dir" fi elif [[ -n "$destination_path" ]]; then mkdir -p "$(dirname "$destination_path")" cp "$stale_export" "$destination_path" log " SALVAGED $destination_path (${salvage_count} messages from stale temp, new archive)" + merged_ok=1 fi - rm -rf "$stale_dir" + if (( merged_ok )); then + rm -rf "$stale_dir" + fi done } @@ -702,11 +764,55 @@ commit_merged_export() { atomic_path=$(mktemp -p "$(dirname "$destination_path")" ".$(basename "$destination_path").dce-replace.XXXXXX") cp "$merged_path" "$atomic_path" - jq empty "$atomic_path" >/dev/null 2>&1 || die "Merged export is not valid JSON: $atomic_path" + json_is_valid "$atomic_path" || die "Merged export is not valid JSON: $atomic_path" assert_export_channel_identity "$atomic_path" "$(channel_id_from_export "$destination_path")" mv -f "$atomic_path" "$destination_path" } +merge_exports_large() { + python3 - "$1" "$2" "$3" <<'PY' +import json, sys + +existing_path, incr_path, out_path = sys.argv[1:4] +with open(existing_path) as f: + base = json.load(f) +with open(incr_path) as f: + incr = json.load(f) + +by_id = {m["id"]: m for m in base.get("messages", [])} +for m in incr.get("messages", []): + by_id[m["id"]] = m + +base["messages"] = sorted(by_id.values(), key=lambda m: (m.get("timestamp") or "", m["id"])) +if "messageCount" in base or "messageCount" in incr: + base["messageCount"] = len(base["messages"]) +base.setdefault("dateRange", {}) +if incr.get("dateRange", {}).get("before"): + base["dateRange"]["before"] = incr["dateRange"]["before"] +if incr.get("dateRange", {}).get("after") and not base["dateRange"].get("after"): + base["dateRange"]["after"] = incr["dateRange"]["after"] +if incr.get("exportedAt"): + base["exportedAt"] = incr["exportedAt"] + +with open(out_path, "w") as f: + json.dump(base, f, ensure_ascii=False, indent=2) +PY +} + +merge_exports_auto() { + local existing_path=$1 + local incremental_path=$2 + local merged_path=$3 + local combined_size + + combined_size=$(( $(file_size_bytes "$existing_path") + $(file_size_bytes "$incremental_path") )) + if (( combined_size > LARGE_EXPORT_BYTES )); then + merge_exports_large "$existing_path" "$incremental_path" "$merged_path" + else + merge_exports "$existing_path" "$incremental_path" "$merged_path" + fi +} + merge_exports() { local existing_path=$1 local incremental_path=$2 @@ -1068,7 +1174,11 @@ scrape_target() { case "$export_status" in 0) ;; 2) - rm -rf "$temp_dir" + if [[ -s "$temp_export" ]]; then + log " Preserving partial export temp for salvage on next run: $temp_dir" + else + rm -rf "$temp_dir" + fi skipped_channels=$((skipped_channels + 1)) record_channel_result "$target_name" "$channel_id" "$guild_label" "${destination_path:-n/a}" SKIPPED "$before_count" 0 "$before_count" continue @@ -1105,7 +1215,7 @@ scrape_target() { continue fi - merge_exports "$destination_path" "$temp_export" "$temp_merged" + merge_exports_auto "$destination_path" "$temp_export" "$temp_merged" [[ -s "$temp_merged" ]] || die "Merged export is empty for channel $channel_id." jq empty "$temp_merged" >/dev/null 2>&1 || die "Merged export is not valid JSON: $temp_merged" assert_export_channel_identity "$temp_merged" "$channel_id"