diff --git a/bin/copyparty-fuse.py b/bin/copyparty-fuse.py new file mode 100755 index 00000000..e199b6a9 --- /dev/null +++ b/bin/copyparty-fuse.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python3 +from __future__ import print_function, unicode_literals + +"""copyparty-fuse: remote copyparty as a local filesystem""" +__author__ = "ed " +__copyright__ = 2019 +__license__ = "MIT" +__url__ = "https://github.com/9001/copyparty/" + +import re +import sys +import time +import stat +import errno +import threading +import http.client # py2: httplib +import urllib.parse +from datetime import datetime +from fuse import FUSE, FuseOSError, Operations +from urllib.parse import quote_from_bytes as quote + + +""" +mount a copyparty server (local or remote) as a filesystem + +expect ~32 MiB/s on LAN, should probably read larger chunks + +usage: + python copyparty-fuse.py ./music http://192.168.1.69:1234/ + +dependencies: + sudo apk add fuse-dev + python3 -m venv ~/pe/ve.fusepy + . ~/pe/ve.fusepy/bin/activate + pip install fusepy +""" + + +def log(msg): + msg = "{:012x} {}\n".format(threading.current_thread().ident, msg) + print(msg[4:], end="") + + +class CacheNode(object): + def __init__(self, name, data): + self.name = name + self.data = data + self.ts = time.time() + + +class Gateway(object): + def __init__(self, base_url): + self.base_url = base_url + + ui = urllib.parse.urlparse(base_url) + self.web_root = ui.path.strip("/") + try: + self.web_host, self.web_port = ui.netloc.split(":") + self.web_port = int(self.web_port) + except: + self.web_host = ui.netloc + if ui.scheme == "http": + self.web_port = 80 + elif ui.scheme == "https": + raise Exception("todo") + else: + raise Exception("bad url?") + + self.conns = {} + + def quotep(self, path): + # TODO: mojibake support + path = path.encode("utf-8", "ignore") + path = path.replace(b" ", b"+") + return quote(path) + + def getconn(self, tid=None): + tid = tid or threading.current_thread().ident + try: + return self.conns[tid] + except: + log("new conn [{}] [{}]".format(self.web_host, self.web_port)) + + conn = http.client.HTTPConnection(self.web_host, self.web_port, timeout=260) + + self.conns[tid] = conn + return conn + + def closeconn(self, tid=None): + tid = tid or threading.current_thread().ident + try: + self.conns[tid].close() + del self.conns[tid] + except: + pass + + def sendreq(self, *args, **kwargs): + tid = threading.current_thread().ident + try: + c = self.getconn(tid) + c.request(*list(args), **kwargs) + except: + self.closeconn(tid) + c = self.getconn(tid) + c.request(*list(args), **kwargs) + + return c + + def listdir(self, path): + web_path = "/" + "/".join([self.web_root, path]) + + c = self.sendreq("GET", self.quotep(web_path)) + + r = c.getresponse() + if r.status != 200: + self.closeconn() + raise Exception("http error {}".format(r.status)) + + return self.parse_html(r) + + def getfile(self, path, ofs1, ofs2): + web_path = "/" + "/".join([self.web_root, path]) + hdr_range = "bytes={}-{}".format(ofs1, ofs2) + + c = self.sendreq("GET", self.quotep(web_path), headers={"Range": hdr_range}) + + r = c.getresponse() + if r.status != http.client.PARTIAL_CONTENT: + self.closeconn() + raise Exception("http error {}".format(r.status)) + + return r.read() + + def parse_html(self, datasrc): + ret = [] + remainder = b"" + ptn = re.compile( + r"^(-|DIR)]+>([^<]+)([^<]+)([^<]+)$" + ) + + while True: + buf = remainder + datasrc.read(4096) + # print('[{}]'.format(buf.decode('utf-8'))) + if not buf: + break + + remainder = b"" + endpos = buf.rfind(b"\n") + if endpos >= 0: + remainder = buf[endpos + 1 :] + buf = buf[:endpos] + + lines = buf.decode("utf-8").split("\n") + for line in lines: + m = ptn.match(line) + if not m: + # print(line) + continue + + ftype, fname, fsize, fdate = m.groups() + ts = datetime.strptime(fdate, "%Y-%m-%d %H:%M:%S").timestamp() + sz = int(fsize) + if ftype == "-": + ret.append([fname, self.stat_file(ts, sz), 0]) + else: + ret.append([fname, self.stat_dir(ts, sz), 0]) + + return ret + + def stat_dir(self, ts, sz=4096): + return { + "st_mode": 0o555 | stat.S_IFDIR, + "st_uid": 1000, + "st_gid": 1000, + "st_size": sz, + "st_atime": ts, + "st_mtime": ts, + "st_ctime": ts, + "st_blocks": int((sz + 511) / 512), + } + + def stat_file(self, ts, sz): + return { + "st_mode": 0o444 | stat.S_IFREG, + "st_uid": 1000, + "st_gid": 1000, + "st_size": sz, + "st_atime": ts, + "st_mtime": ts, + "st_ctime": ts, + "st_blocks": int((sz + 511) / 512), + } + + +class CPPF(Operations): + def __init__(self, base_url): + self.gw = Gateway(base_url) + + self.dircache = [] + self.dircache_mtx = threading.Lock() + + log("up") + + def clean_dircache(self): + """not threadsafe""" + now = time.time() + cutoff = 0 + for cn in self.dircache: + if cn.ts - now > 1: + cutoff += 1 + else: + break + + if cutoff > 0: + self.dircache = self.dircache[cutoff:] + + def get_cached_dir(self, dirpath): + with self.dircache_mtx: + self.clean_dircache() + for cn in self.dircache: + if cn.name == dirpath: + return cn + + return None + + def readdir(self, path, fh=None): + path = path.strip("/") + log("readdir {}".format(path)) + + ret = self.gw.listdir(path) + + with self.dircache_mtx: + cn = CacheNode(path, ret) + self.dircache.append(cn) + self.clean_dircache() + + return ret + + def read(self, path, length, offset, fh=None): + path = path.strip("/") + + ofs2 = offset + length - 1 + log("read {} @ {} len {} end {}".format(path, offset, length, ofs2)) + + file_sz = self.getattr(path)["st_size"] + if ofs2 >= file_sz: + ofs2 = file_sz - 1 + log("truncate to len {} end {}".format((ofs2 - offset) + 1, ofs2)) + + return self.gw.getfile(path, offset, ofs2) + + def getattr(self, path, fh=None): + path = path.strip("/") + try: + dirpath, fname = path.rsplit("/", 1) + except: + dirpath = "" + fname = path + + log("getattr {}".format(path)) + + if not path: + return self.gw.stat_dir(time.time()) + + cn = self.get_cached_dir(dirpath) + if cn: + # log('cache ok') + dents = cn.data + else: + log("cache miss") + dents = self.readdir(dirpath) + + for cache_name, cache_stat, _ in dents: + if cache_name == fname: + return cache_stat + + raise FuseOSError(errno.ENOENT) + + access = None + flush = None + getxattr = None + listxattr = None + open = None + opendir = None + release = None + releasedir = None + statfs = None + + +def main(): + try: + local, remote = sys.argv[1:] + except: + print("need arg 1: local directory") + print("need arg 2: root url") + return + + FUSE(CPPF(remote), local, foreground=True) + + +if __name__ == "__main__": + main()