add option to entirely disable dedup

global-option `--no-clone` / volflag `noclone` entirely disables
serverside deduplication; clients will then fully upload dupe files

can be useful when `--safe-dedup=1` is not an option due to other
software tampering with the on-disk files, and your filesystem has
prohibitively slow or expensive reads
This commit is contained in:
ed 2024-10-08 21:27:19 +00:00
parent eaee1f2cab
commit 3d7facd774
6 changed files with 58 additions and 27 deletions

View file

@ -1167,7 +1167,11 @@ if you want to entirely replace the copyparty response with your own jinja2 temp
enable symlink-based upload deduplication globally with `--dedup` or per-volume with volflag `dedup` enable symlink-based upload deduplication globally with `--dedup` or per-volume with volflag `dedup`
when someone tries to upload a file that already exists on the server, the upload will be politely declined and a symlink is created instead, pointing to the nearest copy on disk, thus reducinc disk space usage by default, when someone tries to upload a file that already exists on the server, the upload will be politely declined, and the server will copy the existing file over to where the upload would have gone
if you enable deduplication with `--dedup` then it'll create a symlink instead of a full copy, thus reducing disk space usage
* on the contrary, if your server is hooked up to s3-glacier or similar storage where reading is expensive, and you cannot use `--safe-dedup=1` because you have other software tampering with your files, so you want to entirely disable detection of duplicate data instead, then you can specify `--no-clone` globally or `noclone` as a volflag
**warning:** when enabling dedup, you should also: **warning:** when enabling dedup, you should also:
* enable indexing with `-e2dsa` or volflag `e2dsa` (see [file indexing](#file-indexing) section below); strongly recommended * enable indexing with `-e2dsa` or volflag `e2dsa` (see [file indexing](#file-indexing) section below); strongly recommended

View file

@ -1005,6 +1005,7 @@ def add_upload(ap):
ap2.add_argument("--hardlink", action="store_true", help="enable hardlink-based dedup; will fallback on symlinks when that is impossible (across filesystems) (volflag=hardlink)") ap2.add_argument("--hardlink", action="store_true", help="enable hardlink-based dedup; will fallback on symlinks when that is impossible (across filesystems) (volflag=hardlink)")
ap2.add_argument("--hardlink-only", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made (volflag=hardlinkonly)") ap2.add_argument("--hardlink-only", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made (volflag=hardlinkonly)")
ap2.add_argument("--no-dupe", action="store_true", help="reject duplicate files during upload; only matches within the same volume (volflag=nodupe)") ap2.add_argument("--no-dupe", action="store_true", help="reject duplicate files during upload; only matches within the same volume (volflag=nodupe)")
ap2.add_argument("--no-clone", action="store_true", help="do not use existing data on disk to satisfy dupe uploads; reduces server HDD reads in exchange for much more network load (volflag=noclone)")
ap2.add_argument("--no-snap", action="store_true", help="disable snapshots -- forget unfinished uploads on shutdown; don't create .hist/up2k.snap files -- abandoned/interrupted uploads must be cleaned up manually") ap2.add_argument("--no-snap", action="store_true", help="disable snapshots -- forget unfinished uploads on shutdown; don't create .hist/up2k.snap files -- abandoned/interrupted uploads must be cleaned up manually")
ap2.add_argument("--snap-wri", metavar="SEC", type=int, default=300, help="write upload state to ./hist/up2k.snap every \033[33mSEC\033[0m seconds; allows resuming incomplete uploads after a server crash") ap2.add_argument("--snap-wri", metavar="SEC", type=int, default=300, help="write upload state to ./hist/up2k.snap every \033[33mSEC\033[0m seconds; allows resuming incomplete uploads after a server crash")
ap2.add_argument("--snap-drop", metavar="MIN", type=float, default=1440.0, help="forget unfinished uploads after \033[33mMIN\033[0m minutes; impossible to resume them after that (360=6h, 1440=24h)") ap2.add_argument("--snap-drop", metavar="MIN", type=float, default=1440.0, help="forget unfinished uploads after \033[33mMIN\033[0m minutes; impossible to resume them after that (360=6h, 1440=24h)")

View file

@ -13,6 +13,7 @@ def vf_bmap() -> dict[str, str]:
"dav_rt": "davrt", "dav_rt": "davrt",
"ed": "dots", "ed": "dots",
"hardlink_only": "hardlinkonly", "hardlink_only": "hardlinkonly",
"no_clone": "noclone",
"no_dirsz": "nodirsz", "no_dirsz": "nodirsz",
"no_dupe": "nodupe", "no_dupe": "nodupe",
"no_forget": "noforget", "no_forget": "noforget",
@ -135,7 +136,8 @@ flagcats = {
"hardlink": "enable hardlink-based file deduplication,\nwith fallback on symlinks when that is impossible", "hardlink": "enable hardlink-based file deduplication,\nwith fallback on symlinks when that is impossible",
"hardlinkonly": "dedup with hardlink only, never symlink;\nmake a full copy if hardlink is impossible", "hardlinkonly": "dedup with hardlink only, never symlink;\nmake a full copy if hardlink is impossible",
"safededup": "verify on-disk data before using it for dedup", "safededup": "verify on-disk data before using it for dedup",
"nodupe": "rejects existing files (instead of symlinking them)", "noclone": "take dupe data from clients, even if available on HDD",
"nodupe": "rejects existing files (instead of linking/cloning them)",
"sparse": "force use of sparse files, mainly for s3-backed storage", "sparse": "force use of sparse files, mainly for s3-backed storage",
"daw": "enable full WebDAV write support (dangerous);\nPUT-operations will now \033[1;31mOVERWRITE\033[0;35m existing files", "daw": "enable full WebDAV write support (dangerous);\nPUT-operations will now \033[1;31mOVERWRITE\033[0;35m existing files",
"nosub": "forces all uploads into the top folder of the vfs", "nosub": "forces all uploads into the top folder of the vfs",

View file

@ -2156,11 +2156,17 @@ class HttpCli(object):
except UnrecvEOF: except UnrecvEOF:
raise Pebkac(422, "client disconnected while posting JSON") raise Pebkac(422, "client disconnected while posting JSON")
self.log("decoding {} bytes of {} json".format(len(json_buf), enc))
try: try:
body = json.loads(json_buf.decode(enc, "replace")) body = json.loads(json_buf.decode(enc, "replace"))
try:
zds = {k: v for k, v in body.items()}
zds["hash"] = "%d chunks" % (len(body["hash"]))
except:
zds = body
t = "POST len=%d type=%s ip=%s user=%s req=%r json=%s"
self.log(t % (len(json_buf), enc, self.ip, self.uname, self.req, zds))
except: except:
raise Pebkac(422, "you POSTed invalid json") raise Pebkac(422, "you POSTed %d bytes of invalid json" % (len(json_buf),))
# self.reply(b"cloudflare", 503) # self.reply(b"cloudflare", 503)
# return True # return True

View file

@ -1545,7 +1545,7 @@ class Up2k(object):
at = 0 at = 0
# skip upload hooks by not providing vflags # skip upload hooks by not providing vflags
self.db_add(db.c, {}, rd, fn, lmod, sz, "", "", wark, "", "", ip, at) self.db_add(db.c, {}, rd, fn, lmod, sz, "", "", wark, wark, "", "", ip, at)
db.n += 1 db.n += 1
tfa += 1 tfa += 1
td = time.time() - db.t td = time.time() - db.t
@ -2779,9 +2779,10 @@ class Up2k(object):
cj["name"] = sanitize_fn(cj["name"], "") cj["name"] = sanitize_fn(cj["name"], "")
cj["poke"] = now = self.db_act = self.vol_act[ptop] = time.time() cj["poke"] = now = self.db_act = self.vol_act[ptop] = time.time()
wark = self._get_wark(cj) wark = dwark = self._get_wark(cj)
job = None job = None
pdir = djoin(ptop, cj["prel"]) pdir = djoin(ptop, cj["prel"])
inc_ap = djoin(pdir, cj["name"])
try: try:
dev = bos.stat(pdir).st_dev dev = bos.stat(pdir).st_dev
except: except:
@ -2796,6 +2797,7 @@ class Up2k(object):
reg = self.registry[ptop] reg = self.registry[ptop]
vfs = self.asrv.vfs.all_vols[cj["vtop"]] vfs = self.asrv.vfs.all_vols[cj["vtop"]]
n4g = bool(vfs.flags.get("noforget")) n4g = bool(vfs.flags.get("noforget"))
noclone = bool(vfs.flags.get("noclone"))
rand = vfs.flags.get("rand") or cj.get("rand") rand = vfs.flags.get("rand") or cj.get("rand")
lost: list[tuple["sqlite3.Cursor", str, str]] = [] lost: list[tuple["sqlite3.Cursor", str, str]] = []
@ -2805,6 +2807,12 @@ class Up2k(object):
vols = [(ptop, jcur)] if jcur else [] vols = [(ptop, jcur)] if jcur else []
if vfs.flags.get("xlink"): if vfs.flags.get("xlink"):
vols += [(k, v) for k, v in self.cur.items() if k != ptop] vols += [(k, v) for k, v in self.cur.items() if k != ptop]
if noclone:
wark = up2k_wark_from_metadata(
self.salt, cj["size"], cj["lmod"], cj["prel"], cj["name"]
)
if vfs.flags.get("up_ts", "") == "fu" or not cj["lmod"]: if vfs.flags.get("up_ts", "") == "fu" or not cj["lmod"]:
# force upload time rather than last-modified # force upload time rather than last-modified
cj["lmod"] = int(time.time()) cj["lmod"] = int(time.time())
@ -2817,10 +2825,10 @@ class Up2k(object):
if self.no_expr_idx: if self.no_expr_idx:
q = r"select * from up where w = ?" q = r"select * from up where w = ?"
argv = [wark] argv = [dwark]
else: else:
q = r"select * from up where substr(w,1,16)=? and +w=?" q = r"select * from up where substr(w,1,16)=? and +w=?"
argv = [wark[:16], wark] argv = [dwark[:16], dwark]
c2 = cur.execute(q, tuple(argv)) c2 = cur.execute(q, tuple(argv))
for _, dtime, dsize, dp_dir, dp_fn, ip, at in c2: for _, dtime, dsize, dp_dir, dp_fn, ip, at in c2:
@ -2828,6 +2836,9 @@ class Up2k(object):
dp_dir, dp_fn = s3dec(dp_dir, dp_fn) dp_dir, dp_fn = s3dec(dp_dir, dp_fn)
dp_abs = djoin(ptop, dp_dir, dp_fn) dp_abs = djoin(ptop, dp_dir, dp_fn)
if noclone and dp_abs != inc_ap:
continue
try: try:
st = bos.stat(dp_abs) st = bos.stat(dp_abs)
if stat.S_ISLNK(st.st_mode): if stat.S_ISLNK(st.st_mode):
@ -2836,7 +2847,7 @@ class Up2k(object):
if st.st_size != dsize: if st.st_size != dsize:
t = "candidate ignored (db/fs desync): {}, size fs={} db={}, mtime fs={} db={}, file: {}" t = "candidate ignored (db/fs desync): {}, size fs={} db={}, mtime fs={} db={}, file: {}"
t = t.format( t = t.format(
wark, st.st_size, dsize, st.st_mtime, dtime, dp_abs dwark, st.st_size, dsize, st.st_mtime, dtime, dp_abs
) )
self.log(t) self.log(t)
raise Exception() raise Exception()
@ -2883,7 +2894,6 @@ class Up2k(object):
alts.append((score, -len(alts), j, cur, dp_dir, dp_fn)) alts.append((score, -len(alts), j, cur, dp_dir, dp_fn))
job = None job = None
inc_ap = djoin(cj["ptop"], cj["prel"], cj["name"])
for dupe in sorted(alts, reverse=True): for dupe in sorted(alts, reverse=True):
rj = dupe[2] rj = dupe[2]
orig_ap = djoin(rj["ptop"], rj["prel"], rj["name"]) orig_ap = djoin(rj["ptop"], rj["prel"], rj["name"])
@ -2893,11 +2903,11 @@ class Up2k(object):
break break
else: else:
self.log("asserting contents of %s" % (orig_ap,)) self.log("asserting contents of %s" % (orig_ap,))
dhashes, st = self._hashlist_from_file(orig_ap) hashes2, st = self._hashlist_from_file(orig_ap)
dwark = up2k_wark_from_hashlist(self.salt, st.st_size, dhashes) wark2 = up2k_wark_from_hashlist(self.salt, st.st_size, hashes2)
if wark != dwark: if dwark != wark2:
t = "will not dedup (fs index desync): fs=%s, db=%s, file: %s" t = "will not dedup (fs index desync): fs=%s, db=%s, file: %s"
self.log(t % (dwark, wark, orig_ap)) self.log(t % (wark2, dwark, orig_ap))
lost.append(dupe[3:]) lost.append(dupe[3:])
continue continue
data_ok = True data_ok = True
@ -2962,11 +2972,11 @@ class Up2k(object):
elif inc_ap != orig_ap and not data_ok and "done" in reg[wark]: elif inc_ap != orig_ap and not data_ok and "done" in reg[wark]:
self.log("asserting contents of %s" % (orig_ap,)) self.log("asserting contents of %s" % (orig_ap,))
dhashes, _ = self._hashlist_from_file(orig_ap) hashes2, _ = self._hashlist_from_file(orig_ap)
dwark = up2k_wark_from_hashlist(self.salt, st.st_size, dhashes) wark2 = up2k_wark_from_hashlist(self.salt, st.st_size, hashes2)
if wark != dwark: if wark != wark2:
t = "will not dedup (fs index desync): fs=%s, idx=%s, file: %s" t = "will not dedup (fs index desync): fs=%s, idx=%s, file: %s"
self.log(t % (dwark, wark, orig_ap)) self.log(t % (wark2, wark, orig_ap))
del reg[wark] del reg[wark]
if job or wark in reg: if job or wark in reg:
@ -3023,6 +3033,7 @@ class Up2k(object):
job = deepcopy(job) job = deepcopy(job)
job["wark"] = wark job["wark"] = wark
job["dwrk"] = dwark
job["at"] = cj.get("at") or now job["at"] = cj.get("at") or now
zs = "vtop ptop prel name lmod host user addr poke" zs = "vtop ptop prel name lmod host user addr poke"
for k in zs.split(): for k in zs.split():
@ -3093,7 +3104,7 @@ class Up2k(object):
raise raise
if cur and not self.args.nw: if cur and not self.args.nw:
zs = "prel name lmod size ptop vtop wark host user addr at" zs = "prel name lmod size ptop vtop wark dwrk host user addr at"
a = [job[x] for x in zs.split()] a = [job[x] for x in zs.split()]
self.db_add(cur, vfs.flags, *a) self.db_add(cur, vfs.flags, *a)
cur.connection.commit() cur.connection.commit()
@ -3123,6 +3134,7 @@ class Up2k(object):
job = { job = {
"wark": wark, "wark": wark,
"dwrk": dwark,
"t0": now, "t0": now,
"sprs": sprs, "sprs": sprs,
"hash": deepcopy(cj["hash"]), "hash": deepcopy(cj["hash"]),
@ -3165,6 +3177,7 @@ class Up2k(object):
"lmod": job["lmod"], "lmod": job["lmod"],
"sprs": job.get("sprs", sprs), "sprs": job.get("sprs", sprs),
"hash": job["need"], "hash": job["need"],
"dwrk": dwark,
"wark": wark, "wark": wark,
} }
@ -3191,7 +3204,7 @@ class Up2k(object):
): ):
sql = "update up set mt=? where substr(w,1,16)=? and +rd=? and +fn=?" sql = "update up set mt=? where substr(w,1,16)=? and +rd=? and +fn=?"
try: try:
cur.execute(sql, (cj["lmod"], wark[:16], job["prel"], job["name"])) cur.execute(sql, (cj["lmod"], dwark[:16], job["prel"], job["name"]))
cur.connection.commit() cur.connection.commit()
ap = djoin(job["ptop"], job["prel"], job["name"]) ap = djoin(job["ptop"], job["prel"], job["name"])
@ -3513,7 +3526,7 @@ class Up2k(object):
except: except:
self.log("failed to utime ({}, {})".format(dst, times)) self.log("failed to utime ({}, {})".format(dst, times))
zs = "prel name lmod size ptop vtop wark host user addr" zs = "prel name lmod size ptop vtop wark dwrk host user addr"
z2 = [job[x] for x in zs.split()] z2 = [job[x] for x in zs.split()]
wake_sr = False wake_sr = False
try: try:
@ -3586,6 +3599,7 @@ class Up2k(object):
ptop: str, ptop: str,
vtop: str, vtop: str,
wark: str, wark: str,
dwark: str,
host: str, host: str,
usr: str, usr: str,
ip: str, ip: str,
@ -3608,6 +3622,7 @@ class Up2k(object):
ptop, ptop,
vtop, vtop,
wark, wark,
dwark,
host, host,
usr, usr,
ip, ip,
@ -3622,7 +3637,7 @@ class Up2k(object):
raise raise
if "e2t" in self.flags[ptop]: if "e2t" in self.flags[ptop]:
self.tagq.put((ptop, wark, rd, fn, sz, ip, at)) self.tagq.put((ptop, dwark, rd, fn, sz, ip, at))
self.n_tagq += 1 self.n_tagq += 1
return True return True
@ -3650,6 +3665,7 @@ class Up2k(object):
ptop: str, ptop: str,
vtop: str, vtop: str,
wark: str, wark: str,
dwark: str,
host: str, host: str,
usr: str, usr: str,
ip: str, ip: str,
@ -3666,13 +3682,13 @@ class Up2k(object):
db_ip = "1.1.1.1" if self.args.no_db_ip else ip db_ip = "1.1.1.1" if self.args.no_db_ip else ip
sql = "insert into up values (?,?,?,?,?,?,?)" sql = "insert into up values (?,?,?,?,?,?,?)"
v = (wark, int(ts), sz, rd, fn, db_ip, int(at or 0)) v = (dwark, int(ts), sz, rd, fn, db_ip, int(at or 0))
try: try:
db.execute(sql, v) db.execute(sql, v)
except: except:
assert self.mem_cur # !rm assert self.mem_cur # !rm
rd, fn = s3enc(self.mem_cur, rd, fn) rd, fn = s3enc(self.mem_cur, rd, fn)
v = (wark, int(ts), sz, rd, fn, db_ip, int(at or 0)) v = (dwark, int(ts), sz, rd, fn, db_ip, int(at or 0))
db.execute(sql, v) db.execute(sql, v)
self.volsize[db] += sz self.volsize[db] += sz
@ -3716,11 +3732,11 @@ class Up2k(object):
for cd in cds: for cd in cds:
# one for each unique cooldown duration # one for each unique cooldown duration
try: try:
db.execute(q, (cd, wark[:16], rd, fn)) db.execute(q, (cd, dwark[:16], rd, fn))
except: except:
assert self.mem_cur # !rm assert self.mem_cur # !rm
rd, fn = s3enc(self.mem_cur, rd, fn) rd, fn = s3enc(self.mem_cur, rd, fn)
db.execute(q, (cd, wark[:16], rd, fn)) db.execute(q, (cd, dwark[:16], rd, fn))
if self.xiu_asleep: if self.xiu_asleep:
self.xiu_asleep = False self.xiu_asleep = False
@ -4175,6 +4191,7 @@ class Up2k(object):
dvn.realpath, dvn.realpath,
dvn.vpath, dvn.vpath,
w, w,
w,
"", "",
"", "",
ip or "", ip or "",
@ -4857,6 +4874,7 @@ class Up2k(object):
ptop, ptop,
vtop, vtop,
wark, wark,
wark,
"", "",
usr, usr,
ip, ip,

View file

@ -122,7 +122,7 @@ class Cfg(Namespace):
def __init__(self, a=None, v=None, c=None, **ka0): def __init__(self, a=None, v=None, c=None, **ka0):
ka = {} ka = {}
ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic hardlink_only nid nih no_acode no_athumb no_dav no_db_ip no_del no_dirsz no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand re_dirsz smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol zs" ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic hardlink_only nid nih no_acode no_athumb no_clone no_dav no_db_ip no_del no_dirsz no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand re_dirsz smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol zs"
ka.update(**{k: False for k in ex.split()}) ka.update(**{k: False for k in ex.split()})
ex = "dedup dotpart dotsrch hook_v no_dhash no_fastboot no_fpool no_htp no_rescan no_sendfile no_ses no_snap no_up_list no_voldump re_dhash plain_ip" ex = "dedup dotpart dotsrch hook_v no_dhash no_fastboot no_fpool no_htp no_rescan no_sendfile no_ses no_snap no_up_list no_voldump re_dhash plain_ip"