mirror of
https://github.com/emikulic/darkhttpd.git
synced 2023-08-10 21:13:08 +03:00
Convert test.py to Python 3.
Unicode was a mistake.
This commit is contained in:
parent
1990aee864
commit
67c506b620
@ -1,4 +1,4 @@
|
||||
#!/usr/bin/env python
|
||||
#!/usr/bin/env python3
|
||||
# This is run by the "run-tests" script.
|
||||
import unittest
|
||||
import socket
|
||||
@ -9,6 +9,9 @@ import random
|
||||
|
||||
WWWROOT = "tmp.httpd.tests"
|
||||
|
||||
def random_bytes(n):
|
||||
return bytes([random.randint(0,255) for _ in range(n)])
|
||||
|
||||
def between(s, start, end):
|
||||
assert start in s, s
|
||||
p = s.index(start) + len(start)
|
||||
@ -37,13 +40,13 @@ class Conn:
|
||||
for k,v in req_hdrs.items():
|
||||
req += k+": "+v+endl
|
||||
req += endl # end of request
|
||||
self.s.send(req)
|
||||
ret = ""
|
||||
self.s.send(req.encode('utf_8'))
|
||||
ret = b''
|
||||
while True:
|
||||
signal.alarm(1) # don't wait forever
|
||||
r = self.s.recv(65536)
|
||||
signal.alarm(0)
|
||||
if r == "":
|
||||
if r == b'':
|
||||
break
|
||||
else:
|
||||
ret += r
|
||||
@ -56,23 +59,23 @@ class Conn:
|
||||
for k,v in req_hdrs.items():
|
||||
req += k+": "+v+endl
|
||||
req += endl # end of request
|
||||
self.s.send(req)
|
||||
self.s.send(req.encode('utf-8'))
|
||||
signal.alarm(1) # don't wait forever
|
||||
ret = ""
|
||||
ret = b''
|
||||
while True:
|
||||
ret += self.s.recv(65536)
|
||||
if "\r\n\r\n" not in ret:
|
||||
if b'\r\n\r\n' not in ret:
|
||||
# Don't have headers yet.
|
||||
continue
|
||||
if method == "HEAD":
|
||||
# We're done.
|
||||
break
|
||||
if "Content-Length: " in ret:
|
||||
cl = between(ret, "Content-Length: ", "\r\n")
|
||||
if b'Content-Length: ' in ret:
|
||||
cl = between(ret, b'Content-Length: ', b'\r\n')
|
||||
cl = int(cl)
|
||||
else:
|
||||
cl = 0
|
||||
p = ret.index("\r\n\r\n") + 4
|
||||
p = ret.index(b'\r\n\r\n') + 4
|
||||
assert len(ret) - p <= cl, [ret, p, cl]
|
||||
if len(ret) == p + cl:
|
||||
# Complete response.
|
||||
@ -84,14 +87,16 @@ def parse(resp):
|
||||
"""
|
||||
Parse response into status line, headers and body.
|
||||
"""
|
||||
pos = resp.find("\r\n\r\n")
|
||||
pos = resp.find(b'\r\n\r\n')
|
||||
assert pos != -1, 'response is %s' % repr(resp)
|
||||
head = resp[:pos]
|
||||
body = resp[pos+4:]
|
||||
status,head = head.split("\r\n", 1)
|
||||
status,head = head.split(b'\r\n', 1)
|
||||
hdrs = {}
|
||||
for line in head.split("\r\n"):
|
||||
k, v = line.split(": ", 1)
|
||||
for line in head.split(b'\r\n'):
|
||||
k, v = line.split(b': ', 1)
|
||||
k = k.decode('utf-8')
|
||||
v = v.decode('utf-8')
|
||||
hdrs[k] = v
|
||||
return (status, hdrs, body)
|
||||
|
||||
@ -101,8 +106,10 @@ class TestHelper(unittest.TestCase):
|
||||
return self.curr_conn.get(url, http_ver, endl, req_hdrs, method)
|
||||
|
||||
def assertContains(self, body, *strings):
|
||||
if type(body) is not bytes:
|
||||
body = body.encode('utf-8')
|
||||
for s in strings:
|
||||
self.assertTrue(s in body,
|
||||
self.assertTrue(s.encode('utf-8') in body,
|
||||
msg="\nExpected: %s\nIn response: %s" % (
|
||||
repr(s), repr(body)))
|
||||
|
||||
@ -152,10 +159,12 @@ class TestHelper(unittest.TestCase):
|
||||
class TestDirList(TestHelper):
|
||||
def setUp(self):
|
||||
self.fn = WWWROOT+"/escape(this)name"
|
||||
open(self.fn, "w").write("x"*12345)
|
||||
with open(self.fn, "w") as f:
|
||||
f.write("x"*12345)
|
||||
|
||||
def tearDown(self):
|
||||
os.unlink(self.fn)
|
||||
self.curr_conn.s.close()
|
||||
|
||||
def test_dirlist_escape(self):
|
||||
resp = self.get("/")
|
||||
@ -189,7 +198,7 @@ def makeCase(name, url, hdr_checker=None, body_checker=None,
|
||||
|
||||
# FIXME: check status
|
||||
if http_ver is not None:
|
||||
prefix = "HTTP/1.1 " # should 1.0 stay 1.0?
|
||||
prefix = b'HTTP/1.1 ' # should 1.0 stay 1.0?
|
||||
self.assertTrue(status.startswith(prefix),
|
||||
msg="%s at start of %s"%(repr(prefix), repr(status)))
|
||||
|
||||
@ -239,6 +248,7 @@ class TestDirRedirect(TestHelper):
|
||||
|
||||
def tearDown(self):
|
||||
os.rmdir(self.fn)
|
||||
self.curr_conn.s.close()
|
||||
|
||||
def test_dir_redirect(self):
|
||||
resp = self.get(self.url)
|
||||
@ -249,11 +259,10 @@ class TestDirRedirect(TestHelper):
|
||||
class TestFileGet(TestHelper):
|
||||
def setUp(self):
|
||||
self.datalen = 2345
|
||||
self.data = ''.join(
|
||||
[chr(random.randint(0,255)) for _ in xrange(self.datalen)])
|
||||
self.data = random_bytes(self.datalen)
|
||||
self.url = '/data.jpeg'
|
||||
self.fn = WWWROOT + self.url
|
||||
with open(self.fn, "w") as f:
|
||||
with open(self.fn, 'wb') as f:
|
||||
f.write(self.data)
|
||||
self.qurl = '/what%3f.jpg'
|
||||
self.qfn = WWWROOT + '/what?.jpg'
|
||||
@ -313,22 +322,22 @@ class TestFileGet(TestHelper):
|
||||
status, hdrs, body = parse(resp2)
|
||||
self.assertContains(status, "304 Not Modified")
|
||||
self.assertEquals(hdrs["Accept-Ranges"], "bytes")
|
||||
self.assertFalse(hdrs.has_key("Last-Modified"))
|
||||
self.assertFalse(hdrs.has_key("Content-Length"))
|
||||
self.assertFalse(hdrs.has_key("Content-Type"))
|
||||
self.assertFalse("Last-Modified" in hdrs)
|
||||
self.assertFalse("Content-Length" in hdrs)
|
||||
self.assertFalse("Content-Type" in hdrs)
|
||||
|
||||
def test_range_single(self):
|
||||
self.drive_range("5-5", "5-5/%d" % self.datalen,
|
||||
1, self.data[5])
|
||||
1, self.data[5:6])
|
||||
|
||||
def test_range_single_first(self):
|
||||
self.drive_range("0-0", "0-0/%d" % self.datalen,
|
||||
1, self.data[0])
|
||||
1, self.data[0:1])
|
||||
|
||||
def test_range_single_last(self):
|
||||
self.drive_range("%d-%d"%(self.datalen-1, self.datalen-1),
|
||||
"%d-%d/%d"%(self.datalen-1, self.datalen-1, self.datalen),
|
||||
1, self.data[-1])
|
||||
1, self.data[-1:])
|
||||
|
||||
def test_range_single_bad(self):
|
||||
resp = self.get(self.url, req_hdrs = {"Range":
|
||||
@ -381,20 +390,15 @@ class TestKeepAlive(TestFileGet):
|
||||
return self.conn.get_keepalive(url, endl, req_hdrs, method)
|
||||
|
||||
def make_large_file(fn, boundary, data):
|
||||
big = 1<<33
|
||||
assert big == 8589934592L
|
||||
assert str(big) == "8589934592"
|
||||
|
||||
f = open(fn, "w")
|
||||
pos = boundary - len(data)/2
|
||||
with open(fn, 'wb') as f:
|
||||
pos = boundary - (len(data) // 2)
|
||||
f.seek(pos)
|
||||
assert f.tell() == pos
|
||||
assert f.tell() < boundary
|
||||
f.write(data)
|
||||
filesize = f.tell()
|
||||
assert filesize == pos + len(data)
|
||||
assert filesize == pos + len(data), (filesize, pos, len(data))
|
||||
assert filesize > boundary
|
||||
f.close()
|
||||
return (pos, filesize)
|
||||
|
||||
class TestLargeFile2G(TestHelper):
|
||||
@ -402,8 +406,7 @@ class TestLargeFile2G(TestHelper):
|
||||
|
||||
def setUp(self):
|
||||
self.datalen = 4096
|
||||
self.data = "".join(
|
||||
[chr(random.randint(0,255)) for _ in xrange(self.datalen)])
|
||||
self.data = random_bytes(self.datalen)
|
||||
self.url = "/big.jpeg"
|
||||
self.fn = WWWROOT + self.url
|
||||
self.filepos, self.filesize = make_large_file(
|
||||
@ -414,14 +417,14 @@ class TestLargeFile2G(TestHelper):
|
||||
|
||||
def drive_start(self, ofs):
|
||||
req_start = self.BOUNDARY + ofs
|
||||
req_end = req_start + self.datalen/4 - 1
|
||||
req_end = req_start + self.datalen//4 - 1
|
||||
range_in = "%d-%d"%(req_start, req_end)
|
||||
range_out = "%s/%d"%(range_in, self.filesize)
|
||||
|
||||
data_start = req_start - self.filepos
|
||||
data_end = data_start + self.datalen/4
|
||||
data_end = data_start + self.datalen//4
|
||||
|
||||
self.drive_range(range_in, range_out, self.datalen/4,
|
||||
self.drive_range(range_in, range_out, self.datalen//4,
|
||||
self.data[data_start:data_end])
|
||||
|
||||
def test_largefile_head(self):
|
||||
|
Loading…
Reference in New Issue
Block a user