diff --git a/libs/privilege.py b/libs/privilege.py new file mode 100644 index 0000000..58a037f --- /dev/null +++ b/libs/privilege.py @@ -0,0 +1,433 @@ +#! /usr/bin/python +""" +privilege.py + +Copyright (c) 2009-2010, Travis H. + +License terms: Same as those of Python itself. + +This module is designed to implement the privilege-dropping API +described here: +http://www.cs.berkeley.edu/~daw/papers/setuid-usenix02.pdf +http://www.cs.berkeley.edu/~daw/papers/setuid-login08b.pdf + +This topic is complex so you should read at least the second paper +before trying to understand why I'm doing this. + +NOTE: This code does not (yet) attempt to work on Solaris and AIX. +That is, you must have getres[ug]id and setres[ug]id to use it. + +TODO: +I have added setres[ug]id to python; unsure of when it will +appear; make a future version of this will check for its presence. + +Make interface more OO, less imperative. + +Implement drop_priv_temp, restore_priv. + +Implement on systems without setres[ug]id + +Contributors: +Kevin Gilette +""" + +from ctypes import * +from ctypes.util import find_library +import os +import pwd +import grp + +# Get the name of the Operating System +os_kernel = os.uname()[0] + +class PrivilegeFail(Exception): + pass + +clib = CDLL(find_library("c")) + +# Tested on x86-64 -- all tests pass when run as root +__uid_t = c_uint +__gid_t = c_uint + +_getresuid = clib.getresuid + +def getresuid(): + """Get the real, effective, and saved user IDs""" + # Create some memory locations for getresuid to write to. + r = __uid_t() + e = __uid_t() + s = __uid_t() + # Call getresuid syscall, passing by reference. + res = _getresuid(byref(r), byref(e), byref(s)) + if res < 0: raise pythonapi.PyErr_SetFromErrno(py_object(OSError)) + # Convert to python integers. + return r.value, e.value, s.value + +_getresgid = clib.getresgid + +def getresgid(): + """Get the real, effective, and saved group IDs""" + r = __gid_t() + e = __gid_t() + s = __gid_t() + # Call getresgid syscall, passing by reference. + res = _getresgid(byref(r), byref(e), byref(s)) + if res < 0: raise pythonapi.PyErr_SetFromErrno(py_object(OSError)) + # Convert to python integers. + return r.value, e.value, s.value + +# Import the setresuid system call using ctypes +_setresuid = clib.setresuid +_setresuid.argttypes = [__uid_t, __uid_t, __uid_t] +_setresuid.resttype = c_int + +# Import the setresgid system call using ctypes +_setresgid = clib.setresgid +_setresgid.argttypes = [__gid_t, __gid_t, __gid_t] +_setresgid.resttype = c_int + +def setresuid(ruid, euid, suid): + """Set the real, effective, and saved user IDs""" + res = _setresuid(__uid_t(ruid), __uid_t(euid), __uid_t(suid)) + if res < 0: raise pythonapi.PyErr_SetFromErrno(py_object(OSError)) + +def setresgid(rgid, egid, sgid): + """Set the real, effecive, and saved group IDs""" + res = _setresgid(__gid_t(rgid), __gid_t(egid), __gid_t(sgid)) + if res < 0: raise pythonapi.PyErr_SetFromErrno(py_object(OSError)) + +def sort_uniq(args): + """Sort a sequence, discarding duplicates.""" + return sorted(set(args)) + +class user_credentials: + """ + This represents the credentials associated with a user. + User ID + Group ID + Supplementary Groups + This is used as an argument to drop_permanently + """ + def __init__(self, uid, gid, sups): + # -1 has special meaning for several set*id calls (ignore) + if uid == -1: raise PrivilegeFail + if gid == -1: raise PrivilegeFail + nm = os.sysconf('SC_NGROUPS_MAX') + if nm < 0 or nm < len(sups): raise PrivilegeFail + self.uid = uid + self.gid = gid + self.sups = sort_uniq(sups) + +def eql_sups(current, target): + """ + Compare two supplementary group lists, and ignore if effective GID is in current but not target. + Prerequisite: The supplementary group lists are sorted and filtered for duplicates. + """ + egid = os.getegid() + my_current = sort_uniq(current) + # Instead of tediously ignoring this value, if it's in the current list, then go ahead + # and add it to the target list + if egid in current: + my_target = target + [ egid ] + else: + my_target = target + my_target = sort_uniq(my_target) + return my_current == my_target + +def get_sups(): + """This is here to give us a layer of abstraction relative to system calls""" + return os.getgroups() + +def set_sups(target_sups): + """ + This is designed to give us a layer of abstraction from the system calls. + It also accomodates FreeBSD's idiosyncracy (which is POSIX-compliant) of + keeping the egid in the supplementary groups list. + It also makes an effort to not call the setgroups routine if the target + group list is identical to the current one in force. + """ + global os_kernel + if os_kernel == 'FreeBSD': + target_sups = [ os.getegid() ] + target_sups + if os.geteuid() == 0: + # This will raise an OSError exception if it fails + os.setgroups(target_sups) + else: + cur_sups = get_sups() + # This will probably fail + if not eql_sups(cur_sups, target_sups): + # This will raise an OSError exception if it fails + os.setgroups(target_sups) + return True + +def set_gids(r, e, s): + """This is here to give us a layer of abstraction relative to system calls""" + setresgid(r, e, s) + +def set_uids(r, e, s): + """This is here to give us a layer of abstraction relative to system calls""" + setresuid(r, e, s) + +class res_ids: + """ + This represents the three IDs (group or user) associated with a process. + """ + def __init__(self, real, effective, saved): + self.r = real + self.e = effective + self.s = saved + +class proc_credentials: + """ + This obtains and represents the credentials associated with a process. + """ + def __init__(self): + self.uids = apply(res_ids, getresuid()) + self.gids = apply(res_ids, getresgid()) + self.sups = sort_uniq(os.getgroups()) + +def get_fs_ids(): + """Get filesystem IDs - applies only to Linux""" + uid = None + gid = None + file = open('/proc/self/status', 'r') + for line in file: + fields = line.split() + if fields[0] == 'Uid:': + uid = int(fields[4]) + elif fields[0] == 'Gid:': + gid = int(fields[4]) + return uid, gid + +def coerce_user(user): + if hasattr(user, '__int__'): + return int(user) + return pwd.getpwnam(user).pw_uid + +def coerce_group(group): + if hasattr(group, '__int__'): + return int(group) + return grp.getgrnam(group).gr_gid + +def drop_privileges_permanently(uid, gid, sups): + """ + This routine is designed to permanently drop all privileges to the + user, group, and supplementary groups specified. + """ + + uid = coerce_user(uid) + gid = coerce_group(gid) + sups = map(coerce_group, sups) + + # This does some syntax checking + ucred = user_credentials(uid, gid, sups) + + # This is for our convenience + u = uid + g = gid + + # Order is important in these three calls + set_sups(ucred.sups) + set_gids(g, g, g) # real, effective, saved + set_uids(u, u, u) # real, effective, saved + + # Check that we actually did what we expected or throw exception. + pc = proc_credentials() + # Portably compare the supplementary group list + if not eql_sups(pc.sups, ucred.sups): raise PrivilegeFail + # Check all the gids + if not (g == pc.gids.r and g == pc.gids.e and g == pc.gids.s): + raise PrivilegeFail + # Check all the uids + if not (u == pc.uids.r and u == pc.gids.e and u == pc.uids.s): + raise PrivilegeFail + global os_kernel + if os_kernel == 'Linux': + if get_fs_ids() != (u, g): raise PrivilegeFail + +# This is all test code +# It is run if this script is invoked directly +if __name__ == '__main__': + + import unittest + + class test_getresXid(unittest.TestCase): + """Test the calls to getresXid""" + def test__getresuid(self): + # TODO: is there any way to avoid redefining this here? I tried a global but it didn't + # work. + # Note: the double-leading underscore may be getting special-cased by python. + __uid_t = c_int + r = __uid_t() + e = __uid_t() + s = __uid_t() + ret = _getresuid(byref(r), byref(e), byref(s)) + self.assertEqual(ret, 0) + self.assertEqual(r.value, os.getuid()) + self.assertEqual(e.value, os.geteuid()) + # NOTE: no other portable way to get saved UID + def test_getresuid(self): + (r, e, s) = getresuid() + self.assertEqual(r, os.getuid()) + self.assertEqual(e, os.geteuid()) + # NOTE: no other portable way to get saved UID + def test__getresgid(self): + __gid_t = c_int + r = __gid_t() + e = __gid_t() + s = __gid_t() + ret = _getresgid(byref(r), byref(e), byref(s)) + self.assertEqual(ret, 0) + self.assertEqual(r.value, os.getgid()) + self.assertEqual(e.value, os.getegid()) + # NOTE: no other portable way to get saved GID + def test_getresgid(self): + (r, e, s) = getresgid() + self.assertEqual(r, os.getgid()) + self.assertEqual(e, os.getegid()) + # NOTE: no other portable way to get saved GID + + class test_setresuid(unittest.TestCase): + """Test the call to setresuid""" + def setUp(self): + self.uid = os.geteuid() + def test__setresuid(self): + __uid_t = c_int + r1 = __uid_t(1) + e1 = __uid_t(1) + # Must save root UID so that we can reset UIDs for other tests + s1 = __uid_t(0) + if self.uid == 0: + rv = _setresuid(r1, e1, s1) + self.assertEqual(rv, 0) + (r2, e2, s2) = getresuid() + self.assertEqual(r1.value, r2) + self.assertEqual(e1.value, e2) + self.assertEqual(s1.value, s2) + else: + rv = _setresuid(r1, e1, s1) + self.assertEqual(rv, -1) + def test_setresuid(self): + if self.uid == 0: + setresuid(1,1,0) + (r, e, s) = getresuid() + self.assertEqual(r, 1) + self.assertEqual(e, 1) + self.assertEqual(s, 0) + else: + self.assertRaises(OSError, setresuid, 1, 1, 1) + def tearDown(self): + if self.uid == 0: + # Restore UIDs for next test + _setresuid(0, 0, 0) + + class test_setresgid(unittest.TestCase): + """Test the call to setresgid""" + def setUp(self): + self.uid = os.geteuid() + def test__setresgid(self): + __gid_t = c_int + r1 = __gid_t(1) + e1 = __gid_t(1) + s1 = __gid_t(1) + if self.uid == 0: + rv = _setresgid(r1, e1, s1) + self.assertEqual(rv, 0) + (r2, e2, s2) = getresgid() + self.assertEqual(r1.value, r2) + self.assertEqual(e1.value, e2) + self.assertEqual(s1.value, s2) + else: + rv = _setresgid(r1, e1, s1) + self.assertEqual(rv, -1) + def test_setresgid(self): + if self.uid == 0: + setresgid(1,1,1) + (r, e, s) = getresgid() + self.assertEqual(r, 1) + self.assertEqual(e, 1) + self.assertEqual(s, 1) + else: + self.assertRaises(OSError, setresgid, 1, 1, 1) + + class test_sort_uniq(unittest.TestCase): + def test_sort_uniq(self): + l = [ 'c', 'a', 'b', 'a' ] + self.assertEqual(sort_uniq(l), [ 'a', 'b', 'c']) + + class test_user_credentials(unittest.TestCase): + def test_negatives(self): + self.assertRaises(PrivilegeFail, user_credentials, -1, 0, []) + self.assertRaises(PrivilegeFail, user_credentials, 0, -1, []) + uc = user_credentials(0, 0, [0, 1]) + + class test_eql_sups(unittest.TestCase): + def test_equal(self): + self.assert_(eql_sups([0, 1, 2], [0, 1, 2])) + def test_contains_egid(self): + self.assert_(eql_sups(sort_uniq([0, 1, 2, os.getegid()]), [0, 1, 2])) + self.assert_(eql_sups(sort_uniq([0, 1, os.getegid(), 9999]), [0, 1, 9999])) + + class test_get_sups(unittest.TestCase): + def test(self): + self.assertEqual(os.getgroups(), get_sups()) + + class test_set_sups(unittest.TestCase): + def test_equal(self): + sups = os.getgroups() + set_sups(sups) + self.assertEqual(sups, os.getgroups()) + def test_unequal(self): + old_sups = os.getgroups() + sups = [ 1, 2, 3 ] + if os.geteuid() == 0: + set_sups(sups) + self.assert_(eql_sups(os.getgroups(), sups)) + # Clean up by resetting supplementary groups + set_sups(old_sups) + else: + self.assertRaises(OSError, set_sups, sups) + + class test_res_ids(unittest.TestCase): + def test_res_ids(self): + ids = res_ids(1, 2, 3) + self.assertEqual(ids.r, 1) + self.assertEqual(ids.e, 2) + self.assertEqual(ids.s, 3) + + class test_proc_credentials(unittest.TestCase): + def test_pc(self): + pc = proc_credentials() + self.assertEqual(pc.uids.r, os.getuid()) + self.assertEqual(pc.uids.e, os.geteuid()) + self.assertEqual(pc.uids.s, (getresuid())[2]) + self.assertEqual(pc.gids.r, os.getgid()) + self.assertEqual(pc.gids.e, os.getegid()) + self.assertEqual(pc.gids.s, (getresgid())[2]) + self.assertEqual(pc.sups, sort_uniq(os.getgroups())) + + class test_get_fs_ids(unittest.TestCase): + def test_get_fs_ids(self): + uid, gid = get_fs_ids() + self.assertEqual(uid, os.getuid()) + self.assertEqual(gid, os.getgid()) + + class test_drop_privs(unittest.TestCase): + def test_drop_privs(self): + """This test must be run last""" + if os.geteuid() == 0: + drop_privileges_permanently(1, 1, [1]) + + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(test_getresXid)) + suite.addTest(unittest.makeSuite(test_setresuid)) + suite.addTest(unittest.makeSuite(test_setresgid)) + suite.addTest(unittest.makeSuite(test_sort_uniq)) + suite.addTest(unittest.makeSuite(test_user_credentials)) + suite.addTest(unittest.makeSuite(test_eql_sups)) + suite.addTest(unittest.makeSuite(test_set_sups)) + suite.addTest(unittest.makeSuite(test_res_ids)) + suite.addTest(unittest.makeSuite(test_proc_credentials)) + suite.addTest(unittest.makeSuite(test_get_fs_ids)) + suite.addTest(unittest.makeSuite(test_drop_privs)) + unittest.TextTestRunner().run(suite) diff --git a/settings.py b/settings.py index 245e34e..eef79a5 100644 --- a/settings.py +++ b/settings.py @@ -16,7 +16,6 @@ DEBUG = True # default in projectdirectory/static/content/ # use "/" even under Windows PASTE_FILES_ROOT = os.path.join(STATIC_FILES_ROOT, 'content') - # Port and host the embeded python server should be using in prod and in dev PROD_HOST = "0.0.0.0" PROD_PORT= "80" diff --git a/start.py b/start.py index bc15d78..41f2e18 100644 --- a/start.py +++ b/start.py @@ -7,6 +7,10 @@ import os import hashlib +import thread +import time +import tempfile +import glob from datetime import datetime, timedelta @@ -14,6 +18,8 @@ from src import settings, setup_path, Paste setup_path() +from privilege import drop_privileges_permanently, coerce_user, coerce_group + from bottle import (Bottle, route, run, abort, static_file, debug, view, request) @@ -86,8 +92,32 @@ def server_static(filename): if __name__ == "__main__": + + def drop_privileges(): + time.sleep(5) + if settings.USER: + settings.GROUP = settings.GROUP or settings.USER + try: + user = coerce_user(settings.USER) + group = coerce_group(settings.GROUP) + + lock_files = glob.glob(os.path.join(tempfile.gettempdir(), + 'bottle.*.lock')) + for lock_file in lock_files: + os.chown(lock_file, user, group) + + drop_privileges_permanently(settings.USER, settings.GROUP, ()) + except Exception: + print "Failed to drop privileges. Running with current user." + + thread.start_new_thread(drop_privileges, ()) + if settings.DEBUG: debug(True) - run(app, host='localhost', port=8000, reloader=True, server="cherrypy") + run(app, host=settings.DEV_HOST, port=settings.DEV_PORT, + reloader=True, server="cherrypy") else: - run(app, host='localhost', port=8000, server="cherrypy") + run(app, host=settings.PROD_HOST, + port=settings.PROD_PORT, server="cherrypy") + +