From 17fa4906871f10b1b8e0d0fe110e601a61866339 Mon Sep 17 00:00:00 2001 From: ed Date: Sat, 14 Jun 2025 21:13:14 +0000 Subject: [PATCH] add ?tail --- copyparty/__main__.py | 11 ++++ copyparty/cfg.py | 10 ++++ copyparty/httpcli.py | 124 +++++++++++++++++++++++++++++++++++++++++- docs/devnotes.md | 3 + tests/util.py | 6 +- 5 files changed, 149 insertions(+), 5 deletions(-) diff --git a/copyparty/__main__.py b/copyparty/__main__.py index f346bc5f..4d6417bc 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -1270,6 +1270,7 @@ def add_optouts(ap): ap2.add_argument("--no-tarcmp", action="store_true", help="disable download as compressed tar (?tar=gz, ?tar=bz2, ?tar=xz, ?tar=gz:9, ...)") ap2.add_argument("--no-lifetime", action="store_true", help="do not allow clients (or server config) to schedule an upload to be deleted after a given time") ap2.add_argument("--no-pipe", action="store_true", help="disable race-the-beam (lockstep download of files which are currently being uploaded) (volflag=nopipe)") + ap2.add_argument("--no-tail", action="store_true", help="disable streaming a growing files with ?tail (volflag=notail)") ap2.add_argument("--no-db-ip", action="store_true", help="do not write uploader-IP into the database; will also disable unpost, you may want \033[32m--forget-ip\033[0m instead (volflag=no_db_ip)") @@ -1399,6 +1400,15 @@ def add_transcoding(ap): ap2.add_argument("--ac-maxage", metavar="SEC", type=int, default=86400, help="delete cached transcode output after \033[33mSEC\033[0m seconds") +def add_tail(ap): + ap2 = ap.add_argument_group('tailing options (realtime streaming of a growing file)') + ap2.add_argument("--tail-who", metavar="LVL", type=int, default=2, help="who can tail? [\033[32m0\033[0m]=nobody, [\033[32m1\033[0m]=admins, [\033[32m2\033[0m]=authenticated-with-read-access, [\033[32m3\033[0m]=everyone-with-read-access (volflag=tail_who)") + ap2.add_argument("--tail-cmax", metavar="N", type=int, default=64, help="do not allow starting a new tail if more than \033[33mN\033[0m active downloads") + ap2.add_argument("--tail-rate", metavar="SEC", type=float, default=0.2, help="check for new data every \033[33mSEC\033[0m seconds (volflag=tail_rate)") + ap2.add_argument("--tail-ka", metavar="SEC", type=float, default=3.0, help="send a zerobyte if connection is idle for \033[33mSEC\033[0m seconds to prevent disconnect") + ap2.add_argument("--tail-fd", metavar="SEC", type=float, default=1.0, help="check if file was replaced (new fd) if idle for \033[33mSEC\033[0m seconds (volflag=tail_fd)") + + def add_rss(ap): ap2 = ap.add_argument_group('RSS options') ap2.add_argument("--rss", action="store_true", help="enable RSS output (experimental) (volflag=rss)") @@ -1590,6 +1600,7 @@ def run_argparse( add_db_metadata(ap) add_thumbnail(ap) add_transcoding(ap) + add_tail(ap) add_rss(ap) add_ftp(ap) add_webdav(ap) diff --git a/copyparty/cfg.py b/copyparty/cfg.py index a01ba03b..81e46792 100644 --- a/copyparty/cfg.py +++ b/copyparty/cfg.py @@ -22,6 +22,7 @@ def vf_bmap() -> dict[str, str]: "no_forget": "noforget", "no_pipe": "nopipe", "no_robots": "norobots", + "no_tail": "notail", "no_thumb": "dthumb", "no_vthumb": "dvthumb", "no_athumb": "dathumb", @@ -101,6 +102,9 @@ def vf_vmap() -> dict[str, str]: "mv_retry", "rm_retry", "sort", + "tail_fd", + "tail_rate", + "tail_who", "tcolor", "unlist", "u2abort", @@ -304,6 +308,12 @@ flagcats = { "exp_md": "placeholders to expand in markdown files; see --help", "exp_lg": "placeholders to expand in prologue/epilogue; see --help", }, + "tailing": { + "notail": "disable ?tail (download a growing file continuously)", + "tail_fd=1": "interval for checking if file was replaced (new fd)", + "tail_rate=0.2": "interval for checking for new data", + "tail_who=2": "restrict ?tail access (1=admins,2=authed,3=everyone)", + }, "others": { "dots": "allow all users with read-access to\nenable the option to show dotfiles in listings", "fk=8": 'generates per-file accesskeys,\nwhich are then required at the "g" permission;\nkeys are invalidated if filesize or inode changes', diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 4b69eb6a..0bc91f64 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -3814,6 +3814,16 @@ class HttpCli(object): return txt + def _can_tail(self, volflags: dict[str, Any]) -> bool: + lvl = volflags["tail_who"] + if "notail" in volflags or not lvl: + raise Pebkac(400, "tail is disabled in server config") + elif lvl <= 1 and not self.can_admin: + raise Pebkac(400, "tail is admin-only on this server") + elif lvl <= 2 and self.uname in ("", "*"): + raise Pebkac(400, "you must be authenticated to use ?tail on this server") + return True + def _can_zip(self, volflags: dict[str, Any]) -> str: lvl = volflags["zip_who"] if self.args.no_zip or not lvl: @@ -3958,6 +3968,8 @@ class HttpCli(object): logmsg = "{:4} {} ".format("", self.req) logtail = "" + is_tail = "tail" in self.uparam and self._can_tail(self.vn.flags) + if ptop is not None: ap_data = "<%s>" % (req_path,) try: @@ -4071,6 +4083,7 @@ class HttpCli(object): and can_range and file_sz and "," not in hrange + and not is_tail ): try: if not hrange.lower().startswith("bytes"): @@ -4156,13 +4169,18 @@ class HttpCli(object): return True dls = self.conn.hsrv.dls + if is_tail: + upper = 1 << 30 + if len(dls) > self.args.tail_cmax: + raise Pebkac(400, "too many active downloads to start a new tail") + if upper - lower > 0x400000: # 4m now = time.time() self.dl_id = "%s:%s" % (self.ip, self.addr[1]) dls[self.dl_id] = (now, 0) self.conn.hsrv.dli[self.dl_id] = ( now, - upper - lower, + 0 if is_tail else upper - lower, self.vn, self.vpath, self.uname, @@ -4173,6 +4191,9 @@ class HttpCli(object): return self.tx_pipe( ptop, req_path, ap_data, job, lower, upper, status, mime, logmsg ) + elif is_tail: + self.tx_tail(open_args, status, mime) + return False ret = True with open_func(*open_args) as f: @@ -4202,6 +4223,106 @@ class HttpCli(object): return ret + def tx_tail( + self, + open_args: list[Any], + status: int, + mime: str, + ) -> None: + self.send_headers(length=None, status=status, mime=mime) + abspath: bytes = open_args[0] + sec_rate = self.args.tail_rate + sec_ka = self.args.tail_ka + sec_fd = self.args.tail_fd + wr_slp = self.args.s_wr_slp + wr_sz = self.args.s_wr_sz + dls = self.conn.hsrv.dls + dl_id = self.dl_id + + # non-numeric = full file from start + # positive = absolute offset from start + # negative = start that many bytes from eof + try: + ofs = int(self.uparam["tail"]) + except: + ofs = 0 + + f = None + try: + st = os.stat(abspath) + f = open(*open_args) + f.seek(0, os.SEEK_END) + eof = f.tell() + f.seek(0) + if ofs < 0: + ofs = max(0, ofs + eof) + + self.log("tailing from byte %d: %r" % (ofs, abspath), 6) + + # send initial data asap + remains = sendfile_py( + self.log, # d/c + ofs, + eof, + f, + self.s, + wr_sz, + wr_slp, + False, # d/c + dls, + dl_id, + ) + sent = (eof - ofs) - remains + ofs = eof - remains + f.seek(ofs) + + gone = 0 + t_fd = t_ka = time.time() + while True: + assert f # !rm + buf = f.read(4096) + now = time.time() + if buf: + t_fd = t_ka = now + self.s.sendall(buf) + sent += len(buf) + dls[dl_id] = (time.time(), sent) + continue + + time.sleep(sec_rate) + if t_ka < now - sec_ka: + t_ka = now + self.s.send(b"\x00") + if t_fd < now - sec_fd: + try: + st2 = os.stat(open_args[0]) + if st2.st_ino != st.st_ino or st2.st_size < sent: + assert f # !rm + # open new file before closing previous to avoid toctous (open may fail; cannot null f before) + f2 = open(*open_args) + f.close() + f = f2 + f.seek(0, os.SEEK_END) + if f.tell() < sent: + ofs = sent = 0 # shrunk; send from start + else: + ofs = sent # just new fd? resume from same ofs + f.seek(ofs) + self.log("reopened at byte %d: %r" % (ofs, abspath), 6) + gone = 0 + st = st2 + except: + gone += 1 + if gone > 3: + self.log("file deleted; disconnecting") + break + except IOError as ex: + if ex.errno not in (errno.EPIPE, errno.ESHUTDOWN, errno.EBADFD): + raise + finally: + if f: + f.close() + def tx_pipe( self, ptop: str, @@ -4762,7 +4883,6 @@ class HttpCli(object): if zi == 2 or (zi == 1 and self.avol): dl_list = self.get_dls() for t0, t1, sent, sz, vp, dl_id, uname in dl_list: - rem = sz - sent td = max(0.1, now - t0) rd, fn = vsplit(vp) if not rd: diff --git a/docs/devnotes.md b/docs/devnotes.md index 2debce96..b86213ad 100644 --- a/docs/devnotes.md +++ b/docs/devnotes.md @@ -191,6 +191,9 @@ authenticate using header `Cookie: cppwd=foo` or url param `&pw=foo` | GET | `?v` | open image/video/audio in mediaplayer | | GET | `?txt` | get file at URL as plaintext | | GET | `?txt=iso-8859-1` | ...with specific charset | +| GET | `?tail` | continuously stream a growing file | +| GET | `?tail=1024` | ...starting from byte 1024 | +| GET | `?tail=-128` | ...starting 128 bytes from the end | | GET | `?th` | get image/video at URL as thumbnail | | GET | `?th=opus` | convert audio file to 128kbps opus | | GET | `?th=caf` | ...in the iOS-proprietary container | diff --git a/tests/util.py b/tests/util.py index f938aa55..4d1236d8 100644 --- a/tests/util.py +++ b/tests/util.py @@ -143,7 +143,7 @@ class Cfg(Namespace): def __init__(self, a=None, v=None, c=None, **ka0): ka = {} - ex = "chpw daw dav_auth dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic hardlink_only nid nih no_acode no_athumb no_bauth no_clone no_cp no_dav no_db_ip no_del no_dirsz no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nsort nw og og_no_head og_s_title ohead q rand re_dirsz rss smb srch_dbg srch_excl stats uqe vague_403 vc ver wo_up_readme write_uplog xdev xlink xvol zipmaxu zs" + ex = "chpw daw dav_auth dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic hardlink_only nid nih no_acode no_athumb no_bauth no_clone no_cp no_dav no_db_ip no_del no_dirsz no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tail no_tarcmp no_thumb no_vthumb no_zip nrand nsort nw og og_no_head og_s_title ohead q rand re_dirsz rss smb srch_dbg srch_excl stats uqe vague_403 vc ver wo_up_readme write_uplog xdev xlink xvol zipmaxu zs" ka.update(**{k: False for k in ex.split()}) ex = "dav_inf dedup dotpart dotsrch hook_v no_dhash no_fastboot no_fpool no_htp no_rescan no_sendfile no_ses no_snap no_up_list no_voldump re_dhash see_dots plain_ip" @@ -152,10 +152,10 @@ class Cfg(Namespace): ex = "ah_cli ah_gen css_browser dbpath hist ipu js_browser js_other mime mimes no_forget no_hash no_idx nonsus_urls og_tpl og_ua ua_nodoc ua_nozip" ka.update(**{k: None for k in ex.split()}) - ex = "hash_mt hsortn safe_dedup srch_time u2abort u2j u2sz" + ex = "hash_mt hsortn safe_dedup srch_time tail_fd tail_rate u2abort u2j u2sz" ka.update(**{k: 1 for k in ex.split()}) - ex = "au_vol dl_list mtab_age reg_cap s_thead s_tbody th_convt ups_who zip_who" + ex = "au_vol dl_list mtab_age reg_cap s_thead s_tbody tail_who th_convt ups_who zip_who" ka.update(**{k: 9 for k in ex.split()}) ex = "db_act forget_ip k304 loris no304 nosubtle re_maxage rproxy rsp_jtr rsp_slp s_wr_slp snap_wri theme themes turbo u2ow zipmaxn zipmaxs"