sqlite3 as up2k db + build index on boot + rproxy ip fix

This commit is contained in:
ed 2021-01-10 09:27:11 +01:00
parent de724a1ff3
commit 7bd2b9c23a
10 changed files with 331 additions and 60 deletions

2
.vscode/launch.json vendored
View file

@ -12,6 +12,8 @@
//"-nw", //"-nw",
"-ed", "-ed",
"-emp", "-emp",
"-e2d",
"-e2s",
"-a", "-a",
"ed:wark", "ed:wark",
"-v", "-v",

View file

@ -144,8 +144,7 @@ roughly sorted by priority
* `os.copy_file_range` for up2k cloning * `os.copy_file_range` for up2k cloning
* support pillow-simd * support pillow-simd
* cache sha512 chunks on client * cache sha512 chunks on client
* ~~symlink existing files on upload~~ * persist unfinished up2k uploads too
* ok at runtime, up2k db still not persisted
* comment field * comment field
* ~~look into android thumbnail cache file format~~ bad idea * ~~look into android thumbnail cache file format~~ bad idea
* figure out the deal with pixel3a not being connectable as hotspot * figure out the deal with pixel3a not being connectable as hotspot

View file

@ -105,17 +105,22 @@ def main():
epilog=dedent( epilog=dedent(
""" """
-a takes username:password, -a takes username:password,
-v takes src:dst:permset:permset:... where "permset" is -v takes src:dst:permset:permset:cflag:cflag:...
accesslevel followed by username (no separator) where "permset" is accesslevel followed by username (no separator)
and "cflag" is config flags to set on this volume
list of cflags:
cnodupe rejects existing files (instead of symlinking them)
example:\033[35m example:\033[35m
-a ed:hunter2 -v .::r:aed -v ../inc:dump:w:aed \033[36m -a ed:hunter2 -v .::r:aed -v ../inc:dump:w:aed:cnodupe \033[36m
mount current directory at "/" with mount current directory at "/" with
* r (read-only) for everyone * r (read-only) for everyone
* a (read+write) for ed * a (read+write) for ed
mount ../inc at "/dump" with mount ../inc at "/dump" with
* w (write-only) for everyone * w (write-only) for everyone
* a (read+write) for ed \033[0m * a (read+write) for ed
* reject duplicate files \033[0m
if no accounts or volumes are configured, if no accounts or volumes are configured,
current folder will be read/write for everyone current folder will be read/write for everyone
@ -125,6 +130,7 @@ def main():
""" """
), ),
) )
# fmt: off
ap.add_argument("-c", metavar="PATH", type=str, action="append", help="add config file") ap.add_argument("-c", metavar="PATH", type=str, action="append", help="add config file")
ap.add_argument("-i", metavar="IP", type=str, default="0.0.0.0", help="ip to bind") ap.add_argument("-i", metavar="IP", type=str, default="0.0.0.0", help="ip to bind")
ap.add_argument("-p", metavar="PORT", type=int, default=3923, help="port to bind") ap.add_argument("-p", metavar="PORT", type=int, default=3923, help="port to bind")
@ -135,12 +141,15 @@ def main():
ap.add_argument("-q", action="store_true", help="quiet") ap.add_argument("-q", action="store_true", help="quiet")
ap.add_argument("-ed", action="store_true", help="enable ?dots") ap.add_argument("-ed", action="store_true", help="enable ?dots")
ap.add_argument("-emp", action="store_true", help="enable markdown plugins") ap.add_argument("-emp", action="store_true", help="enable markdown plugins")
ap.add_argument("-e2d", action="store_true", help="enable up2k database")
ap.add_argument("-e2s", action="store_true", help="enable up2k db-scanner")
ap.add_argument("-mcr", metavar="SEC", type=int, default=60, help="md-editor mod-chk rate") ap.add_argument("-mcr", metavar="SEC", type=int, default=60, help="md-editor mod-chk rate")
ap.add_argument("-nw", action="store_true", help="disable writes (benchmark)") ap.add_argument("-nw", action="store_true", help="disable writes (benchmark)")
ap.add_argument("-nih", action="store_true", help="no info hostname") ap.add_argument("-nih", action="store_true", help="no info hostname")
ap.add_argument("-nid", action="store_true", help="no info disk-usage") ap.add_argument("-nid", action="store_true", help="no info disk-usage")
ap.add_argument("--no-sendfile", action="store_true", help="disable sendfile") ap.add_argument("--no-sendfile", action="store_true", help="disable sendfile")
al = ap.parse_args() al = ap.parse_args()
# fmt: on
SvcHub(al).run() SvcHub(al).run()

View file

@ -258,6 +258,7 @@ class AuthSrv(object):
with open(cfg_fn, "rb") as f: with open(cfg_fn, "rb") as f:
self._parse_config_file(f, user, mread, mwrite, mflags, mount) self._parse_config_file(f, user, mread, mwrite, mflags, mount)
self.all_writable = []
if not mount: if not mount:
# -h says our defaults are CWD at root and read/write for everyone # -h says our defaults are CWD at root and read/write for everyone
vfs = VFS(os.path.abspath("."), "", ["*"], ["*"]) vfs = VFS(os.path.abspath("."), "", ["*"], ["*"])
@ -280,6 +281,11 @@ class AuthSrv(object):
v.uread = mread[dst] v.uread = mread[dst]
v.uwrite = mwrite[dst] v.uwrite = mwrite[dst]
v.flags = mflags[dst] v.flags = mflags[dst]
if v.uwrite:
self.all_writable.append(v)
if vfs.uwrite and vfs not in self.all_writable:
self.all_writable.append(vfs)
missing_users = {} missing_users = {}
for d in [mread, mwrite]: for d in [mread, mwrite]:

View file

@ -28,6 +28,7 @@ class HttpCli(object):
self.conn = conn self.conn = conn
self.s = conn.s self.s = conn.s
self.sr = conn.sr self.sr = conn.sr
self.ip = conn.addr[0]
self.addr = conn.addr self.addr = conn.addr
self.args = conn.args self.args = conn.args
self.auth = conn.auth self.auth = conn.auth
@ -42,7 +43,7 @@ class HttpCli(object):
self.log_func(self.log_src, msg) self.log_func(self.log_src, msg)
def _check_nonfatal(self, ex): def _check_nonfatal(self, ex):
return ex.code in [404] return ex.code < 400 or ex.code == 404
def _assert_safe_rem(self, rem): def _assert_safe_rem(self, rem):
# sanity check to prevent any disasters # sanity check to prevent any disasters
@ -85,7 +86,8 @@ class HttpCli(object):
v = self.headers.get("x-forwarded-for", None) v = self.headers.get("x-forwarded-for", None)
if v is not None and self.conn.addr[0] in ["127.0.0.1", "::1"]: if v is not None and self.conn.addr[0] in ["127.0.0.1", "::1"]:
self.log_src = self.conn.set_rproxy(v.split(",")[0]) self.ip = v.split(",")[0]
self.log_src = self.conn.set_rproxy(self.ip)
self.uname = "*" self.uname = "*"
if "cookie" in self.headers: if "cookie" in self.headers:
@ -305,7 +307,7 @@ class HttpCli(object):
vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True) vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True)
fdir = os.path.join(vfs.realpath, rem) fdir = os.path.join(vfs.realpath, rem)
addr = self.conn.addr[0].replace(":", ".") addr = self.ip.replace(":", ".")
fn = "put-{:.6f}-{}.bin".format(time.time(), addr) fn = "put-{:.6f}-{}.bin".format(time.time(), addr)
path = os.path.join(fdir, fn) path = os.path.join(fdir, fn)
@ -384,9 +386,10 @@ class HttpCli(object):
vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True) vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True)
body["vdir"] = self.vpath body["vtop"] = vfs.vpath
body["rdir"] = os.path.join(vfs.realpath, rem) body["ptop"] = vfs.realpath
body["addr"] = self.addr[0] body["prel"] = rem
body["addr"] = self.ip
body["flag"] = vfs.flags body["flag"] = vfs.flags
x = self.conn.hsrv.broker.put(True, "up2k.handle_json", body) x = self.conn.hsrv.broker.put(True, "up2k.handle_json", body)
@ -409,7 +412,10 @@ class HttpCli(object):
except KeyError: except KeyError:
raise Pebkac(400, "need hash and wark headers for binary POST") raise Pebkac(400, "need hash and wark headers for binary POST")
x = self.conn.hsrv.broker.put(True, "up2k.handle_chunk", wark, chash) vfs, _ = self.conn.auth.vfs.get(self.vpath, self.uname, False, True)
ptop = vfs.realpath
x = self.conn.hsrv.broker.put(True, "up2k.handle_chunk", ptop, wark, chash)
response = x.get() response = x.get()
chunksize, cstart, path, lastmod = response chunksize, cstart, path, lastmod = response
@ -454,7 +460,7 @@ class HttpCli(object):
self.log("clone {} done".format(cstart[0])) self.log("clone {} done".format(cstart[0]))
x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", wark, chash) x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", ptop, wark, chash)
num_left = x.get() num_left = x.get()
if not WINDOWS and num_left == 0: if not WINDOWS and num_left == 0:
@ -576,7 +582,7 @@ class HttpCli(object):
if not os.path.isdir(fsenc(fdir)): if not os.path.isdir(fsenc(fdir)):
raise Pebkac(404, "that folder does not exist") raise Pebkac(404, "that folder does not exist")
suffix = ".{:.6f}-{}".format(time.time(), self.addr[0]) suffix = ".{:.6f}-{}".format(time.time(), self.ip)
open_args = {"fdir": fdir, "suffix": suffix} open_args = {"fdir": fdir, "suffix": suffix}
else: else:
open_args = {} open_args = {}
@ -638,7 +644,7 @@ class HttpCli(object):
"\n".join( "\n".join(
unicode(x) unicode(x)
for x in [ for x in [
":".join(unicode(x) for x in self.addr), ":".join(unicode(x) for x in [self.ip, self.addr[1]]),
msg.rstrip(), msg.rstrip(),
] ]
) )
@ -895,7 +901,7 @@ class HttpCli(object):
open_func = open open_func = open
# 512 kB is optimal for huge files, use 64k # 512 kB is optimal for huge files, use 64k
open_args = [fsenc(fs_path), "rb", 64 * 1024] open_args = [fsenc(fs_path), "rb", 64 * 1024]
if hasattr(os, 'sendfile'): if hasattr(os, "sendfile"):
use_sendfile = not self.args.no_sendfile use_sendfile = not self.args.no_sendfile
# #
@ -1021,6 +1027,9 @@ class HttpCli(object):
if abspath.endswith(".md") and "raw" not in self.uparam: if abspath.endswith(".md") and "raw" not in self.uparam:
return self.tx_md(abspath) return self.tx_md(abspath)
if abspath.endswith("{0}.hist{0}up2k.db".format(os.sep)):
raise Pebkac(403)
return self.tx_file(abspath) return self.tx_file(abspath)
fsroot, vfs_ls, vfs_virt = vn.ls(rem, self.uname) fsroot, vfs_ls, vfs_virt = vn.ls(rem, self.uname)

View file

@ -65,6 +65,7 @@ class HttpConn(object):
color = 34 color = 34
self.rproxy = ip self.rproxy = ip
self.ip = ip
self.log_src = "{} \033[{}m{}".format(ip, color, self.addr[1]).ljust(26) self.log_src = "{} \033[{}m{}".format(ip, color, self.addr[1]).ljust(26)
return self.log_src return self.log_src

View file

@ -9,6 +9,7 @@ from datetime import datetime, timedelta
import calendar import calendar
from .__init__ import PY2, WINDOWS, MACOS, VT100 from .__init__ import PY2, WINDOWS, MACOS, VT100
from .authsrv import AuthSrv
from .tcpsrv import TcpSrv from .tcpsrv import TcpSrv
from .up2k import Up2k from .up2k import Up2k
from .util import mp from .util import mp
@ -38,6 +39,10 @@ class SvcHub(object):
self.tcpsrv = TcpSrv(self) self.tcpsrv = TcpSrv(self)
self.up2k = Up2k(self) self.up2k = Up2k(self)
if self.args.e2d and self.args.e2s:
auth = AuthSrv(self.args, self.log)
self.up2k.build_indexes(auth.all_writable)
# decide which worker impl to use # decide which worker impl to use
if self.check_mp_enable(): if self.check_mp_enable():
from .broker_mp import BrokerMp as Broker from .broker_mp import BrokerMp as Broker

