fix(scrape): preserve partial temps on OOM; large-file salvage merge

OOM/aborted channel exports no longer delete partial temp downloads.
Salvage uses grep boundary repair with python merge/validate for files
over 64 MiB. Retain stale temps when merge fails instead of discarding.
This commit is contained in:
Copilot 2026-06-03 05:35:22 -05:00
parent c13c4167be
commit 87537eb8b0
3 changed files with 180 additions and 35 deletions

View file

@ -0,0 +1,35 @@
---
title: "fix: Preserve partial exports on OOM skip; large-file salvage"
type: fix
status: complete
date: 2026-06-03
origin: /lfg — yes_general re-downloads because OOM skip deletes partial temp; salvage fails on 500MB+ JSON
---
# fix: Preserve partial exports on OOM skip; large-file salvage
## Problem
1. **OOM skip discards progress:** When export exits 134/137/139, `scrape_target` SKIPs the channel and `rm -rf`s the temp dir — losing partial downloads (514 MB, 1 GB).
2. **Salvage fails on large files:** Python marker salvage + `jq empty` on 500 MB+ truncated JSON fails in container (`mktemp` / memory).
3. **Re-download loop:** Stale temps discarded → incremental starts from 2021 archive cursor → 35+ min re-fetch every run.
## Requirements
| ID | Requirement |
|----|-------------|
| R1 | On SKIPPED export (exit 2), **do not** delete temp dir — leave for next-run salvage |
| R2 | `salvage_truncated_json` uses grep/head boundary repair; mktemp uses `${TMPDIR:-/tmp}` |
| R3 | Skip full-file `jq empty` on exports > 64 MiB; validate via python message-count probe |
| R4 | Large merge (>64 MiB combined) uses python id-merge instead of jq |
| R5 | Smoke tests pass; salvage-stale smoke unchanged |
| R6 | Salvage current 1 GB yes_general temp, merge into archive, verify `--after` advances |
## Verification
```bash
./scripts/tests/run-discord-scrape-smoke.sh
DCE_MIN_FREE_MB=0 ./scripts/run-all-smokes.sh
# After merge, incremental should show recent dateRange.after not 2021
```

View file

@ -111,9 +111,9 @@ DCE_MIN_FREE_MB=0 ./scripts/run-operator-validation.sh --sync-gui --per-target -
| expanded_kotor_discord | pass | pass | validation-resume |
| eod_discord | pass | pass | validation-resume |
| DS_Discord_msgs | pass | pass | validation-resume; some channels forbidden |
| KotOR_discord_msgs | **in progress** | — | plan 041 retry after abort-skip fix; log `logs/kotor-retry-20260530.log` |
| KotOR_discord_msgs | **in progress** | — | `yes_general` has ~5 years of backlog (archive cursor was Jan 2021); plan 043 preserves partial temps on OOM skip |
**KotOR remediation (plan 040041):** `run-discord-scrape.sh` skips channels when export exits 134/137/139 (abort/OOM) or log matches disk/forbidden patterns. Offline regression: `run-discord-scrape-smoke.sh` `skip-abort` target. Re-run:
**KotOR / yes_general (plan 040043):** Incremental `--after` works for all channels; most return `UNCHANGED` in seconds. `yes_general` archive last message was **2021-01-17** — the first catch-up legitimately fetches years of history. Prior bug: OOM skip **deleted** partial temp exports, causing re-download loops. Plan 043 preserves partial temps and salvages on next run.
```bash
docker compose build # or podman-compose build

View file

@ -549,34 +549,83 @@ message_count() {
jq -r '(.messages | length) // 0' "$export_path"
}
salvage_truncated_json() {
file_size_bytes() {
local path=$1
stat -c '%s' "$path" 2>/dev/null || stat -f '%z' "$path" 2>/dev/null || echo 0
}
LARGE_EXPORT_BYTES=67108864
json_is_valid() {
local path=$1
local size
size=$(file_size_bytes "$path")
if (( size > LARGE_EXPORT_BYTES )); then
python3 - "$path" <<'PY' >/dev/null 2>&1
import json, sys
with open(sys.argv[1]) as f:
json.load(f)
PY
return $?
fi
jq empty "$path" >/dev/null 2>&1
}
message_count_fast() {
local export_path=$1
if jq empty "$export_path" >/dev/null 2>&1; then
local size count
size=$(file_size_bytes "$export_path")
if (( size > LARGE_EXPORT_BYTES )); then
count=$(python3 - "$export_path" <<'PY'
import json, sys
with open(sys.argv[1]) as f:
print(len(json.load(f).get("messages", [])))
PY
)
printf '%s\n' "$count"
return 0
fi
command -v python3 >/dev/null 2>&1 || return 1
python3 - "$export_path" <<'PY' || return 1
import sys
from pathlib import Path
message_count "$export_path"
}
path = Path(sys.argv[1])
data = path.read_bytes()
marker = b"},\n {"
idx = data.rfind(marker)
if idx < 0:
sys.exit(1)
salvage_truncated_json() {
local export_path=$1
if json_is_valid "$export_path"; then
return 0
fi
truncated = data[: idx + 1]
suffix = b'\n ],\n "messageCount": 0\n}'
path.write_bytes(truncated + suffix)
PY
jq empty "$export_path" >/dev/null 2>&1 || return 1
local temp_file
temp_file=$(mktemp "${TMPDIR:-/tmp}/dce-salvage-fix.XXXXXX.json")
if jq '.messageCount = (.messages | length)' "$export_path" >"$temp_file" 2>/dev/null; then
mv -f "$temp_file" "$export_path"
local last_boundary_line temp_salvage
last_boundary_line=$(grep -n '^ },' "$export_path" | tail -1 | cut -d: -f1)
[[ -n "$last_boundary_line" ]] || return 1
temp_salvage=$(mktemp "${TMPDIR:-/tmp}/dce-salvage.XXXXXX")
{
head -n "$last_boundary_line" "$export_path" | sed '$ s/,$//'
printf ' ],\n "messageCount": 0\n}\n'
} >"$temp_salvage"
if json_is_valid "$temp_salvage"; then
mv -f "$temp_salvage" "$export_path"
else
rm -f "$temp_file"
rm -f "$temp_salvage"
return 1
fi
local temp_fix
temp_fix=$(mktemp "${TMPDIR:-/tmp}/dce-salvage-fix.XXXXXX")
if jq '.messageCount = (.messages | length)' "$export_path" >"$temp_fix" 2>/dev/null; then
mv -f "$temp_fix" "$export_path"
else
rm -f "$temp_fix"
python3 - "$export_path" <<'PY' 2>/dev/null || true
import json, sys
path = sys.argv[1]
with open(path) as f:
data = json.load(f)
data["messageCount"] = len(data.get("messages", []))
with open(path, "w") as f:
json.dump(data, f, ensure_ascii=False)
PY
fi
}
@ -611,8 +660,8 @@ salvage_stale_temp_exports() {
continue
fi
local salvage_count
salvage_count=$(message_count "$stale_export")
local salvage_count merged_ok=0
salvage_count=$(message_count_fast "$stale_export")
if (( salvage_count == 0 )); then
rm -rf "$stale_dir"
continue
@ -620,22 +669,35 @@ salvage_stale_temp_exports() {
if [[ -n "$destination_path" && -f "$destination_path" ]]; then
salvage_merged="$stale_dir/merged.json"
if merge_exports "$destination_path" "$stale_export" "$salvage_merged" && [[ -s "$salvage_merged" ]]; then
if jq empty "$salvage_merged" >/dev/null 2>&1; then
if merge_exports_auto "$destination_path" "$stale_export" "$salvage_merged" && [[ -s "$salvage_merged" ]]; then
if json_is_valid "$salvage_merged"; then
local before_count after_count
before_count=$(message_count "$destination_path")
before_count=$(message_count_fast "$destination_path")
commit_merged_export "$destination_path" "$salvage_merged"
after_count=$(message_count "$destination_path")
after_count=$(message_count_fast "$destination_path")
if (( after_count > before_count )); then
log " SALVAGED $destination_path (+$((after_count - before_count)) messages from stale temp, $before_count$after_count)"
merged_ok=1
else
log " Stale temp merged with no new messages, discarding: $stale_dir"
merged_ok=1
fi
else
log " Stale temp merge produced invalid JSON, retaining for retry: $stale_dir"
fi
else
log " Stale temp merge failed, retaining for retry: $stale_dir"
fi
elif [[ -n "$destination_path" ]]; then
mkdir -p "$(dirname "$destination_path")"
cp "$stale_export" "$destination_path"
log " SALVAGED $destination_path (${salvage_count} messages from stale temp, new archive)"
merged_ok=1
fi
if (( merged_ok )); then
rm -rf "$stale_dir"
fi
done
}
@ -702,11 +764,55 @@ commit_merged_export() {
atomic_path=$(mktemp -p "$(dirname "$destination_path")" ".$(basename "$destination_path").dce-replace.XXXXXX")
cp "$merged_path" "$atomic_path"
jq empty "$atomic_path" >/dev/null 2>&1 || die "Merged export is not valid JSON: $atomic_path"
json_is_valid "$atomic_path" || die "Merged export is not valid JSON: $atomic_path"
assert_export_channel_identity "$atomic_path" "$(channel_id_from_export "$destination_path")"
mv -f "$atomic_path" "$destination_path"
}
merge_exports_large() {
python3 - "$1" "$2" "$3" <<'PY'
import json, sys
existing_path, incr_path, out_path = sys.argv[1:4]
with open(existing_path) as f:
base = json.load(f)
with open(incr_path) as f:
incr = json.load(f)
by_id = {m["id"]: m for m in base.get("messages", [])}
for m in incr.get("messages", []):
by_id[m["id"]] = m
base["messages"] = sorted(by_id.values(), key=lambda m: (m.get("timestamp") or "", m["id"]))
if "messageCount" in base or "messageCount" in incr:
base["messageCount"] = len(base["messages"])
base.setdefault("dateRange", {})
if incr.get("dateRange", {}).get("before"):
base["dateRange"]["before"] = incr["dateRange"]["before"]
if incr.get("dateRange", {}).get("after") and not base["dateRange"].get("after"):
base["dateRange"]["after"] = incr["dateRange"]["after"]
if incr.get("exportedAt"):
base["exportedAt"] = incr["exportedAt"]
with open(out_path, "w") as f:
json.dump(base, f, ensure_ascii=False, indent=2)
PY
}
merge_exports_auto() {
local existing_path=$1
local incremental_path=$2
local merged_path=$3
local combined_size
combined_size=$(( $(file_size_bytes "$existing_path") + $(file_size_bytes "$incremental_path") ))
if (( combined_size > LARGE_EXPORT_BYTES )); then
merge_exports_large "$existing_path" "$incremental_path" "$merged_path"
else
merge_exports "$existing_path" "$incremental_path" "$merged_path"
fi
}
merge_exports() {
local existing_path=$1
local incremental_path=$2
@ -1068,7 +1174,11 @@ scrape_target() {
case "$export_status" in
0) ;;
2)
if [[ -s "$temp_export" ]]; then
log " Preserving partial export temp for salvage on next run: $temp_dir"
else
rm -rf "$temp_dir"
fi
skipped_channels=$((skipped_channels + 1))
record_channel_result "$target_name" "$channel_id" "$guild_label" "${destination_path:-n/a}" SKIPPED "$before_count" 0 "$before_count"
continue
@ -1105,7 +1215,7 @@ scrape_target() {
continue
fi
merge_exports "$destination_path" "$temp_export" "$temp_merged"
merge_exports_auto "$destination_path" "$temp_export" "$temp_merged"
[[ -s "$temp_merged" ]] || die "Merged export is empty for channel $channel_id."
jq empty "$temp_merged" >/dev/null 2>&1 || die "Merged export is not valid JSON: $temp_merged"
assert_export_channel_identity "$temp_merged" "$channel_id"