#!/usr/bin/env python import unittest import socket import signal import re import os import random WWWROOT = "tmp.httpd.tests" class Conn: def __init__(self): self.port = 12346 self.s = socket.socket() self.s.connect(("0.0.0.0", self.port)) # connect throws socket.error on connection refused def get(self, url, http_ver="1.0", endl="\n", req_hdrs={}, method="GET"): req = method+" "+url if http_ver is not None: req += " HTTP/"+http_ver req += endl if http_ver is not None: req_hdrs["User-Agent"] = "test.py" req_hdrs["Connection"] = "close" for k,v in req_hdrs.items(): req += k+": "+v+endl req += endl # end of request self.s.send(req) ret = "" while True: signal.alarm(1) # don't wait forever r = self.s.recv(65536) signal.alarm(0) if r == "": break else: ret += r return ret def parse(resp): """ Parse response into status line, headers and body. """ pos = resp.index("\r\n\r\n") # throws exception on failure head = resp[:pos] body = resp[pos+4:] status,head = head.split("\r\n", 1) hdrs = {} for line in head.split("\r\n"): k, v = line.split(": ", 1) hdrs[k] = v return (status, hdrs, body) class TestHelper(unittest.TestCase): def assertContains(self, body, *strings): for s in strings: self.assertTrue(s in body, msg="expected %s in %s"%(repr(s), repr(body))) def assertIsIndex(self, body, path): self.assertContains(body, "%s\n"%path, "

%s

\n"%path, '../', 'Generated by darkhttpd') def assertIsInvalid(self, body, path): self.assertContains(body, "400 Bad Request", "

Bad Request

