add option for cross-volume dedupe

This commit is contained in:
ed 2022-12-02 17:25:37 +00:00
parent f107497a94
commit 89c9f45fd0
4 changed files with 48 additions and 21 deletions

View file

@ -828,6 +828,7 @@ through arguments:
* `-e2v` verfies file integrity at startup, comparing hashes from the db
* `-e2vu` patches the database with the new hashes from the filesystem
* `-e2vp` panics and kills copyparty instead
* `--xlink` enables deduplication across volumes
the same arguments can be set as volflags, in addition to `d2d`, `d2ds`, `d2t`, `d2ts`, `d2v` for disabling:
* `-v ~/music::r:c,e2dsa,e2tsr` does a full reindex of everything on startup

View file

@ -540,6 +540,7 @@ def run_argparse(
\033[36mnohash=\\.iso$\033[35m skips hashing file contents if path matches *.iso
\033[36mnoidx=\\.iso$\033[35m fully ignores the contents at paths matching *.iso
\033[36mnoforget$\033[35m don't forget files when deleted from disk
\033[36mxlink$\033[35m cross-volume dupe detection / linking
\033[36mxdev\033[35m do not descend into other filesystems
\033[36mxvol\033[35m skip symlinks leaving the volume root
@ -812,6 +813,7 @@ def run_argparse(
ap2.add_argument("--no-idx", metavar="PTN", type=u, help="regex: disable indexing of matching paths during e2ds folder scans (volflag=noidx)")
ap2.add_argument("--no-dhash", action="store_true", help="disable rescan acceleration; do full database integrity check -- makes the db ~5%% smaller and bootup/rescans 3~10x slower")
ap2.add_argument("--no-forget", action="store_true", help="never forget indexed files, even when deleted from disk -- makes it impossible to ever upload the same file twice (volflag=noforget)")
ap2.add_argument("--xlink", action="store_true", help="on upload: check all volumes for dupes, not just the target volume (volflag=xlink)")
ap2.add_argument("--xdev", action="store_true", help="do not descend into other filesystems (symlink or bind-mount to another HDD, ...) (volflag=xdev)")
ap2.add_argument("--xvol", action="store_true", help="skip symlinks leaving the volume root (volflag=xvol)")
ap2.add_argument("--hash-mt", metavar="CORES", type=int, default=hcores, help="num cpu cores to use for file hashing; set 0 or 1 for single-core hashing")

View file

@ -1124,6 +1124,7 @@ class AuthSrv(object):
("no_forget", "noforget"),
("no_dupe", "nodupe"),
("magic", "magic"),
("xlink", "xlink"),
):
if getattr(self.args, ga):
vol.flags[vf] = True

View file

@ -1901,12 +1901,23 @@ class Up2k(object):
sprs = self.fstab.get(pdir) != "ng"
with self.mutex:
cur = self.cur.get(cj["ptop"])
reg = self.registry[cj["ptop"]]
ptop = cj["ptop"]
jcur = self.cur.get(ptop)
reg = self.registry[ptop]
vfs = self.asrv.vfs.all_vols[cj["vtop"]]
n4g = vfs.flags.get("noforget")
lost: list[tuple[str, str]] = []
if cur:
lost: list[tuple["sqlite3.Cursor", str, str]] = []
vols = [(ptop, jcur)]
if vfs.flags.get("xlink"):
vols += [(k, v) for k, v in self.cur.items() if k != ptop]
alts: list[tuple[int, int, dict[str, Any]]] = []
for ptop, cur in vols:
allv = self.asrv.vfs.all_vols
cvfs = next((v for v in allv.values() if v.realpath == ptop), vfs)
vtop = cj["vtop"] if cur == jcur else cvfs.vpath
if self.no_expr_idx:
q = r"select * from up where w = ?"
argv = [wark]
@ -1914,13 +1925,12 @@ class Up2k(object):
q = r"select * from up where substr(w,1,16) = ? and w = ?"
argv = [wark[:16], wark]
alts: list[tuple[int, int, dict[str, Any]]] = []
cur = cur.execute(q, tuple(argv))
for _, dtime, dsize, dp_dir, dp_fn, ip, at in cur:
c2 = cur.execute(q, tuple(argv))
for _, dtime, dsize, dp_dir, dp_fn, ip, at in c2:
if dp_dir.startswith("//") or dp_fn.startswith("//"):
dp_dir, dp_fn = s3dec(dp_dir, dp_fn)
dp_abs = "/".join([cj["ptop"], dp_dir, dp_fn])
dp_abs = "/".join([ptop, dp_dir, dp_fn])
try:
st = bos.stat(dp_abs)
if stat.S_ISLNK(st.st_mode):
@ -1930,14 +1940,14 @@ class Up2k(object):
if n4g:
st = os.stat_result((0, -1, -1, 0, 0, 0, 0, 0, 0, 0))
else:
lost.append((dp_dir, dp_fn))
lost.append((cur, dp_dir, dp_fn))
continue
j = {
"name": dp_fn,
"prel": dp_dir,
"vtop": cj["vtop"],
"ptop": cj["ptop"],
"vtop": vtop,
"ptop": ptop,
"sprs": sprs, # dontcare; finished anyways
"size": dsize,
"lmod": dtime,
@ -1958,20 +1968,33 @@ class Up2k(object):
)
alts.append((score, -len(alts), j))
job = sorted(alts, reverse=True)[0][2] if alts else None
if job and wark in reg:
# self.log("pop " + wark + " " + job["name"] + " handle_json db", 4)
del reg[wark]
job = sorted(alts, reverse=True)[0][2] if alts else None
if job and wark in reg:
# self.log("pop " + wark + " " + job["name"] + " handle_json db", 4)
del reg[wark]
if lost:
for dp_dir, dp_fn in lost:
self.db_rm(cur, dp_dir, dp_fn)
if lost:
c2 = None
for cur, dp_dir, dp_fn in lost:
self.db_rm(cur, dp_dir, dp_fn)
if c2 and c2 != cur:
c2.connection.commit()
cur.connection.commit()
c2 = cur
assert c2
c2.connection.commit()
cur = jcur
ptop = None # use cj or job as appropriate
if job or wark in reg:
job = job or reg[wark]
if job["prel"] == cj["prel"] and job["name"] == cj["name"]:
if (
job["ptop"] == cj["ptop"]
and job["prel"] == cj["prel"]
and job["name"] == cj["name"]
):
# ensure the files haven't been deleted manually
names = [job[x] for x in ["name", "tnam"] if x in job]
for fn in names:
@ -2007,7 +2030,7 @@ class Up2k(object):
raise Pebkac(422, err)
elif "nodupe" in self.flags[job["ptop"]]:
elif "nodupe" in self.flags[cj["ptop"]]:
self.log("dupe-reject:\n {0}\n {1}".format(src, dst))
err = "upload rejected, file already exists:\n"
err += "/" + quotep(vsrc) + " "