u2cli: add multithreading

This commit is contained in:
ed 2021-10-01 00:33:45 +02:00
parent 7bc8196821
commit 088d08963f
3 changed files with 213 additions and 29 deletions

View file

@ -770,8 +770,8 @@ interact with copyparty using non-browser clients
`chunk <movie.mkv` `chunk <movie.mkv`
* python: [up2k.py](https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py) is a command-line up2k client * python: [up2k.py](https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py) is a command-line up2k client
* early beta, only does single-threaded uploads so far * file uploads, file-search, autoresume of aborted/broken uploads
* autoresume of broken uploads * see [./bin/README.md#up2kpy](bin/README.md#up2kpy)
* FUSE: mount a copyparty server as a local filesystem * FUSE: mount a copyparty server as a local filesystem
* cross-platform python client available in [./bin/](bin/) * cross-platform python client available in [./bin/](bin/)

View file

@ -1,3 +1,9 @@
# [`up2k.py`](up2k.py)
* command-line up2k client
* file uploads, file-search, autoresume of aborted/broken uploads
* faster than browsers
* early beta, if something breaks just restart it
# [`copyparty-fuse.py`](copyparty-fuse.py) # [`copyparty-fuse.py`](copyparty-fuse.py)
* mount a copyparty server as a local filesystem (read-only) * mount a copyparty server as a local filesystem (read-only)
* **supports Windows!** -- expect `194 MiB/s` sequential read * **supports Windows!** -- expect `194 MiB/s` sequential read

View file

@ -3,13 +3,12 @@ from __future__ import print_function, unicode_literals
""" """
up2k.py: upload to copyparty up2k.py: upload to copyparty
2021-09-30, v0.3, ed <irc.rizon.net>, MIT-Licensed 2021-09-30, v0.4, ed <irc.rizon.net>, MIT-Licensed
https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py
- dependencies: requests - dependencies: requests
- supports python 2.7 and 3.3 through 3.10 - supports python 2.7 and 3.3 through 3.10
- no parallel hashing / uploads yet, so browsers are faster
- almost zero error-handling - almost zero error-handling
- but if something breaks just try again and it'll autoresume - but if something breaks just try again and it'll autoresume
""" """
@ -54,37 +53,40 @@ class File(object):
"""an up2k upload task; represents a single file""" """an up2k upload task; represents a single file"""
def __init__(self, top, rel, size, lmod): def __init__(self, top, rel, size, lmod):
self.top = top self.top = top # type: bytes
self.rel = rel.replace(b"\\", b"/") self.rel = rel.replace(b"\\", b"/") # type: bytes
self.size = size self.size = size # type: int
self.lmod = lmod self.lmod = lmod # type: float
self.abs = os.path.join(top, rel) self.abs = os.path.join(top, rel) # type: bytes
self.name = self.rel.split(b"/")[-1].decode("utf-8", "replace") self.name = self.rel.split(b"/")[-1].decode("utf-8", "replace") # type: str
# set by get_hashlist # set by get_hashlist
self.cids = [] # [ hash, ofs, sz ] self.cids = [] # type: list[tuple[str, int, int]] # [ hash, ofs, sz ]
self.kchunks = {} # hash: [ ofs, sz ] self.kchunks = {} # type: dict[str, tuple[int, int]] # hash: [ ofs, sz ]
# set by handshake # set by handshake
self.ucids = [] # chunks which need to be uploaded self.ucids = [] # type: list[str] # chunks which need to be uploaded
self.wark = None self.wark = None # type: str
self.url = None self.url = None # type: str
# set by upload # set by upload
self.uploading = [] # chunks currently being uploaded self.up_b = 0 # type: int
self.up_c = 0 # type: int
# m = "size({}) lmod({}) top({}) rel({}) abs({}) name({})" # m = "size({}) lmod({}) top({}) rel({}) abs({}) name({})"
# print(m.format(self.size, self.lmod, self.top, self.rel, self.abs, self.name)) # eprint(m.format(self.size, self.lmod, self.top, self.rel, self.abs, self.name))
class FileSlice(object): class FileSlice(object):
"""file-like object providing a fixed window into a file""" """file-like object providing a fixed window into a file"""
def __init__(self, file, cid): def __init__(self, file, cid):
# type: (File, str) -> FileSlice
self.car, self.len = file.kchunks[cid] self.car, self.len = file.kchunks[cid]
self.cdr = self.car + self.len self.cdr = self.car + self.len
self.ofs = 0 self.ofs = 0 # type: int
self.f = open(file.abs, "rb", 512 * 1024) self.f = open(file.abs, "rb", 512 * 1024)
self.f.seek(self.car) self.f.seek(self.car)
@ -121,6 +123,16 @@ class FileSlice(object):
return ret return ret
def eprint(*a, **ka):
ka["file"] = sys.stderr
if not PY2:
ka["flush"] = True
print(*a, **ka)
if PY2:
sys.stderr.flush()
def statdir(top): def statdir(top):
"""non-recursive listing of directory contents, along with stat() info""" """non-recursive listing of directory contents, along with stat() info"""
if hasattr(os, "scandir"): if hasattr(os, "scandir"):
@ -215,6 +227,9 @@ def get_hashlist(file, pcb):
file_ofs += chunk_sz file_ofs += chunk_sz
file_rem -= chunk_sz file_rem -= chunk_sz
if pcb:
pcb(file, file_ofs)
file.cids = ret file.cids = ret
file.kchunks = {k: [v1, v2] for k, v1, v2 in ret} file.kchunks = {k: [v1, v2] for k, v1, v2 in ret}
@ -291,8 +306,8 @@ def upload(req_ses, file, cid, pw):
class Daemon(threading.Thread): class Daemon(threading.Thread):
def __init__(self): def __init__(self, *a, **ka):
threading.Thread.__init__(self) threading.Thread.__init__(self, *a, **ka)
self.daemon = True self.daemon = True
@ -309,7 +324,7 @@ class Ctl(object):
os.path.abspath(os.path.realpath(x.encode("utf-8"))) for x in ar.files os.path.abspath(os.path.realpath(x.encode("utf-8"))) for x in ar.files
] ]
print("\nscanning {} locations".format(len(ar.files))) eprint("\nscanning {} locations".format(len(ar.files)))
nfiles = 0 nfiles = 0
nbytes = 0 nbytes = 0
@ -317,7 +332,7 @@ class Ctl(object):
nfiles += 1 nfiles += 1
nbytes += inf.st_size nbytes += inf.st_size
print("found {} files, {}\n".format(nfiles, humansize(nbytes))) eprint("found {} files, {}\n".format(nfiles, humansize(nbytes)))
self.nfiles = nfiles self.nfiles = nfiles
self.nbytes = nbytes self.nbytes = nbytes
@ -327,9 +342,10 @@ class Ctl(object):
req_ses.verify = ar.te req_ses.verify = ar.te
self.filegen = walkdirs(ar.files) self.filegen = walkdirs(ar.files)
ar.safe = True
if ar.safe: if ar.safe:
return self.safe() self.safe()
else:
self.fancy()
def safe(self): def safe(self):
"""minimal basic slow boring fallback codepath""" """minimal basic slow boring fallback codepath"""
@ -364,6 +380,166 @@ class Ctl(object):
print(" ok!") print(" ok!")
def fancy(self):
self.hash_f = 0
self.hash_c = 0
self.hash_b = 0
self.up_f = 0
self.up_c = 0
self.up_b = 0
self.hasher_busy = 1
self.handshaker_busy = 0
self.uploader_busy = 0
self.mutex = threading.Lock()
self.q_handshake = Queue() # type: Queue[File]
self.q_recheck = Queue() # type: Queue[File] # partial upload exists [...]
self.q_upload = Queue() # type: Queue[tuple[File, str]]
Daemon(target=self.hasher).start()
for _ in range(self.ar.j):
Daemon(target=self.handshaker).start()
Daemon(target=self.uploader).start()
while True:
time.sleep(0.1)
with self.mutex:
if (
self.q_handshake.empty()
and self.q_upload.empty()
and not self.hasher_busy
and not self.handshaker_busy
and not self.uploader_busy
):
break
def cb_hasher(self, file, ofs):
eprint(".", end="")
def hasher(self):
for nf, (top, rel, inf) in enumerate(self.filegen):
file = File(top, rel, inf.st_size, inf.st_mtime)
upath = file.abs.decode("utf-8", "replace")
while True:
with self.mutex:
if (
self.hash_b - self.up_b < 1024 * 1024 * 128
and self.hash_c - self.up_c < 64
and (
not self.ar.nh
or (
self.q_upload.empty()
and self.q_handshake.empty()
and not self.uploader_busy
)
)
):
break
time.sleep(0.05)
eprint("\n{:6d} hash {}\n".format(self.nfiles - nf, upath), end="")
get_hashlist(file, self.cb_hasher)
with self.mutex:
self.hash_f += 1
self.hash_c += len(file.cids)
self.hash_b += file.size
self.q_handshake.put(file)
self.hasher_busy = 0
def handshaker(self):
search = self.ar.s
q = self.q_handshake
while True:
file = q.get()
if not file:
if q == self.q_handshake:
q = self.q_recheck
q.put(None)
continue
self.q_upload.put(None)
break
with self.mutex:
self.handshaker_busy += 1
upath = file.abs.decode("utf-8", "replace")
eprint("\n handshake {}\n".format(upath), end="")
try:
hs = handshake(req_ses, self.ar.url, file, self.ar.a, search)
except Exception as ex:
if q == self.q_handshake and "<pre>partial upload exists" in str(ex):
self.q_recheck.put(file)
hs = []
else:
raise
if search:
if hs:
for hit in hs:
eprint(" found: {}{}".format(self.ar.url, hit["rp"]))
else:
eprint(" NOT found {}".format(upath))
with self.mutex:
self.up_f += 1
self.up_c += len(file.cids)
self.up_b += file.size
self.handshaker_busy -= 1
continue
with self.mutex:
if not hs:
# all chunks done
self.up_f += 1
self.up_c += len(file.cids) - file.up_c
self.up_b += file.size - file.up_b
if hs and self.up_c:
# some chunks failed
self.up_c -= len(hs)
file.up_c -= len(hs)
for cid in hs:
sz = file.kchunks[cid][1]
self.up_b -= sz
file.up_b -= sz
file.ucids = hs
self.handshaker_busy -= 1
for cid in hs:
self.q_upload.put([file, cid])
def uploader(self):
while True:
task = self.q_upload.get()
if not task:
break
eprint("*", end="")
with self.mutex:
self.uploader_busy += 1
file, cid = task
upload(req_ses, file, cid, self.ar.a)
with self.mutex:
sz = file.kchunks[cid][1]
file.ucids = [x for x in file.ucids if x != cid]
if not file.ucids:
self.q_handshake.put(file)
file.up_b += sz
self.up_b += sz
file.up_c += 1
self.up_c += 1
self.uploader_busy -= 1
def main(): def main():
time.strptime("19970815", "%Y%m%d") # python#7980 time.strptime("19970815", "%Y%m%d") # python#7980
@ -371,19 +547,21 @@ def main():
os.system("rem") # enables colors os.system("rem") # enables colors
# fmt: off # fmt: off
ap = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) ap = app = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
ap.add_argument("url", type=unicode, help="server url, including destination folder") ap.add_argument("url", type=unicode, help="server url, including destination folder")
ap.add_argument("files", type=unicode, nargs="+", help="files and/or folders to process") ap.add_argument("files", type=unicode, nargs="+", help="files and/or folders to process")
ap.add_argument("-a", metavar="PASSWORD", help="password") ap.add_argument("-a", metavar="PASSWORD", help="password")
ap.add_argument("-s", action="store_true", help="file-search (disables upload)") ap.add_argument("-s", action="store_true", help="file-search (disables upload)")
ap = app.add_argument_group("performance tweaks")
ap.add_argument("-j", type=int, metavar="THREADS", default=4, help="parallel connections")
ap.add_argument("-nh", action="store_true", help="disable hashing while uploading")
ap.add_argument("--safe", action="store_true", help="use simple fallback approach")
ap = app.add_argument_group("tls")
ap.add_argument("-te", metavar="PEM_FILE", help="certificate to expect/verify") ap.add_argument("-te", metavar="PEM_FILE", help="certificate to expect/verify")
ap.add_argument("-td", action="store_true", help="disable certificate check") ap.add_argument("-td", action="store_true", help="disable certificate check")
ap.add_argument("--safe", action="store_true", help="use simple fallback approach")
# ap.add_argument("-j", type=int, default=2, help="parallel connections")
# ap.add_argument("-nh", action="store_true", help="disable hashing while uploading")
# fmt: on # fmt: on
Ctl(ap.parse_args()) Ctl(app.parse_args())
if __name__ == "__main__": if __name__ == "__main__":