From 565417453fc00823d6161939653133ca938e70c5 Mon Sep 17 00:00:00 2001 From: ed Date: Thu, 19 Sep 2019 21:43:14 +0000 Subject: [PATCH] add cache (pointless on lan) --- bin/copyparty-fuse.py | 250 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 235 insertions(+), 15 deletions(-) diff --git a/bin/copyparty-fuse.py b/bin/copyparty-fuse.py index 64bc062e..2640047d 100755 --- a/bin/copyparty-fuse.py +++ b/bin/copyparty-fuse.py @@ -12,6 +12,7 @@ import sys import time import stat import errno +import struct import threading import http.client # py2: httplib import urllib.parse @@ -23,8 +24,6 @@ from urllib.parse import quote_from_bytes as quote """ mount a copyparty server (local or remote) as a filesystem -expect ~32 MiB/s on LAN, should probably read larger chunks - usage: python copyparty-fuse.py ./music http://192.168.1.69:1234/ @@ -33,21 +32,53 @@ dependencies: python3 -m venv ~/pe/ve.fusepy . ~/pe/ve.fusepy/bin/activate pip install fusepy + + +MB/s + 28 cache NOthread + 24 cache thread + 29 cache NOthread NOmutex + 67 NOcache NOthread NOmutex ( ´・ω・) nyoro~n + 10 NOcache thread NOmutex """ -def log(msg): - msg = "{:012x} {}\n".format(threading.current_thread().ident, msg) +def threadless_log(msg): + print(msg + "\n", end="") + + +def boring_log(msg): + msg = "\033[36m{:012x}\033[0m {}\n".format(threading.current_thread().ident, msg) print(msg[4:], end="") +def rice_tid(): + tid = threading.current_thread().ident + c = struct.unpack("B" * 5, struct.pack(">Q", tid)[-5:]) + return "".join("\033[1;37;48;5;{}m{:02x}".format(x, x) for x in c) + + +def fancy_log(msg): + print("{}\033[0m {}\n".format(rice_tid(), msg), end="") + + +def null_log(msg): + pass + + +log = boring_log +log = fancy_log +log = threadless_log +dbg = null_log + + def get_tid(): return threading.current_thread().ident class CacheNode(object): - def __init__(self, name, data): - self.name = name + def __init__(self, tag, data): + self.tag = tag self.data = data self.ts = time.time() @@ -117,22 +148,23 @@ class Gateway(object): self.closeconn() raise Exception( "http error {} reading dir {} in {:x}".format( - r.status, web_path, get_tid() + r.status, web_path, rice_tid() ) ) return self.parse_html(r) - def getfile(self, path, ofs1, ofs2): + def download_file_range(self, path, ofs1, ofs2): web_path = "/" + "/".join([self.web_root, path]) hdr_range = "bytes={}-{}".format(ofs1, ofs2) + log("downloading {}".format(hdr_range)) r = self.sendreq("GET", self.quotep(web_path), headers={"Range": hdr_range}) if r.status != http.client.PARTIAL_CONTENT: self.closeconn() raise Exception( "http error {} reading file {} range {} in {:x}".format( - r.status, web_path, hdr_range, get_tid() + r.status, web_path, hdr_range, rice_tid() ) ) @@ -206,6 +238,9 @@ class CPPF(Operations): self.dircache = [] self.dircache_mtx = threading.Lock() + self.filecache = [] + self.filecache_mtx = threading.Lock() + log("up") def clean_dircache(self): @@ -222,21 +257,203 @@ class CPPF(Operations): self.dircache = self.dircache[cutoff:] def get_cached_dir(self, dirpath): - with self.dircache_mtx: + # with self.dircache_mtx: + if True: self.clean_dircache() for cn in self.dircache: - if cn.name == dirpath: + if cn.tag == dirpath: return cn return None + """ + ,-------------------------------, g1>=c1, g2<=c2 + |cache1 cache2| buf[g1-c1:(g1-c1)+(g2-g1)] + `-------------------------------' + ,---------------, + |get1 get2| + `---------------' + __________________________________________________________________________ + + ,-------------------------------, g2<=c2, (g2>=c1) + |cache1 cache2| cdr=buf[:g2-c1] + `-------------------------------' dl car; g1-512K:c1 + ,---------------, + |get1 get2| + `---------------' + __________________________________________________________________________ + + ,-------------------------------, g1>=c1, (g1<=c2) + |cache1 cache2| car=buf[c2-g1:] + `-------------------------------' dl cdr; c2:c2+1M + ,---------------, + |get1 get2| + `---------------' + """ + + def get_cached_file(self, path, get1, get2, file_sz): + car = None + cdr = None + ncn = -1 + # with self.filecache_mtx: + if True: + dbg("cache request from {} to {}, size {}".format(get1, get2, file_sz)) + for cn in self.filecache: + ncn += 1 + + cache_path, cache1 = cn.tag + if cache_path != path: + continue + + cache2 = cache1 + len(cn.data) + if get2 <= cache1 or get1 >= cache2: + continue + + if get1 >= cache1 and get2 <= cache2: + # keep cache entry alive by moving it to the end + self.filecache = ( + self.filecache[:ncn] + self.filecache[ncn + 1 :] + [cn] + ) + buf_ofs = get1 - cache1 + buf_end = buf_ofs + (get2 - get1) + dbg( + "found all ({}, {} to {}, len {}) [{}:{}] = {}".format( + ncn, + cache1, + cache2, + len(cn.data), + buf_ofs, + buf_end, + buf_end - buf_ofs, + ) + ) + return cn.data[buf_ofs:buf_end] + + if get2 < cache2: + x = cn.data[: get2 - cache1] + if not cdr or len(cdr) < len(x): + dbg( + "found car ({}, {} to {}, len {}) [:{}-{}] = [:{}] = {}".format( + ncn, + cache1, + cache2, + len(cn.data), + get2, + cache1, + get2 - cache1, + len(x), + ) + ) + cdr = x + + continue + + if get1 > cache1: + x = cn.data[-(cache2 - get1) :] + if not car or len(car) < len(x): + dbg( + "found cdr ({}, {} to {}, len {}) [-({}-{}):] = [-{}:] = {}".format( + ncn, + cache1, + cache2, + len(cn.data), + cache2, + get1, + cache2 - get1, + len(x), + ) + ) + car = x + + continue + + raise Exception("what") + + if car and cdr: + dbg(" have both") + + ret = car + cdr + if len(ret) == get2 - get1: + return ret + + raise Exception("{} + {} != {} - {}".format(len(car), len(cdr), get2, get1)) + + elif cdr: + h_end = get1 + (get2 - get1) - len(cdr) + h_ofs = h_end - 512 * 1024 + + if h_ofs < 0: + h_ofs = 0 + + buf_ofs = (get2 - get1) - len(cdr) + + dbg( + " cdr {}, car {}-{}={} [-{}:]".format( + len(cdr), h_ofs, h_end, h_end - h_ofs, buf_ofs + ) + ) + + buf = self.gw.download_file_range(path, h_ofs, h_end - 1) + ret = buf[-buf_ofs:] + cdr + + elif car: + h_ofs = get1 + len(car) + h_end = h_ofs + 1024 * 1024 + + if h_end > file_sz: + h_end = file_sz + + buf_ofs = (get2 - get1) - len(car) + + dbg( + " car {}, cdr {}-{}={} [:{}]".format( + len(car), h_ofs, h_end, h_end - h_ofs, buf_ofs + ) + ) + + buf = self.gw.download_file_range(path, h_ofs, h_end - 1) + ret = car + buf[:buf_ofs] + + else: + h_ofs = get1 - 256 * 1024 + h_end = get2 + 1024 * 1024 + + if h_ofs < 0: + h_ofs = 0 + + if h_end > file_sz: + h_end = file_sz + + buf_ofs = get1 - h_ofs + buf_end = buf_ofs + get2 - get1 + + dbg( + " {}-{}={} [{}:{}]".format( + h_ofs, h_end, h_end - h_ofs, buf_ofs, buf_end + ) + ) + + buf = self.gw.download_file_range(path, h_ofs, h_end - 1) + ret = buf[buf_ofs:buf_end] + + cn = CacheNode([path, h_ofs], buf) + # with self.filecache_mtx: + if True: + if len(self.filecache) > 6: + self.filecache = self.filecache[1:] + [cn] + else: + self.filecache.append(cn) + + return ret + def readdir(self, path, fh=None): path = path.strip("/") log("readdir {}".format(path)) ret = self.gw.listdir(path) - with self.dircache_mtx: + # with self.dircache_mtx: + if True: cn = CacheNode(path, ret) self.dircache.append(cn) self.clean_dircache() @@ -246,7 +463,7 @@ class CPPF(Operations): def read(self, path, length, offset, fh=None): path = path.strip("/") - ofs2 = offset + length - 1 + ofs2 = offset + length log("read {} @ {} len {} end {}".format(path, offset, length, ofs2)) file_sz = self.getattr(path)["st_size"] @@ -254,7 +471,9 @@ class CPPF(Operations): ofs2 = file_sz - 1 log("truncate to len {} end {}".format((ofs2 - offset) + 1, ofs2)) - return self.gw.getfile(path, offset, ofs2) + # toggle cache here i suppose + # return self.get_cached_file(path, offset, ofs2, file_sz) + return self.gw.download_file_range(path, offset, ofs2 - 1) def getattr(self, path, fh=None): path = path.strip("/") @@ -302,7 +521,8 @@ def main(): print("need arg 2: root url") return - FUSE(CPPF(remote), local, foreground=True) + FUSE(CPPF(remote), local, foreground=True, nothreads=True) + # if nothreads=False also uncomment the `with *_mtx` things if __name__ == "__main__":