mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
browser getting close
This commit is contained in:
parent
7aba7737d5
commit
2414766678
3
.vscode/settings.json
vendored
3
.vscode/settings.json
vendored
|
@ -41,6 +41,9 @@
|
|||
],
|
||||
"python.formatting.provider": "black",
|
||||
"editor.formatOnSave": true,
|
||||
"[html]": {
|
||||
"editor.formatOnSave": false,
|
||||
}
|
||||
//
|
||||
// things you may wanna edit:
|
||||
//
|
||||
|
|
|
@ -67,12 +67,17 @@ class VFS(object):
|
|||
|
||||
return [self, vpath]
|
||||
|
||||
def can_access(self, vpath, user):
|
||||
"""return [readable,writable]"""
|
||||
vn, _ = self._find(vpath)
|
||||
return [user in vn.uread, user in vn.uwrite]
|
||||
|
||||
def ls(self, vpath, user):
|
||||
"""return user-readable [virt,real] items at vpath"""
|
||||
"""return user-readable [fsdir,real,virt] items at vpath"""
|
||||
vn, rem = self._find(vpath)
|
||||
|
||||
if user not in vn.uread:
|
||||
return [[], []]
|
||||
return [vn.realpath, [], []]
|
||||
|
||||
rp = vn.realpath
|
||||
if rem:
|
||||
|
@ -96,11 +101,7 @@ class VFS(object):
|
|||
except ValueError:
|
||||
pass
|
||||
|
||||
absreal = []
|
||||
for p in real:
|
||||
absreal.append("{}/{}".format(rp, p).replace("//", "/"))
|
||||
|
||||
return [absreal, virt_vis]
|
||||
return [rp, real, virt_vis]
|
||||
|
||||
def user_tree(self, uname, readable=False, writable=False):
|
||||
ret = []
|
||||
|
|
|
@ -3,14 +3,20 @@
|
|||
from __future__ import print_function
|
||||
|
||||
import os
|
||||
import stat
|
||||
import time
|
||||
from datetime import datetime
|
||||
import mimetypes
|
||||
import cgi
|
||||
|
||||
from .__init__ import E, PY2
|
||||
from .util import * # noqa # pylint: disable=unused-wildcard-import
|
||||
|
||||
if not PY2:
|
||||
unicode = str
|
||||
from urllib.parse import unquote_plus
|
||||
else:
|
||||
from urllib import unquote_plus
|
||||
|
||||
|
||||
class HttpCli(object):
|
||||
|
@ -30,6 +36,8 @@ class HttpCli(object):
|
|||
|
||||
self.ok = True
|
||||
self.bufsz = 1024 * 32
|
||||
self.absolute_urls = False
|
||||
self.out_headers = {}
|
||||
|
||||
def log(self, msg):
|
||||
self.log_func(self.log_src, msg)
|
||||
|
@ -72,9 +80,15 @@ class HttpCli(object):
|
|||
# split req into vpath + args
|
||||
args = {}
|
||||
if "?" not in self.req:
|
||||
if not self.req.endswith("/"):
|
||||
self.absolute_urls = True
|
||||
|
||||
vpath = undot(self.req)
|
||||
else:
|
||||
vpath, arglist = self.req.split("?", 1)
|
||||
if not vpath.endswith("/"):
|
||||
self.absolute_urls = True
|
||||
|
||||
vpath = undot(vpath)
|
||||
for k in arglist.split("&"):
|
||||
if "=" in k:
|
||||
|
@ -108,6 +122,9 @@ class HttpCli(object):
|
|||
u"Content-Type: " + mime,
|
||||
u"Content-Length: " + str(len(body)),
|
||||
]
|
||||
for k, v in self.out_headers.items():
|
||||
response.append("{}: {}".format(k, v))
|
||||
|
||||
response.extend(headers)
|
||||
response_str = u"\r\n".join(response).encode("utf-8")
|
||||
if self.ok:
|
||||
|
@ -140,9 +157,10 @@ class HttpCli(object):
|
|||
else:
|
||||
self.vpath = self.wvol[0]
|
||||
|
||||
self.absolute_urls = True
|
||||
|
||||
# go home if verboten
|
||||
readable = self.vpath in self.rvol
|
||||
writable = self.vpath in self.wvol
|
||||
readable, writable = self.conn.auth.vfs.can_access(self.vpath, self.uname)
|
||||
if not readable and not writable:
|
||||
self.log("inaccessible: {}".format(self.vpath))
|
||||
self.args = {"h": True}
|
||||
|
@ -154,7 +172,7 @@ class HttpCli(object):
|
|||
if readable:
|
||||
return self.tx_browser()
|
||||
else:
|
||||
return self.tx_jupper()
|
||||
return self.tx_upper()
|
||||
|
||||
def handle_post(self):
|
||||
self.log("")
|
||||
|
@ -275,8 +293,48 @@ class HttpCli(object):
|
|||
html = self.conn.tpl_mounts.render(this=self)
|
||||
self.reply(html.encode("utf-8"))
|
||||
|
||||
def tx_jupper(self):
|
||||
def tx_upper(self):
|
||||
# return html for basic uploader;
|
||||
# js rewrites to up2k unless args['b']
|
||||
self.loud_reply("TODO jupper {}".format(self.vpath))
|
||||
|
||||
def tx_browser(self):
|
||||
self.loud_reply("TODO browser {}".format(self.vpath))
|
||||
vpath = u""
|
||||
vpnodes = [[u"/", u"/"]]
|
||||
for node in self.vpath.split("/"):
|
||||
vpath += u"/" + node
|
||||
vpnodes.append([cgi.escape(vpath) + "/", cgi.escape(node)])
|
||||
|
||||
fsroot, vfs_ls, vfs_virt = self.auth.vfs.ls(self.vpath, self.uname)
|
||||
vfs_ls.extend(vfs_virt)
|
||||
|
||||
dirs = []
|
||||
files = []
|
||||
for fn in vfs_ls:
|
||||
href = fn
|
||||
if self.absolute_urls:
|
||||
href = vpath + "/" + fn
|
||||
|
||||
fspath = fsroot + "/" + fn
|
||||
inf = os.stat(fspath)
|
||||
|
||||
is_dir = stat.S_ISDIR(inf.st_mode)
|
||||
if is_dir:
|
||||
margin = "DIR"
|
||||
href += "/"
|
||||
else:
|
||||
margin = "-"
|
||||
|
||||
sz = inf.st_size
|
||||
dt = datetime.utcfromtimestamp(inf.st_mtime)
|
||||
dt = dt.strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
item = [margin, cgi.escape(href), cgi.escape(fn), sz, dt]
|
||||
if is_dir:
|
||||
dirs.append(item)
|
||||
else:
|
||||
files.append(item)
|
||||
|
||||
dirs.extend(files)
|
||||
html = self.conn.tpl_browser.render(vpnodes=vpnodes, files=dirs)
|
||||
self.reply(html.encode("utf-8"))
|
||||
|
|
|
@ -12,8 +12,8 @@
|
|||
<body>
|
||||
<h1 id="path">
|
||||
{% for n in vpnodes[:-1] %}
|
||||
<a href="{{ n[0] }}">/{{ n[1] }}</a>
|
||||
{% endfor %}
|
||||
<a href="{{ n[0] }}">{{ n[1] }}</a>
|
||||
{%- endfor %}
|
||||
<span>{{ vpnodes[-1][1] }}</li>
|
||||
</h1>
|
||||
<table id="files">
|
||||
|
@ -26,14 +26,11 @@
|
|||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for f in files %}
|
||||
<tr>
|
||||
<td>{{ f[0] }}</td>
|
||||
<td><a href="{{ f[1] }}">{{ f[2] }}</a></td>
|
||||
<td>{{ f[3] }}</td>
|
||||
<td>{{ f[4] }}</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
|
||||
{% for f in files %}
|
||||
<tr><td>{{ f[0] }}</td><td><a href="{{ f[1] }}">{{ f[2] }}</a></td><td>{{ f[3] }}</td><td>{{ f[4] }}</td></tr>
|
||||
{%- endfor %}
|
||||
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
|
|
@ -11,6 +11,7 @@ import subprocess as sp # nosec
|
|||
from textwrap import dedent
|
||||
from argparse import Namespace
|
||||
from copyparty.authsrv import AuthSrv
|
||||
from copyparty import util
|
||||
|
||||
|
||||
class TestVFS(unittest.TestCase):
|
||||
|
@ -24,10 +25,10 @@ class TestVFS(unittest.TestCase):
|
|||
return foo
|
||||
|
||||
def undot(self, vfs, query, response):
|
||||
self.assertEqual(vfs.undot(query), response)
|
||||
self.assertEqual(util.undot(query), response)
|
||||
query = self.unfoo(query)
|
||||
response = self.unfoo(response)
|
||||
self.assertEqual(vfs.undot(query), response)
|
||||
self.assertEqual(util.undot(query), response)
|
||||
|
||||
def absify(self, root, names):
|
||||
return ["{}/{}".format(root, x).replace("//", "/") for x in names]
|
||||
|
@ -130,34 +131,39 @@ class TestVFS(unittest.TestCase):
|
|||
self.assertEqual(n.uread, ["k"])
|
||||
self.assertEqual(n.uwrite, ["*", "k"])
|
||||
|
||||
real, virt = vfs.ls("/", "*")
|
||||
self.assertEqual(real, self.absify(td, ["b", "c"]))
|
||||
fsdir, real, virt = vfs.ls("/", "*")
|
||||
self.assertEqual(fsdir, td)
|
||||
self.assertEqual(real, ["b", "c"])
|
||||
self.assertEqual(virt, ["a"])
|
||||
|
||||
real, virt = vfs.ls("a", "*")
|
||||
self.assertEqual(real, self.absify(td, ["a/aa", "a/ab"]))
|
||||
fsdir, real, virt = vfs.ls("a", "*")
|
||||
self.assertEqual(fsdir, td + "/a")
|
||||
self.assertEqual(real, ["aa", "ab"])
|
||||
self.assertEqual(virt, ["ac"])
|
||||
|
||||
real, virt = vfs.ls("a/ab", "*")
|
||||
self.assertEqual(real, self.absify(td, ["a/ab/aba", "a/ab/abb", "a/ab/abc"]))
|
||||
fsdir, real, virt = vfs.ls("a/ab", "*")
|
||||
self.assertEqual(fsdir, td + "/a/ab")
|
||||
self.assertEqual(real, ["aba", "abb", "abc"])
|
||||
self.assertEqual(virt, [])
|
||||
|
||||
real, virt = vfs.ls("a/ac", "*")
|
||||
self.assertEqual(real, self.absify(td, ["a/ac/aca", "a/ac/acc"]))
|
||||
fsdir, real, virt = vfs.ls("a/ac", "*")
|
||||
self.assertEqual(fsdir, td + "/a/ac")
|
||||
self.assertEqual(real, ["aca", "acc"])
|
||||
self.assertEqual(virt, [])
|
||||
|
||||
real, virt = vfs.ls("a/ac", "k")
|
||||
self.assertEqual(real, self.absify(td, ["a/ac/aca", "a/ac/acc"]))
|
||||
fsdir, real, virt = vfs.ls("a/ac", "k")
|
||||
self.assertEqual(fsdir, td + "/a/ac")
|
||||
self.assertEqual(real, ["aca", "acc"])
|
||||
self.assertEqual(virt, ["acb"])
|
||||
|
||||
real, virt = vfs.ls("a/ac/acb", "*")
|
||||
fsdir, real, virt = vfs.ls("a/ac/acb", "*")
|
||||
self.assertEqual(fsdir, td + "/a/ac/acb")
|
||||
self.assertEqual(real, [])
|
||||
self.assertEqual(virt, [])
|
||||
|
||||
real, virt = vfs.ls("a/ac/acb", "k")
|
||||
self.assertEqual(
|
||||
real, self.absify(td, ["a/ac/acb/acba", "a/ac/acb/acbb", "a/ac/acb/acbc"])
|
||||
)
|
||||
fsdir, real, virt = vfs.ls("a/ac/acb", "k")
|
||||
self.assertEqual(fsdir, td + "/a/ac/acb")
|
||||
self.assertEqual(real, ["acba", "acbb", "acbc"])
|
||||
self.assertEqual(virt, [])
|
||||
|
||||
# breadth-first construction
|
||||
|
@ -187,17 +193,21 @@ class TestVFS(unittest.TestCase):
|
|||
# shadowing
|
||||
vfs = AuthSrv(Namespace(c=None, a=[], v=[".::r", "b:a/ac:r"]), None).vfs
|
||||
|
||||
r1, v1 = vfs.ls("", "*")
|
||||
self.assertEqual(r1, self.absify(td, ["b", "c"]))
|
||||
fsp, r1, v1 = vfs.ls("", "*")
|
||||
self.assertEqual(fsp, td)
|
||||
self.assertEqual(r1, ["b", "c"])
|
||||
self.assertEqual(v1, ["a"])
|
||||
|
||||
r1, v1 = vfs.ls("a", "*")
|
||||
self.assertEqual(r1, self.absify(td, ["a/aa", "a/ab"]))
|
||||
fsp, r1, v1 = vfs.ls("a", "*")
|
||||
self.assertEqual(fsp, td + "/a")
|
||||
self.assertEqual(r1, ["aa", "ab"])
|
||||
self.assertEqual(v1, ["ac"])
|
||||
|
||||
r1, v1 = vfs.ls("a/ac", "*")
|
||||
r2, v2 = vfs.ls("b", "*")
|
||||
self.assertEqual(r1, self.absify(td, ["b/ba", "b/bb", "b/bc"]))
|
||||
fsp1, r1, v1 = vfs.ls("a/ac", "*")
|
||||
fsp2, r2, v2 = vfs.ls("b", "*")
|
||||
self.assertEqual(fsp1, td + "/b")
|
||||
self.assertEqual(fsp2, td + "/b")
|
||||
self.assertEqual(r1, ["ba", "bb", "bc"])
|
||||
self.assertEqual(r1, r2)
|
||||
self.assertEqual(v1, v2)
|
||||
|
||||
|
|
Loading…
Reference in a new issue