mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
delinting
This commit is contained in:
parent
5414591362
commit
fe0330f6f7
15
.vscode/settings.json
vendored
15
.vscode/settings.json
vendored
|
@ -33,6 +33,17 @@
|
|||
"test_*.py"
|
||||
],
|
||||
"python.linting.pylintEnabled": true,
|
||||
"python.linting.enabled": true,
|
||||
"python.pythonPath": "/usr/bin/python3",
|
||||
"python.linting.flake8Enabled": true,
|
||||
"python.linting.banditEnabled": true,
|
||||
"python.linting.flake8Args": [
|
||||
"--max-line-length=120",
|
||||
"--ignore=E722,F405,E203,W503,W293",
|
||||
],
|
||||
"python.formatting.provider": "black",
|
||||
"editor.formatOnSave": true,
|
||||
//
|
||||
// things you may wanna edit:
|
||||
//
|
||||
"python.pythonPath": ".env/bin/python",
|
||||
//"python.linting.enabled": true,
|
||||
}
|
12
README.md
12
README.md
|
@ -25,3 +25,15 @@ turn your phone or raspi into a portable file server with resumable uploads/down
|
|||
* [ ] accounts
|
||||
|
||||
conclusion: don't bother
|
||||
|
||||
## dependencies
|
||||
|
||||
* jinja2
|
||||
* markupsafe
|
||||
|
||||
## dev env
|
||||
|
||||
python3 -v venv .env
|
||||
. .env/bin/activate
|
||||
pip install jinja2 # dependencies
|
||||
pip install black bandit pylint flake8 # vscode tooling
|
||||
|
|
|
@ -14,8 +14,8 @@ import threading
|
|||
from textwrap import dedent
|
||||
import multiprocessing as mp
|
||||
|
||||
from .__version__ import *
|
||||
from .tcpsrv import *
|
||||
from .__version__ import S_VERSION, S_BUILD_DT
|
||||
from .tcpsrv import TcpSrv
|
||||
|
||||
|
||||
class RiceFormatter(argparse.HelpFormatter):
|
||||
|
@ -41,7 +41,7 @@ def main():
|
|||
try:
|
||||
# support vscode debugger (bonus: same behavior as on windows)
|
||||
mp.set_start_method("spawn", True)
|
||||
except:
|
||||
except AttributeError:
|
||||
# py2.7 probably
|
||||
pass
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ from __future__ import print_function
|
|||
import os
|
||||
import threading
|
||||
|
||||
from .__init__ import *
|
||||
from .__init__ import PY2
|
||||
from .util import undot
|
||||
|
||||
|
||||
|
@ -21,8 +21,8 @@ class VFS(object):
|
|||
|
||||
def add(self, src, dst):
|
||||
"""get existing, or add new path to the vfs"""
|
||||
assert not src.endswith("/")
|
||||
assert not dst.endswith("/")
|
||||
assert not src.endswith("/") # nosec
|
||||
assert not dst.endswith("/") # nosec
|
||||
|
||||
if "/" in dst:
|
||||
# requires breadth-first population (permissions trickle down)
|
||||
|
@ -93,7 +93,7 @@ class VFS(object):
|
|||
for name in virt_all:
|
||||
try:
|
||||
real.remove(name)
|
||||
except:
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
absreal = []
|
||||
|
@ -227,14 +227,14 @@ class AuthSrv(object):
|
|||
if not mount:
|
||||
# -h says our defaults are CWD at root and read/write for everyone
|
||||
vfs = VFS(os.path.abspath("."), "", ["*"], ["*"])
|
||||
elif not "" in mount:
|
||||
elif "" not in mount:
|
||||
# there's volumes but no root; make root inaccessible
|
||||
vfs = VFS(os.path.abspath("."), "", [], [])
|
||||
|
||||
maxdepth = 0
|
||||
for dst in sorted(mount.keys(), key=lambda x: (x.count("/"), len(x))):
|
||||
depth = dst.count("/")
|
||||
assert maxdepth <= depth
|
||||
assert maxdepth <= depth # nosec
|
||||
maxdepth = depth
|
||||
|
||||
if dst == "":
|
||||
|
|
|
@ -2,13 +2,12 @@
|
|||
# coding: utf-8
|
||||
from __future__ import print_function
|
||||
|
||||
import os
|
||||
import time
|
||||
import hashlib
|
||||
import mimetypes
|
||||
import jinja2
|
||||
|
||||
from .__init__ import *
|
||||
from .util import *
|
||||
from .__init__ import E, PY2
|
||||
from .util import * # noqa # pylint: disable=unused-wildcard-import
|
||||
|
||||
if not PY2:
|
||||
unicode = str
|
||||
|
@ -72,7 +71,7 @@ class HttpCli(object):
|
|||
|
||||
# split req into vpath + args
|
||||
args = {}
|
||||
if not "?" in self.req:
|
||||
if "?" not in self.req:
|
||||
vpath = undot(self.req)
|
||||
else:
|
||||
vpath, arglist = self.req.split("?", 1)
|
||||
|
@ -164,7 +163,7 @@ class HttpCli(object):
|
|||
try:
|
||||
if self.headers["expect"].lower() == "100-continue":
|
||||
self.s.send(b"HTTP/1.1 100 Continue\r\n\r\n")
|
||||
except:
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
self.parser = MultipartParser(self.log, self.sr, self.headers)
|
||||
|
@ -190,7 +189,7 @@ class HttpCli(object):
|
|||
msg = u"login ok"
|
||||
else:
|
||||
msg = u"naw dude"
|
||||
pwd = u"x"
|
||||
pwd = u"x" # nosec
|
||||
|
||||
h = ["Set-Cookie: cppwd={}; Path=/".format(pwd)]
|
||||
html = u'<h1>{}<h2><a href="/">ack'.format(msg)
|
||||
|
@ -281,4 +280,3 @@ class HttpCli(object):
|
|||
|
||||
def tx_browser(self):
|
||||
self.loud_reply("TODO browser {}".format(self.vpath))
|
||||
|
||||
|
|
|
@ -3,10 +3,9 @@
|
|||
from __future__ import print_function
|
||||
|
||||
import os
|
||||
import time
|
||||
import jinja2
|
||||
|
||||
from .__init__ import *
|
||||
from .__init__ import E
|
||||
from .util import Unrecv
|
||||
from .httpcli import HttpCli
|
||||
|
||||
|
@ -42,4 +41,3 @@ class HttpConn(object):
|
|||
if not cli.run():
|
||||
self.s.close()
|
||||
return
|
||||
|
||||
|
|
|
@ -2,10 +2,11 @@
|
|||
# coding: utf-8
|
||||
from __future__ import print_function
|
||||
|
||||
import time
|
||||
import threading
|
||||
|
||||
from .httpconn import *
|
||||
from .authsrv import *
|
||||
from .httpconn import HttpConn
|
||||
from .authsrv import AuthSrv
|
||||
|
||||
|
||||
class HttpSrv(object):
|
||||
|
@ -60,7 +61,7 @@ class HttpSrv(object):
|
|||
del self.clients[cli]
|
||||
|
||||
if self.disconnect_func:
|
||||
self.disconnect_func(addr)
|
||||
self.disconnect_func(addr) # pylint: disable=not-callable
|
||||
|
||||
def thr_workload(self):
|
||||
"""indicates the python interpreter workload caused by this HttpSrv"""
|
||||
|
|
|
@ -2,18 +2,19 @@
|
|||
# coding: utf-8
|
||||
from __future__ import print_function
|
||||
|
||||
import sys
|
||||
import time
|
||||
import signal
|
||||
import threading
|
||||
import multiprocessing as mp
|
||||
|
||||
from .__init__ import *
|
||||
from .httpsrv import *
|
||||
from .__init__ import PY2, WINDOWS
|
||||
from .httpsrv import HttpSrv
|
||||
|
||||
if PY2 and not WINDOWS:
|
||||
from multiprocessing.reduction import ForkingPickler
|
||||
from StringIO import StringIO as MemesIO
|
||||
import pickle
|
||||
from StringIO import StringIO as MemesIO # pylint: disable=import-error
|
||||
import pickle # nosec
|
||||
|
||||
|
||||
class MpWorker(object):
|
||||
|
@ -71,7 +72,7 @@ class MpWorker(object):
|
|||
|
||||
sck = d[1]
|
||||
if PY2:
|
||||
sck = pickle.loads(sck)
|
||||
sck = pickle.loads(sck) # nosec
|
||||
|
||||
self.httpsrv.accept(sck, d[2])
|
||||
|
||||
|
@ -182,7 +183,7 @@ class MpSrv(object):
|
|||
proc.workload = 0
|
||||
|
||||
if self.disconnect_func:
|
||||
self.disconnect_func(addr)
|
||||
self.disconnect_func(addr) # pylint: disable=not-callable
|
||||
|
||||
def accept(self, sck, addr):
|
||||
proc = sorted(self.procs, key=lambda x: x.workload)[0]
|
||||
|
|
|
@ -2,13 +2,14 @@
|
|||
# coding: utf-8
|
||||
from __future__ import print_function
|
||||
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import threading
|
||||
from datetime import datetime, timedelta
|
||||
import calendar
|
||||
|
||||
from .__init__ import *
|
||||
from .__init__ import PY2, WINDOWS
|
||||
|
||||
|
||||
class TcpSrv(object):
|
||||
|
@ -30,7 +31,7 @@ class TcpSrv(object):
|
|||
try:
|
||||
s.connect(("10.255.255.255", 1))
|
||||
ip = s.getsockname()[0]
|
||||
except:
|
||||
except OSError:
|
||||
pass
|
||||
s.close()
|
||||
|
||||
|
|
|
@ -6,12 +6,11 @@ import os
|
|||
import json
|
||||
import shutil
|
||||
import unittest
|
||||
import subprocess as sp
|
||||
import subprocess as sp # nosec
|
||||
|
||||
from io import StringIO
|
||||
from textwrap import dedent
|
||||
from argparse import Namespace
|
||||
from copyparty.authsrv import *
|
||||
from copyparty.authsrv import AuthSrv
|
||||
|
||||
|
||||
class TestVFS(unittest.TestCase):
|
||||
|
@ -36,12 +35,8 @@ class TestVFS(unittest.TestCase):
|
|||
def runcmd(self, *argv):
|
||||
p = sp.Popen(argv, stdout=sp.PIPE, stderr=sp.PIPE)
|
||||
stdout, stderr = p.communicate()
|
||||
try:
|
||||
stdout = stdout.decode("utf-8")
|
||||
stderr = stderr.decode("utf-8")
|
||||
except:
|
||||
pass
|
||||
|
||||
stdout = stdout.decode("utf-8")
|
||||
stderr = stderr.decode("utf-8")
|
||||
return [p.returncode, stdout, stderr]
|
||||
|
||||
def chkcmd(self, *argv):
|
||||
|
@ -49,8 +44,10 @@ class TestVFS(unittest.TestCase):
|
|||
if ok != 0:
|
||||
raise Exception(serr)
|
||||
|
||||
return sout, serr
|
||||
|
||||
def get_ramdisk(self):
|
||||
for vol in ["/dev/shm", "/Volumes/cptd"]:
|
||||
for vol in ["/dev/shm", "/Volumes/cptd"]: # nosec (singleton test)
|
||||
if os.path.exists(vol):
|
||||
return vol
|
||||
|
||||
|
@ -65,7 +62,7 @@ class TestVFS(unittest.TestCase):
|
|||
td = self.get_ramdisk() + "/vfs"
|
||||
try:
|
||||
shutil.rmtree(td)
|
||||
except:
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
os.mkdir(td)
|
||||
|
|
Loading…
Reference in a new issue