mirror of
https://github.com/9001/copyparty.git
synced 2026-06-22 22:12:52 -06:00
439 lines
14 KiB
Python
439 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
# coding: utf-8
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
|
|
from copyparty.authsrv import AuthSrv
|
|
from copyparty.httpcli import HttpCli
|
|
from tests import util as tu
|
|
from tests.util import Cfg
|
|
|
|
|
|
def hdr(query):
|
|
h = "GET /{} HTTP/1.1\r\nCookie: cppwd=o\r\nConnection: close\r\n\r\n"
|
|
return h.format(query).encode("utf-8")
|
|
|
|
|
|
def build_multipart_form(files, boundary="XD"):
|
|
"""
|
|
Build a multipart/form-data request body for testing uploads.
|
|
"""
|
|
parts = []
|
|
|
|
parts.append("--{}\r\n".format(boundary).encode("utf-8"))
|
|
parts.append('Content-Disposition: form-data; name="act"\r\n\r\n'.encode("utf-8"))
|
|
parts.append("bput\r\n".encode("utf-8"))
|
|
|
|
for filename, content in files:
|
|
parts.append("--{}\r\n".format(boundary).encode("utf-8"))
|
|
parts.append(
|
|
'Content-Disposition: form-data; name="f"; filename="{}"\r\n\r\n'.format(filename).encode("utf-8")
|
|
)
|
|
if isinstance(content, str):
|
|
parts.append(content.encode("utf-8"))
|
|
else:
|
|
parts.append(content)
|
|
parts.append("\r\n".encode("utf-8"))
|
|
|
|
parts.append("--{}--\r\n".format(boundary).encode("utf-8"))
|
|
|
|
return b"".join(parts)
|
|
|
|
|
|
class TestUpload(unittest.TestCase):
|
|
def setUp(self):
|
|
self.td = tu.get_ramdisk()
|
|
self.maxDiff = 99999
|
|
|
|
def tearDown(self):
|
|
os.chdir(tempfile.gettempdir())
|
|
shutil.rmtree(self.td)
|
|
|
|
def test_basic_upload(self):
|
|
"""Test basic upload functionality (bput)"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ao")
|
|
vcfg = ["ao/:ao:rw,o"]
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
files = [("test.txt", "Hello, World!")]
|
|
h, body = self.bup("ao", files)
|
|
|
|
self.assertIn("HTTP/1.1 201", h)
|
|
|
|
files_in_ao = os.listdir("ao")
|
|
test_files = [f for f in files_in_ao if f.startswith("test")]
|
|
self.assertTrue(len(test_files) > 0, "Uploaded file should exist")
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_basic_upload_json_response(self):
|
|
"""Test basic upload with JSON response"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ao")
|
|
vcfg = ["ao/:ao:rw,o"]
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
files = [("test.txt", "Hello, JSON!")]
|
|
h, body = self.bup("ao?j", files)
|
|
|
|
self.assertIn("HTTP/1.1 201", h)
|
|
self.assertIn("application/json", h)
|
|
|
|
response = json.loads(body)
|
|
self.assertEqual(response["status"], "OK")
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_large_file_upload(self):
|
|
"""Test uploading a relatively large file"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ao")
|
|
vcfg = ["ao/:ao:rw,o"]
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
large_content = b"A" * (128 * 1024)
|
|
files = [("large.bin", large_content)]
|
|
h, body = self.bup("ao", files)
|
|
|
|
self.assertIn("HTTP/1.1 201", h)
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_upload_without_write_permission(self):
|
|
"""Test upload without write permission - should fail"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ro")
|
|
vcfg = ["ro/:ro:r,o"]
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
files = [("test.txt", "This should fail")]
|
|
h, body = self.bup("ro", files)
|
|
|
|
self.assertIn("HTTP/1.1 403", h)
|
|
|
|
files_in_ro = os.listdir("ro")
|
|
test_files = [f for f in files_in_ro if f.startswith("test")]
|
|
self.assertEqual(len(test_files), 0, "No files should be created")
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_upload_existing_file_no_overwrite(self):
|
|
"""Test that upload does NOT overwrite existing files without ?replace
|
|
|
|
This verifies the old file protection mechanism implemented in:
|
|
- util.py:1816-1915 (ren_open function) - adds suffix if file exists
|
|
- httpcli.py:3733-3742 (replace parameter handling) - requires ?replace to overwrite
|
|
"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ao")
|
|
vcfg = ["ao/:ao:rw,o"]
|
|
|
|
with open("ao/existing.txt", "w") as f:
|
|
f.write("Original content")
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
files = [("existing.txt", "New content that should NOT overwrite")]
|
|
h, body = self.bup("ao?j", files)
|
|
|
|
self.assertIn("HTTP/1.1 201", h)
|
|
|
|
with open("ao/existing.txt", "r") as f:
|
|
self.assertEqual(f.read(), "Original content", "Old file should NOT be overwritten")
|
|
|
|
files_in_ao = os.listdir("ao")
|
|
existing_files = [f for f in files_in_ao if "existing" in f]
|
|
self.assertTrue(len(existing_files) >= 2, "Should have both original and new file")
|
|
|
|
new_content_found = False
|
|
for fname in existing_files:
|
|
if fname != "existing.txt":
|
|
with open(os.path.join("ao", fname), "r") as f:
|
|
if f.read() == "New content that should NOT overwrite":
|
|
new_content_found = True
|
|
break
|
|
|
|
self.assertTrue(new_content_found, "New file should be created with different name")
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_upload_with_replace_param_overwrites(self):
|
|
"""Test that upload with ?replace parameter overwrites existing files
|
|
|
|
Requires:
|
|
1. URL contains ?replace parameter
|
|
2. User has delete permission
|
|
"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ao")
|
|
vcfg = ["ao/:ao:rw,o"]
|
|
|
|
with open("ao/existing.txt", "w") as f:
|
|
f.write("Original content")
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
files = [("existing.txt", "New content that SHOULD overwrite")]
|
|
h, body = self.bup("ao?j&replace", files)
|
|
|
|
self.assertIn("HTTP/1.1 201", h)
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_upload_replace_without_delete_permission(self):
|
|
"""Test that ?replace fails without delete permission"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("wo")
|
|
vcfg = ["wo/:wo:w,o"]
|
|
|
|
with open("wo/existing.txt", "w") as f:
|
|
f.write("Original content")
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
files = [("existing.txt", "New content")]
|
|
h, body = self.bup("wo?j&replace", files)
|
|
|
|
with open("wo/existing.txt", "r") as f:
|
|
self.assertEqual(f.read(), "Original content", "Old file should NOT be overwritten without delete permission")
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_upload_empty_file(self):
|
|
"""Test uploading an empty file"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ao")
|
|
vcfg = ["ao/:ao:rw,o"]
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
files = [("empty.txt", "")]
|
|
h, body = self.bup("ao?j", files)
|
|
|
|
self.assertIn("HTTP/1.1 201", h)
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_upload_error_response_contains_error_info(self):
|
|
"""Test that upload errors return proper error information"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ro")
|
|
vcfg = ["ro/:ro:r,o"]
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
files = [("test.txt", "This should fail")]
|
|
h, body = self.bup("ro?j", files)
|
|
|
|
self.assertIn("HTTP/1.1 403", h)
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_upload_large_file_integrity(self):
|
|
"""Test that uploaded files maintain integrity (SHA256 check)"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ao")
|
|
vcfg = ["ao/:ao:rw,o"]
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
test_content = os.urandom(256 * 1024)
|
|
original_hash = hashlib.sha256(test_content).hexdigest()
|
|
|
|
files = [("integrity_test.bin", test_content)]
|
|
h, body = self.bup("ao", files)
|
|
|
|
self.assertIn("HTTP/1.1 201", h)
|
|
|
|
files_in_ao = os.listdir("ao")
|
|
test_files = [f for f in files_in_ao if "integrity_test" in f]
|
|
self.assertTrue(len(test_files) > 0)
|
|
|
|
for fname in test_files:
|
|
with open(os.path.join("ao", fname), "rb") as f:
|
|
uploaded_content = f.read()
|
|
uploaded_hash = hashlib.sha256(uploaded_content).hexdigest()
|
|
if uploaded_hash == original_hash:
|
|
break
|
|
else:
|
|
self.fail("No file with matching hash found")
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_upload_multiple_files(self):
|
|
"""Test uploading multiple files"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ao")
|
|
vcfg = ["ao/:ao:rw,o"]
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
files = [
|
|
("file1.txt", "Content 1"),
|
|
("file2.txt", "Content 2"),
|
|
]
|
|
h, body = self.bup("ao?j", files)
|
|
|
|
self.assertIn("HTTP/1.1 201", h)
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_upload_special_characters_filename(self):
|
|
"""Test upload with special characters in filename"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ao")
|
|
vcfg = ["ao/:ao:rw,o"]
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
files = [("my file 123.txt", "Content with spaces")]
|
|
h, body = self.bup("ao?j", files)
|
|
|
|
self.assertIn("HTTP/1.1 201", h)
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test_partial_file_cleanup_on_size_limit(self):
|
|
"""Test that partial uploads are handled properly
|
|
|
|
Backend implementation in:
|
|
- httpcli.py:3803-3857 - creates .PARTIAL file during upload
|
|
- httpcli.py:3851-3855 - cleanup on size limit exceeded
|
|
- httpcli.py:3931-3940 - exception handling
|
|
"""
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
os.makedirs("ao")
|
|
vcfg = ["ao/:ao:rw,o"]
|
|
|
|
self.args = Cfg(v=vcfg, a=["o:o", "x:x"], smax=100)
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
large_content = b"A" * 200
|
|
files = [("too_large.txt", large_content)]
|
|
h, body = self.bup("ao", files)
|
|
|
|
files_in_ao = os.listdir("ao")
|
|
for fname in files_in_ao:
|
|
self.assertNotIn(".PARTIAL", fname, "No partial files should remain")
|
|
|
|
self.conn.shutdown()
|
|
|
|
def bup(self, url, files):
|
|
"""
|
|
Perform a basic upload (bput) request.
|
|
|
|
Args:
|
|
url: The URL path to upload to
|
|
files: List of (filename, content) tuples
|
|
|
|
Returns:
|
|
(headers, body) tuple
|
|
"""
|
|
boundary = "XD"
|
|
body = build_multipart_form(files, boundary)
|
|
|
|
hdr_fmt = "POST /{} HTTP/1.1\r\nPW: o\r\nConnection: close\r\nContent-Type: multipart/form-data; boundary={}\r\nContent-Length: {}\r\n\r\n"
|
|
hdr_str = hdr_fmt.format(url, boundary, len(body))
|
|
|
|
full_request = hdr_str.encode("utf-8") + body
|
|
print("POST -->", full_request[:200], "...")
|
|
|
|
conn = self.conn.setbuf(full_request)
|
|
HttpCli(conn).run()
|
|
|
|
response = conn.s._reply
|
|
if isinstance(response, bytes):
|
|
parts = response.split(b"\r\n\r\n", 1)
|
|
h = parts[0].decode("utf-8")
|
|
b = parts[1].decode("utf-8") if len(parts) > 1 else ""
|
|
else:
|
|
parts = response.split("\r\n\r\n", 1)
|
|
h = parts[0]
|
|
b = parts[1] if len(parts) > 1 else ""
|
|
|
|
print("POST <--", h[:200], "...")
|
|
return h, b
|
|
|
|
def log(self, src, msg, c=0):
|
|
print(msg)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|