2021-01-17 08:29:41 +03:00
|
|
|
#!/usr/bin/env python3
|
2016-01-23 12:05:02 +03:00
|
|
|
# This is run by the "run-tests" script.
|
2011-01-17 15:44:02 +03:00
|
|
|
import unittest
|
|
|
|
import socket
|
|
|
|
import signal
|
2011-01-17 16:51:56 +03:00
|
|
|
import re
|
2011-04-16 13:27:06 +04:00
|
|
|
import os
|
2011-05-01 12:13:41 +04:00
|
|
|
import random
|
2011-04-16 13:27:06 +04:00
|
|
|
|
|
|
|
WWWROOT = "tmp.httpd.tests"
|
2011-01-17 15:44:02 +03:00
|
|
|
|
2021-01-17 08:29:41 +03:00
|
|
|
def random_bytes(n):
|
|
|
|
return bytes([random.randint(0,255) for _ in range(n)])
|
|
|
|
|
2015-01-01 14:18:10 +03:00
|
|
|
def between(s, start, end):
|
|
|
|
assert start in s, s
|
|
|
|
p = s.index(start) + len(start)
|
|
|
|
s = s[p:]
|
|
|
|
assert end in s, s
|
|
|
|
p = s.index(end)
|
|
|
|
return s[:p]
|
|
|
|
|
|
|
|
assert between("hello world", "hell", "world") == "o "
|
|
|
|
|
2011-01-17 15:44:02 +03:00
|
|
|
class Conn:
|
|
|
|
def __init__(self):
|
|
|
|
self.port = 12346
|
|
|
|
self.s = socket.socket()
|
|
|
|
self.s.connect(("0.0.0.0", self.port))
|
|
|
|
# connect throws socket.error on connection refused
|
|
|
|
|
2021-01-17 08:36:26 +03:00
|
|
|
def close(self):
|
|
|
|
self.s.close()
|
|
|
|
|
2011-05-01 12:15:56 +04:00
|
|
|
def get(self, url, http_ver="1.0", endl="\n", req_hdrs={}, method="GET"):
|
|
|
|
req = method+" "+url
|
2011-01-17 16:51:56 +03:00
|
|
|
if http_ver is not None:
|
|
|
|
req += " HTTP/"+http_ver
|
2011-04-16 13:57:00 +04:00
|
|
|
req += endl
|
|
|
|
if http_ver is not None:
|
|
|
|
req_hdrs["User-Agent"] = "test.py"
|
|
|
|
req_hdrs["Connection"] = "close"
|
|
|
|
for k,v in req_hdrs.items():
|
|
|
|
req += k+": "+v+endl
|
2011-01-17 16:51:56 +03:00
|
|
|
req += endl # end of request
|
2021-01-17 08:29:41 +03:00
|
|
|
self.s.send(req.encode('utf_8'))
|
|
|
|
ret = b''
|
2011-01-17 15:44:02 +03:00
|
|
|
while True:
|
2011-01-17 16:51:56 +03:00
|
|
|
signal.alarm(1) # don't wait forever
|
2011-01-17 15:44:02 +03:00
|
|
|
r = self.s.recv(65536)
|
|
|
|
signal.alarm(0)
|
2021-01-17 08:29:41 +03:00
|
|
|
if r == b'':
|
2011-01-17 15:44:02 +03:00
|
|
|
break
|
|
|
|
else:
|
|
|
|
ret += r
|
|
|
|
return ret
|
|
|
|
|
2015-01-01 14:18:10 +03:00
|
|
|
def get_keepalive(self, url, endl="\n", req_hdrs={}, method="GET"):
|
|
|
|
req = method+" "+url+" HTTP/1.1"+endl
|
|
|
|
req_hdrs["User-Agent"] = "test.py"
|
|
|
|
req_hdrs["Connection"] = "keep-alive"
|
|
|
|
for k,v in req_hdrs.items():
|
|
|
|
req += k+": "+v+endl
|
|
|
|
req += endl # end of request
|
2021-01-17 08:29:41 +03:00
|
|
|
self.s.send(req.encode('utf-8'))
|
2015-01-01 14:18:10 +03:00
|
|
|
signal.alarm(1) # don't wait forever
|
2021-01-17 08:29:41 +03:00
|
|
|
ret = b''
|
2015-01-01 14:18:10 +03:00
|
|
|
while True:
|
|
|
|
ret += self.s.recv(65536)
|
2021-01-17 08:29:41 +03:00
|
|
|
if b'\r\n\r\n' not in ret:
|
2015-01-01 14:18:10 +03:00
|
|
|
# Don't have headers yet.
|
|
|
|
continue
|
|
|
|
if method == "HEAD":
|
|
|
|
# We're done.
|
|
|
|
break
|
2021-01-17 08:29:41 +03:00
|
|
|
if b'Content-Length: ' in ret:
|
|
|
|
cl = between(ret, b'Content-Length: ', b'\r\n')
|
2015-01-01 14:18:10 +03:00
|
|
|
cl = int(cl)
|
|
|
|
else:
|
|
|
|
cl = 0
|
2021-01-17 08:29:41 +03:00
|
|
|
p = ret.index(b'\r\n\r\n') + 4
|
2015-01-01 14:18:10 +03:00
|
|
|
assert len(ret) - p <= cl, [ret, p, cl]
|
|
|
|
if len(ret) == p + cl:
|
|
|
|
# Complete response.
|
|
|
|
break
|
|
|
|
signal.alarm(0)
|
|
|
|
return ret
|
|
|
|
|
2011-01-17 16:51:56 +03:00
|
|
|
def parse(resp):
|
|
|
|
"""
|
|
|
|
Parse response into status line, headers and body.
|
|
|
|
"""
|
2021-01-17 08:29:41 +03:00
|
|
|
pos = resp.find(b'\r\n\r\n')
|
2015-01-01 14:18:10 +03:00
|
|
|
assert pos != -1, 'response is %s' % repr(resp)
|
2011-01-17 16:51:56 +03:00
|
|
|
head = resp[:pos]
|
|
|
|
body = resp[pos+4:]
|
2021-01-17 08:29:41 +03:00
|
|
|
status,head = head.split(b'\r\n', 1)
|
2011-01-17 16:51:56 +03:00
|
|
|
hdrs = {}
|
2021-01-17 08:29:41 +03:00
|
|
|
for line in head.split(b'\r\n'):
|
|
|
|
k, v = line.split(b': ', 1)
|
|
|
|
k = k.decode('utf-8')
|
|
|
|
v = v.decode('utf-8')
|
2011-01-17 16:51:56 +03:00
|
|
|
hdrs[k] = v
|
|
|
|
return (status, hdrs, body)
|
|
|
|
|
2011-04-17 13:00:04 +04:00
|
|
|
class TestHelper(unittest.TestCase):
|
2015-01-01 13:47:59 +03:00
|
|
|
def get(self, url, http_ver="1.0", endl="\n", req_hdrs={}, method="GET"):
|
2021-01-17 08:36:26 +03:00
|
|
|
c = Conn()
|
|
|
|
r = c.get(url, http_ver, endl, req_hdrs, method)
|
|
|
|
c.close()
|
|
|
|
return r
|
2015-01-01 13:47:59 +03:00
|
|
|
|
2011-01-17 15:44:02 +03:00
|
|
|
def assertContains(self, body, *strings):
|
2021-01-17 08:29:41 +03:00
|
|
|
if type(body) is not bytes:
|
|
|
|
body = body.encode('utf-8')
|
2011-01-17 15:44:02 +03:00
|
|
|
for s in strings:
|
2021-01-17 08:29:41 +03:00
|
|
|
self.assertTrue(s.encode('utf-8') in body,
|
2015-01-01 13:47:59 +03:00
|
|
|
msg="\nExpected: %s\nIn response: %s" % (
|
|
|
|
repr(s), repr(body)))
|
2011-01-17 15:44:02 +03:00
|
|
|
|
|
|
|
def assertIsIndex(self, body, path):
|
|
|
|
self.assertContains(body,
|
|
|
|
"<title>%s</title>\n"%path,
|
|
|
|
"<h1>%s</h1>\n"%path,
|
|
|
|
'<a href="..">..</a>/',
|
|
|
|
'Generated by darkhttpd')
|
|
|
|
|
|
|
|
def assertIsInvalid(self, body, path):
|
|
|
|
self.assertContains(body,
|
|
|
|
"<title>400 Bad Request</title>",
|
|
|
|
"<h1>Bad Request</h1>\n",
|
2011-04-16 12:52:53 +04:00
|
|
|
"You requested an invalid URL: %s\n"%path,
|
2011-01-17 15:44:02 +03:00
|
|
|
'Generated by darkhttpd')
|
|
|
|
|
2015-01-01 12:32:52 +03:00
|
|
|
def assertNotFound(self, body, path):
|
|
|
|
self.assertContains(body,
|
|
|
|
"<title>404 Not Found</title>",
|
|
|
|
"<h1>Not Found</h1>\n",
|
|
|
|
"The URL you requested (%s) was not found.\n"%path,
|
|
|
|
'Generated by darkhttpd')
|
|
|
|
|
|
|
|
def assertForbidden(self, body, path):
|
|
|
|
self.assertContains(body,
|
|
|
|
"<title>403 Forbidden</title>",
|
|
|
|
"<h1>Forbidden</h1>\n",
|
|
|
|
"You don't have permission to access (%s).\n"%path,
|
|
|
|
'Generated by darkhttpd')
|
|
|
|
|
2015-01-01 13:20:08 +03:00
|
|
|
def assertUnreadable(self, body, path):
|
|
|
|
self.assertContains(body,
|
|
|
|
"Couldn't list directory: Permission denied\n",
|
|
|
|
'Generated by darkhttpd')
|
|
|
|
|
2011-05-03 14:43:04 +04:00
|
|
|
def drive_range(self, range_in, range_out, len_out, data_out,
|
|
|
|
status_out = "206 Partial Content"):
|
2015-01-01 13:47:59 +03:00
|
|
|
resp = self.get(self.url, req_hdrs = {"Range": "bytes="+range_in})
|
2011-05-03 14:43:04 +04:00
|
|
|
status, hdrs, body = parse(resp)
|
|
|
|
self.assertContains(status, status_out)
|
2021-01-17 08:36:26 +03:00
|
|
|
self.assertEqual(hdrs["Accept-Ranges"], "bytes")
|
|
|
|
self.assertEqual(hdrs["Content-Range"], "bytes "+range_out)
|
|
|
|
self.assertEqual(hdrs["Content-Length"], str(len_out))
|
|
|
|
self.assertEqual(body, data_out)
|
2011-05-03 14:43:04 +04:00
|
|
|
|
2011-04-17 13:00:04 +04:00
|
|
|
class TestDirList(TestHelper):
|
2011-04-17 13:01:52 +04:00
|
|
|
def setUp(self):
|
2015-05-19 15:02:12 +03:00
|
|
|
self.fn = WWWROOT+"/escape(this)name"
|
2021-01-17 08:29:41 +03:00
|
|
|
with open(self.fn, "w") as f:
|
|
|
|
f.write("x"*12345)
|
2011-04-17 13:01:52 +04:00
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
os.unlink(self.fn)
|
|
|
|
|
2011-04-16 13:27:06 +04:00
|
|
|
def test_dirlist_escape(self):
|
2015-01-01 13:47:59 +03:00
|
|
|
resp = self.get("/")
|
2011-04-16 13:27:06 +04:00
|
|
|
status, hdrs, body = parse(resp)
|
2021-01-17 08:36:26 +03:00
|
|
|
self.assertEqual(ord("#"), 0x23)
|
2015-05-19 15:02:12 +03:00
|
|
|
self.assertContains(body, "escape%28this%29name", "12345")
|
2011-04-16 13:27:06 +04:00
|
|
|
|
2011-04-17 13:00:04 +04:00
|
|
|
class TestCases(TestHelper):
|
|
|
|
pass # these get autogenerated in setUpModule()
|
|
|
|
|
2011-01-17 16:51:56 +03:00
|
|
|
def nerf(s):
|
|
|
|
return re.sub("[^a-zA-Z0-9]", "_", s)
|
|
|
|
|
|
|
|
def makeCase(name, url, hdr_checker=None, body_checker=None,
|
|
|
|
req_hdrs={"User-Agent": "test.py"},
|
|
|
|
http_ver=None, endl="\n"):
|
|
|
|
def do_test(self):
|
2015-01-01 13:47:59 +03:00
|
|
|
resp = self.get(url, http_ver, endl, req_hdrs)
|
2011-01-17 16:51:56 +03:00
|
|
|
if http_ver is None:
|
|
|
|
status = ""
|
|
|
|
hdrs = {}
|
|
|
|
body = resp
|
|
|
|
else:
|
|
|
|
status, hdrs, body = parse(resp)
|
|
|
|
|
|
|
|
if hdr_checker is not None and http_ver is not None:
|
|
|
|
hdr_checker(self, hdrs)
|
|
|
|
|
|
|
|
if body_checker is not None:
|
|
|
|
body_checker(self, body)
|
|
|
|
|
|
|
|
# FIXME: check status
|
|
|
|
if http_ver is not None:
|
2021-01-17 08:29:41 +03:00
|
|
|
prefix = b'HTTP/1.1 ' # should 1.0 stay 1.0?
|
2011-01-17 16:51:56 +03:00
|
|
|
self.assertTrue(status.startswith(prefix),
|
|
|
|
msg="%s at start of %s"%(repr(prefix), repr(status)))
|
|
|
|
|
|
|
|
v = http_ver
|
|
|
|
if v is None:
|
|
|
|
v = "0.9"
|
|
|
|
test_name = "_".join([
|
|
|
|
"test",
|
|
|
|
nerf(name),
|
|
|
|
nerf("HTTP"+v),
|
|
|
|
{"\n":"LF", "\r\n":"CRLF"}[endl],
|
|
|
|
])
|
2011-01-18 16:44:15 +03:00
|
|
|
do_test.__name__ = test_name # hax
|
2011-01-17 16:51:56 +03:00
|
|
|
setattr(TestCases, test_name, do_test)
|
|
|
|
|
|
|
|
def makeCases(name, url, hdr_checker=None, body_checker=None,
|
|
|
|
req_hdrs={"User-Agent": "test.py"}):
|
2011-01-18 16:47:18 +03:00
|
|
|
for http_ver in [None, "1.0", "1.1"]:
|
2011-01-17 16:51:56 +03:00
|
|
|
for endl in ["\n", "\r\n"]:
|
|
|
|
makeCase(name, url, hdr_checker, body_checker,
|
|
|
|
req_hdrs, http_ver, endl)
|
2011-01-17 15:44:02 +03:00
|
|
|
|
2011-01-18 16:26:13 +03:00
|
|
|
def makeSimpleCases(name, url, assert_name):
|
|
|
|
makeCases(name, url, None,
|
|
|
|
lambda self,body: getattr(self, assert_name)(body, url))
|
|
|
|
|
2011-01-18 16:44:15 +03:00
|
|
|
def setUpModule():
|
|
|
|
for args in [
|
|
|
|
["index", "/", "assertIsIndex"],
|
|
|
|
["up dir", "/dir/../", "assertIsIndex"],
|
|
|
|
["extra slashes", "//dir///..////", "assertIsIndex"],
|
|
|
|
["no trailing slash", "/dir/..", "assertIsIndex"],
|
|
|
|
["no leading slash", "dir/../", "assertIsInvalid"],
|
|
|
|
["invalid up dir", "/../", "assertIsInvalid"],
|
|
|
|
["fancy invalid up dir", "/./dir/./../../", "assertIsInvalid"],
|
2015-01-01 12:32:52 +03:00
|
|
|
["not found", "/not_found.txt", "assertNotFound"],
|
|
|
|
["forbidden", "/forbidden/x", "assertForbidden"],
|
2015-01-01 13:20:08 +03:00
|
|
|
["unreadable", "/unreadable/", "assertUnreadable"],
|
2011-01-18 16:44:15 +03:00
|
|
|
]:
|
|
|
|
makeSimpleCases(*args)
|
2011-01-17 15:44:02 +03:00
|
|
|
|
2011-05-01 13:13:25 +04:00
|
|
|
class TestDirRedirect(TestHelper):
|
|
|
|
def setUp(self):
|
|
|
|
self.url = "/mydir"
|
|
|
|
self.fn = WWWROOT + self.url
|
|
|
|
os.mkdir(self.fn)
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
os.rmdir(self.fn)
|
|
|
|
|
|
|
|
def test_dir_redirect(self):
|
2015-01-01 13:47:59 +03:00
|
|
|
resp = self.get(self.url)
|
2011-05-01 13:13:25 +04:00
|
|
|
status, hdrs, body = parse(resp)
|
|
|
|
self.assertContains(status, "301 Moved Permanently")
|
2021-01-17 08:36:26 +03:00
|
|
|
self.assertEqual(hdrs["Location"], self.url+"/") # trailing slash
|
2011-05-01 13:13:25 +04:00
|
|
|
|
2011-05-01 12:13:41 +04:00
|
|
|
class TestFileGet(TestHelper):
|
|
|
|
def setUp(self):
|
|
|
|
self.datalen = 2345
|
2021-01-17 08:29:41 +03:00
|
|
|
self.data = random_bytes(self.datalen)
|
2020-07-01 13:01:56 +03:00
|
|
|
self.url = '/data.jpeg'
|
2011-05-01 12:13:41 +04:00
|
|
|
self.fn = WWWROOT + self.url
|
2021-01-17 08:29:41 +03:00
|
|
|
with open(self.fn, 'wb') as f:
|
2020-07-01 13:01:56 +03:00
|
|
|
f.write(self.data)
|
|
|
|
self.qurl = '/what%3f.jpg'
|
|
|
|
self.qfn = WWWROOT + '/what?.jpg'
|
2021-01-17 08:36:26 +03:00
|
|
|
if os.path.exists(self.qfn):
|
|
|
|
os.unlink(self.qfn)
|
2020-07-01 13:01:56 +03:00
|
|
|
os.link(self.fn, self.qfn)
|
2011-05-01 12:13:41 +04:00
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
os.unlink(self.fn)
|
2020-07-01 13:01:56 +03:00
|
|
|
os.unlink(self.qfn)
|
2011-05-01 12:13:41 +04:00
|
|
|
|
2013-06-10 13:26:25 +04:00
|
|
|
def get_helper(self, url):
|
2015-01-01 13:47:59 +03:00
|
|
|
resp = self.get(url)
|
2011-05-01 12:13:41 +04:00
|
|
|
status, hdrs, body = parse(resp)
|
|
|
|
self.assertContains(status, "200 OK")
|
2021-01-17 08:36:26 +03:00
|
|
|
self.assertEqual(hdrs["Accept-Ranges"], "bytes")
|
|
|
|
self.assertEqual(hdrs["Content-Length"], str(self.datalen))
|
|
|
|
self.assertEqual(hdrs["Content-Type"], "image/jpeg")
|
2013-04-28 18:55:08 +04:00
|
|
|
self.assertContains(hdrs["Server"], "darkhttpd/")
|
2015-01-01 14:18:10 +03:00
|
|
|
assert body == self.data, [url, resp, status, hdrs, body]
|
2021-01-17 08:36:26 +03:00
|
|
|
self.assertEqual(body, self.data)
|
2011-05-01 12:13:41 +04:00
|
|
|
|
2013-06-10 13:26:25 +04:00
|
|
|
def test_file_get(self):
|
|
|
|
self.get_helper(self.url)
|
|
|
|
|
2014-05-26 16:01:22 +04:00
|
|
|
def test_file_get_urldecode(self):
|
|
|
|
self.get_helper(''.join(['%%%02x' % ord(x) for x in self.url]))
|
|
|
|
|
2013-06-10 13:26:25 +04:00
|
|
|
def test_file_get_redundant_dots(self):
|
|
|
|
self.get_helper("/././." + self.url)
|
|
|
|
|
2020-07-01 13:01:56 +03:00
|
|
|
def test_file_get_with_empty_query(self):
|
2014-05-26 15:36:29 +04:00
|
|
|
self.get_helper(self.url + "?")
|
|
|
|
|
2020-07-01 13:01:56 +03:00
|
|
|
def test_file_get_with_query(self):
|
2014-05-26 15:36:29 +04:00
|
|
|
self.get_helper(self.url + "?action=Submit")
|
|
|
|
|
2020-07-01 13:01:56 +03:00
|
|
|
def test_file_get_esc_question(self):
|
|
|
|
self.get_helper(self.qurl)
|
|
|
|
|
|
|
|
def test_file_get_esc_question_with_query(self):
|
|
|
|
self.get_helper(self.qurl + '?hello=world')
|
|
|
|
|
2011-05-01 12:15:56 +04:00
|
|
|
def test_file_head(self):
|
2015-01-01 13:47:59 +03:00
|
|
|
resp = self.get(self.url, method="HEAD")
|
2011-05-01 12:15:56 +04:00
|
|
|
status, hdrs, body = parse(resp)
|
|
|
|
self.assertContains(status, "200 OK")
|
2021-01-17 08:36:26 +03:00
|
|
|
self.assertEqual(hdrs["Accept-Ranges"], "bytes")
|
|
|
|
self.assertEqual(hdrs["Content-Length"], str(self.datalen))
|
|
|
|
self.assertEqual(hdrs["Content-Type"], "image/jpeg")
|
2011-05-01 12:15:56 +04:00
|
|
|
|
2011-05-01 12:20:26 +04:00
|
|
|
def test_if_modified_since(self):
|
2015-01-01 13:47:59 +03:00
|
|
|
resp1 = self.get(self.url, method="HEAD")
|
2011-05-01 12:20:26 +04:00
|
|
|
status, hdrs, body = parse(resp1)
|
|
|
|
lastmod = hdrs["Last-Modified"]
|
|
|
|
|
2015-01-01 13:47:59 +03:00
|
|
|
resp2 = self.get(self.url, method="GET", req_hdrs =
|
2011-05-01 12:20:26 +04:00
|
|
|
{"If-Modified-Since": lastmod })
|
|
|
|
status, hdrs, body = parse(resp2)
|
|
|
|
self.assertContains(status, "304 Not Modified")
|
2021-01-17 08:36:26 +03:00
|
|
|
self.assertEqual(hdrs["Accept-Ranges"], "bytes")
|
2021-01-17 08:29:41 +03:00
|
|
|
self.assertFalse("Last-Modified" in hdrs)
|
|
|
|
self.assertFalse("Content-Length" in hdrs)
|
|
|
|
self.assertFalse("Content-Type" in hdrs)
|
2011-05-01 12:20:26 +04:00
|
|
|
|
2011-05-01 13:08:12 +04:00
|
|
|
def test_range_single(self):
|
|
|
|
self.drive_range("5-5", "5-5/%d" % self.datalen,
|
2021-01-17 08:29:41 +03:00
|
|
|
1, self.data[5:6])
|
2011-05-01 13:08:12 +04:00
|
|
|
|
2011-05-01 13:19:39 +04:00
|
|
|
def test_range_single_first(self):
|
|
|
|
self.drive_range("0-0", "0-0/%d" % self.datalen,
|
2021-01-17 08:29:41 +03:00
|
|
|
1, self.data[0:1])
|
2011-05-01 13:19:39 +04:00
|
|
|
|
|
|
|
def test_range_single_last(self):
|
|
|
|
self.drive_range("%d-%d"%(self.datalen-1, self.datalen-1),
|
|
|
|
"%d-%d/%d"%(self.datalen-1, self.datalen-1, self.datalen),
|
2021-01-17 08:29:41 +03:00
|
|
|
1, self.data[-1:])
|
2011-05-01 13:19:39 +04:00
|
|
|
|
|
|
|
def test_range_single_bad(self):
|
2015-01-01 13:47:59 +03:00
|
|
|
resp = self.get(self.url, req_hdrs = {"Range":
|
2011-05-01 13:19:39 +04:00
|
|
|
"bytes=%d-%d"%(self.datalen, self.datalen)})
|
|
|
|
status, hdrs, body = parse(resp)
|
|
|
|
self.assertContains(status, "416 Requested Range Not Satisfiable")
|
|
|
|
|
2011-05-01 12:49:18 +04:00
|
|
|
def test_range_reasonable(self):
|
|
|
|
self.drive_range("10-20", "10-20/%d" % self.datalen,
|
|
|
|
20-10+1, self.data[10:20+1])
|
|
|
|
|
2011-05-01 13:08:12 +04:00
|
|
|
def test_range_start_given(self):
|
2011-05-01 12:49:18 +04:00
|
|
|
self.drive_range("10-", "10-%d/%d" % (self.datalen-1, self.datalen),
|
|
|
|
self.datalen-10, self.data[10:])
|
|
|
|
|
2011-05-01 13:08:12 +04:00
|
|
|
def test_range_end_given(self):
|
|
|
|
self.drive_range("-25",
|
|
|
|
"%d-%d/%d"%(self.datalen-25, self.datalen-1, self.datalen),
|
2011-05-01 12:49:18 +04:00
|
|
|
25, self.data[-25:])
|
|
|
|
|
2011-05-01 13:08:12 +04:00
|
|
|
def test_range_beyond_end(self):
|
|
|
|
# expecting same result as test_range_end_given
|
2011-05-01 12:54:06 +04:00
|
|
|
self.drive_range("%d-%d"%(self.datalen-25, self.datalen*2),
|
|
|
|
"%d-%d/%d"%(self.datalen-25, self.datalen-1, self.datalen),
|
|
|
|
25, self.data[-25:])
|
|
|
|
|
2011-05-01 13:08:12 +04:00
|
|
|
def test_range_end_given_oversize(self):
|
|
|
|
# expecting full file
|
|
|
|
self.drive_range("-%d"%(self.datalen*3),
|
|
|
|
"0-%d/%d"%(self.datalen-1, self.datalen),
|
|
|
|
self.datalen, self.data)
|
|
|
|
|
2011-05-01 12:54:06 +04:00
|
|
|
def test_range_bad_start(self):
|
2015-01-01 13:47:59 +03:00
|
|
|
resp = self.get(self.url, req_hdrs = {"Range": "bytes=%d-"%(
|
2011-05-01 12:49:18 +04:00
|
|
|
self.datalen*2)})
|
|
|
|
status, hdrs, body = parse(resp)
|
|
|
|
self.assertContains(status, "416 Requested Range Not Satisfiable")
|
|
|
|
|
2011-05-01 12:56:04 +04:00
|
|
|
def test_range_backwards(self):
|
2015-01-01 13:47:59 +03:00
|
|
|
resp = self.get(self.url, req_hdrs = {"Range": "bytes=20-10"})
|
2011-05-01 12:56:04 +04:00
|
|
|
status, hdrs, body = parse(resp)
|
|
|
|
self.assertContains(status, "416 Requested Range Not Satisfiable")
|
|
|
|
|
2015-01-01 14:18:10 +03:00
|
|
|
class TestKeepAlive(TestFileGet):
|
2021-01-17 08:36:26 +03:00
|
|
|
"""
|
|
|
|
Run all of TestFileGet but with a single long-lived connection.
|
|
|
|
"""
|
2015-01-01 14:18:10 +03:00
|
|
|
def setUp(self):
|
|
|
|
TestFileGet.setUp(self)
|
|
|
|
self.conn = Conn()
|
|
|
|
|
2021-01-17 08:36:26 +03:00
|
|
|
def tearDown(self):
|
|
|
|
self.conn.close()
|
|
|
|
|
2015-01-01 14:18:10 +03:00
|
|
|
def get(self, url, endl="\n", req_hdrs={}, method="GET"):
|
|
|
|
return self.conn.get_keepalive(url, endl, req_hdrs, method)
|
|
|
|
|
2011-05-03 14:43:04 +04:00
|
|
|
def make_large_file(fn, boundary, data):
|
2021-01-17 08:29:41 +03:00
|
|
|
with open(fn, 'wb') as f:
|
|
|
|
pos = boundary - (len(data) // 2)
|
|
|
|
f.seek(pos)
|
|
|
|
assert f.tell() == pos
|
|
|
|
assert f.tell() < boundary
|
|
|
|
f.write(data)
|
|
|
|
filesize = f.tell()
|
|
|
|
assert filesize == pos + len(data), (filesize, pos, len(data))
|
|
|
|
assert filesize > boundary
|
2011-05-03 14:43:04 +04:00
|
|
|
return (pos, filesize)
|
|
|
|
|
|
|
|
class TestLargeFile2G(TestHelper):
|
|
|
|
BOUNDARY = 1<<31
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.datalen = 4096
|
2021-01-17 08:29:41 +03:00
|
|
|
self.data = random_bytes(self.datalen)
|
2011-05-03 14:43:04 +04:00
|
|
|
self.url = "/big.jpeg"
|
|
|
|
self.fn = WWWROOT + self.url
|
|
|
|
self.filepos, self.filesize = make_large_file(
|
|
|
|
self.fn, self.BOUNDARY, self.data)
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
os.unlink(self.fn)
|
|
|
|
|
|
|
|
def drive_start(self, ofs):
|
|
|
|
req_start = self.BOUNDARY + ofs
|
2021-01-17 08:29:41 +03:00
|
|
|
req_end = req_start + self.datalen//4 - 1
|
2011-05-03 14:43:04 +04:00
|
|
|
range_in = "%d-%d"%(req_start, req_end)
|
|
|
|
range_out = "%s/%d"%(range_in, self.filesize)
|
|
|
|
|
|
|
|
data_start = req_start - self.filepos
|
2021-01-17 08:29:41 +03:00
|
|
|
data_end = data_start + self.datalen//4
|
2011-05-03 14:43:04 +04:00
|
|
|
|
2021-01-17 08:29:41 +03:00
|
|
|
self.drive_range(range_in, range_out, self.datalen//4,
|
2011-05-03 14:43:04 +04:00
|
|
|
self.data[data_start:data_end])
|
|
|
|
|
|
|
|
def test_largefile_head(self):
|
2015-01-01 13:47:59 +03:00
|
|
|
resp = self.get(self.url, method="HEAD")
|
2011-05-03 14:43:04 +04:00
|
|
|
status, hdrs, body = parse(resp)
|
|
|
|
self.assertContains(status, "200 OK")
|
2021-01-17 08:36:26 +03:00
|
|
|
self.assertEqual(hdrs["Accept-Ranges"], "bytes")
|
|
|
|
self.assertEqual(hdrs["Content-Length"], str(self.filesize))
|
|
|
|
self.assertEqual(hdrs["Content-Type"], "image/jpeg")
|
2011-05-03 14:43:04 +04:00
|
|
|
|
|
|
|
def test_largefile__3(self): self.drive_start(-3)
|
|
|
|
def test_largefile__2(self): self.drive_start(-2)
|
|
|
|
def test_largefile__1(self): self.drive_start(-1)
|
|
|
|
def test_largefile_0(self): self.drive_start(0)
|
|
|
|
def test_largefile_1(self): self.drive_start(1)
|
|
|
|
def test_largefile_2(self): self.drive_start(2)
|
|
|
|
def test_largefile_3(self): self.drive_start(3)
|
|
|
|
|
|
|
|
class TestLargeFile4G(TestLargeFile2G):
|
|
|
|
BOUNDARY = 1<<32
|
|
|
|
|
2011-01-17 15:44:02 +03:00
|
|
|
if __name__ == '__main__':
|
2011-01-18 16:44:15 +03:00
|
|
|
setUpModule()
|
2011-01-17 15:44:02 +03:00
|
|
|
unittest.main()
|
|
|
|
|
|
|
|
# vim:set ts=4 sw=4 et:
|