View file

@ -6,6 +6,7 @@ import os
import re import re
import time import time
import math import math
import stat
import shutil import shutil
import base64 import base64
import hashlib import hashlib
@ -13,7 +14,15 @@ import threading
from copy import deepcopy from copy import deepcopy
from .__init__ import WINDOWS from .__init__ import WINDOWS
from .util import Pebkac, Queue, fsenc, sanitize_fn, ren_open from .util import Pebkac, Queue, fsdec, fsenc, sanitize_fn, ren_open
HAVE_SQLITE3 = False
try:
import sqlite3
HAVE_SQLITE3 = True
except:
pass
class Up2k(object): class Up2k(object):
@ -22,20 +31,21 @@ class Up2k(object):
* documentation * documentation
* registry persistence * registry persistence
* ~/.config flatfiles for active jobs * ~/.config flatfiles for active jobs
* wark->path database for finished uploads
""" """
def __init__(self, broker): def __init__(self, broker):
self.broker = broker self.broker = broker
self.args = broker.args self.args = broker.args
self.log = broker.log self.log = broker.log
self.persist = self.args.e2d
# config # config
self.salt = "hunter2" # TODO: config self.salt = "hunter2" # TODO: config
# state # state
self.registry = {}
self.mutex = threading.Lock() self.mutex = threading.Lock()
self.registry = {}
self.db = {}
if WINDOWS: if WINDOWS:
# usually fails to set lastmod too quickly # usually fails to set lastmod too quickly
@ -47,57 +57,234 @@ class Up2k(object):
# static # static
self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$") self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$")
if self.persist and not HAVE_SQLITE3:
m = "could not initialize sqlite3, will use in-memory registry only"
self.log("up2k", m)
def register_vpath(self, ptop):
with self.mutex:
if ptop in self.registry:
return None
self.registry[ptop] = {}
if not self.persist or not HAVE_SQLITE3:
return None
try:
os.mkdir(os.path.join(ptop, ".hist"))
except:
pass
db_path = os.path.join(ptop, ".hist", "up2k.db")
if ptop in self.db:
# self.db[ptop].close()
return None
try:
db = self._open_db(db_path)
self.db[ptop] = db
return db
except Exception as ex:
m = "failed to open [{}]: {}".format(ptop, repr(ex))
self.log("up2k", m)
return None
def build_indexes(self, writeables):
tops = [d.realpath for d in writeables]
for top in tops:
db = self.register_vpath(top)
if db:
# can be symlink so don't `and d.startswith(top)``
excl = set([d for d in tops if d != top])
self._build_dir([db, 0], top, excl, top)
self._drop_lost(db, top)
db.commit()
def _build_dir(self, dbw, top, excl, cdir):
histdir = os.path.join(top, ".hist")
for inode in [fsdec(x) for x in os.listdir(fsenc(cdir))]:
abspath = os.path.join(cdir, inode)
inf = os.stat(fsenc(abspath))
if stat.S_ISDIR(inf.st_mode):
if abspath in excl or abspath == histdir:
continue
self.log("up2k", "dir: {}".format(abspath))
self._build_dir(dbw, top, excl, abspath)
else:
# self.log("up2k", "file: {}".format(abspath))
rp = abspath[len(top) :].replace("\\", "/").strip("/")
c = dbw[0].execute("select * from up where rp = ?", (rp,))
in_db = list(c.fetchall())
if in_db:
_, dts, dsz, _ = in_db[0]
if len(in_db) > 1:
m = "WARN: multiple entries: [{}] => [{}] ({})"
self.log("up2k", m.format(top, rp, len(in_db)))
dts = -1
if dts == inf.st_mtime and dsz == inf.st_size:
continue
m = "reindex [{}] => [{}] ({}/{}) ({}/{})".format(
top, rp, dts, inf.st_mtime, dsz, inf.st_size
)
self.log("up2k", m)
self.db_rm(dbw[0], rp)
dbw[1] += 1
in_db = None
self.log("up2k", "file: {}".format(abspath))
hashes = self._hashlist_from_file(abspath)
wark = self._wark_from_hashlist(inf.st_size, hashes)
self.db_add(dbw[0], wark, rp, inf.st_mtime, inf.st_size)
dbw[1] += 1
if dbw[1] > 1024:
dbw[0].commit()
dbw[1] = 0
def _drop_lost(self, db, top):
rm = []
c = db.execute("select * from up")
for dwark, dts, dsz, drp in c:
abspath = os.path.join(top, drp)
if not os.path.exists(abspath):
rm.append(drp)
if not rm:
return
self.log("up2k", "forgetting {} deleted files".format(len(rm)))
for rp in rm:
self.db_rm(db, rp)
def _open_db(self, db_path):
conn = sqlite3.connect(db_path, check_same_thread=False)
try:
c = conn.execute(r"select * from kv where k = 'sver'")
rows = c.fetchall()
if rows:
ver = rows[0][1]
else:
self.log("up2k", "WARN: no sver in kv, DB corrupt?")
ver = "unknown"
if ver == "1":
try:
nfiles = next(conn.execute("select count(w) from up"))[0]
self.log("up2k", "found DB at {} |{}|".format(db_path, nfiles))
return conn
except Exception as ex:
m = "WARN: could not list files, DB corrupt?\n " + repr(ex)
self.log("up2k", m)
m = "REPLACING unsupported DB (v.{}) at {}".format(ver, db_path)
self.log("up2k", m)
conn.close()
os.unlink(db_path)
conn = sqlite3.connect(db_path, check_same_thread=False)
except:
pass
# sqlite is variable-width only, no point in using char/nchar/varchar
for cmd in [
r"create table kv (k text, v text)",
r"create table up (w text, mt int, sz int, rp text)",
r"insert into kv values ('sver', '1')",
r"create index up_w on up(w)",
]:
conn.execute(cmd)
conn.commit()
self.log("up2k", "created DB at {}".format(db_path))
return conn
def handle_json(self, cj): def handle_json(self, cj):
self.register_vpath(cj["ptop"])
cj["name"] = sanitize_fn(cj["name"]) cj["name"] = sanitize_fn(cj["name"])
wark = self._get_wark(cj) wark = self._get_wark(cj)
now = time.time() now = time.time()
job = None
with self.mutex: with self.mutex:
# TODO use registry persistence here to symlink any matching wark db = self.db.get(cj["ptop"], None)
if wark in self.registry: reg = self.registry[cj["ptop"]]
job = self.registry[wark] if wark not in reg and db:
if job["rdir"] != cj["rdir"] or job["name"] != cj["name"]: cur = db.execute(r"select * from up where w = ?", (wark,))
src = os.path.join(job["rdir"], job["name"]) for _, dtime, dsize, dp_rel in cur:
dst = os.path.join(cj["rdir"], cj["name"]) dp_abs = os.path.join(cj["ptop"], dp_rel).replace("\\", "/")
# relying on path.exists to return false on broken symlinks
if os.path.exists(dp_abs):
try:
prel, name = dp_rel.rsplit("/", 1)
except:
prel = ""
name = dp_rel
job = {
"name": name,
"prel": prel,
"vtop": cj["vtop"],
"ptop": cj["ptop"],
"flag": cj["flag"],
"size": dsize,
"lmod": dtime,
"hash": [],
"need": [],
}
break
if job or wark in reg:
job = job or reg[wark]
if job["prel"] != cj["prel"] or job["name"] != cj["name"]:
src = os.path.join(job["ptop"], job["prel"], job["name"])
dst = os.path.join(cj["ptop"], cj["prel"], cj["name"])
vsrc = os.path.join(job["vtop"], job["prel"], job["name"])
vsrc = vsrc.replace("\\", "/") # just for prints anyways
if job["need"]: if job["need"]:
self.log("up2k", "unfinished:\n {0}\n {1}".format(src, dst)) self.log("up2k", "unfinished:\n {0}\n {1}".format(src, dst))
err = "partial upload exists at a different location; please resume uploading here instead:\n{0}{1} ".format( err = "partial upload exists at a different location; please resume uploading here instead:\n"
job["vdir"], job["name"] err += vsrc + " "
)
raise Pebkac(400, err) raise Pebkac(400, err)
elif "nodupe" in job["flag"]: elif "nodupe" in job["flag"]:
self.log("up2k", "dupe-reject:\n {0}\n {1}".format(src, dst)) self.log("up2k", "dupe-reject:\n {0}\n {1}".format(src, dst))
err = "upload rejected, file already exists:\n{0}{1} ".format( err = "upload rejected, file already exists:\n " + vsrc + " "
job["vdir"], job["name"]
)
raise Pebkac(400, err) raise Pebkac(400, err)
else: else:
# symlink to the client-provided name, # symlink to the client-provided name,
# returning the previous upload info # returning the previous upload info
job = deepcopy(job) job = deepcopy(job)
job["rdir"] = cj["rdir"] for k in ["ptop", "vtop", "prel"]:
job["name"] = self._untaken(cj["rdir"], cj["name"], now, cj["addr"]) job[k] = cj[k]
dst = os.path.join(job["rdir"], job["name"])
pdir = os.path.join(cj["ptop"], cj["prel"])
job["name"] = self._untaken(pdir, cj["name"], now, cj["addr"])
dst = os.path.join(job["ptop"], job["prel"], job["name"])
os.unlink(fsenc(dst)) # TODO ed pls os.unlink(fsenc(dst)) # TODO ed pls
self._symlink(src, dst) self._symlink(src, dst)
else:
if not job:
job = { job = {
"wark": wark, "wark": wark,
"t0": now, "t0": now,
"addr": cj["addr"],
"vdir": cj["vdir"],
"rdir": cj["rdir"],
"flag": cj["flag"],
# client-provided, sanitized by _get_wark:
"name": cj["name"],
"size": cj["size"],
"lmod": cj["lmod"],
"hash": deepcopy(cj["hash"]), "hash": deepcopy(cj["hash"]),
"need": [],
} }
# client-provided, sanitized by _get_wark: name, size, lmod
for k in [
"addr",
"vtop",
"ptop",
"prel",
"flag",
"name",
"size",
"lmod",
]:
job[k] = cj[k]
# one chunk may occur multiple times in a file; # one chunk may occur multiple times in a file;
# filter to unique values for the list of missing chunks # filter to unique values for the list of missing chunks
# (preserve order to reduce disk thrashing) # (preserve order to reduce disk thrashing)
job["need"] = []
lut = {} lut = {}
for k in cj["hash"]: for k in cj["hash"]:
if k not in lut: if k not in lut:
@ -149,36 +336,47 @@ class Up2k(object):
self.log("up2k", "cannot symlink; creating copy: " + repr(ex)) self.log("up2k", "cannot symlink; creating copy: " + repr(ex))
shutil.copy2(fsenc(src), fsenc(dst)) shutil.copy2(fsenc(src), fsenc(dst))
def handle_chunk(self, wark, chash): def handle_chunk(self, ptop, wark, chash):
with self.mutex: with self.mutex:
job = self.registry.get(wark) job = self.registry[ptop].get(wark, None)
if not job: if not job:
raise Pebkac(404, "unknown wark") raise Pebkac(400, "unknown wark")
if chash not in job["need"]: if chash not in job["need"]:
raise Pebkac(200, "already got that but thanks??") raise Pebkac(200, "already got that but thanks??")
nchunk = [n for n, v in enumerate(job["hash"]) if v == chash] nchunk = [n for n, v in enumerate(job["hash"]) if v == chash]
if not nchunk: if not nchunk:
raise Pebkac(404, "unknown chunk") raise Pebkac(400, "unknown chunk")
chunksize = self._get_chunksize(job["size"]) chunksize = self._get_chunksize(job["size"])
ofs = [chunksize * x for x in nchunk] ofs = [chunksize * x for x in nchunk]
path = os.path.join(job["rdir"], job["name"]) path = os.path.join(job["ptop"], job["prel"], job["name"])
return [chunksize, ofs, path, job["lmod"]] return [chunksize, ofs, path, job["lmod"]]
def confirm_chunk(self, wark, chash): def confirm_chunk(self, ptop, wark, chash):
with self.mutex: with self.mutex:
job = self.registry[wark] job = self.registry[ptop][wark]
job["need"].remove(chash) job["need"].remove(chash)
ret = len(job["need"]) ret = len(job["need"])
if ret > 0:
return ret
if WINDOWS and ret == 0: if WINDOWS:
path = os.path.join(job["rdir"], job["name"]) path = os.path.join(job["ptop"], job["prel"], job["name"])
self.lastmod_q.put([path, (int(time.time()), int(job["lmod"]))]) self.lastmod_q.put([path, (int(time.time()), int(job["lmod"]))])
db = self.db.get(job["ptop"], None)
if db:
rp = os.path.join(job["prel"], job["name"]).replace("\\", "/")
self.db_rm(db, rp)
self.db_add(db, job["wark"], rp, job["lmod"], job["size"])
db.commit()
del self.registry[ptop][wark]
# in-memory registry is reserved for unfinished uploads
return ret return ret
def _get_chunksize(self, filesize): def _get_chunksize(self, filesize):
@ -193,6 +391,14 @@ class Up2k(object):
chunksize += stepsize chunksize += stepsize
stepsize *= mul stepsize *= mul
def db_rm(self, db, rp):
db.execute("delete from up where rp = ?", (rp,))
def db_add(self, db, wark, rp, ts, sz):
db.execute(
"insert into up values (?,?,?,?)", (wark, ts, sz, rp,),
)
def _get_wark(self, cj): def _get_wark(self, cj):
if len(cj["name"]) > 1024 or len(cj["hash"]) > 512 * 1024: # 16TiB if len(cj["name"]) > 1024 or len(cj["hash"]) > 512 * 1024: # 16TiB
raise Pebkac(400, "name or numchunks not according to spec") raise Pebkac(400, "name or numchunks not according to spec")
@ -209,9 +415,13 @@ class Up2k(object):
except: except:
cj["lmod"] = int(time.time()) cj["lmod"] = int(time.time())
# server-reproducible file identifier, independent of name or location wark = self._wark_from_hashlist(cj["size"], cj["hash"])
ident = [self.salt, str(cj["size"])] return wark
ident.extend(cj["hash"])
def _wark_from_hashlist(self, filesize, hashes):
""" server-reproducible file identifier, independent of name or location """
ident = [self.salt, str(filesize)]
ident.extend(hashes)
ident = "\n".join(ident) ident = "\n".join(ident)
hasher = hashlib.sha512() hasher = hashlib.sha512()
@ -221,10 +431,34 @@ class Up2k(object):
wark = base64.urlsafe_b64encode(digest) wark = base64.urlsafe_b64encode(digest)
return wark.decode("utf-8").rstrip("=") return wark.decode("utf-8").rstrip("=")
def _hashlist_from_file(self, path):
fsz = os.path.getsize(path)
csz = self._get_chunksize(fsz)
ret = []
with open(path, "rb", 512 * 1024) as f:
while fsz > 0:
hashobj = hashlib.sha512()
rem = min(csz, fsz)
fsz -= rem
while rem > 0:
buf = f.read(min(rem, 64 * 1024))
if not buf:
raise Exception("EOF at " + str(f.tell()))
hashobj.update(buf)
rem -= len(buf)
digest = hashobj.digest()[:32]
digest = base64.urlsafe_b64encode(digest)
ret.append(digest.decode("utf-8").rstrip("="))
return ret
def _new_upload(self, job): def _new_upload(self, job):
self.registry[job["wark"]] = job self.registry[job["ptop"]][job["wark"]] = job
suffix = ".{:.6f}-{}".format(job["t0"], job["addr"]) suffix = ".{:.6f}-{}".format(job["t0"], job["addr"])
with ren_open(job["name"], "wb", fdir=job["rdir"], suffix=suffix) as f: pdir = os.path.join(job["ptop"], job["prel"])
with ren_open(job["name"], "wb", fdir=pdir, suffix=suffix) as f:
f, job["name"] = f["orz"] f, job["name"] = f["orz"]
f.seek(job["size"] - 1) f.seek(job["size"] - 1)
f.write(b"e") f.write(b"e")

View file

@ -712,3 +712,6 @@ class Pebkac(Exception):
def __init__(self, code, msg=None): def __init__(self, code, msg=None):
super(Pebkac, self).__init__(msg or HTTPCODE[code]) super(Pebkac, self).__init__(msg or HTTPCODE[code])
self.code = code self.code = code
def __repr__(self):
return "Pebkac({}, {})".format(self.code, repr(self.args))

View file

@ -672,7 +672,10 @@ function up2k_init(have_crypto) {
var rsp = (xhr.responseText + ''); var rsp = (xhr.responseText + '');
if (rsp.indexOf('partial upload exists') !== -1 || if (rsp.indexOf('partial upload exists') !== -1 ||
rsp.indexOf('file already exists') !== -1) { rsp.indexOf('file already exists') !== -1) {
err = rsp.slice(5); err = rsp;
var ofs = err.lastIndexOf(' : ');
if (ofs > 0)
err = err.slice(0, ofs);
} }
if (err != "") { if (err != "") {
ebi('f{0}t'.format(t.n)).innerHTML = "ERROR"; ebi('f{0}t'.format(t.n)).innerHTML = "ERROR";