mirror of
https://github.com/emikulic/darkhttpd.git
synced 2023-08-10 21:13:08 +03:00
141 lines
4.2 KiB
Python
Executable File
141 lines
4.2 KiB
Python
Executable File
#!/usr/bin/env python
|
|
import unittest
|
|
import socket
|
|
import signal
|
|
import re
|
|
|
|
class Conn:
|
|
def __init__(self):
|
|
self.port = 12346
|
|
self.s = socket.socket()
|
|
self.s.connect(("0.0.0.0", self.port))
|
|
# connect throws socket.error on connection refused
|
|
|
|
def get(self, url, http_ver=None, endl="\n",
|
|
req_hdrs={"User-Agent": "test.py"}):
|
|
req = "GET "+url
|
|
if http_ver is not None:
|
|
req += " HTTP/"+http_ver
|
|
req += "\n"
|
|
for k,v in req_hdrs.items():
|
|
req += k+": "+v+endl
|
|
req += endl # end of request
|
|
self.s.send(req)
|
|
ret = ""
|
|
while True:
|
|
signal.alarm(1) # don't wait forever
|
|
r = self.s.recv(65536)
|
|
signal.alarm(0)
|
|
if r == "":
|
|
break
|
|
else:
|
|
ret += r
|
|
return ret
|
|
|
|
def parse(resp):
|
|
"""
|
|
Parse response into status line, headers and body.
|
|
"""
|
|
pos = resp.index("\r\n\r\n") # throws exception on failure
|
|
head = resp[:pos]
|
|
body = resp[pos+4:]
|
|
status,head = head.split("\r\n", 1)
|
|
hdrs = {}
|
|
for line in head.split("\r\n"):
|
|
k, v = line.split(": ", 1)
|
|
hdrs[k] = v
|
|
return (status, hdrs, body)
|
|
|
|
class TestCases(unittest.TestCase):
|
|
def assertContains(self, body, *strings):
|
|
for s in strings:
|
|
self.assertTrue(s in body,
|
|
msg="expected %s in %s"%(repr(s), repr(body)))
|
|
|
|
def assertIsIndex(self, body, path):
|
|
self.assertContains(body,
|
|
"<title>%s</title>\n"%path,
|
|
"<h1>%s</h1>\n"%path,
|
|
'<a href="..">..</a>/',
|
|
'Generated by darkhttpd')
|
|
|
|
def assertIsInvalid(self, body, path):
|
|
self.assertContains(body,
|
|
"<title>400 Bad Request</title>",
|
|
"<h1>Bad Request</h1>\n",
|
|
"You requested an invalid URI: %s\n"%path,
|
|
'Generated by darkhttpd')
|
|
|
|
def nerf(s):
|
|
return re.sub("[^a-zA-Z0-9]", "_", s)
|
|
|
|
def makeCase(name, url, hdr_checker=None, body_checker=None,
|
|
req_hdrs={"User-Agent": "test.py"},
|
|
http_ver=None, endl="\n"):
|
|
def do_test(self):
|
|
resp = Conn().get(url, http_ver, endl, req_hdrs)
|
|
if http_ver is None:
|
|
status = ""
|
|
hdrs = {}
|
|
body = resp
|
|
else:
|
|
status, hdrs, body = parse(resp)
|
|
|
|
if hdr_checker is not None and http_ver is not None:
|
|
hdr_checker(self, hdrs)
|
|
|
|
if body_checker is not None:
|
|
body_checker(self, body)
|
|
|
|
# FIXME: check status
|
|
if http_ver is not None:
|
|
prefix = "HTTP/1.1 " # should 1.0 stay 1.0?
|
|
self.assertTrue(status.startswith(prefix),
|
|
msg="%s at start of %s"%(repr(prefix), repr(status)))
|
|
|
|
v = http_ver
|
|
if v is None:
|
|
v = "0.9"
|
|
test_name = "_".join([
|
|
"test",
|
|
nerf(name),
|
|
nerf("HTTP"+v),
|
|
{"\n":"LF", "\r\n":"CRLF"}[endl],
|
|
])
|
|
do_test.__name__ = test_name # hax
|
|
setattr(TestCases, test_name, do_test)
|
|
|
|
def makeCases(name, url, hdr_checker=None, body_checker=None,
|
|
req_hdrs={"User-Agent": "test.py"}):
|
|
# FIXME: 0.9 is broken
|
|
for http_ver in [None, "1.0", "1.1"]:
|
|
#for http_ver in ["1.0", "1.1"]:
|
|
for endl in ["\n", "\r\n"]:
|
|
makeCase(name, url, hdr_checker, body_checker,
|
|
req_hdrs, http_ver, endl)
|
|
|
|
def makeSimpleCases(name, url, assert_name):
|
|
makeCases(name, url, None,
|
|
lambda self,body: getattr(self, assert_name)(body, url))
|
|
|
|
def setUpModule():
|
|
for args in [
|
|
["index", "/", "assertIsIndex"],
|
|
["up dir", "/dir/../", "assertIsIndex"],
|
|
["extra slashes", "//dir///..////", "assertIsIndex"],
|
|
["no trailing slash", "/dir/..", "assertIsIndex"],
|
|
["no leading slash", "dir/../", "assertIsInvalid"],
|
|
["invalid up dir", "/../", "assertIsInvalid"],
|
|
["fancy invalid up dir", "/./dir/./../../", "assertIsInvalid"],
|
|
]:
|
|
makeSimpleCases(*args)
|
|
|
|
if __name__ == '__main__':
|
|
setUpModule()
|
|
unittest.main()
|
|
#x = Conn().get("/xyz/../", "1.0")
|
|
#y = parse(x)
|
|
#print repr(y)
|
|
|
|
# vim:set ts=4 sw=4 et:
|