\n", "You requested an invalid URL: %s\n"%path, 'Generated by darkhttpd') def drive_range(self, range_in, range_out, len_out, data_out, status_out = "206 Partial Content"): resp = Conn().get(self.url, req_hdrs = {"Range": "bytes="+range_in}) status, hdrs, body = parse(resp) self.assertContains(status, status_out) self.assertEquals(hdrs["Accept-Ranges"], "bytes") self.assertEquals(hdrs["Content-Range"], "bytes "+range_out) self.assertEquals(hdrs["Content-Length"], str(len_out)) self.assertEquals(body, data_out) class TestDirList(TestHelper): def setUp(self): self.fn = WWWROOT+"/escape#this" open(self.fn, "w").write("x"*12345) def tearDown(self): os.unlink(self.fn) def test_dirlist_escape(self): resp = Conn().get("/") status, hdrs, body = parse(resp) self.assertEquals(ord("#"), 0x23) self.assertContains(body, "escape%23this", "12345") class TestCases(TestHelper): pass # these get autogenerated in setUpModule() def nerf(s): return re.sub("[^a-zA-Z0-9]", "_", s) def makeCase(name, url, hdr_checker=None, body_checker=None, req_hdrs={"User-Agent": "test.py"}, http_ver=None, endl="\n"): def do_test(self): resp = Conn().get(url, http_ver, endl, req_hdrs) if http_ver is None: status = "" hdrs = {} body = resp else: status, hdrs, body = parse(resp) if hdr_checker is not None and http_ver is not None: hdr_checker(self, hdrs) if body_checker is not None: body_checker(self, body) # FIXME: check status if http_ver is not None: prefix = "HTTP/1.1 " # should 1.0 stay 1.0? self.assertTrue(status.startswith(prefix), msg="%s at start of %s"%(repr(prefix), repr(status))) v = http_ver if v is None: v = "0.9" test_name = "_".join([ "test", nerf(name), nerf("HTTP"+v), {"\n":"LF", "\r\n":"CRLF"}[endl], ]) do_test.__name__ = test_name # hax setattr(TestCases, test_name, do_test) def makeCases(name, url, hdr_checker=None, body_checker=None, req_hdrs={"User-Agent": "test.py"}): for http_ver in [None, "1.0", "1.1"]: for endl in ["\n", "\r\n"]: makeCase(name, url, hdr_checker, body_checker, req_hdrs, http_ver, endl) def makeSimpleCases(name, url, assert_name): makeCases(name, url, None, lambda self,body: getattr(self, assert_name)(body, url)) def setUpModule(): for args in [ ["index", "/", "assertIsIndex"], ["up dir", "/dir/../", "assertIsIndex"], ["extra slashes", "//dir///..////", "assertIsIndex"], ["no trailing slash", "/dir/..", "assertIsIndex"], ["no leading slash", "dir/../", "assertIsInvalid"], ["invalid up dir", "/../", "assertIsInvalid"], ["fancy invalid up dir", "/./dir/./../../", "assertIsInvalid"], ]: makeSimpleCases(*args) class TestDirRedirect(TestHelper): def setUp(self): self.url = "/mydir" self.fn = WWWROOT + self.url os.mkdir(self.fn) def tearDown(self): os.rmdir(self.fn) def test_dir_redirect(self): resp = Conn().get(self.url) status, hdrs, body = parse(resp) self.assertContains(status, "301 Moved Permanently") self.assertEquals(hdrs["Location"], self.url+"/") # trailing slash class TestFileGet(TestHelper): def setUp(self): self.datalen = 2345 self.data = "".join( [chr(random.randint(0,255)) for _ in xrange(self.datalen)]) self.url = "/data.jpeg" self.fn = WWWROOT + self.url open(self.fn, "w").write(self.data) def tearDown(self): os.unlink(self.fn) def test_file_get(self): resp = Conn().get(self.url) status, hdrs, body = parse(resp) self.assertContains(status, "200 OK") self.assertEquals(hdrs["Accept-Ranges"], "bytes") self.assertEquals(hdrs["Content-Length"], str(self.datalen)) self.assertEquals(hdrs["Content-Type"], "image/jpeg") self.assertEquals(body, self.data) def test_file_head(self): resp = Conn().get(self.url, method="HEAD") status, hdrs, body = parse(resp) self.assertContains(status, "200 OK") self.assertEquals(hdrs["Accept-Ranges"], "bytes") self.assertEquals(hdrs["Content-Length"], str(self.datalen)) self.assertEquals(hdrs["Content-Type"], "image/jpeg") def test_if_modified_since(self): resp1 = Conn().get(self.url, method="HEAD") status, hdrs, body = parse(resp1) lastmod = hdrs["Last-Modified"] resp2 = Conn().get(self.url, method="GET", req_hdrs = {"If-Modified-Since": lastmod }) status, hdrs, body = parse(resp2) self.assertContains(status, "304 Not Modified") self.assertEquals(hdrs["Accept-Ranges"], "bytes") self.assertFalse(hdrs.has_key("Last-Modified")) self.assertFalse(hdrs.has_key("Content-Length")) self.assertFalse(hdrs.has_key("Content-Type")) def test_range_single(self): self.drive_range("5-5", "5-5/%d" % self.datalen, 1, self.data[5]) def test_range_single_first(self): self.drive_range("0-0", "0-0/%d" % self.datalen, 1, self.data[0]) def test_range_single_last(self): self.drive_range("%d-%d"%(self.datalen-1, self.datalen-1), "%d-%d/%d"%(self.datalen-1, self.datalen-1, self.datalen), 1, self.data[-1]) def test_range_single_bad(self): resp = Conn().get(self.url, req_hdrs = {"Range": "bytes=%d-%d"%(self.datalen, self.datalen)}) status, hdrs, body = parse(resp) self.assertContains(status, "416 Requested Range Not Satisfiable") def test_range_reasonable(self): self.drive_range("10-20", "10-20/%d" % self.datalen, 20-10+1, self.data[10:20+1]) def test_range_start_given(self): self.drive_range("10-", "10-%d/%d" % (self.datalen-1, self.datalen), self.datalen-10, self.data[10:]) def test_range_end_given(self): self.drive_range("-25", "%d-%d/%d"%(self.datalen-25, self.datalen-1, self.datalen), 25, self.data[-25:]) def test_range_beyond_end(self): # expecting same result as test_range_end_given self.drive_range("%d-%d"%(self.datalen-25, self.datalen*2), "%d-%d/%d"%(self.datalen-25, self.datalen-1, self.datalen), 25, self.data[-25:]) def test_range_end_given_oversize(self): # expecting full file self.drive_range("-%d"%(self.datalen*3), "0-%d/%d"%(self.datalen-1, self.datalen), self.datalen, self.data) def test_range_bad_start(self): resp = Conn().get(self.url, req_hdrs = {"Range": "bytes=%d-"%( self.datalen*2)}) status, hdrs, body = parse(resp) self.assertContains(status, "416 Requested Range Not Satisfiable") def test_range_backwards(self): resp = Conn().get(self.url, req_hdrs = {"Range": "bytes=20-10"}) status, hdrs, body = parse(resp) self.assertContains(status, "416 Requested Range Not Satisfiable") def make_large_file(fn, boundary, data): big = 1<<33 assert big == 8589934592L assert str(big) == "8589934592" f = open(fn, "w") pos = boundary - len(data)/2 f.seek(pos) assert f.tell() == pos assert f.tell() < boundary f.write(data) filesize = f.tell() assert filesize == pos + len(data) assert filesize > boundary f.close() return (pos, filesize) class TestLargeFile2G(TestHelper): BOUNDARY = 1<<31 def setUp(self): self.datalen = 4096 self.data = "".join( [chr(random.randint(0,255)) for _ in xrange(self.datalen)]) self.url = "/big.jpeg" self.fn = WWWROOT + self.url self.filepos, self.filesize = make_large_file( self.fn, self.BOUNDARY, self.data) def tearDown(self): os.unlink(self.fn) def drive_start(self, ofs): req_start = self.BOUNDARY + ofs req_end = req_start + self.datalen/4 - 1 range_in = "%d-%d"%(req_start, req_end) range_out = "%s/%d"%(range_in, self.filesize) data_start = req_start - self.filepos data_end = data_start + self.datalen/4 self.drive_range(range_in, range_out, self.datalen/4, self.data[data_start:data_end]) def test_largefile_head(self): resp = Conn().get(self.url, method="HEAD") status, hdrs, body = parse(resp) self.assertContains(status, "200 OK") self.assertEquals(hdrs["Accept-Ranges"], "bytes") self.assertEquals(hdrs["Content-Length"], str(self.filesize)) self.assertEquals(hdrs["Content-Type"], "image/jpeg") def test_largefile__3(self): self.drive_start(-3) def test_largefile__2(self): self.drive_start(-2) def test_largefile__1(self): self.drive_start(-1) def test_largefile_0(self): self.drive_start(0) def test_largefile_1(self): self.drive_start(1) def test_largefile_2(self): self.drive_start(2) def test_largefile_3(self): self.drive_start(3) class TestLargeFile4G(TestLargeFile2G): BOUNDARY = 1<<32 if __name__ == '__main__': setUpModule() unittest.main() # vim:set ts=4 sw=4 et: