add cache (pointless on lan)

This commit is contained in:
ed 2019-09-19 21:43:14 +00:00
parent 01a8409d69
commit 565417453f

View file

@ -12,6 +12,7 @@ import sys
import time import time
import stat import stat
import errno import errno
import struct
import threading import threading
import http.client # py2: httplib import http.client # py2: httplib
import urllib.parse import urllib.parse
@ -23,8 +24,6 @@ from urllib.parse import quote_from_bytes as quote
""" """
mount a copyparty server (local or remote) as a filesystem mount a copyparty server (local or remote) as a filesystem
expect ~32 MiB/s on LAN, should probably read larger chunks
usage: usage:
python copyparty-fuse.py ./music http://192.168.1.69:1234/ python copyparty-fuse.py ./music http://192.168.1.69:1234/
@ -33,21 +32,53 @@ dependencies:
python3 -m venv ~/pe/ve.fusepy python3 -m venv ~/pe/ve.fusepy
. ~/pe/ve.fusepy/bin/activate . ~/pe/ve.fusepy/bin/activate
pip install fusepy pip install fusepy
MB/s
28 cache NOthread
24 cache thread
29 cache NOthread NOmutex
67 NOcache NOthread NOmutex ( ´ω) nyoro~n
10 NOcache thread NOmutex
""" """
def log(msg): def threadless_log(msg):
msg = "{:012x} {}\n".format(threading.current_thread().ident, msg) print(msg + "\n", end="")
def boring_log(msg):
msg = "\033[36m{:012x}\033[0m {}\n".format(threading.current_thread().ident, msg)
print(msg[4:], end="") print(msg[4:], end="")
def rice_tid():
tid = threading.current_thread().ident
c = struct.unpack("B" * 5, struct.pack(">Q", tid)[-5:])
return "".join("\033[1;37;48;5;{}m{:02x}".format(x, x) for x in c)
def fancy_log(msg):
print("{}\033[0m {}\n".format(rice_tid(), msg), end="")
def null_log(msg):
pass
log = boring_log
log = fancy_log
log = threadless_log
dbg = null_log
def get_tid(): def get_tid():
return threading.current_thread().ident return threading.current_thread().ident
class CacheNode(object): class CacheNode(object):
def __init__(self, name, data): def __init__(self, tag, data):
self.name = name self.tag = tag
self.data = data self.data = data
self.ts = time.time() self.ts = time.time()
@ -117,22 +148,23 @@ class Gateway(object):
self.closeconn() self.closeconn()
raise Exception( raise Exception(
"http error {} reading dir {} in {:x}".format( "http error {} reading dir {} in {:x}".format(
r.status, web_path, get_tid() r.status, web_path, rice_tid()
) )
) )
return self.parse_html(r) return self.parse_html(r)
def getfile(self, path, ofs1, ofs2): def download_file_range(self, path, ofs1, ofs2):
web_path = "/" + "/".join([self.web_root, path]) web_path = "/" + "/".join([self.web_root, path])
hdr_range = "bytes={}-{}".format(ofs1, ofs2) hdr_range = "bytes={}-{}".format(ofs1, ofs2)
log("downloading {}".format(hdr_range))
r = self.sendreq("GET", self.quotep(web_path), headers={"Range": hdr_range}) r = self.sendreq("GET", self.quotep(web_path), headers={"Range": hdr_range})
if r.status != http.client.PARTIAL_CONTENT: if r.status != http.client.PARTIAL_CONTENT:
self.closeconn() self.closeconn()
raise Exception( raise Exception(
"http error {} reading file {} range {} in {:x}".format( "http error {} reading file {} range {} in {:x}".format(
r.status, web_path, hdr_range, get_tid() r.status, web_path, hdr_range, rice_tid()
) )
) )
@ -206,6 +238,9 @@ class CPPF(Operations):
self.dircache = [] self.dircache = []
self.dircache_mtx = threading.Lock() self.dircache_mtx = threading.Lock()
self.filecache = []
self.filecache_mtx = threading.Lock()
log("up") log("up")
def clean_dircache(self): def clean_dircache(self):
@ -222,21 +257,203 @@ class CPPF(Operations):
self.dircache = self.dircache[cutoff:] self.dircache = self.dircache[cutoff:]
def get_cached_dir(self, dirpath): def get_cached_dir(self, dirpath):
with self.dircache_mtx: # with self.dircache_mtx:
if True:
self.clean_dircache() self.clean_dircache()
for cn in self.dircache: for cn in self.dircache:
if cn.name == dirpath: if cn.tag == dirpath:
return cn return cn
return None return None
"""
,-------------------------------, g1>=c1, g2<=c2
|cache1 cache2| buf[g1-c1:(g1-c1)+(g2-g1)]
`-------------------------------'
,---------------,
|get1 get2|
`---------------'
__________________________________________________________________________
,-------------------------------, g2<=c2, (g2>=c1)
|cache1 cache2| cdr=buf[:g2-c1]
`-------------------------------' dl car; g1-512K:c1
,---------------,
|get1 get2|
`---------------'
__________________________________________________________________________
,-------------------------------, g1>=c1, (g1<=c2)
|cache1 cache2| car=buf[c2-g1:]
`-------------------------------' dl cdr; c2:c2+1M
,---------------,
|get1 get2|
`---------------'
"""
def get_cached_file(self, path, get1, get2, file_sz):
car = None
cdr = None
ncn = -1
# with self.filecache_mtx:
if True:
dbg("cache request from {} to {}, size {}".format(get1, get2, file_sz))
for cn in self.filecache:
ncn += 1
cache_path, cache1 = cn.tag
if cache_path != path:
continue
cache2 = cache1 + len(cn.data)
if get2 <= cache1 or get1 >= cache2:
continue
if get1 >= cache1 and get2 <= cache2:
# keep cache entry alive by moving it to the end
self.filecache = (
self.filecache[:ncn] + self.filecache[ncn + 1 :] + [cn]
)
buf_ofs = get1 - cache1
buf_end = buf_ofs + (get2 - get1)
dbg(
"found all ({}, {} to {}, len {}) [{}:{}] = {}".format(
ncn,
cache1,
cache2,
len(cn.data),
buf_ofs,
buf_end,
buf_end - buf_ofs,
)
)
return cn.data[buf_ofs:buf_end]
if get2 < cache2:
x = cn.data[: get2 - cache1]
if not cdr or len(cdr) < len(x):
dbg(
"found car ({}, {} to {}, len {}) [:{}-{}] = [:{}] = {}".format(
ncn,
cache1,
cache2,
len(cn.data),
get2,
cache1,
get2 - cache1,
len(x),
)
)
cdr = x
continue
if get1 > cache1:
x = cn.data[-(cache2 - get1) :]
if not car or len(car) < len(x):
dbg(
"found cdr ({}, {} to {}, len {}) [-({}-{}):] = [-{}:] = {}".format(
ncn,
cache1,
cache2,
len(cn.data),
cache2,
get1,
cache2 - get1,
len(x),
)
)
car = x
continue
raise Exception("what")
if car and cdr:
dbg("<cache> have both")
ret = car + cdr
if len(ret) == get2 - get1:
return ret
raise Exception("{} + {} != {} - {}".format(len(car), len(cdr), get2, get1))
elif cdr:
h_end = get1 + (get2 - get1) - len(cdr)
h_ofs = h_end - 512 * 1024
if h_ofs < 0:
h_ofs = 0
buf_ofs = (get2 - get1) - len(cdr)
dbg(
"<cache> cdr {}, car {}-{}={} [-{}:]".format(
len(cdr), h_ofs, h_end, h_end - h_ofs, buf_ofs
)
)
buf = self.gw.download_file_range(path, h_ofs, h_end - 1)
ret = buf[-buf_ofs:] + cdr
elif car:
h_ofs = get1 + len(car)
h_end = h_ofs + 1024 * 1024
if h_end > file_sz:
h_end = file_sz
buf_ofs = (get2 - get1) - len(car)
dbg(
"<cache> car {}, cdr {}-{}={} [:{}]".format(
len(car), h_ofs, h_end, h_end - h_ofs, buf_ofs
)
)
buf = self.gw.download_file_range(path, h_ofs, h_end - 1)
ret = car + buf[:buf_ofs]
else:
h_ofs = get1 - 256 * 1024
h_end = get2 + 1024 * 1024
if h_ofs < 0:
h_ofs = 0
if h_end > file_sz:
h_end = file_sz
buf_ofs = get1 - h_ofs
buf_end = buf_ofs + get2 - get1
dbg(
"<cache> {}-{}={} [{}:{}]".format(
h_ofs, h_end, h_end - h_ofs, buf_ofs, buf_end
)
)
buf = self.gw.download_file_range(path, h_ofs, h_end - 1)
ret = buf[buf_ofs:buf_end]
cn = CacheNode([path, h_ofs], buf)
# with self.filecache_mtx:
if True:
if len(self.filecache) > 6:
self.filecache = self.filecache[1:] + [cn]
else:
self.filecache.append(cn)
return ret
def readdir(self, path, fh=None): def readdir(self, path, fh=None):
path = path.strip("/") path = path.strip("/")
log("readdir {}".format(path)) log("readdir {}".format(path))
ret = self.gw.listdir(path) ret = self.gw.listdir(path)
with self.dircache_mtx: # with self.dircache_mtx:
if True:
cn = CacheNode(path, ret) cn = CacheNode(path, ret)
self.dircache.append(cn) self.dircache.append(cn)
self.clean_dircache() self.clean_dircache()
@ -246,7 +463,7 @@ class CPPF(Operations):
def read(self, path, length, offset, fh=None): def read(self, path, length, offset, fh=None):
path = path.strip("/") path = path.strip("/")
ofs2 = offset + length - 1 ofs2 = offset + length
log("read {} @ {} len {} end {}".format(path, offset, length, ofs2)) log("read {} @ {} len {} end {}".format(path, offset, length, ofs2))
file_sz = self.getattr(path)["st_size"] file_sz = self.getattr(path)["st_size"]
@ -254,7 +471,9 @@ class CPPF(Operations):
ofs2 = file_sz - 1 ofs2 = file_sz - 1
log("truncate to len {} end {}".format((ofs2 - offset) + 1, ofs2)) log("truncate to len {} end {}".format((ofs2 - offset) + 1, ofs2))
return self.gw.getfile(path, offset, ofs2) # toggle cache here i suppose
# return self.get_cached_file(path, offset, ofs2, file_sz)
return self.gw.download_file_range(path, offset, ofs2 - 1)
def getattr(self, path, fh=None): def getattr(self, path, fh=None):
path = path.strip("/") path = path.strip("/")
@ -302,7 +521,8 @@ def main():
print("need arg 2: root url") print("need arg 2: root url")
return return
FUSE(CPPF(remote), local, foreground=True) FUSE(CPPF(remote), local, foreground=True, nothreads=True)
# if nothreads=False also uncomment the `with *_mtx` things
if __name__ == "__main__": if __name__ == "__main__":