mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 00:52:16 -06:00
509 lines
12 KiB
Python
509 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
# coding: latin-1
|
|
from __future__ import print_function, unicode_literals
|
|
import re, os, sys, time, shutil, signal, threading, tarfile, hashlib, platform, tempfile, traceback
|
|
import subprocess as sp
|
|
|
|
|
|
"""
|
|
to edit this file, use HxD or "vim -b"
|
|
(there is compressed stuff at the end)
|
|
|
|
run me with python 2.7 or 3.3+ to unpack and run copyparty
|
|
|
|
there's zero binaries! just plaintext python scripts all the way down
|
|
so you can easily unpack the archive and inspect it for shady stuff
|
|
|
|
the archive data is attached after the b"\n# eof\n" archive marker,
|
|
b"?0" decodes to b"\x00"
|
|
b"?n" decodes to b"\n"
|
|
b"?r" decodes to b"\r"
|
|
b"??" decodes to b"?"
|
|
"""
|
|
|
|
|
|
# set by make-sfx.sh
|
|
VER = None
|
|
SIZE = None
|
|
CKSUM = None
|
|
STAMP = None
|
|
|
|
PY2 = sys.version_info < (3,)
|
|
PY37 = sys.version_info > (3, 7)
|
|
WINDOWS = sys.platform in ["win32", "msys"]
|
|
sys.dont_write_bytecode = True
|
|
me = os.path.abspath(os.path.realpath(__file__))
|
|
|
|
|
|
def eprint(*a, **ka):
|
|
ka["file"] = sys.stderr
|
|
print(*a, **ka)
|
|
|
|
|
|
def msg(*a, **ka):
|
|
if a:
|
|
a = ["[SFX]", a[0]] + list(a[1:])
|
|
|
|
eprint(*a, **ka)
|
|
|
|
|
|
# skip 1
|
|
|
|
|
|
def testptn1():
|
|
"""test: creates a test-pattern for encode()"""
|
|
import struct
|
|
|
|
buf = b""
|
|
for c in range(256):
|
|
buf += struct.pack("B", c)
|
|
|
|
yield buf
|
|
|
|
|
|
def testptn2():
|
|
import struct
|
|
|
|
for a in range(256):
|
|
if a % 16 == 0:
|
|
msg(a)
|
|
|
|
for b in range(256):
|
|
buf = b""
|
|
for c in range(256):
|
|
buf += struct.pack("BBBB", a, b, c, b)
|
|
yield buf
|
|
|
|
|
|
def testptn3():
|
|
with open("C:/Users/ed/Downloads/python-3.8.1-amd64.exe", "rb", 512 * 1024) as f:
|
|
while True:
|
|
buf = f.read(512 * 1024)
|
|
if not buf:
|
|
break
|
|
|
|
yield buf
|
|
|
|
|
|
testptn = testptn2
|
|
|
|
|
|
def testchk(cdata):
|
|
"""test: verifies that `data` yields testptn"""
|
|
import struct
|
|
|
|
cbuf = b""
|
|
mbuf = b""
|
|
checked = 0
|
|
t0 = time.time()
|
|
mdata = testptn()
|
|
while True:
|
|
if not mbuf:
|
|
try:
|
|
mbuf += next(mdata)
|
|
except:
|
|
break
|
|
|
|
if not cbuf:
|
|
try:
|
|
cbuf += next(cdata)
|
|
except:
|
|
expect = mbuf[:8]
|
|
expect = "".join(
|
|
" {:02x}".format(x)
|
|
for x in struct.unpack("B" * len(expect), expect)
|
|
)
|
|
raise Exception(
|
|
"truncated at {}, expected{}".format(checked + len(cbuf), expect)
|
|
)
|
|
|
|
ncmp = min(len(cbuf), len(mbuf))
|
|
# msg("checking {:x}H bytes, {:x}H ok so far".format(ncmp, checked))
|
|
for n in range(ncmp):
|
|
checked += 1
|
|
if cbuf[n] != mbuf[n]:
|
|
expect = mbuf[n : n + 8]
|
|
expect = "".join(
|
|
" {:02x}".format(x)
|
|
for x in struct.unpack("B" * len(expect), expect)
|
|
)
|
|
cc = struct.unpack(b"B", cbuf[n : n + 1])[0]
|
|
raise Exception(
|
|
"byte {:x}H bad, got {:02x}, expected{}".format(checked, cc, expect)
|
|
)
|
|
|
|
cbuf = cbuf[ncmp:]
|
|
mbuf = mbuf[ncmp:]
|
|
|
|
td = time.time() - t0
|
|
txt = "all {}d bytes OK in {:.3f} sec, {:.3f} MB/s".format(
|
|
checked, td, (checked / (1024 * 1024.0)) / td
|
|
)
|
|
msg(txt)
|
|
|
|
|
|
def encode(data, size, cksum, ver, ts):
|
|
"""creates a new sfx; `data` should yield bufs to attach"""
|
|
nb = 0
|
|
nin = 0
|
|
nout = 0
|
|
skip = False
|
|
with open(me, "rb") as fi:
|
|
unpk = ""
|
|
src = fi.read().replace(b"\r", b"").rstrip(b"\n").decode("utf-8")
|
|
for ln in src.split("\n"):
|
|
if ln.endswith("# skip 0"):
|
|
skip = False
|
|
nb = 9
|
|
continue
|
|
|
|
if ln.endswith("# skip 1") or skip:
|
|
skip = True
|
|
continue
|
|
|
|
if ln.strip().startswith("# fmt: "):
|
|
continue
|
|
|
|
if ln:
|
|
nb = 0
|
|
else:
|
|
nb += 1
|
|
if nb > 2:
|
|
continue
|
|
|
|
unpk += ln + "\n"
|
|
|
|
for k, v in [
|
|
["VER", '"' + ver + '"'],
|
|
["SIZE", size],
|
|
["CKSUM", '"' + cksum + '"'],
|
|
["STAMP", ts],
|
|
]:
|
|
v1 = "\n{} = None\n".format(k)
|
|
v2 = "\n{} = {}\n".format(k, v)
|
|
unpk = unpk.replace(v1, v2)
|
|
|
|
unpk = unpk.replace("\n ", "\n\t")
|
|
for _ in range(16):
|
|
unpk = unpk.replace("\t ", "\t\t")
|
|
|
|
with open("sfx.out", "wb") as f:
|
|
f.write(unpk.encode("utf-8").rstrip(b"\n") + b"\n\n\n# eof")
|
|
for buf in data:
|
|
ebuf = (
|
|
buf.replace(b"?", b"??")
|
|
.replace(b"\x00", b"?0")
|
|
.replace(b"\r", b"?r")
|
|
.replace(b"\n", b"?n")
|
|
)
|
|
nin += len(buf)
|
|
nout += len(ebuf)
|
|
while ebuf:
|
|
ep = 4090
|
|
while True:
|
|
a = ebuf.rfind(b"?", 0, ep)
|
|
if a < 0 or ep - a > 2:
|
|
break
|
|
ep = a
|
|
buf = ebuf[:ep]
|
|
ebuf = ebuf[ep:]
|
|
f.write(b"\n#" + buf)
|
|
|
|
f.write(b"\n\n")
|
|
|
|
msg("wrote {:x}H bytes ({:x}H after encode)".format(nin, nout))
|
|
|
|
|
|
def makesfx(tar_src, ver, ts):
|
|
sz = os.path.getsize(tar_src)
|
|
cksum = hashfile(tar_src)
|
|
encode(yieldfile(tar_src), sz, cksum, ver, ts)
|
|
|
|
|
|
# skip 0
|
|
|
|
|
|
def u8(gen):
|
|
try:
|
|
for s in gen:
|
|
yield s.decode("utf-8", "ignore")
|
|
except:
|
|
yield s
|
|
for s in gen:
|
|
yield s
|
|
|
|
|
|
def yieldfile(fn):
|
|
with open(fn, "rb") as f:
|
|
for block in iter(lambda: f.read(64 * 1024), b""):
|
|
yield block
|
|
|
|
|
|
def hashfile(fn):
|
|
h = hashlib.sha1()
|
|
for block in yieldfile(fn):
|
|
h.update(block)
|
|
|
|
return h.hexdigest()[:24]
|
|
|
|
|
|
def unpack():
|
|
"""unpacks the tar yielded by `data`"""
|
|
name = "pe-copyparty"
|
|
try:
|
|
name += "." + str(os.geteuid())
|
|
except:
|
|
pass
|
|
|
|
tag = "v" + str(STAMP)
|
|
top = tempfile.gettempdir()
|
|
opj = os.path.join
|
|
ofe = os.path.exists
|
|
final = opj(top, name)
|
|
san = opj(final, "copyparty/up2k.py")
|
|
for suf in range(0, 9001):
|
|
withpid = "{}.{}.{}".format(name, os.getpid(), suf)
|
|
mine = opj(top, withpid)
|
|
if not ofe(mine):
|
|
break
|
|
|
|
tar = opj(mine, "tar")
|
|
|
|
try:
|
|
if tag in os.listdir(final) and ofe(san):
|
|
msg("found early")
|
|
return final
|
|
except:
|
|
pass
|
|
|
|
sz = 0
|
|
os.mkdir(mine)
|
|
with open(tar, "wb") as f:
|
|
for buf in get_payload():
|
|
sz += len(buf)
|
|
f.write(buf)
|
|
|
|
ck = hashfile(tar)
|
|
if ck != CKSUM:
|
|
t = "\n\nexpected {} ({} byte)\nobtained {} ({} byte)\nsfx corrupt"
|
|
raise Exception(t.format(CKSUM, SIZE, ck, sz))
|
|
|
|
with tarfile.open(tar, "r:bz2") as tf:
|
|
# this is safe against traversal
|
|
# skip 1
|
|
# since it will never process user-provided data;
|
|
# the only possible input is a single tar.bz2
|
|
# which gets hardcoded into this script at build stage
|
|
# skip 0
|
|
tf.extractall(mine)
|
|
|
|
os.remove(tar)
|
|
|
|
with open(opj(mine, tag), "wb") as f:
|
|
f.write(b"h\n")
|
|
|
|
try:
|
|
if tag in os.listdir(final) and ofe(san):
|
|
msg("found late")
|
|
return final
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
if os.path.islink(final):
|
|
os.remove(final)
|
|
else:
|
|
shutil.rmtree(final)
|
|
except:
|
|
pass
|
|
|
|
for fn in u8(os.listdir(top)):
|
|
if fn.startswith(name) and fn != withpid:
|
|
try:
|
|
old = opj(top, fn)
|
|
if time.time() - os.path.getmtime(old) > 86400:
|
|
shutil.rmtree(old)
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
os.symlink(mine, final)
|
|
except:
|
|
try:
|
|
os.rename(mine, final)
|
|
return final
|
|
except:
|
|
msg("reloc fail,", mine)
|
|
|
|
return mine
|
|
|
|
|
|
def get_payload():
|
|
"""yields the binary data attached to script"""
|
|
with open(me, "rb") as f:
|
|
buf = f.read().rstrip(b"\r\n")
|
|
|
|
ptn = b"\n# eof\n#"
|
|
a = buf.find(ptn)
|
|
if a < 0:
|
|
raise Exception("could not find archive marker")
|
|
|
|
esc = {b"??": b"?", b"?r": b"\r", b"?n": b"\n", b"?0": b"\x00"}
|
|
buf = buf[a + len(ptn) :].replace(b"\n#", b"")
|
|
p = 0
|
|
while buf:
|
|
a = buf.find(b"?", p)
|
|
if a < 0:
|
|
yield buf[p:]
|
|
break
|
|
elif a == p:
|
|
yield esc[buf[p : p + 2]]
|
|
p += 2
|
|
else:
|
|
yield buf[p:a]
|
|
p = a
|
|
|
|
|
|
def utime(top):
|
|
# avoid cleaners
|
|
files = [os.path.join(dp, p) for dp, dd, df in os.walk(top) for p in dd + df]
|
|
while True:
|
|
t = int(time.time())
|
|
for f in [top] + files:
|
|
os.utime(f, (t, t))
|
|
|
|
time.sleep(78123)
|
|
|
|
|
|
def confirm(rv):
|
|
msg()
|
|
msg("retcode", rv if rv else traceback.format_exc())
|
|
if WINDOWS:
|
|
msg("*** hit enter to exit ***")
|
|
try:
|
|
raw_input() if PY2 else input()
|
|
except:
|
|
pass
|
|
|
|
sys.exit(rv or 1)
|
|
|
|
|
|
def run(tmp, j2, ftp):
|
|
msg("jinja2:", j2 or "bundled")
|
|
msg("pyftpd:", ftp or "bundled")
|
|
msg("sfxdir:", tmp)
|
|
msg()
|
|
|
|
t = threading.Thread(target=utime, args=(tmp,))
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
ld = (("", ""), (j2, "j2"), (ftp, "ftp"), (not PY2, "py2"), (PY37, "py37"))
|
|
ld = [os.path.join(tmp, b) for a, b in ld if not a]
|
|
|
|
# skip 1
|
|
# enable this to dynamically remove type hints at startup,
|
|
# in case a future python version can use them for performance
|
|
if sys.version_info < (3, 10) and False:
|
|
sys.path.insert(0, ld[0])
|
|
|
|
from strip_hints.a import uh
|
|
|
|
uh(tmp + "/copyparty")
|
|
# skip 0
|
|
|
|
if any([re.match(r"^-.*j[0-9]", x) for x in sys.argv]):
|
|
run_s(ld)
|
|
else:
|
|
run_i(ld)
|
|
|
|
|
|
def run_i(ld):
|
|
for x in ld:
|
|
sys.path.insert(0, x)
|
|
|
|
from copyparty.__main__ import main as p
|
|
|
|
p()
|
|
|
|
|
|
def run_s(ld):
|
|
# fmt: off
|
|
c = "import sys,runpy;" + "".join(['sys.path.insert(0,r"' + x.replace("\\", "/") + '");' for x in ld]) + 'runpy.run_module("copyparty",run_name="__main__")'
|
|
c = [str(x) for x in [sys.executable, "-c", c] + list(sys.argv[1:])]
|
|
# fmt: on
|
|
msg("\n", c, "\n")
|
|
p = sp.Popen(c)
|
|
|
|
def bye(*a):
|
|
p.send_signal(signal.SIGINT)
|
|
|
|
signal.signal(signal.SIGTERM, bye)
|
|
p.wait()
|
|
|
|
raise SystemExit(p.returncode)
|
|
|
|
|
|
def main():
|
|
sysver = str(sys.version).replace("\n", "\n" + " " * 18)
|
|
pktime = time.strftime("%Y-%m-%d, %H:%M:%S", time.gmtime(STAMP))
|
|
msg()
|
|
msg(" this is: copyparty", VER)
|
|
msg(" packed at:", pktime, "UTC,", STAMP)
|
|
msg("archive is:", me)
|
|
msg("python bin:", sys.executable)
|
|
msg("python ver:", platform.python_implementation(), sysver)
|
|
msg()
|
|
|
|
arg = ""
|
|
try:
|
|
arg = sys.argv[1]
|
|
except:
|
|
pass
|
|
|
|
# skip 1
|
|
|
|
if arg == "--sfx-testgen":
|
|
return encode(testptn(), 1, "x", "x", 1)
|
|
|
|
if arg == "--sfx-testchk":
|
|
return testchk(get_payload())
|
|
|
|
if arg == "--sfx-make":
|
|
tar, ver, ts = sys.argv[2:]
|
|
return makesfx(tar, ver, ts)
|
|
|
|
# skip 0
|
|
|
|
tmp = os.path.realpath(unpack())
|
|
|
|
try:
|
|
from jinja2 import __version__ as j2
|
|
except:
|
|
j2 = None
|
|
|
|
try:
|
|
from pyftpdlib.__init__ import __ver__ as ftp
|
|
except:
|
|
ftp = None
|
|
|
|
try:
|
|
run(tmp, j2, ftp)
|
|
except SystemExit as ex:
|
|
c = ex.code
|
|
if c not in [0, -15]:
|
|
confirm(ex.code)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except:
|
|
confirm(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|
|
# skip 1
|
|
# python sfx.py --sfx-testgen && python test.py --sfx-testchk
|
|
# c:\Python27\python.exe sfx.py --sfx-testgen && c:\Python27\python.exe test.py --sfx-testchk
|