# coding: utf-8 from __future__ import print_function, unicode_literals import re import os import time import math import json import gzip import stat import shutil import base64 import hashlib import threading import traceback import subprocess as sp from copy import deepcopy from .__init__ import WINDOWS, ANYWIN from .util import ( Pebkac, Queue, ProgressPrinter, fsdec, fsenc, sanitize_fn, ren_open, atomic_move, s3enc, s3dec, statdir, s2hms, ) from .mtag import MTag, MParser try: HAVE_SQLITE3 = True import sqlite3 except: HAVE_SQLITE3 = False class Up2k(object): """ TODO: * documentation * registry persistence * ~/.config flatfiles for active jobs """ def __init__(self, hub, all_vols): self.hub = hub self.args = hub.args self.log_func = hub.log self.all_vols = all_vols # config self.salt = self.args.salt # state self.mutex = threading.Lock() self.hashq = Queue() self.tagq = Queue() self.registry = {} self.entags = {} self.flags = {} self.cur = {} self.mtag = None self.pending_tags = None self.mem_cur = None self.sqlite_ver = None self.no_expr_idx = False if HAVE_SQLITE3: # mojibake detector self.mem_cur = self._orz(":memory:") self.mem_cur.execute(r"create table a (b text)") self.sqlite_ver = tuple([int(x) for x in sqlite3.sqlite_version.split(".")]) if self.sqlite_ver < (3, 9): self.no_expr_idx = True if ANYWIN: # usually fails to set lastmod too quickly self.lastmod_q = Queue() thr = threading.Thread(target=self._lastmodder) thr.daemon = True thr.start() # static self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$") if not HAVE_SQLITE3: self.log("could not initialize sqlite3, will use in-memory registry only") have_e2d = self.init_indexes() if have_e2d: thr = threading.Thread(target=self._snapshot) thr.daemon = True thr.start() thr = threading.Thread(target=self._hasher) thr.daemon = True thr.start() if self.mtag: thr = threading.Thread(target=self._tagger) thr.daemon = True thr.start() thr = threading.Thread(target=self._run_all_mtp) thr.daemon = True thr.start() def log(self, msg, c=0): self.log_func("up2k", msg + "\033[K", c) def _vis_job_progress(self, job): perc = 100 - (len(job["need"]) * 100.0 / len(job["hash"])) path = os.path.join(job["ptop"], job["prel"], job["name"]) return "{:5.1f}% {}".format(perc, path) def _vis_reg_progress(self, reg): ret = [] for _, job in reg.items(): ret.append(self._vis_job_progress(job)) return ret def _expr_idx_filter(self, flags): if not self.no_expr_idx: return False, flags ret = {k: v for k, v in flags.items() if not k.startswith("e2t")} if ret.keys() == flags.keys(): return False, flags return True, ret def init_indexes(self): self.pp = ProgressPrinter() vols = self.all_vols.values() t0 = time.time() have_e2d = False if self.no_expr_idx: modified = False for vol in vols: m, f = self._expr_idx_filter(vol.flags) if m: vol.flags = f modified = True if modified: msg = "disabling -e2t because your sqlite belongs in a museum" self.log(msg, c=3) live_vols = [] for vol in vols: try: os.listdir(vol.realpath) live_vols.append(vol) except: self.log("cannot access " + vol.realpath, c=1) vols = live_vols need_mtag = False for vol in vols: if "e2t" in vol.flags: need_mtag = True if need_mtag: self.mtag = MTag(self.log_func, self.args) if not self.mtag.usable: self.mtag = None # e2ds(a) volumes first, # also covers tags where e2ts is set for vol in vols: en = {} if "mte" in vol.flags: en = {k: True for k in vol.flags["mte"].split(",")} self.entags[vol.realpath] = en if "e2d" in vol.flags: have_e2d = True if "e2ds" in vol.flags: r = self._build_file_index(vol, vols) if not r: needed_mutagen = True # open the rest + do any e2ts(a) needed_mutagen = False for vol in vols: r = self.register_vpath(vol.realpath, vol.flags) if not r or "e2ts" not in vol.flags: continue cur, db_path, sz0 = r n_add, n_rm, success = self._build_tags_index(vol.realpath) if not success: needed_mutagen = True if n_add or n_rm: self.vac(cur, db_path, n_add, n_rm, sz0) self.pp.end = True msg = "{} volumes in {:.2f} sec" self.log(msg.format(len(vols), time.time() - t0)) if needed_mutagen: msg = "could not read tags because no backends are available (mutagen or ffprobe)" self.log(msg, c=1) return have_e2d def register_vpath(self, ptop, flags): with self.mutex: if ptop in self.registry: return None _, flags = self._expr_idx_filter(flags) ft = "\033[0;32m{}{:.0}" ff = "\033[0;35m{}{:.0}" fv = "\033[0;36m{}:\033[1;30m{}" a = [ (ft if v is True else ff if v is False else fv).format(k, str(v)) for k, v in flags.items() ] if a: self.log(" ".join(sorted(a)) + "\033[0m") reg = {} path = os.path.join(ptop, ".hist", "up2k.snap") if "e2d" in flags and os.path.exists(path): with gzip.GzipFile(path, "rb") as f: j = f.read().decode("utf-8") reg = json.loads(j) for _, job in reg.items(): job["poke"] = time.time() m = "loaded snap {} |{}|".format(path, len(reg.keys())) m = [m] + self._vis_reg_progress(reg) self.log("\n".join(m)) self.flags[ptop] = flags self.registry[ptop] = reg if not HAVE_SQLITE3 or "e2d" not in flags or "d2d" in flags: return None try: os.mkdir(os.path.join(ptop, ".hist")) except: pass db_path = os.path.join(ptop, ".hist", "up2k.db") if ptop in self.cur: return None try: sz0 = 0 if os.path.exists(db_path): sz0 = os.path.getsize(db_path) // 1024 cur = self._open_db(db_path) self.cur[ptop] = cur return [cur, db_path, sz0] except: msg = "cannot use database at [{}]:\n{}" self.log(msg.format(ptop, traceback.format_exc())) return None def _build_file_index(self, vol, all_vols): do_vac = False top = vol.realpath reg = self.register_vpath(top, vol.flags) if not reg: return _, db_path, sz0 = reg dbw = [reg[0], 0, time.time()] self.pp.n = next(dbw[0].execute("select count(w) from up"))[0] excl = [ vol.realpath + "/" + d.vpath[len(vol.vpath) :].lstrip("/") for d in all_vols if d != vol and (d.vpath.startswith(vol.vpath + "/") or not vol.vpath) ] n_add = self._build_dir(dbw, top, set(excl), top) n_rm = self._drop_lost(dbw[0], top) if dbw[1]: self.log("commit {} new files".format(dbw[1])) dbw[0].connection.commit() n_add, n_rm, success = self._build_tags_index(vol.realpath) dbw[0].connection.commit() if n_add or n_rm or do_vac: self.vac(dbw[0], db_path, n_add, n_rm, sz0) return success def vac(self, cur, db_path, n_add, n_rm, sz0): sz1 = os.path.getsize(db_path) // 1024 cur.execute("vacuum") sz2 = os.path.getsize(db_path) // 1024 msg = "{} new, {} del, {} kB vacced, {} kB gain, {} kB now".format( n_add, n_rm, sz1 - sz2, sz2 - sz0, sz2 ) self.log(msg) def _build_dir(self, dbw, top, excl, cdir): self.pp.msg = "a{} {}".format(self.pp.n, cdir) histdir = os.path.join(top, ".hist") ret = 0 for iname, inf in statdir(self.log, not self.args.no_scandir, False, cdir): abspath = os.path.join(cdir, iname) lmod = int(inf.st_mtime) if stat.S_ISDIR(inf.st_mode): if abspath in excl or abspath == histdir: continue # self.log(" dir: {}".format(abspath)) ret += self._build_dir(dbw, top, excl, abspath) else: # self.log("file: {}".format(abspath)) rp = abspath[len(top) :].replace("\\", "/").strip("/") rd, fn = rp.rsplit("/", 1) if "/" in rp else ["", rp] sql = "select * from up where rd = ? and fn = ?" try: c = dbw[0].execute(sql, (rd, fn)) except: c = dbw[0].execute(sql, s3enc(self.mem_cur, rd, fn)) in_db = list(c.fetchall()) if in_db: self.pp.n -= 1 _, dts, dsz, _, _ = in_db[0] if len(in_db) > 1: m = "WARN: multiple entries: [{}] => [{}] |{}|\n{}" rep_db = "\n".join([repr(x) for x in in_db]) self.log(m.format(top, rp, len(in_db), rep_db)) dts = -1 if dts == lmod and dsz == inf.st_size: continue m = "reindex [{}] => [{}] ({}/{}) ({}/{})".format( top, rp, dts, lmod, dsz, inf.st_size ) self.log(m) self.db_rm(dbw[0], rd, fn) ret += 1 dbw[1] += 1 in_db = None self.pp.msg = "a{} {}".format(self.pp.n, abspath) if inf.st_size > 1024 * 1024: self.log("file: {}".format(abspath)) try: hashes = self._hashlist_from_file(abspath) except Exception as ex: self.log("hash: {} @ [{}]".format(repr(ex), abspath)) continue wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes) self.db_add(dbw[0], wark, rd, fn, lmod, inf.st_size) dbw[1] += 1 ret += 1 td = time.time() - dbw[2] if dbw[1] >= 4096 or td >= 60: self.log("commit {} new files".format(dbw[1])) dbw[0].connection.commit() dbw[1] = 0 dbw[2] = time.time() return ret def _drop_lost(self, cur, top): rm = [] nchecked = 0 nfiles = next(cur.execute("select count(w) from up"))[0] c = cur.execute("select * from up") for dwark, dts, dsz, drd, dfn in c: nchecked += 1 if drd.startswith("//") or dfn.startswith("//"): drd, dfn = s3dec(drd, dfn) abspath = os.path.join(top, drd, dfn) # almost zero overhead dw self.pp.msg = "b{} {}".format(nfiles - nchecked, abspath) try: if not os.path.exists(fsenc(abspath)): rm.append([drd, dfn]) except Exception as ex: self.log("stat-rm: {} @ [{}]".format(repr(ex), abspath)) if rm: self.log("forgetting {} deleted files".format(len(rm))) for rd, fn in rm: # self.log("{} / {}".format(rd, fn)) self.db_rm(cur, rd, fn) return len(rm) def _build_tags_index(self, ptop): entags = self.entags[ptop] flags = self.flags[ptop] cur = self.cur[ptop] n_add = 0 n_rm = 0 n_buf = 0 last_write = time.time() if "e2tsr" in flags: n_rm = cur.execute("select count(w) from mt").fetchone()[0] if n_rm: self.log("discarding {} media tags for a full rescan".format(n_rm)) cur.execute("delete from mt") else: self.log("volume has e2tsr but there are no media tags to discard") # integrity: drop tags for tracks that were deleted if "e2t" in flags: drops = [] c2 = cur.connection.cursor() up_q = "select w from up where substr(w,1,16) = ?" for (w,) in cur.execute("select w from mt"): if not c2.execute(up_q, (w,)).fetchone(): drops.append(w[:16]) c2.close() if drops: msg = "discarding media tags for {} deleted files" self.log(msg.format(len(drops))) n_rm += len(drops) for w in drops: cur.execute("delete from mt where w = ?", (w,)) # bail if a volume flag disables indexing if "d2t" in flags or "d2d" in flags: return n_add, n_rm, True # add tags for new files if "e2ts" in flags: if not self.mtag: return n_add, n_rm, False mpool = False if self.mtag.prefer_mt and not self.args.no_mtag_mt: mpool = self._start_mpool() c2 = cur.connection.cursor() c3 = cur.connection.cursor() n_left = cur.execute("select count(w) from up").fetchone()[0] for w, rd, fn in cur.execute("select w, rd, fn from up"): n_left -= 1 q = "select w from mt where w = ?" if c2.execute(q, (w[:16],)).fetchone(): continue if "mtp" in flags: q = "insert into mt values (?,'t:mtp','a')" c2.execute(q, (w[:16],)) if rd.startswith("//") or fn.startswith("//"): rd, fn = s3dec(rd, fn) abspath = os.path.join(ptop, rd, fn) self.pp.msg = "c{} {}".format(n_left, abspath) args = [entags, w, abspath] if not mpool: n_tags = self._tag_file(c3, *args) else: mpool.put(["mtag"] + args) n_tags = len(self._flush_mpool(c3)) n_add += n_tags n_buf += n_tags td = time.time() - last_write if n_buf >= 4096 or td >= 60: self.log("commit {} new tags".format(n_buf)) cur.connection.commit() last_write = time.time() n_buf = 0 self._stop_mpool(mpool, c3) c3.close() c2.close() return n_add, n_rm, True def _flush_mpool(self, wcur): with self.mutex: ret = [] for x in self.pending_tags: self._tag_file(wcur, *x) ret.append(x[1]) self.pending_tags = [] return ret def _run_all_mtp(self): t0 = time.time() self.mtp_parsers = {} for ptop, flags in self.flags.items(): if "mtp" in flags: self._run_one_mtp(ptop) td = time.time() - t0 msg = "mtp finished in {:.2f} sec ({})" self.log(msg.format(td, s2hms(td, True))) def _run_one_mtp(self, ptop): db_path = os.path.join(ptop, ".hist", "up2k.db") sz0 = os.path.getsize(db_path) // 1024 entags = self.entags[ptop] parsers = {} for parser in self.flags[ptop]["mtp"]: try: parser = MParser(parser) except: self.log("invalid argument: " + parser, 1) return for tag in entags: if tag in parser.tags: parsers[parser.tag] = parser self.mtp_parsers[ptop] = parsers q = "select count(w) from mt where k = 't:mtp'" with self.mutex: cur = self.cur[ptop] cur = cur.connection.cursor() wcur = cur.connection.cursor() n_left = cur.execute(q).fetchone()[0] mpool = self._start_mpool() batch_sz = mpool.maxsize * 3 t_prev = time.time() n_prev = n_left n_done = 0 to_delete = {} in_progress = {} while True: with self.mutex: q = "select w from mt where k = 't:mtp' limit ?" warks = cur.execute(q, (batch_sz,)).fetchall() warks = [x[0] for x in warks] jobs = [] for w in warks: q = "select rd, fn from up where substr(w,1,16)=? limit 1" rd, fn = cur.execute(q, (w,)).fetchone() rd, fn = s3dec(rd, fn) abspath = os.path.join(ptop, rd, fn) q = "select k from mt where w = ?" have = cur.execute(q, (w,)).fetchall() have = [x[0] for x in have] parsers = self._get_parsers(ptop, have, abspath) if not parsers: to_delete[w] = True n_left -= 1 continue if w in in_progress: continue jobs.append([parsers, None, w, abspath]) in_progress[w] = True done = self._flush_mpool(wcur) with self.mutex: for w in done: to_delete[w] = True in_progress.pop(w) n_done += 1 for w in to_delete.keys(): q = "delete from mt where w = ? and k = 't:mtp'" cur.execute(q, (w,)) to_delete = {} if not warks: break if not jobs: continue try: now = time.time() s = ((now - t_prev) / (n_prev - n_left)) * n_left h, s = divmod(s, 3600) m, s = divmod(s, 60) n_prev = n_left t_prev = now except: h = 1 m = 1 msg = "mtp: {} done, {} left, eta {}h {:02d}m" with self.mutex: msg = msg.format(n_done, n_left, int(h), int(m)) self.log(msg, c=6) for j in jobs: n_left -= 1 mpool.put(j) with self.mutex: cur.connection.commit() done = self._stop_mpool(mpool, wcur) with self.mutex: for w in done: q = "delete from mt where w = ? and k = 't:mtp'" cur.execute(q, (w,)) cur.connection.commit() if n_done: self.vac(cur, db_path, n_done, 0, sz0) wcur.close() cur.close() def _get_parsers(self, ptop, have, abspath): try: all_parsers = self.mtp_parsers[ptop] except: return {} entags = self.entags[ptop] parsers = {} for k, v in all_parsers.items(): if "ac" in entags or ".aq" in entags: if "ac" in have or ".aq" in have: # is audio, require non-audio? if v.audio == "n": continue # is not audio, require audio? elif v.audio == "y": continue if v.ext: match = False for ext in v.ext: if abspath.lower().endswith("." + ext): match = True break if not match: continue parsers[k] = v parsers = {k: v for k, v in parsers.items() if v.force or k not in have} return parsers def _start_mpool(self): # mp.pool.ThreadPool and concurrent.futures.ThreadPoolExecutor # both do crazy runahead so lets reinvent another wheel nw = os.cpu_count() if hasattr(os, "cpu_count") else 4 if self.args.no_mtag_mt: nw = 1 if self.pending_tags is None: self.log("using {}x {}".format(nw, self.mtag.backend)) self.pending_tags = [] mpool = Queue(nw) for _ in range(nw): thr = threading.Thread(target=self._tag_thr, args=(mpool,)) thr.daemon = True thr.start() return mpool def _stop_mpool(self, mpool, wcur): if not mpool: return for _ in range(mpool.maxsize): mpool.put(None) mpool.join() done = self._flush_mpool(wcur) return done def _tag_thr(self, q): while True: task = q.get() if not task: q.task_done() return try: parser, entags, wark, abspath = task if parser == "mtag": tags = self.mtag.get(abspath) else: tags = self.mtag.get_bin(parser, abspath) vtags = [ "\033[36m{} \033[33m{}".format(k, v) for k, v in tags.items() ] if vtags: self.log("{}\033[0m [{}]".format(" ".join(vtags), abspath)) with self.mutex: self.pending_tags.append([entags, wark, abspath, tags]) except: ex = traceback.format_exc() if parser == "mtag": parser = self.mtag.backend msg = "{} failed to read tags from {}:\n{}" self.log(msg.format(parser, abspath, ex), c=3) q.task_done() def _tag_file(self, write_cur, entags, wark, abspath, tags=None): if tags is None: tags = self.mtag.get(abspath) if entags: tags = {k: v for k, v in tags.items() if k in entags} if not tags: # indicate scanned without tags tags = {"x": 0} if not tags: return 0 for k in tags.keys(): q = "delete from mt where w = ? and ({})".format( " or ".join(["k = ?"] * len(tags)) ) args = [wark[:16]] + list(tags.keys()) write_cur.execute(q, tuple(args)) ret = 0 for k, v in tags.items(): q = "insert into mt values (?,?,?)" write_cur.execute(q, (wark[:16], k, v)) ret += 1 return ret def _orz(self, db_path): return sqlite3.connect(db_path, check_same_thread=False).cursor() # x.set_trace_callback(trace) def _open_db(self, db_path): existed = os.path.exists(db_path) cur = self._orz(db_path) ver = self._read_ver(cur) if not existed and ver is None: return self._create_db(db_path, cur) orig_ver = ver if not ver or ver < 3: bak = "{}.bak.{:x}.v{}".format(db_path, int(time.time()), ver) db = cur.connection cur.close() db.close() msg = "creating new DB (old is bad); backup: {}" if ver: msg = "creating backup before upgrade: {}" self.log(msg.format(bak)) shutil.copy2(db_path, bak) cur = self._orz(db_path) if ver == 1: cur = self._upgrade_v1(cur, db_path) if cur: ver = 2 if ver == 2: cur = self._create_v3(cur) ver = self._read_ver(cur) if cur else None if ver == 3: if orig_ver != ver: cur.connection.commit() cur.execute("vacuum") cur.connection.commit() try: nfiles = next(cur.execute("select count(w) from up"))[0] self.log("OK: {} |{}|".format(db_path, nfiles)) return cur except Exception as ex: self.log("WARN: could not list files, DB corrupt?\n " + repr(ex)) if cur: db = cur.connection cur.close() db.close() return self._create_db(db_path, None) def _create_db(self, db_path, cur): if not cur: cur = self._orz(db_path) self._create_v2(cur) self._create_v3(cur) cur.connection.commit() self.log("created DB at {}".format(db_path)) return cur def _read_ver(self, cur): for tab in ["ki", "kv"]: try: c = cur.execute(r"select v from {} where k = 'sver'".format(tab)) except: continue rows = c.fetchall() if rows: return int(rows[0][0]) def _create_v2(self, cur): for cmd in [ r"create table up (w text, mt int, sz int, rd text, fn text)", r"create index up_rd on up(rd)", r"create index up_fn on up(fn)", ]: cur.execute(cmd) return cur def _create_v3(self, cur): """ collision in 2^(n/2) files where n = bits (6 bits/ch) 10*6/2 = 2^30 = 1'073'741'824, 24.1mb idx 12*6/2 = 2^36 = 68'719'476'736, 24.8mb idx 16*6/2 = 2^48 = 281'474'976'710'656, 26.1mb idx """ for c, ks in [["drop table k", "isv"], ["drop index up_", "w"]]: for k in ks: try: cur.execute(c + k) except: pass idx = r"create index up_w on up(substr(w,1,16))" if self.no_expr_idx: idx = r"create index up_w on up(w)" for cmd in [ idx, r"create table mt (w text, k text, v int)", r"create index mt_w on mt(w)", r"create index mt_k on mt(k)", r"create index mt_v on mt(v)", r"create table kv (k text, v int)", r"insert into kv values ('sver', 3)", ]: cur.execute(cmd) return cur def _upgrade_v1(self, odb, db_path): npath = db_path + ".next" if os.path.exists(npath): os.unlink(npath) ndb = self._orz(npath) self._create_v2(ndb) c = odb.execute("select * from up") for wark, ts, sz, rp in c: rd, fn = rp.rsplit("/", 1) if "/" in rp else ["", rp] v = (wark, ts, sz, rd, fn) ndb.execute("insert into up values (?,?,?,?,?)", v) ndb.connection.commit() ndb.connection.close() odb.connection.close() atomic_move(npath, db_path) return self._orz(db_path) def handle_json(self, cj): if not self.register_vpath(cj["ptop"], cj["vcfg"]): if cj["ptop"] not in self.registry: raise Pebkac(410, "location unavailable") cj["name"] = sanitize_fn(cj["name"], bad=[".prologue.html", ".epilogue.html"]) cj["poke"] = time.time() wark = self._get_wark(cj) now = time.time() job = None with self.mutex: cur = self.cur.get(cj["ptop"], None) reg = self.registry[cj["ptop"]] if cur: if self.no_expr_idx: q = r"select * from up where w = ?" argv = (wark,) else: q = r"select * from up where substr(w,1,16) = ? and w = ?" argv = (wark[:16], wark) cur = cur.execute(q, argv) for _, dtime, dsize, dp_dir, dp_fn in cur: if dp_dir.startswith("//") or dp_fn.startswith("//"): dp_dir, dp_fn = s3dec(dp_dir, dp_fn) dp_abs = os.path.join(cj["ptop"], dp_dir, dp_fn).replace("\\", "/") # relying on path.exists to return false on broken symlinks if os.path.exists(fsenc(dp_abs)): job = { "name": dp_fn, "prel": dp_dir, "vtop": cj["vtop"], "ptop": cj["ptop"], "size": dsize, "lmod": dtime, "hash": [], "need": [], } break if job and wark in reg: del reg[wark] if job or wark in reg: job = job or reg[wark] if job["prel"] == cj["prel"] and job["name"] == cj["name"]: # ensure the files haven't been deleted manually names = [job[x] for x in ["name", "tnam"] if x in job] for fn in names: path = os.path.join(job["ptop"], job["prel"], fn) try: if os.path.getsize(path) > 0: # upload completed or both present break except: # missing; restart job = None break else: # file contents match, but not the path src = os.path.join(job["ptop"], job["prel"], job["name"]) dst = os.path.join(cj["ptop"], cj["prel"], cj["name"]) vsrc = os.path.join(job["vtop"], job["prel"], job["name"]) vsrc = vsrc.replace("\\", "/") # just for prints anyways if job["need"]: self.log("unfinished:\n {0}\n {1}".format(src, dst)) err = "partial upload exists at a different location; please resume uploading here instead:\n" err += "/" + vsrc + " " raise Pebkac(400, err) elif "nodupe" in self.flags[job["ptop"]]: self.log("dupe-reject:\n {0}\n {1}".format(src, dst)) err = "upload rejected, file already exists:\n/" + vsrc + " " raise Pebkac(400, err) else: # symlink to the client-provided name, # returning the previous upload info job = deepcopy(job) for k in ["ptop", "vtop", "prel"]: job[k] = cj[k] pdir = os.path.join(cj["ptop"], cj["prel"]) job["name"] = self._untaken(pdir, cj["name"], now, cj["addr"]) dst = os.path.join(job["ptop"], job["prel"], job["name"]) os.unlink(fsenc(dst)) # TODO ed pls self._symlink(src, dst) if not job: job = { "wark": wark, "t0": now, "hash": deepcopy(cj["hash"]), "need": [], } # client-provided, sanitized by _get_wark: name, size, lmod for k in [ "addr", "vtop", "ptop", "prel", "name", "size", "lmod", "poke", ]: job[k] = cj[k] # one chunk may occur multiple times in a file; # filter to unique values for the list of missing chunks # (preserve order to reduce disk thrashing) lut = {} for k in cj["hash"]: if k not in lut: job["need"].append(k) lut[k] = 1 self._new_upload(job) return { "name": job["name"], "size": job["size"], "lmod": job["lmod"], "hash": job["need"], "wark": wark, } def _untaken(self, fdir, fname, ts, ip): # TODO broker which avoid this race and # provides a new filename if taken (same as bup) suffix = ".{:.6f}-{}".format(ts, ip) with ren_open(fname, "wb", fdir=fdir, suffix=suffix) as f: return f["orz"][1] def _symlink(self, src, dst): # TODO store this in linktab so we never delete src if there are links to it self.log("linking dupe:\n {0}\n {1}".format(src, dst)) try: lsrc = src ldst = dst fs1 = os.stat(fsenc(os.path.split(src)[0])).st_dev fs2 = os.stat(fsenc(os.path.split(dst)[0])).st_dev if fs1 == 0: # py2 on winxp or other unsupported combination raise OSError() elif fs1 == fs2: # same fs; make symlink as relative as possible v = [] for p in [src, dst]: if WINDOWS: p = p.replace("\\", "/") v.append(p.split("/")) nsrc, ndst = v nc = 0 for a, b in zip(nsrc, ndst): if a != b: break nc += 1 if nc > 1: lsrc = nsrc[nc:] hops = len(ndst[nc:]) - 1 lsrc = "../" * hops + "/".join(lsrc) os.symlink(fsenc(lsrc), fsenc(ldst)) except (AttributeError, OSError) as ex: self.log("cannot symlink; creating copy: " + repr(ex)) shutil.copy2(fsenc(src), fsenc(dst)) def handle_chunk(self, ptop, wark, chash): with self.mutex: job = self.registry[ptop].get(wark, None) if not job: known = " ".join([x for x in self.registry[ptop].keys()]) self.log("unknown wark [{}], known: {}".format(wark, known)) raise Pebkac(400, "unknown wark") if chash not in job["need"]: raise Pebkac(400, "already got that but thanks??") nchunk = [n for n, v in enumerate(job["hash"]) if v == chash] if not nchunk: raise Pebkac(400, "unknown chunk") job["poke"] = time.time() chunksize = up2k_chunksize(job["size"]) ofs = [chunksize * x for x in nchunk] path = os.path.join(job["ptop"], job["prel"], job["tnam"]) return [chunksize, ofs, path, job["lmod"]] def confirm_chunk(self, ptop, wark, chash): with self.mutex: try: job = self.registry[ptop][wark] pdir = os.path.join(job["ptop"], job["prel"]) src = os.path.join(pdir, job["tnam"]) dst = os.path.join(pdir, job["name"]) except Exception as ex: return "confirm_chunk, wark, " + repr(ex) try: job["need"].remove(chash) except Exception as ex: return "confirm_chunk, chash, " + repr(ex) ret = len(job["need"]) if ret > 0: return ret, src atomic_move(src, dst) if ANYWIN: a = [dst, job["size"], (int(time.time()), int(job["lmod"]))] self.lastmod_q.put(a) # legit api sware 2 me mum if self.idx_wark( job["ptop"], job["wark"], job["prel"], job["name"], job["lmod"], job["size"], ): del self.registry[ptop][wark] # in-memory registry is reserved for unfinished uploads return ret, dst def idx_wark(self, ptop, wark, rd, fn, lmod, sz): cur = self.cur.get(ptop, None) if not cur: return False self.db_rm(cur, rd, fn) self.db_add(cur, wark, rd, fn, int(lmod), sz) cur.connection.commit() if "e2t" in self.flags[ptop]: self.tagq.put([ptop, wark, rd, fn]) return True def db_rm(self, db, rd, fn): sql = "delete from up where rd = ? and fn = ?" try: db.execute(sql, (rd, fn)) except: db.execute(sql, s3enc(self.mem_cur, rd, fn)) def db_add(self, db, wark, rd, fn, ts, sz): sql = "insert into up values (?,?,?,?,?)" v = (wark, int(ts), sz, rd, fn) try: db.execute(sql, v) except: rd, fn = s3enc(self.mem_cur, rd, fn) v = (wark, ts, sz, rd, fn) db.execute(sql, v) def _get_wark(self, cj): if len(cj["name"]) > 1024 or len(cj["hash"]) > 512 * 1024: # 16TiB raise Pebkac(400, "name or numchunks not according to spec") for k in cj["hash"]: if not self.r_hash.match(k): raise Pebkac( 400, "at least one hash is not according to spec: {}".format(k) ) # try to use client-provided timestamp, don't care if it fails somehow try: cj["lmod"] = int(cj["lmod"]) except: cj["lmod"] = int(time.time()) wark = up2k_wark_from_hashlist(self.salt, cj["size"], cj["hash"]) return wark def _hashlist_from_file(self, path): fsz = os.path.getsize(path) csz = up2k_chunksize(fsz) ret = [] with open(path, "rb", 512 * 1024) as f: while fsz > 0: self.pp.msg = "{} MB, {}".format(int(fsz / 1024 / 1024), path) hashobj = hashlib.sha512() rem = min(csz, fsz) fsz -= rem while rem > 0: buf = f.read(min(rem, 64 * 1024)) if not buf: raise Exception("EOF at " + str(f.tell())) hashobj.update(buf) rem -= len(buf) digest = hashobj.digest()[:32] digest = base64.urlsafe_b64encode(digest) ret.append(digest.decode("utf-8").rstrip("=")) return ret def _new_upload(self, job): self.registry[job["ptop"]][job["wark"]] = job pdir = os.path.join(job["ptop"], job["prel"]) job["name"] = self._untaken(pdir, job["name"], job["t0"], job["addr"]) # if len(job["name"].split(".")) > 8: # raise Exception("aaa") tnam = job["name"] + ".PARTIAL" if self.args.dotpart: tnam = "." + tnam suffix = ".{:.6f}-{}".format(job["t0"], job["addr"]) with ren_open(tnam, "wb", fdir=pdir, suffix=suffix) as f: f, job["tnam"] = f["orz"] if ( ANYWIN and self.args.sparse and self.args.sparse * 1024 * 1024 <= job["size"] ): fp = os.path.join(pdir, job["tnam"]) try: sp.check_call(["fsutil", "sparse", "setflag", fp]) except: self.log("could not sparse [{}]".format(fp), 3) f.seek(job["size"] - 1) f.write(b"e") def _lastmodder(self): while True: ready = [] while not self.lastmod_q.empty(): ready.append(self.lastmod_q.get()) # self.log("lmod: got {}".format(len(ready))) time.sleep(5) for path, sz, times in ready: self.log("lmod: setting times {} on {}".format(times, path)) try: os.utime(fsenc(path), times) except: self.log("lmod: failed to utime ({}, {})".format(path, times)) if self.args.sparse and self.args.sparse * 1024 * 1024 <= sz: try: sp.check_call(["fsutil", "sparse", "setflag", path, "0"]) except: self.log("could not unsparse [{}]".format(path), 3) def _snapshot(self): persist_interval = 30 # persist unfinished uploads index every 30 sec discard_interval = 21600 # drop unfinished uploads after 6 hours inactivity prev = {} while True: time.sleep(persist_interval) with self.mutex: for k, reg in self.registry.items(): self._snap_reg(prev, k, reg, discard_interval) def _snap_reg(self, prev, k, reg, discard_interval): now = time.time() rm = [x for x in reg.values() if now - x["poke"] > discard_interval] if rm: m = "dropping {} abandoned uploads in {}".format(len(rm), k) vis = [self._vis_job_progress(x) for x in rm] self.log("\n".join([m] + vis)) for job in rm: del reg[job["wark"]] try: # remove the filename reservation path = os.path.join(job["ptop"], job["prel"], job["name"]) if os.path.getsize(path) == 0: os.unlink(path) if len(job["hash"]) == len(job["need"]): # PARTIAL is empty, delete that too path = os.path.join(job["ptop"], job["prel"], job["tnam"]) os.unlink(path) except: pass path = os.path.join(k, ".hist", "up2k.snap") if not reg: if k not in prev or prev[k] is not None: prev[k] = None if os.path.exists(path): os.unlink(path) return newest = max(x["poke"] for _, x in reg.items()) if reg else 0 etag = [len(reg), newest] if etag == prev.get(k, None): return try: os.mkdir(os.path.join(k, ".hist")) except: pass path2 = "{}.{}".format(path, os.getpid()) j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8") with gzip.GzipFile(path2, "wb") as f: f.write(j) atomic_move(path2, path) self.log("snap: {} |{}|".format(path, len(reg.keys()))) prev[k] = etag def _tagger(self): while True: ptop, wark, rd, fn = self.tagq.get() if "e2t" not in self.flags[ptop]: continue abspath = os.path.join(ptop, rd, fn) tags = self.mtag.get(abspath) ntags1 = len(tags) parsers = self._get_parsers(ptop, tags, abspath) if parsers: tags.update(self.mtag.get_bin(parsers, abspath)) with self.mutex: cur = self.cur[ptop] if not cur: self.log("no cursor to write tags with??", c=1) continue # TODO is undef if vol 404 on startup entags = self.entags[ptop] if not entags: self.log("no entags okay.jpg", c=3) continue self._tag_file(cur, entags, wark, abspath, tags) cur.connection.commit() self.log("tagged {} ({}+{})".format(abspath, ntags1, len(tags) - ntags1)) def _hasher(self): while True: ptop, rd, fn = self.hashq.get() if "e2d" not in self.flags[ptop]: continue abspath = os.path.join(ptop, rd, fn) self.log("hashing " + abspath) inf = os.stat(fsenc(abspath)) hashes = self._hashlist_from_file(abspath) wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes) with self.mutex: self.idx_wark(ptop, wark, rd, fn, inf.st_mtime, inf.st_size) def hash_file(self, ptop, flags, rd, fn): self.register_vpath(ptop, flags) self.hashq.put([ptop, rd, fn]) def up2k_chunksize(filesize): chunksize = 1024 * 1024 stepsize = 512 * 1024 while True: for mul in [1, 2]: nchunks = math.ceil(filesize * 1.0 / chunksize) if nchunks <= 256 or chunksize >= 32 * 1024 * 1024: return chunksize chunksize += stepsize stepsize *= mul def up2k_wark_from_hashlist(salt, filesize, hashes): """ server-reproducible file identifier, independent of name or location """ ident = [salt, str(filesize)] ident.extend(hashes) ident = "\n".join(ident) hasher = hashlib.sha512() hasher.update(ident.encode("utf-8")) digest = hasher.digest()[:32] wark = base64.urlsafe_b64encode(digest) return wark.decode("utf-8").rstrip("=")