diff --git a/README.md b/README.md index d4885ece..5e30b251 100644 --- a/README.md +++ b/README.md @@ -1167,7 +1167,11 @@ if you want to entirely replace the copyparty response with your own jinja2 temp enable symlink-based upload deduplication globally with `--dedup` or per-volume with volflag `dedup` -when someone tries to upload a file that already exists on the server, the upload will be politely declined and a symlink is created instead, pointing to the nearest copy on disk, thus reducinc disk space usage +by default, when someone tries to upload a file that already exists on the server, the upload will be politely declined, and the server will copy the existing file over to where the upload would have gone + +if you enable deduplication with `--dedup` then it'll create a symlink instead of a full copy, thus reducing disk space usage + +* on the contrary, if your server is hooked up to s3-glacier or similar storage where reading is expensive, and you cannot use `--safe-dedup=1` because you have other software tampering with your files, so you want to entirely disable detection of duplicate data instead, then you can specify `--no-clone` globally or `noclone` as a volflag **warning:** when enabling dedup, you should also: * enable indexing with `-e2dsa` or volflag `e2dsa` (see [file indexing](#file-indexing) section below); strongly recommended diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 72e3ccfe..8f2360de 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -1005,6 +1005,7 @@ def add_upload(ap): ap2.add_argument("--hardlink", action="store_true", help="enable hardlink-based dedup; will fallback on symlinks when that is impossible (across filesystems) (volflag=hardlink)") ap2.add_argument("--hardlink-only", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made (volflag=hardlinkonly)") ap2.add_argument("--no-dupe", action="store_true", help="reject duplicate files during upload; only matches within the same volume (volflag=nodupe)") + ap2.add_argument("--no-clone", action="store_true", help="do not use existing data on disk to satisfy dupe uploads; reduces server HDD reads in exchange for much more network load (volflag=noclone)") ap2.add_argument("--no-snap", action="store_true", help="disable snapshots -- forget unfinished uploads on shutdown; don't create .hist/up2k.snap files -- abandoned/interrupted uploads must be cleaned up manually") ap2.add_argument("--snap-wri", metavar="SEC", type=int, default=300, help="write upload state to ./hist/up2k.snap every \033[33mSEC\033[0m seconds; allows resuming incomplete uploads after a server crash") ap2.add_argument("--snap-drop", metavar="MIN", type=float, default=1440.0, help="forget unfinished uploads after \033[33mMIN\033[0m minutes; impossible to resume them after that (360=6h, 1440=24h)") diff --git a/copyparty/cfg.py b/copyparty/cfg.py index 4c584dc5..ebb8cc9c 100644 --- a/copyparty/cfg.py +++ b/copyparty/cfg.py @@ -13,6 +13,7 @@ def vf_bmap() -> dict[str, str]: "dav_rt": "davrt", "ed": "dots", "hardlink_only": "hardlinkonly", + "no_clone": "noclone", "no_dirsz": "nodirsz", "no_dupe": "nodupe", "no_forget": "noforget", @@ -135,7 +136,8 @@ flagcats = { "hardlink": "enable hardlink-based file deduplication,\nwith fallback on symlinks when that is impossible", "hardlinkonly": "dedup with hardlink only, never symlink;\nmake a full copy if hardlink is impossible", "safededup": "verify on-disk data before using it for dedup", - "nodupe": "rejects existing files (instead of symlinking them)", + "noclone": "take dupe data from clients, even if available on HDD", + "nodupe": "rejects existing files (instead of linking/cloning them)", "sparse": "force use of sparse files, mainly for s3-backed storage", "daw": "enable full WebDAV write support (dangerous);\nPUT-operations will now \033[1;31mOVERWRITE\033[0;35m existing files", "nosub": "forces all uploads into the top folder of the vfs", diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 1df0be1e..20ea0e01 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -2156,11 +2156,17 @@ class HttpCli(object): except UnrecvEOF: raise Pebkac(422, "client disconnected while posting JSON") - self.log("decoding {} bytes of {} json".format(len(json_buf), enc)) try: body = json.loads(json_buf.decode(enc, "replace")) + try: + zds = {k: v for k, v in body.items()} + zds["hash"] = "%d chunks" % (len(body["hash"])) + except: + zds = body + t = "POST len=%d type=%s ip=%s user=%s req=%r json=%s" + self.log(t % (len(json_buf), enc, self.ip, self.uname, self.req, zds)) except: - raise Pebkac(422, "you POSTed invalid json") + raise Pebkac(422, "you POSTed %d bytes of invalid json" % (len(json_buf),)) # self.reply(b"cloudflare", 503) # return True diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 165f1a7b..36d4c890 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -1545,7 +1545,7 @@ class Up2k(object): at = 0 # skip upload hooks by not providing vflags - self.db_add(db.c, {}, rd, fn, lmod, sz, "", "", wark, "", "", ip, at) + self.db_add(db.c, {}, rd, fn, lmod, sz, "", "", wark, wark, "", "", ip, at) db.n += 1 tfa += 1 td = time.time() - db.t @@ -2779,9 +2779,10 @@ class Up2k(object): cj["name"] = sanitize_fn(cj["name"], "") cj["poke"] = now = self.db_act = self.vol_act[ptop] = time.time() - wark = self._get_wark(cj) + wark = dwark = self._get_wark(cj) job = None pdir = djoin(ptop, cj["prel"]) + inc_ap = djoin(pdir, cj["name"]) try: dev = bos.stat(pdir).st_dev except: @@ -2796,6 +2797,7 @@ class Up2k(object): reg = self.registry[ptop] vfs = self.asrv.vfs.all_vols[cj["vtop"]] n4g = bool(vfs.flags.get("noforget")) + noclone = bool(vfs.flags.get("noclone")) rand = vfs.flags.get("rand") or cj.get("rand") lost: list[tuple["sqlite3.Cursor", str, str]] = [] @@ -2805,6 +2807,12 @@ class Up2k(object): vols = [(ptop, jcur)] if jcur else [] if vfs.flags.get("xlink"): vols += [(k, v) for k, v in self.cur.items() if k != ptop] + + if noclone: + wark = up2k_wark_from_metadata( + self.salt, cj["size"], cj["lmod"], cj["prel"], cj["name"] + ) + if vfs.flags.get("up_ts", "") == "fu" or not cj["lmod"]: # force upload time rather than last-modified cj["lmod"] = int(time.time()) @@ -2817,10 +2825,10 @@ class Up2k(object): if self.no_expr_idx: q = r"select * from up where w = ?" - argv = [wark] + argv = [dwark] else: q = r"select * from up where substr(w,1,16)=? and +w=?" - argv = [wark[:16], wark] + argv = [dwark[:16], dwark] c2 = cur.execute(q, tuple(argv)) for _, dtime, dsize, dp_dir, dp_fn, ip, at in c2: @@ -2828,6 +2836,9 @@ class Up2k(object): dp_dir, dp_fn = s3dec(dp_dir, dp_fn) dp_abs = djoin(ptop, dp_dir, dp_fn) + if noclone and dp_abs != inc_ap: + continue + try: st = bos.stat(dp_abs) if stat.S_ISLNK(st.st_mode): @@ -2836,7 +2847,7 @@ class Up2k(object): if st.st_size != dsize: t = "candidate ignored (db/fs desync): {}, size fs={} db={}, mtime fs={} db={}, file: {}" t = t.format( - wark, st.st_size, dsize, st.st_mtime, dtime, dp_abs + dwark, st.st_size, dsize, st.st_mtime, dtime, dp_abs ) self.log(t) raise Exception() @@ -2883,7 +2894,6 @@ class Up2k(object): alts.append((score, -len(alts), j, cur, dp_dir, dp_fn)) job = None - inc_ap = djoin(cj["ptop"], cj["prel"], cj["name"]) for dupe in sorted(alts, reverse=True): rj = dupe[2] orig_ap = djoin(rj["ptop"], rj["prel"], rj["name"]) @@ -2893,11 +2903,11 @@ class Up2k(object): break else: self.log("asserting contents of %s" % (orig_ap,)) - dhashes, st = self._hashlist_from_file(orig_ap) - dwark = up2k_wark_from_hashlist(self.salt, st.st_size, dhashes) - if wark != dwark: + hashes2, st = self._hashlist_from_file(orig_ap) + wark2 = up2k_wark_from_hashlist(self.salt, st.st_size, hashes2) + if dwark != wark2: t = "will not dedup (fs index desync): fs=%s, db=%s, file: %s" - self.log(t % (dwark, wark, orig_ap)) + self.log(t % (wark2, dwark, orig_ap)) lost.append(dupe[3:]) continue data_ok = True @@ -2962,11 +2972,11 @@ class Up2k(object): elif inc_ap != orig_ap and not data_ok and "done" in reg[wark]: self.log("asserting contents of %s" % (orig_ap,)) - dhashes, _ = self._hashlist_from_file(orig_ap) - dwark = up2k_wark_from_hashlist(self.salt, st.st_size, dhashes) - if wark != dwark: + hashes2, _ = self._hashlist_from_file(orig_ap) + wark2 = up2k_wark_from_hashlist(self.salt, st.st_size, hashes2) + if wark != wark2: t = "will not dedup (fs index desync): fs=%s, idx=%s, file: %s" - self.log(t % (dwark, wark, orig_ap)) + self.log(t % (wark2, wark, orig_ap)) del reg[wark] if job or wark in reg: @@ -3023,6 +3033,7 @@ class Up2k(object): job = deepcopy(job) job["wark"] = wark + job["dwrk"] = dwark job["at"] = cj.get("at") or now zs = "vtop ptop prel name lmod host user addr poke" for k in zs.split(): @@ -3093,7 +3104,7 @@ class Up2k(object): raise if cur and not self.args.nw: - zs = "prel name lmod size ptop vtop wark host user addr at" + zs = "prel name lmod size ptop vtop wark dwrk host user addr at" a = [job[x] for x in zs.split()] self.db_add(cur, vfs.flags, *a) cur.connection.commit() @@ -3123,6 +3134,7 @@ class Up2k(object): job = { "wark": wark, + "dwrk": dwark, "t0": now, "sprs": sprs, "hash": deepcopy(cj["hash"]), @@ -3165,6 +3177,7 @@ class Up2k(object): "lmod": job["lmod"], "sprs": job.get("sprs", sprs), "hash": job["need"], + "dwrk": dwark, "wark": wark, } @@ -3191,7 +3204,7 @@ class Up2k(object): ): sql = "update up set mt=? where substr(w,1,16)=? and +rd=? and +fn=?" try: - cur.execute(sql, (cj["lmod"], wark[:16], job["prel"], job["name"])) + cur.execute(sql, (cj["lmod"], dwark[:16], job["prel"], job["name"])) cur.connection.commit() ap = djoin(job["ptop"], job["prel"], job["name"]) @@ -3513,7 +3526,7 @@ class Up2k(object): except: self.log("failed to utime ({}, {})".format(dst, times)) - zs = "prel name lmod size ptop vtop wark host user addr" + zs = "prel name lmod size ptop vtop wark dwrk host user addr" z2 = [job[x] for x in zs.split()] wake_sr = False try: @@ -3586,6 +3599,7 @@ class Up2k(object): ptop: str, vtop: str, wark: str, + dwark: str, host: str, usr: str, ip: str, @@ -3608,6 +3622,7 @@ class Up2k(object): ptop, vtop, wark, + dwark, host, usr, ip, @@ -3622,7 +3637,7 @@ class Up2k(object): raise if "e2t" in self.flags[ptop]: - self.tagq.put((ptop, wark, rd, fn, sz, ip, at)) + self.tagq.put((ptop, dwark, rd, fn, sz, ip, at)) self.n_tagq += 1 return True @@ -3650,6 +3665,7 @@ class Up2k(object): ptop: str, vtop: str, wark: str, + dwark: str, host: str, usr: str, ip: str, @@ -3666,13 +3682,13 @@ class Up2k(object): db_ip = "1.1.1.1" if self.args.no_db_ip else ip sql = "insert into up values (?,?,?,?,?,?,?)" - v = (wark, int(ts), sz, rd, fn, db_ip, int(at or 0)) + v = (dwark, int(ts), sz, rd, fn, db_ip, int(at or 0)) try: db.execute(sql, v) except: assert self.mem_cur # !rm rd, fn = s3enc(self.mem_cur, rd, fn) - v = (wark, int(ts), sz, rd, fn, db_ip, int(at or 0)) + v = (dwark, int(ts), sz, rd, fn, db_ip, int(at or 0)) db.execute(sql, v) self.volsize[db] += sz @@ -3716,11 +3732,11 @@ class Up2k(object): for cd in cds: # one for each unique cooldown duration try: - db.execute(q, (cd, wark[:16], rd, fn)) + db.execute(q, (cd, dwark[:16], rd, fn)) except: assert self.mem_cur # !rm rd, fn = s3enc(self.mem_cur, rd, fn) - db.execute(q, (cd, wark[:16], rd, fn)) + db.execute(q, (cd, dwark[:16], rd, fn)) if self.xiu_asleep: self.xiu_asleep = False @@ -4175,6 +4191,7 @@ class Up2k(object): dvn.realpath, dvn.vpath, w, + w, "", "", ip or "", @@ -4857,6 +4874,7 @@ class Up2k(object): ptop, vtop, wark, + wark, "", usr, ip, diff --git a/tests/util.py b/tests/util.py index c2438fd7..5719fd3e 100644 --- a/tests/util.py +++ b/tests/util.py @@ -122,7 +122,7 @@ class Cfg(Namespace): def __init__(self, a=None, v=None, c=None, **ka0): ka = {} - ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic hardlink_only nid nih no_acode no_athumb no_dav no_db_ip no_del no_dirsz no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand re_dirsz smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol zs" + ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic hardlink_only nid nih no_acode no_athumb no_clone no_dav no_db_ip no_del no_dirsz no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand re_dirsz smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol zs" ka.update(**{k: False for k in ex.split()}) ex = "dedup dotpart dotsrch hook_v no_dhash no_fastboot no_fpool no_htp no_rescan no_sendfile no_ses no_snap no_up_list no_voldump re_dhash plain_ip"