mirror of
				https://github.com/9001/copyparty.git
				synced 2025-10-31 12:43:13 -06:00 
			
		
		
		
	
		
			
				
	
	
		
			125 lines
		
	
	
		
			4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			125 lines
		
	
	
		
			4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| # coding: utf-8
 | |
| from __future__ import print_function, unicode_literals
 | |
| 
 | |
| import os
 | |
| import shutil
 | |
| import tempfile
 | |
| import unittest
 | |
| 
 | |
| from copyparty.authsrv import AuthSrv
 | |
| from copyparty.httpcli import HttpCli
 | |
| from tests import util as tu
 | |
| from tests.util import Cfg
 | |
| 
 | |
| try:
 | |
|     from typing import Optional
 | |
| except:
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def hdr(query):
 | |
|     h = "GET /{} HTTP/1.1\r\nPW: o\r\nConnection: close\r\n\r\n"
 | |
|     return h.format(query).encode("utf-8")
 | |
| 
 | |
| 
 | |
| class TestHooks(tu.TC):
 | |
|     def setUp(self):
 | |
|         self.conn: Optional[tu.VHttpConn] = None
 | |
|         self.td = tu.get_ramdisk()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         os.chdir(tempfile.gettempdir())
 | |
|         shutil.rmtree(self.td)
 | |
| 
 | |
|     def reset(self):
 | |
|         td = os.path.join(self.td, "vfs")
 | |
|         if os.path.exists(td):
 | |
|             shutil.rmtree(td)
 | |
|         os.mkdir(td)
 | |
|         os.chdir(td)
 | |
|         return td
 | |
| 
 | |
|     def cinit(self):
 | |
|         if self.conn:
 | |
|             self.conn.shutdown()
 | |
|             self.conn = None
 | |
|         self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
 | |
| 
 | |
|     def test(self):
 | |
|         vcfg = ["a/b/c/d:c/d:A", "a:a:r"]
 | |
| 
 | |
|         scenarios = (
 | |
|             ('{"vp":"x/y"}', "c/d/a.png", "c/d/x/y/a.png"),
 | |
|             ('{"vp":"x/y"}', "c/d/e/a.png", "c/d/e/x/y/a.png"),
 | |
|             ('{"vp":"../x/y"}', "c/d/e/a.png", "c/d/x/y/a.png"),
 | |
|             ('{"ap":"x/y"}', "c/d/a.png", "c/d/x/y/a.png"),
 | |
|             ('{"ap":"x/y"}', "c/d/e/a.png", "c/d/e/x/y/a.png"),
 | |
|             ('{"ap":"../x/y"}', "c/d/e/a.png", "c/d/x/y/a.png"),
 | |
|             ('{"ap":"../x/y"}', "c/d/a.png", "a/b/c/x/y/a.png"),
 | |
|             ('{"fn":"b.png"}', "c/d/a.png", "c/d/b.png"),
 | |
|             ('{"vp":"x","fn":"b.png"}', "c/d/a.png", "c/d/x/b.png"),
 | |
|         )
 | |
| 
 | |
|         for x in scenarios:
 | |
|             print("\n\n\n", x)
 | |
|             hooktxt, url_up, url_dl = x
 | |
|             for hooktype in ("xbu", "xau"):
 | |
|                 for upfun in (self.put, self.bup):
 | |
|                     self.reset()
 | |
|                     self.makehook("""print('{"reloc":%s}')""" % (hooktxt,))
 | |
|                     ka = {hooktype: ["j,c1,h.py"]}
 | |
|                     self.args = Cfg(v=vcfg, a=["o:o"], e2d=True, **ka)
 | |
|                     self.asrv = AuthSrv(self.args, self.log)
 | |
|                     self.cinit()
 | |
| 
 | |
|                     h, b = upfun(url_up)
 | |
|                     self.assertStart("HTTP/1.1 201 Created\r", h)
 | |
|                     h, b = self.curl(url_dl)
 | |
|                     self.assertEqual(b, "ok %s\n" % (url_up))
 | |
| 
 | |
|     def makehook(self, hs):
 | |
|         with open("h.py", "wb") as f:
 | |
|             f.write(hs.encode("utf-8"))
 | |
| 
 | |
|     def put(self, url):
 | |
|         buf = "PUT /{0} HTTP/1.1\r\nPW: o\r\nConnection: close\r\nContent-Length: {1}\r\n\r\nok {0}\n"
 | |
|         buf = buf.format(url, len(url) + 4).encode("utf-8")
 | |
|         print("PUT -->", buf)
 | |
|         conn = self.conn.setbuf(buf)
 | |
|         HttpCli(conn).run()
 | |
|         ret = conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
 | |
|         print("PUT <--", ret)
 | |
|         return ret
 | |
| 
 | |
|     def bup(self, url):
 | |
|         hdr = "POST /%s HTTP/1.1\r\nPW: o\r\nConnection: close\r\nContent-Type: multipart/form-data; boundary=XD\r\nContent-Length: %d\r\n\r\n"
 | |
|         bdy = '--XD\r\nContent-Disposition: form-data; name="act"\r\n\r\nbput\r\n--XD\r\nContent-Disposition: form-data; name="f"; filename="%s"\r\n\r\n'
 | |
|         ftr = "\r\n--XD--\r\n"
 | |
|         try:
 | |
|             url, fn = url.rsplit("/", 1)
 | |
|         except:
 | |
|             fn = url
 | |
|             url = ""
 | |
| 
 | |
|         buf = (bdy % (fn,) + "ok %s/%s\n" % (url, fn) + ftr).encode("utf-8")
 | |
|         buf = (hdr % (url, len(buf))).encode("utf-8") + buf
 | |
|         print("PoST -->", buf)
 | |
|         conn = self.conn.setbuf(buf)
 | |
|         HttpCli(conn).run()
 | |
|         ret = conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
 | |
|         print("POST <--", ret)
 | |
|         return ret
 | |
| 
 | |
|     def curl(self, url, binary=False):
 | |
|         conn = self.conn.setbuf(hdr(url))
 | |
|         HttpCli(conn).run()
 | |
|         if binary:
 | |
|             h, b = conn.s._reply.split(b"\r\n\r\n", 1)
 | |
|             return [h.decode("utf-8"), b]
 | |
| 
 | |
|         return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
 | |
| 
 | |
|     def log(self, src, msg, c=0):
 | |
|         print(msg)
 |