#!/usr/bin/env python import unittest import socket import signal import re import os WWWROOT = "tmp.httpd.tests" class Conn: def __init__(self): self.port = 12346 self.s = socket.socket() self.s.connect(("0.0.0.0", self.port)) # connect throws socket.error on connection refused def get(self, url, http_ver="1.0", endl="\n", req_hdrs={}): req = "GET "+url if http_ver is not None: req += " HTTP/"+http_ver req += endl if http_ver is not None: req_hdrs["User-Agent"] = "test.py" req_hdrs["Connection"] = "close" for k,v in req_hdrs.items(): req += k+": "+v+endl req += endl # end of request self.s.send(req) ret = "" while True: signal.alarm(1) # don't wait forever r = self.s.recv(65536) signal.alarm(0) if r == "": break else: ret += r return ret def parse(resp): """ Parse response into status line, headers and body. """ pos = resp.index("\r\n\r\n") # throws exception on failure head = resp[:pos] body = resp[pos+4:] status,head = head.split("\r\n", 1) hdrs = {} for line in head.split("\r\n"): k, v = line.split(": ", 1) hdrs[k] = v return (status, hdrs, body) class TestCases(unittest.TestCase): def assertContains(self, body, *strings): for s in strings: self.assertTrue(s in body, msg="expected %s in %s"%(repr(s), repr(body))) def assertIsIndex(self, body, path): self.assertContains(body, "%s\n"%path, "

%s

\n"%path, '../', 'Generated by darkhttpd') def assertIsInvalid(self, body, path): self.assertContains(body, "400 Bad Request", "

Bad Request

\n", "You requested an invalid URL: %s\n"%path, 'Generated by darkhttpd') def test_dirlist_escape(self): fn = WWWROOT+"/escape#this" open(fn, "w").write("x"*12345) try: resp = Conn().get("/") finally: os.unlink(fn) status, hdrs, body = parse(resp) self.assertEquals(ord("#"), 0x23) self.assertContains(body, "escape%23this", "12345") def nerf(s): return re.sub("[^a-zA-Z0-9]", "_", s) def makeCase(name, url, hdr_checker=None, body_checker=None, req_hdrs={"User-Agent": "test.py"}, http_ver=None, endl="\n"): def do_test(self): resp = Conn().get(url, http_ver, endl, req_hdrs) if http_ver is None: status = "" hdrs = {} body = resp else: status, hdrs, body = parse(resp) if hdr_checker is not None and http_ver is not None: hdr_checker(self, hdrs) if body_checker is not None: body_checker(self, body) # FIXME: check status if http_ver is not None: prefix = "HTTP/1.1 " # should 1.0 stay 1.0? self.assertTrue(status.startswith(prefix), msg="%s at start of %s"%(repr(prefix), repr(status))) v = http_ver if v is None: v = "0.9" test_name = "_".join([ "test", nerf(name), nerf("HTTP"+v), {"\n":"LF", "\r\n":"CRLF"}[endl], ]) do_test.__name__ = test_name # hax setattr(TestCases, test_name, do_test) def makeCases(name, url, hdr_checker=None, body_checker=None, req_hdrs={"User-Agent": "test.py"}): # FIXME: 0.9 is broken for http_ver in [None, "1.0", "1.1"]: #for http_ver in ["1.0", "1.1"]: for endl in ["\n", "\r\n"]: makeCase(name, url, hdr_checker, body_checker, req_hdrs, http_ver, endl) def makeSimpleCases(name, url, assert_name): makeCases(name, url, None, lambda self,body: getattr(self, assert_name)(body, url)) def setUpModule(): for args in [ ["index", "/", "assertIsIndex"], ["up dir", "/dir/../", "assertIsIndex"], ["extra slashes", "//dir///..////", "assertIsIndex"], ["no trailing slash", "/dir/..", "assertIsIndex"], ["no leading slash", "dir/../", "assertIsInvalid"], ["invalid up dir", "/../", "assertIsInvalid"], ["fancy invalid up dir", "/./dir/./../../", "assertIsInvalid"], ]: makeSimpleCases(*args) if __name__ == '__main__': setUpModule() unittest.main() #x = Conn().get("/xyz/../", "1.0") #y = parse(x) #print repr(y) # vim:set ts=4 sw=4 et: