copyparty/copyparty/up2k.py

3125 lines
101 KiB
Python

# coding: utf-8
from __future__ import print_function, unicode_literals
import base64
import gzip
import hashlib
import json
import math
import os
import re
import shutil
import signal
import stat
import subprocess as sp
import tempfile
import threading
import time
import traceback
from copy import deepcopy
from queue import Queue
from .__init__ import ANYWIN, PY2, TYPE_CHECKING, WINDOWS
from .authsrv import LEELOO_DALLAS, VFS, AuthSrv
from .bos import bos
from .fsutil import Fstab
from .mtag import MParser, MTag
from .util import (
HAVE_SQLITE3,
SYMTIME,
MTHash,
Pebkac,
ProgressPrinter,
absreal,
atomic_move,
db_ex_chk,
djoin,
fsenc,
min_ex,
quotep,
ren_open,
rmdirs,
rmdirs_up,
s2hms,
s3dec,
s3enc,
sanitize_fn,
statdir,
vjoin,
vsplit,
w8b64dec,
w8b64enc,
)
if HAVE_SQLITE3:
import sqlite3
DB_VER = 5
try:
from typing import Any, Optional, Pattern, Union
except:
pass
if TYPE_CHECKING:
from .svchub import SvcHub
class Dbw(object):
def __init__(self, c: "sqlite3.Cursor", n: int, t: float) -> None:
self.c = c
self.n = n
self.t = t
class Mpqe(object):
"""pending files to tag-scan"""
def __init__(
self,
mtp: dict[str, MParser],
entags: set[str],
w: str,
abspath: str,
oth_tags: dict[str, Any],
):
# mtp empty = mtag
self.mtp = mtp
self.entags = entags
self.w = w
self.abspath = abspath
self.oth_tags = oth_tags
class Up2k(object):
def __init__(self, hub: "SvcHub") -> None:
self.hub = hub
self.asrv: AuthSrv = hub.asrv
self.args = hub.args
self.log_func = hub.log
self.salt = self.args.salt
self.r_hash = re.compile("^[0-9a-zA-Z_-]{44}$")
self.gid = 0
self.stop = False
self.mutex = threading.Lock()
self.blocked: Optional[str] = None
self.pp: Optional[ProgressPrinter] = None
self.rescan_cond = threading.Condition()
self.need_rescan: set[str] = set()
self.db_act = 0.0
self.registry: dict[str, dict[str, dict[str, Any]]] = {}
self.flags: dict[str, dict[str, Any]] = {}
self.droppable: dict[str, list[str]] = {}
self.volstate: dict[str, str] = {}
self.dupesched: dict[str, list[tuple[str, str, float]]] = {}
self.snap_persist_interval = 300 # persist unfinished index every 5 min
self.snap_discard_interval = 21600 # drop unfinished after 6 hours inactivity
self.snap_prev: dict[str, Optional[tuple[int, float]]] = {}
self.mtag: Optional[MTag] = None
self.entags: dict[str, set[str]] = {}
self.mtp_parsers: dict[str, dict[str, MParser]] = {}
self.pending_tags: list[tuple[set[str], str, str, dict[str, Any]]] = []
self.hashq: Queue[tuple[str, str, str, str, float]] = Queue()
self.tagq: Queue[tuple[str, str, str, str, str, float]] = Queue()
self.tag_event = threading.Condition()
self.n_hashq = 0
self.n_tagq = 0
self.mpool_used = False
self.cur: dict[str, "sqlite3.Cursor"] = {}
self.mem_cur = None
self.sqlite_ver = None
self.no_expr_idx = False
self.timeout = int(max(self.args.srch_time, 50) * 1.2) + 1
self.spools: set[tempfile.SpooledTemporaryFile[bytes]] = set()
if HAVE_SQLITE3:
# mojibake detector
self.mem_cur = self._orz(":memory:")
self.mem_cur.execute(r"create table a (b text)")
self.sqlite_ver = tuple([int(x) for x in sqlite3.sqlite_version.split(".")])
if self.sqlite_ver < (3, 9):
self.no_expr_idx = True
else:
t = "could not initialize sqlite3, will use in-memory registry only"
self.log(t, 3)
if ANYWIN:
# usually fails to set lastmod too quickly
self.lastmod_q: list[tuple[str, int, tuple[int, int], bool]] = []
thr = threading.Thread(target=self._lastmodder, name="up2k-lastmod")
thr.daemon = True
thr.start()
self.fstab = Fstab(self.log_func)
if self.args.hash_mt < 2:
self.mth: Optional[MTHash] = None
else:
self.mth = MTHash(self.args.hash_mt)
if self.args.no_fastboot:
self.deferred_init()
def init_vols(self) -> None:
if self.args.no_fastboot:
return
t = threading.Thread(target=self.deferred_init, name="up2k-deferred-init")
t.daemon = True
t.start()
def reload(self) -> None:
self.gid += 1
self.log("reload #{} initiated".format(self.gid))
all_vols = self.asrv.vfs.all_vols
self.rescan(all_vols, list(all_vols.keys()), True)
def deferred_init(self) -> None:
all_vols = self.asrv.vfs.all_vols
have_e2d = self.init_indexes(all_vols, [])
if not self.pp and self.args.exit == "idx":
return self.hub.sigterm()
thr = threading.Thread(target=self._snapshot, name="up2k-snapshot")
thr.daemon = True
thr.start()
if have_e2d:
thr = threading.Thread(target=self._hasher, name="up2k-hasher")
thr.daemon = True
thr.start()
thr = threading.Thread(target=self._sched_rescan, name="up2k-rescan")
thr.daemon = True
thr.start()
if self.mtag:
for n in range(max(1, self.args.mtag_mt)):
name = "tagger-{}".format(n)
thr = threading.Thread(target=self._tagger, name=name)
thr.daemon = True
thr.start()
thr = threading.Thread(target=self._run_all_mtp, name="up2k-mtp-init")
thr.daemon = True
thr.start()
def log(self, msg: str, c: Union[int, str] = 0) -> None:
self.log_func("up2k", msg + "\033[K", c)
def _block(self, why: str) -> None:
self.blocked = why
self.log("uploads temporarily blocked due to " + why, 3)
def _unblock(self) -> None:
if self.blocked is not None:
self.blocked = None
if not self.stop:
self.log("uploads are now possible", 2)
def get_state(self) -> str:
mtpq: Union[int, str] = 0
q = "select count(w) from mt where k = 't:mtp'"
got_lock = False if PY2 else self.mutex.acquire(timeout=0.5)
if got_lock:
for cur in self.cur.values():
try:
mtpq += cur.execute(q).fetchone()[0]
except:
pass
self.mutex.release()
else:
mtpq = "(?)"
ret = {
"volstate": self.volstate,
"scanning": bool(self.pp),
"hashq": self.n_hashq,
"tagq": self.n_tagq,
"mtpq": mtpq,
"dbwt": "{:.2f}".format(
min(1000 * 24 * 60 * 60 - 1, time.time() - self.db_act)
),
}
return json.dumps(ret, indent=4)
def rescan(self, all_vols: dict[str, VFS], scan_vols: list[str], wait: bool) -> str:
if not wait and self.pp:
return "cannot initiate; scan is already in progress"
args = (all_vols, scan_vols)
t = threading.Thread(
target=self.init_indexes,
args=args,
name="up2k-rescan-{}".format(scan_vols[0] if scan_vols else "all"),
)
t.daemon = True
t.start()
return ""
def _sched_rescan(self) -> None:
volage = {}
cooldown = 0.0
timeout = time.time() + 3
while True:
timeout = max(timeout, cooldown)
wait = max(0.1, timeout + 0.1 - time.time())
with self.rescan_cond:
self.rescan_cond.wait(wait)
now = time.time()
if now < cooldown:
continue
if self.pp:
cooldown = now + 1
continue
if self.args.no_lifetime:
timeout = now + 9001
else:
# important; not deferred by db_act
timeout = self._check_lifetimes()
with self.mutex:
for vp, vol in sorted(self.asrv.vfs.all_vols.items()):
maxage = vol.flags.get("scan")
if not maxage:
continue
if vp not in volage:
volage[vp] = now
deadline = volage[vp] + maxage
if deadline <= now:
self.need_rescan.add(vp)
timeout = min(timeout, deadline)
if self.db_act > now - self.args.db_act:
# recent db activity; defer volume rescan
act_timeout = self.db_act + self.args.db_act
if self.need_rescan:
timeout = now
if timeout < act_timeout:
timeout = act_timeout
t = "volume rescan deferred {:.1f} sec, due to database activity"
self.log(t.format(timeout - now))
continue
with self.mutex:
vols = list(sorted(self.need_rescan))
self.need_rescan.clear()
if vols:
cooldown = now + 10
err = self.rescan(self.asrv.vfs.all_vols, vols, False)
if err:
for v in vols:
self.need_rescan.add(v)
continue
for v in vols:
volage[v] = now
def _check_lifetimes(self) -> float:
now = time.time()
timeout = now + 9001
if now: # diff-golf
for vp, vol in sorted(self.asrv.vfs.all_vols.items()):
lifetime = vol.flags.get("lifetime")
if not lifetime:
continue
cur = self.cur.get(vol.realpath)
if not cur:
continue
lifetime = int(lifetime)
timeout = min(timeout, now + lifetime)
nrm = 0
deadline = time.time() - lifetime
q = "select rd, fn from up where at > 0 and at < ? limit 100"
while True:
with self.mutex:
hits = cur.execute(q, (deadline,)).fetchall()
if not hits:
break
for rd, fn in hits:
if rd.startswith("//") or fn.startswith("//"):
rd, fn = s3dec(rd, fn)
fvp = "{}/{}".format(rd, fn).strip("/")
if vp:
fvp = "{}/{}".format(vp, fvp)
self._handle_rm(LEELOO_DALLAS, "", fvp)
nrm += 1
if nrm:
self.log("{} files graduated in {}".format(nrm, vp))
if timeout < 10:
continue
q = "select at from up where at > 0 order by at limit 1"
with self.mutex:
hits = cur.execute(q).fetchone()
if hits:
timeout = min(timeout, now + lifetime - (now - hits[0]))
return timeout
def _vis_job_progress(self, job: dict[str, Any]) -> str:
perc = 100 - (len(job["need"]) * 100.0 / len(job["hash"]))
path = os.path.join(job["ptop"], job["prel"], job["name"])
return "{:5.1f}% {}".format(perc, path)
def _vis_reg_progress(self, reg: dict[str, dict[str, Any]]) -> list[str]:
ret = []
for _, job in reg.items():
if job["need"]:
ret.append(self._vis_job_progress(job))
return ret
def _expr_idx_filter(self, flags: dict[str, Any]) -> tuple[bool, dict[str, Any]]:
if not self.no_expr_idx:
return False, flags
ret = {k: v for k, v in flags.items() if not k.startswith("e2t")}
if ret.keys() == flags.keys():
return False, flags
return True, ret
def init_indexes(self, all_vols: dict[str, VFS], scan_vols: list[str]) -> bool:
gid = self.gid
while self.pp and gid == self.gid:
time.sleep(0.1)
if gid != self.gid:
return False
if gid:
self.log("reload #{} running".format(self.gid))
self.pp = ProgressPrinter()
vols = list(all_vols.values())
t0 = time.time()
have_e2d = False
if self.no_expr_idx:
modified = False
for vol in vols:
m, f = self._expr_idx_filter(vol.flags)
if m:
vol.flags = f
modified = True
if modified:
msg = "disabling -e2t because your sqlite belongs in a museum"
self.log(msg, c=3)
live_vols = []
with self.mutex:
# only need to protect register_vpath but all in one go feels right
for vol in vols:
try:
bos.listdir(vol.realpath)
except:
self.volstate[vol.vpath] = "OFFLINE (cannot access folder)"
self.log("cannot access " + vol.realpath, c=1)
continue
if scan_vols and vol.vpath not in scan_vols:
continue
if not self.register_vpath(vol.realpath, vol.flags):
# self.log("db not enable for {}".format(m, vol.realpath))
continue
live_vols.append(vol)
if vol.vpath not in self.volstate:
self.volstate[vol.vpath] = "OFFLINE (pending initialization)"
vols = live_vols
need_vac = {}
need_mtag = False
for vol in vols:
if "e2t" in vol.flags:
need_mtag = True
if need_mtag and not self.mtag:
self.mtag = MTag(self.log_func, self.args)
if not self.mtag.usable:
self.mtag = None
# e2ds(a) volumes first
if next((zv for zv in vols if "e2ds" in zv.flags), None):
self._block("indexing")
for vol in vols:
if self.stop:
break
en: set[str] = set()
if "mte" in vol.flags:
en = set(vol.flags["mte"].split(","))
self.entags[vol.realpath] = en
if "e2d" in vol.flags:
have_e2d = True
if "e2ds" in vol.flags:
self.volstate[vol.vpath] = "busy (hashing files)"
_, vac = self._build_file_index(vol, list(all_vols.values()))
if vac:
need_vac[vol] = True
if "e2v" in vol.flags:
t = "online (integrity-check pending)"
elif "e2ts" in vol.flags:
t = "online (tags pending)"
else:
t = "online, idle"
self.volstate[vol.vpath] = t
self._unblock()
# file contents verification
for vol in vols:
if self.stop:
break
if "e2v" not in vol.flags:
continue
t = "online (verifying integrity)"
self.volstate[vol.vpath] = t
self.log("{} [{}]".format(t, vol.realpath))
nmod = self._verify_integrity(vol)
if nmod:
self.log("modified {} entries in the db".format(nmod), 3)
need_vac[vol] = True
if "e2ts" in vol.flags:
t = "online (tags pending)"
else:
t = "online, idle"
self.volstate[vol.vpath] = t
# open the rest + do any e2ts(a)
needed_mutagen = False
for vol in vols:
if self.stop:
break
if "e2ts" not in vol.flags:
continue
t = "online (reading tags)"
self.volstate[vol.vpath] = t
self.log("{} [{}]".format(t, vol.realpath))
nadd, nrm, success = self._build_tags_index(vol)
if not success:
needed_mutagen = True
if nadd or nrm:
need_vac[vol] = True
self.volstate[vol.vpath] = "online (mtp soon)"
for vol in need_vac:
reg = self.register_vpath(vol.realpath, vol.flags)
assert reg
cur, _ = reg
with self.mutex:
cur.connection.commit()
cur.execute("vacuum")
if self.stop:
return False
self.pp.end = True
msg = "{} volumes in {:.2f} sec"
self.log(msg.format(len(vols), time.time() - t0))
if needed_mutagen:
msg = "could not read tags because no backends are available (Mutagen or FFprobe)"
self.log(msg, c=1)
thr = None
if self.mtag:
t = "online (running mtp)"
if scan_vols:
thr = threading.Thread(target=self._run_all_mtp, name="up2k-mtp-scan")
else:
self.pp = None
t = "online, idle"
for vol in vols:
self.volstate[vol.vpath] = t
if thr:
thr.daemon = True
thr.start()
return have_e2d
def register_vpath(
self, ptop: str, flags: dict[str, Any]
) -> Optional[tuple["sqlite3.Cursor", str]]:
histpath = self.asrv.vfs.histtab.get(ptop)
if not histpath:
self.log("no histpath for [{}]".format(ptop))
return None
db_path = os.path.join(histpath, "up2k.db")
if ptop in self.registry:
try:
return self.cur[ptop], db_path
except:
return None
_, flags = self._expr_idx_filter(flags)
ft = "\033[0;32m{}{:.0}"
ff = "\033[0;35m{}{:.0}"
fv = "\033[0;36m{}:\033[1;30m{}"
fx = set(("html_head",))
a = [
(ft if v is True else ff if v is False else fv).format(k, str(v))
for k, v in flags.items()
if k not in fx
]
if a:
vpath = "?"
for k, v in self.asrv.vfs.all_vols.items():
if v.realpath == ptop:
vpath = k
if vpath:
vpath += "/"
zs = " ".join(sorted(a))
zs = zs.replace("30mre.compile(", "30m(") # nohash
self.log("/{} {}".format(vpath, zs), "35")
reg = {}
drp = None
path = os.path.join(histpath, "up2k.snap")
if bos.path.exists(path):
with gzip.GzipFile(path, "rb") as f:
j = f.read().decode("utf-8")
reg2 = json.loads(j)
try:
drp = reg2["droppable"]
reg2 = reg2["registry"]
except:
pass
for k, job in reg2.items():
path = os.path.join(job["ptop"], job["prel"], job["name"])
if bos.path.exists(path):
reg[k] = job
job["poke"] = time.time()
job["busy"] = {}
else:
self.log("ign deleted file in snap: [{}]".format(path))
if drp is None:
drp = [k for k, v in reg.items() if not v.get("need", [])]
else:
drp = [x for x in drp if x in reg]
t = "loaded snap {} |{}| ({})".format(path, len(reg.keys()), len(drp or []))
ta = [t] + self._vis_reg_progress(reg)
self.log("\n".join(ta))
self.flags[ptop] = flags
self.registry[ptop] = reg
self.droppable[ptop] = drp or []
self.regdrop(ptop, "")
if not HAVE_SQLITE3 or "e2d" not in flags or "d2d" in flags:
return None
bos.makedirs(histpath)
try:
cur = self._open_db(db_path)
self.cur[ptop] = cur
return cur, db_path
except:
msg = "cannot use database at [{}]:\n{}"
self.log(msg.format(ptop, traceback.format_exc()))
return None
def _build_file_index(self, vol: VFS, all_vols: list[VFS]) -> tuple[bool, bool]:
do_vac = False
top = vol.realpath
rei = vol.flags.get("noidx")
reh = vol.flags.get("nohash")
n4g = bool(vol.flags.get("noforget"))
dev = 0
if vol.flags.get("xdev"):
dev = bos.stat(top).st_dev
with self.mutex:
reg = self.register_vpath(top, vol.flags)
assert reg and self.pp
cur, db_path = reg
db = Dbw(cur, 0, time.time())
self.pp.n = next(db.c.execute("select count(w) from up"))[0]
excl = [
vol.realpath + "/" + d.vpath[len(vol.vpath) :].lstrip("/")
for d in all_vols
if d != vol and (d.vpath.startswith(vol.vpath + "/") or not vol.vpath)
]
excl += [absreal(x) for x in excl]
excl += list(self.asrv.vfs.histtab.values())
if WINDOWS:
excl = [x.replace("/", "\\") for x in excl]
else:
# ~/.wine/dosdevices/z:/ and such
excl += ["/dev", "/proc", "/run", "/sys"]
rtop = absreal(top)
n_add = n_rm = 0
try:
n_add = self._build_dir(
db,
top,
set(excl),
top,
rtop,
rei,
reh,
n4g,
[],
dev,
bool(vol.flags.get("xvol")),
)
if not n4g:
n_rm = self._drop_lost(db.c, top, excl)
except Exception as ex:
t = "failed to index volume [{}]:\n{}"
self.log(t.format(top, min_ex()), c=1)
if db_ex_chk(self.log, ex, db_path):
self.hub.log_stacks()
if db.n:
self.log("commit {} new files".format(db.n))
if self.args.no_dhash:
if db.c.execute("select d from dh").fetchone():
db.c.execute("delete from dh")
self.log("forgetting dhashes in {}".format(top))
elif n_add or n_rm:
self._set_tagscan(db.c, True)
db.c.connection.commit()
return True, bool(n_add or n_rm or do_vac)
def _build_dir(
self,
db: Dbw,
top: str,
excl: set[str],
cdir: str,
rcdir: str,
rei: Optional[Pattern[str]],
reh: Optional[Pattern[str]],
n4g: bool,
seen: list[str],
dev: int,
xvol: bool,
) -> int:
if xvol and not rcdir.startswith(top):
self.log("skip xvol: [{}] -> [{}]".format(cdir, rcdir), 6)
return 0
if rcdir in seen:
t = "bailing from symlink loop,\n prev: {}\n curr: {}\n from: {}"
self.log(t.format(seen[-1], rcdir, cdir), 3)
return 0
ret = 0
seen = seen + [rcdir]
unreg: list[str] = []
files: list[tuple[int, int, str]] = []
assert self.pp and self.mem_cur
self.pp.msg = "a{} {}".format(self.pp.n, cdir)
rd = cdir[len(top) :].strip("/")
if WINDOWS:
rd = rd.replace("\\", "/").strip("/")
g = statdir(self.log_func, not self.args.no_scandir, False, cdir)
gl = sorted(g)
partials = set([x[0] for x in gl if "PARTIAL" in x[0]])
for iname, inf in gl:
if self.stop:
return -1
rp = vjoin(rd, iname)
abspath = os.path.join(cdir, iname)
if rei and rei.search(abspath):
unreg.append(rp)
continue
lmod = int(inf.st_mtime)
sz = inf.st_size
if stat.S_ISDIR(inf.st_mode):
rap = absreal(abspath)
if dev and inf.st_dev != dev:
self.log("skip xdev {}->{}: {}".format(dev, inf.st_dev, abspath), 6)
continue
if abspath in excl or rap in excl:
unreg.append(rp)
continue
if iname == ".th" and bos.path.isdir(os.path.join(abspath, "top")):
# abandoned or foreign, skip
continue
# self.log(" dir: {}".format(abspath))
try:
ret += self._build_dir(
db, top, excl, abspath, rap, rei, reh, n4g, seen, dev, xvol
)
except:
t = "failed to index subdir [{}]:\n{}"
self.log(t.format(abspath, min_ex()), c=1)
elif not stat.S_ISREG(inf.st_mode):
self.log("skip type-{:x} file [{}]".format(inf.st_mode, abspath))
else:
# self.log("file: {}".format(abspath))
if rp.endswith(".PARTIAL") and time.time() - lmod < 60:
# rescan during upload
continue
if not sz and (
"{}.PARTIAL".format(iname) in partials
or ".{}.PARTIAL".format(iname) in partials
):
# placeholder for unfinished upload
continue
files.append((sz, lmod, iname))
# folder of 1000 files = ~1 MiB RAM best-case (tiny filenames);
# free up stuff we're done with before dhashing
gl = []
partials.clear()
if not self.args.no_dhash:
if len(files) < 9000:
zh = hashlib.sha1(str(files).encode("utf-8", "replace"))
else:
zh = hashlib.sha1()
_ = [zh.update(str(x).encode("utf-8", "replace")) for x in files]
dhash = base64.urlsafe_b64encode(zh.digest()[:12]).decode("ascii")
sql = "select d from dh where d = ? and h = ?"
try:
c = db.c.execute(sql, (rd, dhash))
drd = rd
except:
drd = "//" + w8b64enc(rd)
c = db.c.execute(sql, (drd, dhash))
if c.fetchone():
return ret
seen_files = set([x[2] for x in files]) # for dropcheck
for sz, lmod, fn in files:
if self.stop:
return -1
rp = vjoin(rd, fn)
abspath = os.path.join(cdir, fn)
nohash = reh.search(abspath) if reh else False
if fn: # diff-golf
sql = "select w, mt, sz from up where rd = ? and fn = ?"
try:
c = db.c.execute(sql, (rd, fn))
except:
c = db.c.execute(sql, s3enc(self.mem_cur, rd, fn))
in_db = list(c.fetchall())
if in_db:
self.pp.n -= 1
dw, dts, dsz = in_db[0]
if len(in_db) > 1:
t = "WARN: multiple entries: [{}] => [{}] |{}|\n{}"
rep_db = "\n".join([repr(x) for x in in_db])
self.log(t.format(top, rp, len(in_db), rep_db))
dts = -1
if dts == lmod and dsz == sz and (nohash or dw[0] != "#" or not sz):
continue
t = "reindex [{}] => [{}] ({}/{}) ({}/{})".format(
top, rp, dts, lmod, dsz, sz
)
self.log(t)
self.db_rm(db.c, rd, fn)
ret += 1
db.n += 1
in_db = []
self.pp.msg = "a{} {}".format(self.pp.n, abspath)
if nohash or not sz:
wark = up2k_wark_from_metadata(self.salt, sz, lmod, rd, fn)
else:
if sz > 1024 * 1024:
self.log("file: {}".format(abspath))
try:
hashes = self._hashlist_from_file(
abspath, "a{}, ".format(self.pp.n)
)
except Exception as ex:
self.log("hash: {} @ [{}]".format(repr(ex), abspath))
continue
if not hashes:
return -1
wark = up2k_wark_from_hashlist(self.salt, sz, hashes)
self.db_add(db.c, wark, rd, fn, lmod, sz, "", 0)
db.n += 1
ret += 1
td = time.time() - db.t
if db.n >= 4096 or td >= 60:
self.log("commit {} new files".format(db.n))
db.c.connection.commit()
db.n = 0
db.t = time.time()
if not self.args.no_dhash:
db.c.execute("delete from dh where d = ?", (drd,))
db.c.execute("insert into dh values (?,?)", (drd, dhash))
if self.stop:
return -1
# drop shadowed folders
for rd in unreg:
n = 0
q = "select count(w) from up where (rd = ? or rd like ?||'%') and at == 0"
for erd in [rd, "//" + w8b64enc(rd)]:
try:
n = db.c.execute(q, (erd, erd + "/")).fetchone()[0]
break
except:
pass
if n:
t = "forgetting {} shadowed autoindexed files in [{}] > [{}]"
self.log(t.format(n, top, rd))
q = "delete from dh where (d = ? or d like ?||'%')"
db.c.execute(q, (erd, erd + "/"))
q = "delete from up where (rd = ? or rd like ?||'%') and at == 0"
db.c.execute(q, (erd, erd + "/"))
ret += n
if n4g:
return ret
# drop missing files
q = "select fn from up where rd = ?"
try:
c = db.c.execute(q, (rd,))
except:
c = db.c.execute(q, ("//" + w8b64enc(rd),))
hits = [w8b64dec(x[2:]) if x.startswith("//") else x for (x,) in c]
rm_files = [x for x in hits if x not in seen_files]
n_rm = len(rm_files)
for fn in rm_files:
self.db_rm(db.c, rd, fn)
if n_rm:
self.log("forgot {} deleted files".format(n_rm))
return ret
def _drop_lost(self, cur: "sqlite3.Cursor", top: str, excl: list[str]) -> int:
rm = []
n_rm = 0
nchecked = 0
assert self.pp
# `_build_dir` did all unshadowed files; first do dirs:
ndirs = next(cur.execute("select count(distinct rd) from up"))[0]
c = cur.execute("select distinct rd from up order by rd desc")
for (drd,) in c:
nchecked += 1
if drd.startswith("//"):
rd = w8b64dec(drd[2:])
else:
rd = drd
abspath = os.path.join(top, rd)
self.pp.msg = "b{} {}".format(ndirs - nchecked, abspath)
try:
if os.path.isdir(abspath):
continue
except:
pass
rm.append(drd)
if rm:
q = "select count(w) from up where rd = ?"
for rd in rm:
n_rm += next(cur.execute(q, (rd,)))[0]
self.log("forgetting {} deleted dirs, {} files".format(len(rm), n_rm))
for rd in rm:
cur.execute("delete from dh where d = ?", (rd,))
cur.execute("delete from up where rd = ?", (rd,))
# then shadowed deleted files
n_rm2 = 0
c2 = cur.connection.cursor()
excl = [x[len(top) + 1 :] for x in excl if x.startswith(top + "/")]
q = "select rd, fn from up where (rd = ? or rd like ?||'%') order by rd"
for rd in excl:
for erd in [rd, "//" + w8b64enc(rd)]:
try:
c = cur.execute(q, (erd, erd + "/"))
break
except:
pass
crd = "///"
cdc: set[str] = set()
for drd, dfn in c:
rd, fn = s3dec(drd, dfn)
if crd != rd:
crd = rd
try:
cdc = set(os.listdir(os.path.join(top, rd)))
except:
cdc.clear()
if fn not in cdc:
q = "delete from up where rd = ? and fn = ?"
c2.execute(q, (drd, dfn))
n_rm2 += 1
if n_rm2:
self.log("forgetting {} shadowed deleted files".format(n_rm2))
c2.close()
return n_rm + n_rm2
def _verify_integrity(self, vol: VFS) -> int:
"""expensive; blocks database access until finished"""
ptop = vol.realpath
assert self.pp
cur = self.cur[ptop]
rei = vol.flags.get("noidx")
reh = vol.flags.get("nohash")
e2vu = "e2vu" in vol.flags
e2vp = "e2vp" in vol.flags
excl = [
d[len(vol.vpath) :].lstrip("/")
for d in self.asrv.vfs.all_vols
if d != vol.vpath and (d.startswith(vol.vpath + "/") or not vol.vpath)
]
qexa: list[str] = []
pexa: list[str] = []
for vpath in excl:
qexa.append("up.rd != ? and not up.rd like ?||'%'")
pexa.extend([vpath, vpath])
pex: tuple[Any, ...] = tuple(pexa)
qex = " and ".join(qexa)
if qex:
qex = " where " + qex
rewark: list[tuple[str, str, str, int, int]] = []
with self.mutex:
b_left = 0
n_left = 0
q = "select sz from up" + qex
for (sz,) in cur.execute(q, pex):
b_left += sz # sum() can overflow according to docs
n_left += 1
tf, _ = self._spool_warks(cur, "select w, rd, fn from up" + qex, pex, 0)
with gzip.GzipFile(mode="rb", fileobj=tf) as gf:
for zb in gf:
if self.stop:
return -1
w, drd, dfn = zb[:-1].decode("utf-8").split("\x00")
with self.mutex:
q = "select mt, sz from up where w = ? and rd = ? and fn = ?"
try:
mt, sz = cur.execute(q, (w, drd, dfn)).fetchone()
except:
# file moved/deleted since spooling
continue
n_left -= 1
b_left -= sz
if drd.startswith("//") or dfn.startswith("//"):
rd, fn = s3dec(drd, dfn)
else:
rd = drd
fn = dfn
abspath = os.path.join(ptop, rd, fn)
if rei and rei.search(abspath):
continue
nohash = reh.search(abspath) if reh else False
pf = "v{}, {:.0f}+".format(n_left, b_left / 1024 / 1024)
self.pp.msg = pf + abspath
st = bos.stat(abspath)
sz2 = st.st_size
mt2 = int(st.st_mtime)
if nohash or not sz2:
w2 = up2k_wark_from_metadata(self.salt, sz2, mt2, rd, fn)
else:
if sz2 > 1024 * 1024 * 32:
self.log("file: {}".format(abspath))
try:
hashes = self._hashlist_from_file(abspath, pf)
except Exception as ex:
self.log("hash: {} @ [{}]".format(repr(ex), abspath))
continue
if not hashes:
return -1
w2 = up2k_wark_from_hashlist(self.salt, sz2, hashes)
if w == w2:
continue
rewark.append((drd, dfn, w2, sz2, mt2))
t = "hash mismatch: {}\n db: {} ({} byte, {})\n fs: {} ({} byte, {})"
t = t.format(abspath, w, sz, mt, w2, sz2, mt2)
self.log(t, 1)
if e2vp and rewark:
self.hub.retcode = 1
os.kill(os.getpid(), signal.SIGTERM)
raise Exception("{} files have incorrect hashes".format(len(rewark)))
if not e2vu or not rewark:
return 0
with self.mutex:
for rd, fn, w, sz, mt in rewark:
q = "update up set w = ?, sz = ?, mt = ? where rd = ? and fn = ? limit 1"
cur.execute(q, (w, sz, int(mt), rd, fn))
cur.connection.commit()
return len(rewark)
def _build_tags_index(self, vol: VFS) -> tuple[int, int, bool]:
ptop = vol.realpath
with self.mutex:
reg = self.register_vpath(ptop, vol.flags)
assert reg and self.pp
cur = self.cur[ptop]
if not self.args.no_dhash:
with self.mutex:
c = cur.execute("select k from kv where k = 'tagscan'")
if not c.fetchone():
return 0, 0, bool(self.mtag)
ret = self._build_tags_index_2(ptop)
with self.mutex:
self._set_tagscan(cur, False)
cur.connection.commit()
return ret
def _set_tagscan(self, cur: "sqlite3.Cursor", need: bool) -> bool:
if self.args.no_dhash:
return False
c = cur.execute("select k from kv where k = 'tagscan'")
if bool(c.fetchone()) == need:
return False
if need:
cur.execute("insert into kv values ('tagscan',1)")
else:
cur.execute("delete from kv where k = 'tagscan'")
return True
def _build_tags_index_2(self, ptop: str) -> tuple[int, int, bool]:
entags = self.entags[ptop]
flags = self.flags[ptop]
cur = self.cur[ptop]
n_add = 0
n_rm = 0
if "e2tsr" in flags:
with self.mutex:
n_rm = cur.execute("select count(w) from mt").fetchone()[0]
if n_rm:
self.log("discarding {} media tags for a full rescan".format(n_rm))
cur.execute("delete from mt")
# integrity: drop tags for tracks that were deleted
if "e2t" in flags:
with self.mutex:
n = 0
c2 = cur.connection.cursor()
up_q = "select w from up where substr(w,1,16) = ?"
rm_q = "delete from mt where w = ?"
for (w,) in cur.execute("select w from mt"):
if not c2.execute(up_q, (w,)).fetchone():
c2.execute(rm_q, (w[:16],))
n += 1
c2.close()
if n:
t = "discarded media tags for {} deleted files"
self.log(t.format(n))
n_rm += n
with self.mutex:
cur.connection.commit()
# bail if a volflag disables indexing
if "d2t" in flags or "d2d" in flags:
return 0, n_rm, True
# add tags for new files
if "e2ts" in flags:
if not self.mtag:
return 0, n_rm, False
nq = 0
with self.mutex:
tf, nq = self._spool_warks(
cur, "select w from up order by rd, fn", (), 1
)
if not nq:
# self.log("tags ok")
self._unspool(tf)
return 0, n_rm, True
if nq == -1:
return -1, -1, True
with gzip.GzipFile(mode="rb", fileobj=tf) as gf:
n_add = self._e2ts_q(gf, nq, cur, ptop, entags)
self._unspool(tf)
return n_add, n_rm, True
def _e2ts_q(
self,
qf: gzip.GzipFile,
nq: int,
cur: "sqlite3.Cursor",
ptop: str,
entags: set[str],
) -> int:
assert self.pp and self.mtag
flags = self.flags[ptop]
mpool: Optional[Queue[Mpqe]] = None
if self.mtag.prefer_mt and self.args.mtag_mt > 1:
mpool = self._start_mpool()
n_add = 0
n_buf = 0
last_write = time.time()
for bw in qf:
if self.stop:
return -1
w = bw[:-1].decode("ascii")
with self.mutex:
try:
q = "select rd, fn, ip, at from up where substr(w,1,16)=? and +w=?"
rd, fn, ip, at = cur.execute(q, (w[:16], w)).fetchone()
except:
# file modified/deleted since spooling
continue
if rd.startswith("//") or fn.startswith("//"):
rd, fn = s3dec(rd, fn)
if "mtp" in flags:
q = "insert into mt values (?,'t:mtp','a')"
cur.execute(q, (w[:16],))
abspath = os.path.join(ptop, rd, fn)
self.pp.msg = "c{} {}".format(nq, abspath)
if not mpool:
n_tags = self._tagscan_file(cur, entags, w, abspath, ip, at)
else:
if ip:
oth_tags = {"up_ip": ip, "up_at": at}
else:
oth_tags = {}
mpool.put(Mpqe({}, entags, w, abspath, oth_tags))
with self.mutex:
n_tags = len(self._flush_mpool(cur))
n_add += n_tags
n_buf += n_tags
nq -= 1
td = time.time() - last_write
if n_buf >= 4096 or td >= self.timeout / 2:
self.log("commit {} new tags".format(n_buf))
with self.mutex:
cur.connection.commit()
last_write = time.time()
n_buf = 0
if mpool:
self._stop_mpool(mpool)
with self.mutex:
n_add += len(self._flush_mpool(cur))
with self.mutex:
cur.connection.commit()
return n_add
def _spool_warks(
self,
cur: "sqlite3.Cursor",
q: str,
params: tuple[Any, ...],
flt: int,
) -> tuple[tempfile.SpooledTemporaryFile[bytes], int]:
"""mutex me"""
n = 0
c2 = cur.connection.cursor()
tf = tempfile.SpooledTemporaryFile(1024 * 1024 * 8, "w+b", prefix="cpp-tq-")
with gzip.GzipFile(mode="wb", fileobj=tf) as gf:
for row in cur.execute(q, params):
if self.stop:
return tf, -1
if flt == 1:
q = "select w from mt where w = ?"
if c2.execute(q, (row[0][:16],)).fetchone():
continue
gf.write("{}\n".format("\x00".join(row)).encode("utf-8"))
n += 1
c2.close()
tf.seek(0)
self.spools.add(tf)
return tf, n
def _unspool(self, tf: tempfile.SpooledTemporaryFile[bytes]) -> None:
try:
self.spools.remove(tf)
except:
return
try:
tf.close()
except Exception as ex:
self.log("failed to delete spool: {}".format(ex), 3)
def _flush_mpool(self, wcur: "sqlite3.Cursor") -> list[str]:
ret = []
for x in self.pending_tags:
self._tag_file(wcur, *x)
ret.append(x[1])
self.pending_tags = []
return ret
def _run_all_mtp(self) -> None:
gid = self.gid
t0 = time.time()
for ptop, flags in self.flags.items():
if "mtp" in flags:
self._run_one_mtp(ptop, gid)
td = time.time() - t0
msg = "mtp finished in {:.2f} sec ({})"
self.log(msg.format(td, s2hms(td, True)))
self.pp = None
for k in list(self.volstate.keys()):
if "OFFLINE" not in self.volstate[k]:
self.volstate[k] = "online, idle"
if self.args.exit == "idx":
self.hub.sigterm()
def _run_one_mtp(self, ptop: str, gid: int) -> None:
if gid != self.gid:
return
entags = self.entags[ptop]
parsers = {}
for parser in self.flags[ptop]["mtp"]:
try:
parser = MParser(parser)
except:
self.log("invalid argument (could not find program): " + parser, 1)
return
for tag in entags:
if tag in parser.tags:
parsers[parser.tag] = parser
if self.args.mtag_vv:
t = "parsers for {}: \033[0m{}"
self.log(t.format(ptop, list(parsers.keys())), "1;30")
self.mtp_parsers[ptop] = parsers
q = "select count(w) from mt where k = 't:mtp'"
with self.mutex:
cur = self.cur[ptop]
cur = cur.connection.cursor()
wcur = cur.connection.cursor()
n_left = cur.execute(q).fetchone()[0]
mpool = self._start_mpool()
batch_sz = mpool.maxsize * 3
t_prev = time.time()
n_prev = n_left
n_done = 0
to_delete = {}
in_progress = {}
while True:
did_nothing = True
with self.mutex:
if gid != self.gid:
break
q = "select w from mt where k = 't:mtp' limit ?"
zq = cur.execute(q, (batch_sz,)).fetchall()
warks = [str(x[0]) for x in zq]
jobs = []
for w in warks:
if w in in_progress:
continue
q = "select rd, fn, ip, at from up where substr(w,1,16)=? limit 1"
rd, fn, ip, at = cur.execute(q, (w,)).fetchone()
rd, fn = s3dec(rd, fn)
abspath = os.path.join(ptop, rd, fn)
q = "select k from mt where w = ?"
zq = cur.execute(q, (w,)).fetchall()
have: dict[str, Union[str, float]] = {x[0]: 1 for x in zq}
did_nothing = False
parsers = self._get_parsers(ptop, have, abspath)
if not parsers:
to_delete[w] = True
n_left -= 1
continue
if next((x for x in parsers.values() if x.pri), None):
q = "select k, v from mt where w = ?"
zq2 = cur.execute(q, (w,)).fetchall()
oth_tags = {str(k): v for k, v in zq2}
else:
oth_tags = {}
if ip:
oth_tags["up_ip"] = ip
oth_tags["up_at"] = at
jobs.append(Mpqe(parsers, set(), w, abspath, oth_tags))
in_progress[w] = True
with self.mutex:
done = self._flush_mpool(wcur)
for w in done:
to_delete[w] = True
did_nothing = False
in_progress.pop(w)
n_done += 1
for w in to_delete:
q = "delete from mt where w = ? and +k = 't:mtp'"
cur.execute(q, (w,))
to_delete = {}
if not warks:
break
if did_nothing:
with self.tag_event:
self.tag_event.wait(0.2)
if not jobs:
continue
try:
now = time.time()
s = ((now - t_prev) / (n_prev - n_left)) * n_left
h, s = divmod(s, 3600)
m, s = divmod(s, 60)
n_prev = n_left
t_prev = now
except:
h = 1
m = 1
msg = "mtp: {} done, {} left, eta {}h {:02d}m"
with self.mutex:
msg = msg.format(n_done, n_left, int(h), int(m))
self.log(msg, c=6)
for j in jobs:
n_left -= 1
mpool.put(j)
with self.mutex:
cur.connection.commit()
self._stop_mpool(mpool)
with self.mutex:
done = self._flush_mpool(wcur)
for w in done:
q = "delete from mt where w = ? and +k = 't:mtp'"
cur.execute(q, (w,))
cur.connection.commit()
if n_done:
self.log("mtp: scanned {} files in {}".format(n_done, ptop), c=6)
cur.execute("vacuum")
wcur.close()
cur.close()
def _get_parsers(
self, ptop: str, have: dict[str, Union[str, float]], abspath: str
) -> dict[str, MParser]:
try:
all_parsers = self.mtp_parsers[ptop]
except:
if self.args.mtag_vv:
self.log("no mtp defined for {}".format(ptop), "1;30")
return {}
entags = self.entags[ptop]
parsers = {}
for k, v in all_parsers.items():
if "ac" in entags or ".aq" in entags:
if "ac" in have or ".aq" in have:
# is audio, require non-audio?
if v.audio == "n":
if self.args.mtag_vv:
t = "skip mtp {}; is no-audio, have audio"
self.log(t.format(k), "1;30")
continue
# is not audio, require audio?
elif v.audio == "y":
if self.args.mtag_vv:
t = "skip mtp {}; is audio, have no-audio"
self.log(t.format(k), "1;30")
continue
if v.ext:
match = False
for ext in v.ext:
if abspath.lower().endswith("." + ext):
match = True
break
if not match:
if self.args.mtag_vv:
t = "skip mtp {}; need file-ext {}, have {}"
self.log(t.format(k, v.ext, abspath.rsplit(".")[-1]), "1;30")
continue
parsers[k] = v
parsers = {k: v for k, v in parsers.items() if v.force or k not in have}
return parsers
def _start_mpool(self) -> Queue[Mpqe]:
# mp.pool.ThreadPool and concurrent.futures.ThreadPoolExecutor
# both do crazy runahead so lets reinvent another wheel
nw = max(1, self.args.mtag_mt)
assert self.mtag
if not self.mpool_used:
self.mpool_used = True
self.log("using {}x {}".format(nw, self.mtag.backend))
mpool: Queue[Mpqe] = Queue(nw)
for _ in range(nw):
thr = threading.Thread(
target=self._tag_thr, args=(mpool,), name="up2k-mpool"
)
thr.daemon = True
thr.start()
return mpool
def _stop_mpool(self, mpool: Queue[Mpqe]) -> None:
if not mpool:
return
for _ in range(mpool.maxsize):
mpool.put(Mpqe({}, set(), "", "", {}))
mpool.join()
def _tag_thr(self, q: Queue[Mpqe]) -> None:
assert self.mtag
while True:
qe = q.get()
if not qe.w:
q.task_done()
return
try:
if not qe.mtp:
if self.args.mtag_vv:
t = "tag-thr: {}({})"
self.log(t.format(self.mtag.backend, qe.abspath), "1;30")
tags = self.mtag.get(qe.abspath)
else:
if self.args.mtag_vv:
t = "tag-thr: {}({})"
self.log(t.format(list(qe.mtp.keys()), qe.abspath), "1;30")
tags = self.mtag.get_bin(qe.mtp, qe.abspath, qe.oth_tags)
vtags = [
"\033[36m{} \033[33m{}".format(k, v) for k, v in tags.items()
]
if vtags:
self.log("{}\033[0m [{}]".format(" ".join(vtags), qe.abspath))
with self.mutex:
self.pending_tags.append((qe.entags, qe.w, qe.abspath, tags))
except:
ex = traceback.format_exc()
self._log_tag_err(qe.mtp or self.mtag.backend, qe.abspath, ex)
finally:
if qe.mtp:
with self.tag_event:
self.tag_event.notify_all()
q.task_done()
def _log_tag_err(self, parser: Any, abspath: str, ex: Any) -> None:
msg = "{} failed to read tags from {}:\n{}".format(parser, abspath, ex)
self.log(msg.lstrip(), c=1 if "<Signals.SIG" in msg else 3)
def _tagscan_file(
self,
write_cur: "sqlite3.Cursor",
entags: set[str],
wark: str,
abspath: str,
ip: str,
at: float,
) -> int:
"""will mutex"""
assert self.mtag
if not bos.path.isfile(abspath):
return 0
try:
tags = self.mtag.get(abspath)
except Exception as ex:
self._log_tag_err("", abspath, ex)
return 0
if ip:
tags["up_ip"] = ip
tags["up_at"] = at
with self.mutex:
return self._tag_file(write_cur, entags, wark, abspath, tags)
def _tag_file(
self,
write_cur: "sqlite3.Cursor",
entags: set[str],
wark: str,
abspath: str,
tags: dict[str, Union[str, float]],
) -> int:
"""mutex me"""
assert self.mtag
if not bos.path.isfile(abspath):
return 0
if entags:
tags = {k: v for k, v in tags.items() if k in entags}
if not tags:
# indicate scanned without tags
tags = {"x": 0}
if not tags:
return 0
for k in tags.keys():
q = "delete from mt where w = ? and ({})".format(
" or ".join(["+k = ?"] * len(tags))
)
args = [wark[:16]] + list(tags.keys())
write_cur.execute(q, tuple(args))
ret = 0
for k, v in tags.items():
q = "insert into mt values (?,?,?)"
write_cur.execute(q, (wark[:16], k, v))
ret += 1
self._set_tagscan(write_cur, True)
return ret
def _orz(self, db_path: str) -> "sqlite3.Cursor":
return sqlite3.connect(db_path, self.timeout, check_same_thread=False).cursor()
# x.set_trace_callback(trace)
def _open_db(self, db_path: str) -> "sqlite3.Cursor":
existed = bos.path.exists(db_path)
cur = self._orz(db_path)
ver = self._read_ver(cur)
if not existed and ver is None:
return self._create_db(db_path, cur)
if ver == 4:
try:
t = "creating backup before upgrade: "
cur = self._backup_db(db_path, cur, ver, t)
self._upgrade_v4(cur)
ver = 5
except:
self.log("WARN: failed to upgrade from v4", 3)
if ver == DB_VER:
try:
self._add_dhash_tab(cur)
except:
pass
try:
nfiles = next(cur.execute("select count(w) from up"))[0]
self.log("OK: {} |{}|".format(db_path, nfiles))
return cur
except:
self.log("WARN: could not list files; DB corrupt?\n" + min_ex())
if (ver or 0) > DB_VER:
t = "database is version {}, this copyparty only supports versions <= {}"
raise Exception(t.format(ver, DB_VER))
msg = "creating new DB (old is bad); backup: "
if ver:
msg = "creating new DB (too old to upgrade); backup: "
cur = self._backup_db(db_path, cur, ver, msg)
db = cur.connection
cur.close()
db.close()
bos.unlink(db_path)
return self._create_db(db_path, None)
def _backup_db(
self, db_path: str, cur: "sqlite3.Cursor", ver: Optional[int], msg: str
) -> "sqlite3.Cursor":
bak = "{}.bak.{:x}.v{}".format(db_path, int(time.time()), ver)
self.log(msg + bak)
try:
c2 = sqlite3.connect(bak)
with c2:
cur.connection.backup(c2)
return cur
except:
t = "native sqlite3 backup failed; using fallback method:\n"
self.log(t + min_ex())
finally:
c2.close()
db = cur.connection
cur.close()
db.close()
shutil.copy2(fsenc(db_path), fsenc(bak))
return self._orz(db_path)
def _read_ver(self, cur: "sqlite3.Cursor") -> Optional[int]:
for tab in ["ki", "kv"]:
try:
c = cur.execute(r"select v from {} where k = 'sver'".format(tab))
except:
continue
rows = c.fetchall()
if rows:
return int(rows[0][0])
return None
def _create_db(
self, db_path: str, cur: Optional["sqlite3.Cursor"]
) -> "sqlite3.Cursor":
"""
collision in 2^(n/2) files where n = bits (6 bits/ch)
10*6/2 = 2^30 = 1'073'741'824, 24.1mb idx 1<<(3*10)
12*6/2 = 2^36 = 68'719'476'736, 24.8mb idx
16*6/2 = 2^48 = 281'474'976'710'656, 26.1mb idx
"""
if not cur:
cur = self._orz(db_path)
idx = r"create index up_w on up(substr(w,1,16))"
if self.no_expr_idx:
idx = r"create index up_w on up(w)"
for cmd in [
r"create table up (w text, mt int, sz int, rd text, fn text, ip text, at int)",
r"create index up_rd on up(rd)",
r"create index up_fn on up(fn)",
r"create index up_ip on up(ip)",
idx,
r"create table mt (w text, k text, v int)",
r"create index mt_w on mt(w)",
r"create index mt_k on mt(k)",
r"create index mt_v on mt(v)",
r"create table kv (k text, v int)",
r"insert into kv values ('sver', {})".format(DB_VER),
]:
cur.execute(cmd)
self._add_dhash_tab(cur)
self.log("created DB at {}".format(db_path))
return cur
def _upgrade_v4(self, cur: "sqlite3.Cursor") -> None:
for cmd in [
r"alter table up add column ip text",
r"alter table up add column at int",
r"create index up_ip on up(ip)",
r"update kv set v=5 where k='sver'",
]:
cur.execute(cmd)
cur.connection.commit()
def _add_dhash_tab(self, cur: "sqlite3.Cursor") -> None:
# v5 -> v5a
for cmd in [
r"create table dh (d text, h text)",
r"create index dh_d on dh(d)",
r"insert into kv values ('tagscan',1)",
]:
cur.execute(cmd)
cur.connection.commit()
def _job_volchk(self, cj: dict[str, Any]) -> None:
if not self.register_vpath(cj["ptop"], cj["vcfg"]):
if cj["ptop"] not in self.registry:
raise Pebkac(410, "location unavailable")
def handle_json(self, cj: dict[str, Any]) -> dict[str, Any]:
try:
# bit expensive; 3.9=10x 3.11=2x
if self.mutex.acquire(timeout=10):
self._job_volchk(cj)
self.mutex.release()
else:
t = "cannot receive uploads right now;\nserver busy with {}.\nPlease wait; the client will retry..."
raise Pebkac(503, t.format(self.blocked or "[unknown]"))
except TypeError:
# py2
with self.mutex:
self._job_volchk(cj)
cj["name"] = sanitize_fn(cj["name"], "", [".prologue.html", ".epilogue.html"])
cj["poke"] = now = self.db_act = time.time()
wark = self._get_wark(cj)
job = None
pdir = djoin(cj["ptop"], cj["prel"])
try:
dev = bos.stat(pdir).st_dev
except:
dev = 0
# check if filesystem supports sparse files;
# refuse out-of-order / multithreaded uploading if sprs False
sprs = self.fstab.get(pdir) != "ng"
with self.mutex:
cur = self.cur.get(cj["ptop"])
reg = self.registry[cj["ptop"]]
vfs = self.asrv.vfs.all_vols[cj["vtop"]]
n4g = vfs.flags.get("noforget")
if cur:
if self.no_expr_idx:
q = r"select * from up where w = ?"
argv = [wark]
else:
q = r"select * from up where substr(w,1,16) = ? and w = ?"
argv = [wark[:16], wark]
alts: list[tuple[int, int, dict[str, Any]]] = []
cur = cur.execute(q, tuple(argv))
for _, dtime, dsize, dp_dir, dp_fn, ip, at in cur:
if dp_dir.startswith("//") or dp_fn.startswith("//"):
dp_dir, dp_fn = s3dec(dp_dir, dp_fn)
dp_abs = "/".join([cj["ptop"], dp_dir, dp_fn])
try:
st = bos.stat(dp_abs)
if stat.S_ISLNK(st.st_mode):
# broken symlink
raise Exception()
except:
if n4g:
st = os.stat_result((0, -1, -1, 0, 0, 0, 0, 0, 0, 0))
else:
continue
j = {
"name": dp_fn,
"prel": dp_dir,
"vtop": cj["vtop"],
"ptop": cj["ptop"],
"sprs": sprs, # dontcare; finished anyways
"size": dsize,
"lmod": dtime,
"addr": ip,
"at": at,
"hash": [],
"need": [],
"busy": {},
}
score = (
(3 if st.st_dev == dev else 0)
+ (2 if dp_dir == cj["prel"] else 0)
+ (1 if dp_fn == cj["name"] else 0)
)
alts.append((score, -len(alts), j))
job = sorted(alts, reverse=True)[0][2] if alts else None
if job and wark in reg:
# self.log("pop " + wark + " " + job["name"] + " handle_json db", 4)
del reg[wark]
if job or wark in reg:
job = job or reg[wark]
if job["prel"] == cj["prel"] and job["name"] == cj["name"]:
# ensure the files haven't been deleted manually
names = [job[x] for x in ["name", "tnam"] if x in job]
for fn in names:
path = os.path.join(job["ptop"], job["prel"], fn)
try:
if bos.path.getsize(path) > 0:
# upload completed or both present
break
except:
# missing; restart
if not self.args.nw and not n4g:
job = None
break
else:
# file contents match, but not the path
src = os.path.join(job["ptop"], job["prel"], job["name"])
dst = os.path.join(cj["ptop"], cj["prel"], cj["name"])
vsrc = os.path.join(job["vtop"], job["prel"], job["name"])
vsrc = vsrc.replace("\\", "/") # just for prints anyways
if job["need"]:
self.log("unfinished:\n {0}\n {1}".format(src, dst))
err = "partial upload exists at a different location; please resume uploading here instead:\n"
err += "/" + quotep(vsrc) + " "
# registry is size-constrained + can only contain one unique wark;
# let want_recheck trigger symlink (if still in reg) or reupload
if cur:
dupe = (cj["prel"], cj["name"], cj["lmod"])
try:
self.dupesched[src].append(dupe)
except:
self.dupesched[src] = [dupe]
raise Pebkac(400, err)
elif "nodupe" in self.flags[job["ptop"]]:
self.log("dupe-reject:\n {0}\n {1}".format(src, dst))
err = "upload rejected, file already exists:\n"
err += "/" + quotep(vsrc) + " "
raise Pebkac(400, err)
else:
# symlink to the client-provided name,
# returning the previous upload info
job = deepcopy(job)
for k in ["ptop", "vtop", "prel"]:
job[k] = cj[k]
pdir = djoin(cj["ptop"], cj["prel"])
job["name"] = self._untaken(pdir, cj["name"], now, cj["addr"])
dst = os.path.join(job["ptop"], job["prel"], job["name"])
if not self.args.nw:
bos.unlink(dst) # TODO ed pls
try:
self._symlink(src, dst, lmod=cj["lmod"])
except:
if not n4g:
raise
if cur:
a = [cj[x] for x in "prel name lmod size addr".split()]
a += [cj.get("at") or time.time()]
self.db_add(cur, wark, *a)
cur.connection.commit()
if not job:
if vfs.lim:
ap1 = djoin(cj["ptop"], cj["prel"])
ap2, cj["prel"] = vfs.lim.all(
cj["addr"], cj["prel"], cj["size"], ap1, reg
)
bos.makedirs(ap2)
vfs.lim.nup(cj["addr"])
vfs.lim.bup(cj["addr"], cj["size"])
job = {
"wark": wark,
"t0": now,
"sprs": sprs,
"hash": deepcopy(cj["hash"]),
"need": [],
"busy": {},
}
# client-provided, sanitized by _get_wark: name, size, lmod
for k in [
"addr",
"vtop",
"ptop",
"prel",
"name",
"size",
"lmod",
"poke",
]:
job[k] = cj[k]
# one chunk may occur multiple times in a file;
# filter to unique values for the list of missing chunks
# (preserve order to reduce disk thrashing)
lut = {}
for k in cj["hash"]:
if k not in lut:
job["need"].append(k)
lut[k] = 1
try:
self._new_upload(job)
except:
self.registry[job["ptop"]].pop(job["wark"], None)
raise
purl = "{}/{}".format(job["vtop"], job["prel"]).strip("/")
purl = "/{}/".format(purl) if purl else "/"
return {
"name": job["name"],
"purl": purl,
"size": job["size"],
"lmod": job["lmod"],
"sprs": job.get("sprs", sprs),
"hash": job["need"],
"wark": wark,
}
def _untaken(self, fdir: str, fname: str, ts: float, ip: str) -> str:
if self.args.nw:
return fname
if self.args.plain_ip:
dip = ip.replace(":", ".")
else:
dip = self.hub.iphash.s(ip)
suffix = "-{:.6f}-{}".format(ts, dip)
with ren_open(fname, "wb", fdir=fdir, suffix=suffix) as zfw:
return zfw["orz"][1]
def _symlink(
self, src: str, dst: str, verbose: bool = True, lmod: float = 0
) -> None:
if verbose:
self.log("linking dupe:\n {0}\n {1}".format(src, dst))
if self.args.nw:
return
linked = False
try:
if self.args.no_dedup:
raise Exception("disabled in config")
lsrc = src
ldst = dst
fs1 = bos.stat(os.path.dirname(src)).st_dev
fs2 = bos.stat(os.path.dirname(dst)).st_dev
if fs1 == 0 or fs2 == 0:
# py2 on winxp or other unsupported combination
raise OSError(38, "filesystem does not have st_dev")
elif fs1 == fs2:
# same fs; make symlink as relative as possible
v = []
for p in [src, dst]:
if WINDOWS:
p = p.replace("\\", "/")
v.append(p.split("/"))
nsrc, ndst = v
nc = 0
for a, b in zip(nsrc, ndst):
if a != b:
break
nc += 1
if nc > 1:
zsl = nsrc[nc:]
hops = len(ndst[nc:]) - 1
lsrc = "../" * hops + "/".join(zsl)
try:
if self.args.hardlink:
os.link(fsenc(src), fsenc(dst))
linked = True
except Exception as ex:
self.log("cannot hardlink: " + repr(ex))
if self.args.never_symlink:
raise Exception("symlink-fallback disabled in cfg")
if not linked:
os.symlink(fsenc(lsrc), fsenc(ldst))
linked = True
except Exception as ex:
self.log("cannot link; creating copy: " + repr(ex))
shutil.copy2(fsenc(src), fsenc(dst))
if lmod and (not linked or SYMTIME):
times = (int(time.time()), int(lmod))
if ANYWIN:
self.lastmod_q.append((dst, 0, times, False))
else:
bos.utime(dst, times, False)
def handle_chunk(
self, ptop: str, wark: str, chash: str
) -> tuple[int, list[int], str, float, bool]:
with self.mutex:
self.db_act = time.time()
job = self.registry[ptop].get(wark)
if not job:
known = " ".join([x for x in self.registry[ptop].keys()])
self.log("unknown wark [{}], known: {}".format(wark, known))
raise Pebkac(400, "unknown wark")
if chash not in job["need"]:
msg = "chash = {} , need:\n".format(chash)
msg += "\n".join(job["need"])
self.log(msg)
raise Pebkac(400, "already got that but thanks??")
nchunk = [n for n, v in enumerate(job["hash"]) if v == chash]
if not nchunk:
raise Pebkac(400, "unknown chunk")
if chash in job["busy"]:
nh = len(job["hash"])
idx = job["hash"].index(chash)
t = "that chunk is already being written to:\n {}\n {} {}/{}\n {}"
raise Pebkac(400, t.format(wark, chash, idx, nh, job["name"]))
path = os.path.join(job["ptop"], job["prel"], job["tnam"])
chunksize = up2k_chunksize(job["size"])
ofs = [chunksize * x for x in nchunk]
if not job["sprs"]:
cur_sz = bos.path.getsize(path)
if ofs[0] > cur_sz:
t = "please upload sequentially using one thread;\nserver filesystem does not support sparse files.\n file: {}\n chunk: {}\n cofs: {}\n flen: {}"
t = t.format(job["name"], nchunk[0], ofs[0], cur_sz)
raise Pebkac(400, t)
job["busy"][chash] = 1
job["poke"] = time.time()
return chunksize, ofs, path, job["lmod"], job["sprs"]
def release_chunk(self, ptop: str, wark: str, chash: str) -> bool:
with self.mutex:
job = self.registry[ptop].get(wark)
if job:
job["busy"].pop(chash, None)
return True
def confirm_chunk(self, ptop: str, wark: str, chash: str) -> tuple[int, str]:
with self.mutex:
self.db_act = time.time()
try:
job = self.registry[ptop][wark]
pdir = os.path.join(job["ptop"], job["prel"])
src = os.path.join(pdir, job["tnam"])
dst = os.path.join(pdir, job["name"])
except Exception as ex:
return "confirm_chunk, wark, " + repr(ex) # type: ignore
job["busy"].pop(chash, None)
try:
job["need"].remove(chash)
except Exception as ex:
return "confirm_chunk, chash, " + repr(ex) # type: ignore
ret = len(job["need"])
if ret > 0:
return ret, src
if self.args.nw:
self.regdrop(ptop, wark)
return ret, dst
# windows cant rename open files
if not ANYWIN or src == dst:
self._finish_upload(ptop, wark)
return ret, dst
def finish_upload(self, ptop: str, wark: str) -> None:
with self.mutex:
self._finish_upload(ptop, wark)
def _finish_upload(self, ptop: str, wark: str) -> None:
self.db_act = time.time()
try:
job = self.registry[ptop][wark]
pdir = os.path.join(job["ptop"], job["prel"])
src = os.path.join(pdir, job["tnam"])
dst = os.path.join(pdir, job["name"])
except Exception as ex:
raise Pebkac(500, "finish_upload, wark, " + repr(ex))
# self.log("--- " + wark + " " + dst + " finish_upload atomic " + dst, 4)
atomic_move(src, dst)
times = (int(time.time()), int(job["lmod"]))
if ANYWIN:
z1 = (dst, job["size"], times, job["sprs"])
self.lastmod_q.append(z1)
elif not job["hash"]:
try:
bos.utime(dst, times)
except:
pass
z2 = [job[x] for x in "ptop wark prel name lmod size addr".split()]
z2 += [job.get("at") or time.time()]
if self.idx_wark(*z2):
del self.registry[ptop][wark]
else:
self.regdrop(ptop, wark)
dupes = self.dupesched.pop(dst, [])
if not dupes:
return
cur = self.cur.get(ptop)
for rd, fn, lmod in dupes:
d2 = os.path.join(ptop, rd, fn)
if os.path.exists(d2):
continue
self._symlink(dst, d2, lmod=lmod)
if cur:
self.db_rm(cur, rd, fn)
self.db_add(cur, wark, rd, fn, *z2[-4:])
if cur:
cur.connection.commit()
def regdrop(self, ptop: str, wark: str) -> None:
olds = self.droppable[ptop]
if wark:
olds.append(wark)
if len(olds) <= self.args.reg_cap:
return
n = len(olds) - int(self.args.reg_cap / 2)
t = "up2k-registry [{}] has {} droppables; discarding {}"
self.log(t.format(ptop, len(olds), n))
for k in olds[:n]:
self.registry[ptop].pop(k, None)
self.droppable[ptop] = olds[n:]
def idx_wark(
self,
ptop: str,
wark: str,
rd: str,
fn: str,
lmod: float,
sz: int,
ip: str,
at: float,
) -> bool:
cur = self.cur.get(ptop)
if not cur:
return False
try:
self.db_rm(cur, rd, fn)
self.db_add(cur, wark, rd, fn, lmod, sz, ip, at)
cur.connection.commit()
except Exception as ex:
x = self.register_vpath(ptop, {})
assert x
db_ex_chk(self.log, ex, x[1])
raise
if "e2t" in self.flags[ptop]:
self.tagq.put((ptop, wark, rd, fn, ip, at))
self.n_tagq += 1
return True
def db_rm(self, db: "sqlite3.Cursor", rd: str, fn: str) -> None:
sql = "delete from up where rd = ? and fn = ?"
try:
db.execute(sql, (rd, fn))
except:
assert self.mem_cur
db.execute(sql, s3enc(self.mem_cur, rd, fn))
def db_add(
self,
db: "sqlite3.Cursor",
wark: str,
rd: str,
fn: str,
ts: float,
sz: int,
ip: str,
at: float,
) -> None:
sql = "insert into up values (?,?,?,?,?,?,?)"
v = (wark, int(ts), sz, rd, fn, ip or "", int(at or 0))
try:
db.execute(sql, v)
except:
assert self.mem_cur
rd, fn = s3enc(self.mem_cur, rd, fn)
v = (wark, int(ts), sz, rd, fn, ip or "", int(at or 0))
db.execute(sql, v)
def handle_rm(self, uname: str, ip: str, vpaths: list[str]) -> str:
n_files = 0
ok = {}
ng = {}
for vp in vpaths:
a, b, c = self._handle_rm(uname, ip, vp)
n_files += a
for k in b:
ok[k] = 1
for k in c:
ng[k] = 1
ng = {k: 1 for k in ng if k not in ok}
iok = len(ok)
ing = len(ng)
return "deleted {} files (and {}/{} folders)".format(n_files, iok, iok + ing)
def _handle_rm(
self, uname: str, ip: str, vpath: str
) -> tuple[int, list[str], list[str]]:
self.db_act = time.time()
try:
permsets = [[True, False, False, True]]
vn, rem = self.asrv.vfs.get(vpath, uname, *permsets[0])
vn, rem = vn.get_dbv(rem)
unpost = False
except:
# unpost with missing permissions? verify with db
if not self.args.unpost:
raise Pebkac(400, "the unpost feature is disabled in server config")
unpost = True
permsets = [[False, True]]
vn, rem = self.asrv.vfs.get(vpath, uname, *permsets[0])
vn, rem = vn.get_dbv(rem)
_, _, _, _, dip, dat = self._find_from_vpath(vn.realpath, rem)
t = "you cannot delete this: "
if not dip:
t += "file not found"
elif dip != ip:
t += "not uploaded by (You)"
elif dat < time.time() - self.args.unpost:
t += "uploaded too long ago"
else:
t = ""
if t:
raise Pebkac(400, t)
ptop = vn.realpath
atop = vn.canonical(rem, False)
adir, fn = os.path.split(atop)
try:
st = bos.lstat(atop)
is_dir = stat.S_ISDIR(st.st_mode)
except:
raise Pebkac(400, "file not found on disk (already deleted?)")
scandir = not self.args.no_scandir
if is_dir:
g = vn.walk("", rem, [], uname, permsets, True, scandir, True)
if unpost:
raise Pebkac(400, "cannot unpost folders")
elif stat.S_ISLNK(st.st_mode) or stat.S_ISREG(st.st_mode):
dbv, vrem = self.asrv.vfs.get(vpath, uname, *permsets[0])
dbv, vrem = dbv.get_dbv(vrem)
voldir = vsplit(vrem)[0]
vpath_dir = vsplit(vpath)[0]
g = [(dbv, voldir, vpath_dir, adir, [(fn, 0)], [], {})] # type: ignore
else:
self.log("rm: skip type-{:x} file [{}]".format(st.st_mode, atop))
return 0, [], []
n_files = 0
for dbv, vrem, _, adir, files, rd, vd in g:
for fn in [x[0] for x in files]:
n_files += 1
abspath = os.path.join(adir, fn)
volpath = "{}/{}".format(vrem, fn).strip("/")
vpath = "{}/{}".format(dbv.vpath, volpath).strip("/")
self.log("rm {}\n {}".format(vpath, abspath))
_ = dbv.get(volpath, uname, *permsets[0])
with self.mutex:
cur = None
try:
ptop = dbv.realpath
cur, wark, _, _, _, _ = self._find_from_vpath(ptop, volpath)
self._forget_file(ptop, volpath, cur, wark, True)
finally:
if cur:
cur.connection.commit()
bos.unlink(abspath)
ok: list[str] = []
ng: list[str] = []
if is_dir:
ok, ng = rmdirs(self.log_func, scandir, True, atop, 1)
ok2, ng2 = rmdirs_up(os.path.dirname(atop))
return n_files, ok + ok2, ng + ng2
def handle_mv(self, uname: str, svp: str, dvp: str) -> str:
self.db_act = time.time()
svn, srem = self.asrv.vfs.get(svp, uname, True, False, True)
svn, srem = svn.get_dbv(srem)
sabs = svn.canonical(srem, False)
if not srem:
raise Pebkac(400, "mv: cannot move a mountpoint")
st = bos.lstat(sabs)
if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
with self.mutex:
return self._mv_file(uname, svp, dvp)
jail = svn.get_dbv(srem)[0]
permsets = [[True, False, True]]
scandir = not self.args.no_scandir
# following symlinks is too scary
g = svn.walk("", srem, [], uname, permsets, True, scandir, True)
for dbv, vrem, _, atop, files, rd, vd in g:
if dbv != jail:
# fail early (prevent partial moves)
raise Pebkac(400, "mv: source folder contains other volumes")
g = svn.walk("", srem, [], uname, permsets, True, scandir, True)
for dbv, vrem, _, atop, files, rd, vd in g:
if dbv != jail:
# the actual check (avoid toctou)
raise Pebkac(400, "mv: source folder contains other volumes")
for fn in files:
svpf = "/".join(x for x in [dbv.vpath, vrem, fn[0]] if x)
if not svpf.startswith(svp + "/"): # assert
raise Pebkac(500, "mv: bug at {}, top {}".format(svpf, svp))
dvpf = dvp + svpf[len(svp) :]
with self.mutex:
self._mv_file(uname, svpf, dvpf)
rmdirs(self.log_func, scandir, True, sabs, 1)
rmdirs_up(os.path.dirname(sabs))
return "k"
def _mv_file(self, uname: str, svp: str, dvp: str) -> str:
svn, srem = self.asrv.vfs.get(svp, uname, True, False, True)
svn, srem = svn.get_dbv(srem)
dvn, drem = self.asrv.vfs.get(dvp, uname, False, True)
dvn, drem = dvn.get_dbv(drem)
sabs = svn.canonical(srem, False)
dabs = dvn.canonical(drem)
drd, dfn = vsplit(drem)
n1 = svp.split("/")[-1]
n2 = dvp.split("/")[-1]
if n1.startswith(".") or n2.startswith("."):
if self.args.no_dot_mv:
raise Pebkac(400, "moving dotfiles is disabled in server config")
elif self.args.no_dot_ren and n1 != n2:
raise Pebkac(400, "renaming dotfiles is disabled in server config")
if bos.path.exists(dabs):
raise Pebkac(400, "mv2: target file exists")
bos.makedirs(os.path.dirname(dabs))
if bos.path.islink(sabs):
dlabs = absreal(sabs)
t = "moving symlink from [{}] to [{}], target [{}]"
self.log(t.format(sabs, dabs, dlabs))
mt = bos.path.getmtime(sabs, False)
bos.unlink(sabs)
self._symlink(dlabs, dabs, False, lmod=mt)
# folders are too scary, schedule rescan of both vols
self.need_rescan.add(svn.vpath)
self.need_rescan.add(dvn.vpath)
with self.rescan_cond:
self.rescan_cond.notify_all()
return "k"
c1, w, ftime_, fsize_, ip, at = self._find_from_vpath(svn.realpath, srem)
c2 = self.cur.get(dvn.realpath)
if ftime_ is None:
st = bos.stat(sabs)
ftime = st.st_mtime
fsize = st.st_size
else:
ftime = ftime_
fsize = fsize_ or 0
if w:
assert c1
if c2 and c2 != c1:
self._copy_tags(c1, c2, w)
self._forget_file(svn.realpath, srem, c1, w, c1 != c2)
self._relink(w, svn.realpath, srem, dabs)
c1.connection.commit()
if c2:
self.db_add(c2, w, drd, dfn, ftime, fsize, ip or "", at or 0)
c2.connection.commit()
else:
self.log("not found in src db: [{}]".format(svp))
try:
atomic_move(sabs, dabs)
except OSError as ex:
if ex.errno != 18:
raise
self.log("cross-device move:\n {}\n {}".format(sabs, dabs))
b1, b2 = fsenc(sabs), fsenc(dabs)
try:
shutil.copy2(b1, b2)
except:
os.unlink(b2)
raise
os.unlink(b1)
return "k"
def _copy_tags(
self, csrc: "sqlite3.Cursor", cdst: "sqlite3.Cursor", wark: str
) -> None:
"""copy all tags for wark from src-db to dst-db"""
w = wark[:16]
if cdst.execute("select * from mt where w=? limit 1", (w,)).fetchone():
return # existing tags in dest db
for _, k, v in csrc.execute("select * from mt where w=?", (w,)):
cdst.execute("insert into mt values(?,?,?)", (w, k, v))
def _find_from_vpath(
self, ptop: str, vrem: str
) -> tuple[
Optional["sqlite3.Cursor"],
Optional[str],
Optional[int],
Optional[int],
Optional[str],
Optional[int],
]:
cur = self.cur.get(ptop)
if not cur:
return None, None, None, None, None, None
rd, fn = vsplit(vrem)
q = "select w, mt, sz, ip, at from up where rd=? and fn=? limit 1"
try:
c = cur.execute(q, (rd, fn))
except:
assert self.mem_cur
c = cur.execute(q, s3enc(self.mem_cur, rd, fn))
hit = c.fetchone()
if hit:
wark, ftime, fsize, ip, at = hit
return cur, wark, ftime, fsize, ip, at
return cur, None, None, None, None, None
def _forget_file(
self,
ptop: str,
vrem: str,
cur: Optional["sqlite3.Cursor"],
wark: Optional[str],
drop_tags: bool,
) -> None:
"""forgets file in db, fixes symlinks, does not delete"""
srd, sfn = vsplit(vrem)
self.log("forgetting {}".format(vrem))
if wark and cur:
self.log("found {} in db".format(wark))
if drop_tags:
if self._relink(wark, ptop, vrem, ""):
drop_tags = False
if drop_tags:
q = "delete from mt where w=?"
cur.execute(q, (wark[:16],))
self.db_rm(cur, srd, sfn)
reg = self.registry.get(ptop)
if reg:
vdir = vsplit(vrem)[0]
wark = wark or next(
(
x
for x, y in reg.items()
if sfn in [y["name"], y.get("tnam")] and y["prel"] == vdir
),
"",
)
job = reg.get(wark) if wark else None
if job:
t = "forgetting partial upload {} ({})"
p = self._vis_job_progress(job)
self.log(t.format(wark, p))
assert wark
del reg[wark]
def _relink(self, wark: str, sptop: str, srem: str, dabs: str) -> int:
"""
update symlinks from file at svn/srem to dabs (rename),
or to first remaining full if no dabs (delete)
"""
dupes = []
sabs = os.path.join(sptop, srem)
q = "select rd, fn from up where substr(w,1,16)=? and w=?"
for ptop, cur in self.cur.items():
for rd, fn in cur.execute(q, (wark[:16], wark)):
if rd.startswith("//") or fn.startswith("//"):
rd, fn = s3dec(rd, fn)
dvrem = "/".join([rd, fn]).strip("/")
if ptop != sptop or srem != dvrem:
dupes.append([ptop, dvrem])
self.log("found {} dupe: [{}] {}".format(wark, ptop, dvrem))
if not dupes:
return 0
full: dict[str, tuple[str, str]] = {}
links: dict[str, tuple[str, str]] = {}
for ptop, vp in dupes:
ap = os.path.join(ptop, vp)
try:
d = links if bos.path.islink(ap) else full
d[ap] = (ptop, vp)
except:
self.log("relink: not found: [{}]".format(ap))
if not dabs and not full and links:
# deleting final remaining full copy; swap it with a symlink
slabs = list(sorted(links.keys()))[0]
ptop, rem = links.pop(slabs)
self.log("linkswap [{}] and [{}]".format(sabs, slabs))
mt = bos.path.getmtime(slabs, False)
bos.unlink(slabs)
bos.rename(sabs, slabs)
bos.utime(slabs, (int(time.time()), int(mt)), False)
self._symlink(slabs, sabs, False)
full[slabs] = (ptop, rem)
sabs = slabs
if not dabs:
dabs = list(sorted(full.keys()))[0]
for alink in links:
lmod = None
try:
if alink != sabs and absreal(alink) != sabs:
continue
self.log("relinking [{}] to [{}]".format(alink, dabs))
lmod = bos.path.getmtime(alink, False)
bos.unlink(alink)
except:
pass
self._symlink(dabs, alink, False, lmod=lmod or 0)
return len(full) + len(links)
def _get_wark(self, cj: dict[str, Any]) -> str:
if len(cj["name"]) > 1024 or len(cj["hash"]) > 512 * 1024: # 16TiB
raise Pebkac(400, "name or numchunks not according to spec")
for k in cj["hash"]:
if not self.r_hash.match(k):
raise Pebkac(
400, "at least one hash is not according to spec: {}".format(k)
)
# try to use client-provided timestamp, don't care if it fails somehow
try:
cj["lmod"] = int(cj["lmod"])
except:
cj["lmod"] = int(time.time())
if cj["hash"]:
wark = up2k_wark_from_hashlist(self.salt, cj["size"], cj["hash"])
else:
wark = up2k_wark_from_metadata(
self.salt, cj["size"], cj["lmod"], cj["prel"], cj["name"]
)
return wark
def _hashlist_from_file(self, path: str, prefix: str = "") -> list[str]:
fsz = bos.path.getsize(path)
csz = up2k_chunksize(fsz)
ret = []
suffix = " MB, {}".format(path)
with open(fsenc(path), "rb", 512 * 1024) as f:
if self.mth and fsz >= 1024 * 512:
tlt = self.mth.hash(f, fsz, csz, self.pp, prefix, suffix)
ret = [x[0] for x in tlt]
fsz = 0
while fsz > 0:
# same as `hash_at` except for `imutex` / bufsz
if self.stop:
return []
if self.pp:
mb = int(fsz / 1024 / 1024)
self.pp.msg = prefix + str(mb) + suffix
hashobj = hashlib.sha512()
rem = min(csz, fsz)
fsz -= rem
while rem > 0:
buf = f.read(min(rem, 64 * 1024))
if not buf:
raise Exception("EOF at " + str(f.tell()))
hashobj.update(buf)
rem -= len(buf)
digest = hashobj.digest()[:33]
digest = base64.urlsafe_b64encode(digest)
ret.append(digest.decode("utf-8"))
return ret
def _new_upload(self, job: dict[str, Any]) -> None:
pdir = djoin(job["ptop"], job["prel"])
if not job["size"] and bos.path.isfile(os.path.join(pdir, job["name"])):
return
self.registry[job["ptop"]][job["wark"]] = job
job["name"] = self._untaken(pdir, job["name"], job["t0"], job["addr"])
# if len(job["name"].split(".")) > 8:
# raise Exception("aaa")
tnam = job["name"] + ".PARTIAL"
if self.args.dotpart:
tnam = "." + tnam
if self.args.nw:
job["tnam"] = tnam
if not job["hash"]:
del self.registry[job["ptop"]][job["wark"]]
return
if self.args.plain_ip:
dip = job["addr"].replace(":", ".")
else:
dip = self.hub.iphash.s(job["addr"])
suffix = "-{:.6f}-{}".format(job["t0"], dip)
with ren_open(tnam, "wb", fdir=pdir, suffix=suffix) as zfw:
f, job["tnam"] = zfw["orz"]
abspath = os.path.join(pdir, job["tnam"])
sprs = job["sprs"]
sz = job["size"]
relabel = False
if (
ANYWIN
and sprs
and self.args.sparse
and self.args.sparse * 1024 * 1024 <= sz
):
try:
sp.check_call(["fsutil", "sparse", "setflag", abspath])
except:
self.log("could not sparse [{}]".format(abspath), 3)
relabel = True
sprs = False
if not ANYWIN and sprs and sz > 1024 * 1024:
fs = self.fstab.get(pdir)
if fs != "ok":
relabel = True
f.seek(1024 * 1024 - 1)
f.write(b"e")
f.flush()
try:
nblk = bos.stat(abspath).st_blocks
sprs = nblk < 2048
except:
sprs = False
if relabel:
t = "sparse files {} on {} filesystem at {}"
nv = "ok" if sprs else "ng"
self.log(t.format(nv, self.fstab.get(pdir), pdir))
self.fstab.relabel(pdir, nv)
job["sprs"] = sprs
if job["hash"] and sprs:
f.seek(sz - 1)
f.write(b"e")
if not job["hash"]:
self._finish_upload(job["ptop"], job["wark"])
def _lastmodder(self) -> None:
while True:
ready = self.lastmod_q
self.lastmod_q = []
# self.log("lmod: got {}".format(len(ready)))
time.sleep(5)
for path, sz, times, sparse in ready:
self.log("lmod: setting times {} on {}".format(times, path))
try:
bos.utime(path, times, False)
except:
t = "lmod: failed to utime ({}, {}):\n{}"
self.log(t.format(path, times, min_ex()))
if sparse and self.args.sparse and self.args.sparse * 1024 * 1024 <= sz:
try:
sp.check_call(["fsutil", "sparse", "setflag", path, "0"])
except:
self.log("could not unsparse [{}]".format(path), 3)
def _snapshot(self) -> None:
slp = self.snap_persist_interval
while True:
time.sleep(slp)
if self.pp:
slp = 5
else:
slp = self.snap_persist_interval
self.do_snapshot()
def do_snapshot(self) -> None:
with self.mutex:
for k, reg in self.registry.items():
self._snap_reg(k, reg)
def _snap_reg(self, ptop: str, reg: dict[str, dict[str, Any]]) -> None:
now = time.time()
histpath = self.asrv.vfs.histtab.get(ptop)
if not histpath:
return
rm = [
x
for x in reg.values()
if x["need"] and now - x["poke"] > self.snap_discard_interval
]
if rm:
t = "dropping {} abandoned uploads in {}".format(len(rm), ptop)
vis = [self._vis_job_progress(x) for x in rm]
self.log("\n".join([t] + vis))
for job in rm:
del reg[job["wark"]]
try:
# remove the filename reservation
path = os.path.join(job["ptop"], job["prel"], job["name"])
if bos.path.getsize(path) == 0:
bos.unlink(path)
if len(job["hash"]) == len(job["need"]):
# PARTIAL is empty, delete that too
path = os.path.join(job["ptop"], job["prel"], job["tnam"])
bos.unlink(path)
except:
pass
if self.args.nw:
return
path = os.path.join(histpath, "up2k.snap")
if not reg:
if ptop not in self.snap_prev or self.snap_prev[ptop] is not None:
self.snap_prev[ptop] = None
if bos.path.exists(path):
bos.unlink(path)
return
newest = float(max(x["poke"] for _, x in reg.items()) if reg else 0)
etag = (len(reg), newest)
if etag == self.snap_prev.get(ptop):
return
bos.makedirs(histpath)
path2 = "{}.{}".format(path, os.getpid())
body = {"droppable": self.droppable[ptop], "registry": reg}
j = json.dumps(body, indent=2, sort_keys=True).encode("utf-8")
with gzip.GzipFile(path2, "wb") as f:
f.write(j)
atomic_move(path2, path)
self.log("snap: {} |{}|".format(path, len(reg.keys())))
self.snap_prev[ptop] = etag
def _tagger(self) -> None:
with self.mutex:
self.n_tagq += 1
assert self.mtag
while True:
with self.mutex:
self.n_tagq -= 1
ptop, wark, rd, fn, ip, at = self.tagq.get()
if "e2t" not in self.flags[ptop]:
continue
# self.log("\n " + repr([ptop, rd, fn]))
abspath = os.path.join(ptop, rd, fn)
try:
tags = self.mtag.get(abspath)
ntags1 = len(tags)
parsers = self._get_parsers(ptop, tags, abspath)
if self.args.mtag_vv:
t = "parsers({}): {}\n{} {} tags: {}".format(
ptop, list(parsers.keys()), ntags1, self.mtag.backend, tags
)
self.log(t)
if parsers:
tags["up_ip"] = ip
tags["up_at"] = at
tags.update(self.mtag.get_bin(parsers, abspath, tags))
except Exception as ex:
self._log_tag_err("", abspath, ex)
continue
with self.mutex:
cur = self.cur[ptop]
if not cur:
self.log("no cursor to write tags with??", c=1)
continue
# TODO is undef if vol 404 on startup
entags = self.entags[ptop]
if not entags:
self.log("no entags okay.jpg", c=3)
continue
self._tag_file(cur, entags, wark, abspath, tags)
cur.connection.commit()
self.log("tagged {} ({}+{})".format(abspath, ntags1, len(tags) - ntags1))
def _hasher(self) -> None:
with self.mutex:
self.n_hashq += 1
while True:
with self.mutex:
self.n_hashq -= 1
# self.log("hashq {}".format(self.n_hashq))
ptop, rd, fn, ip, at = self.hashq.get()
# self.log("hashq {} pop {}/{}/{}".format(self.n_hashq, ptop, rd, fn))
if "e2d" not in self.flags[ptop]:
continue
abspath = os.path.join(ptop, rd, fn)
self.log("hashing " + abspath)
inf = bos.stat(abspath)
if not inf.st_size:
wark = up2k_wark_from_metadata(
self.salt, inf.st_size, int(inf.st_mtime), rd, fn
)
else:
hashes = self._hashlist_from_file(abspath)
if not hashes:
return
wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes)
with self.mutex:
self.idx_wark(ptop, wark, rd, fn, inf.st_mtime, inf.st_size, ip, at)
def hash_file(
self, ptop: str, flags: dict[str, Any], rd: str, fn: str, ip: str, at: float
) -> None:
with self.mutex:
self.register_vpath(ptop, flags)
self.hashq.put((ptop, rd, fn, ip, at))
self.n_hashq += 1
# self.log("hashq {} push {}/{}/{}".format(self.n_hashq, ptop, rd, fn))
def shutdown(self) -> None:
self.stop = True
if self.mth:
self.mth.stop = True
for x in list(self.spools):
self._unspool(x)
self.log("writing snapshot")
self.do_snapshot()
def up2k_chunksize(filesize: int) -> int:
chunksize = 1024 * 1024
stepsize = 512 * 1024
while True:
for mul in [1, 2]:
nchunks = math.ceil(filesize * 1.0 / chunksize)
if nchunks <= 256 or chunksize >= 32 * 1024 * 1024:
return chunksize
chunksize += stepsize
stepsize *= mul
def up2k_wark_from_hashlist(salt: str, filesize: int, hashes: list[str]) -> str:
"""server-reproducible file identifier, independent of name or location"""
values = [salt, str(filesize)] + hashes
vstr = "\n".join(values)
wark = hashlib.sha512(vstr.encode("utf-8")).digest()[:33]
wark = base64.urlsafe_b64encode(wark)
return wark.decode("ascii")
def up2k_wark_from_metadata(salt: str, sz: int, lastmod: int, rd: str, fn: str) -> str:
ret = fsenc("{}\n{}\n{}\n{}\n{}".format(salt, lastmod, sz, rd, fn))
ret = base64.urlsafe_b64encode(hashlib.sha512(ret).digest())
return "#{}".format(ret.decode("ascii"))[:44]