1
0
mirror of https://github.com/Tygs/0bin.git synced 2023-08-10 21:13:00 +03:00
This commit is contained in:
sam 2012-04-29 05:20:54 +07:00
commit 30cce72678
3 changed files with 465 additions and 3 deletions

433
libs/privilege.py Normal file
View File

@ -0,0 +1,433 @@
#! /usr/bin/python
"""
privilege.py
Copyright (c) 2009-2010, Travis H.
License terms: Same as those of Python itself.
This module is designed to implement the privilege-dropping API
described here:
http://www.cs.berkeley.edu/~daw/papers/setuid-usenix02.pdf
http://www.cs.berkeley.edu/~daw/papers/setuid-login08b.pdf
This topic is complex so you should read at least the second paper
before trying to understand why I'm doing this.
NOTE: This code does not (yet) attempt to work on Solaris and AIX.
That is, you must have getres[ug]id and setres[ug]id to use it.
TODO:
I have added setres[ug]id to python; unsure of when it will
appear; make a future version of this will check for its presence.
Make interface more OO, less imperative.
Implement drop_priv_temp, restore_priv.
Implement on systems without setres[ug]id
Contributors:
Kevin Gilette
"""
from ctypes import *
from ctypes.util import find_library
import os
import pwd
import grp
# Get the name of the Operating System
os_kernel = os.uname()[0]
class PrivilegeFail(Exception):
pass
clib = CDLL(find_library("c"))
# Tested on x86-64 -- all tests pass when run as root
__uid_t = c_uint
__gid_t = c_uint
_getresuid = clib.getresuid
def getresuid():
"""Get the real, effective, and saved user IDs"""
# Create some memory locations for getresuid to write to.
r = __uid_t()
e = __uid_t()
s = __uid_t()
# Call getresuid syscall, passing by reference.
res = _getresuid(byref(r), byref(e), byref(s))
if res < 0: raise pythonapi.PyErr_SetFromErrno(py_object(OSError))
# Convert to python integers.
return r.value, e.value, s.value
_getresgid = clib.getresgid
def getresgid():
"""Get the real, effective, and saved group IDs"""
r = __gid_t()
e = __gid_t()
s = __gid_t()
# Call getresgid syscall, passing by reference.
res = _getresgid(byref(r), byref(e), byref(s))
if res < 0: raise pythonapi.PyErr_SetFromErrno(py_object(OSError))
# Convert to python integers.
return r.value, e.value, s.value
# Import the setresuid system call using ctypes
_setresuid = clib.setresuid
_setresuid.argttypes = [__uid_t, __uid_t, __uid_t]
_setresuid.resttype = c_int
# Import the setresgid system call using ctypes
_setresgid = clib.setresgid
_setresgid.argttypes = [__gid_t, __gid_t, __gid_t]
_setresgid.resttype = c_int
def setresuid(ruid, euid, suid):
"""Set the real, effective, and saved user IDs"""
res = _setresuid(__uid_t(ruid), __uid_t(euid), __uid_t(suid))
if res < 0: raise pythonapi.PyErr_SetFromErrno(py_object(OSError))
def setresgid(rgid, egid, sgid):
"""Set the real, effecive, and saved group IDs"""
res = _setresgid(__gid_t(rgid), __gid_t(egid), __gid_t(sgid))
if res < 0: raise pythonapi.PyErr_SetFromErrno(py_object(OSError))
def sort_uniq(args):
"""Sort a sequence, discarding duplicates."""
return sorted(set(args))
class user_credentials:
"""
This represents the credentials associated with a user.
User ID
Group ID
Supplementary Groups
This is used as an argument to drop_permanently
"""
def __init__(self, uid, gid, sups):
# -1 has special meaning for several set*id calls (ignore)
if uid == -1: raise PrivilegeFail
if gid == -1: raise PrivilegeFail
nm = os.sysconf('SC_NGROUPS_MAX')
if nm < 0 or nm < len(sups): raise PrivilegeFail
self.uid = uid
self.gid = gid
self.sups = sort_uniq(sups)
def eql_sups(current, target):
"""
Compare two supplementary group lists, and ignore if effective GID is in current but not target.
Prerequisite: The supplementary group lists are sorted and filtered for duplicates.
"""
egid = os.getegid()
my_current = sort_uniq(current)
# Instead of tediously ignoring this value, if it's in the current list, then go ahead
# and add it to the target list
if egid in current:
my_target = target + [ egid ]
else:
my_target = target
my_target = sort_uniq(my_target)
return my_current == my_target
def get_sups():
"""This is here to give us a layer of abstraction relative to system calls"""
return os.getgroups()
def set_sups(target_sups):
"""
This is designed to give us a layer of abstraction from the system calls.
It also accomodates FreeBSD's idiosyncracy (which is POSIX-compliant) of
keeping the egid in the supplementary groups list.
It also makes an effort to not call the setgroups routine if the target
group list is identical to the current one in force.
"""
global os_kernel
if os_kernel == 'FreeBSD':
target_sups = [ os.getegid() ] + target_sups
if os.geteuid() == 0:
# This will raise an OSError exception if it fails
os.setgroups(target_sups)
else:
cur_sups = get_sups()
# This will probably fail
if not eql_sups(cur_sups, target_sups):
# This will raise an OSError exception if it fails
os.setgroups(target_sups)
return True
def set_gids(r, e, s):
"""This is here to give us a layer of abstraction relative to system calls"""
setresgid(r, e, s)
def set_uids(r, e, s):
"""This is here to give us a layer of abstraction relative to system calls"""
setresuid(r, e, s)
class res_ids:
"""
This represents the three IDs (group or user) associated with a process.
"""
def __init__(self, real, effective, saved):
self.r = real
self.e = effective
self.s = saved
class proc_credentials:
"""
This obtains and represents the credentials associated with a process.
"""
def __init__(self):
self.uids = apply(res_ids, getresuid())
self.gids = apply(res_ids, getresgid())
self.sups = sort_uniq(os.getgroups())
def get_fs_ids():
"""Get filesystem IDs - applies only to Linux"""
uid = None
gid = None
file = open('/proc/self/status', 'r')
for line in file:
fields = line.split()
if fields[0] == 'Uid:':
uid = int(fields[4])
elif fields[0] == 'Gid:':
gid = int(fields[4])
return uid, gid
def coerce_user(user):
if hasattr(user, '__int__'):
return int(user)
return pwd.getpwnam(user).pw_uid
def coerce_group(group):
if hasattr(group, '__int__'):
return int(group)
return grp.getgrnam(group).gr_gid
def drop_privileges_permanently(uid, gid, sups):
"""
This routine is designed to permanently drop all privileges to the
user, group, and supplementary groups specified.
"""
uid = coerce_user(uid)
gid = coerce_group(gid)
sups = map(coerce_group, sups)
# This does some syntax checking
ucred = user_credentials(uid, gid, sups)
# This is for our convenience
u = uid
g = gid
# Order is important in these three calls
set_sups(ucred.sups)
set_gids(g, g, g) # real, effective, saved
set_uids(u, u, u) # real, effective, saved
# Check that we actually did what we expected or throw exception.
pc = proc_credentials()
# Portably compare the supplementary group list
if not eql_sups(pc.sups, ucred.sups): raise PrivilegeFail
# Check all the gids
if not (g == pc.gids.r and g == pc.gids.e and g == pc.gids.s):
raise PrivilegeFail
# Check all the uids
if not (u == pc.uids.r and u == pc.gids.e and u == pc.uids.s):
raise PrivilegeFail
global os_kernel
if os_kernel == 'Linux':
if get_fs_ids() != (u, g): raise PrivilegeFail
# This is all test code
# It is run if this script is invoked directly
if __name__ == '__main__':
import unittest
class test_getresXid(unittest.TestCase):
"""Test the calls to getresXid"""
def test__getresuid(self):
# TODO: is there any way to avoid redefining this here? I tried a global but it didn't
# work.
# Note: the double-leading underscore may be getting special-cased by python.
__uid_t = c_int
r = __uid_t()
e = __uid_t()
s = __uid_t()
ret = _getresuid(byref(r), byref(e), byref(s))
self.assertEqual(ret, 0)
self.assertEqual(r.value, os.getuid())
self.assertEqual(e.value, os.geteuid())
# NOTE: no other portable way to get saved UID
def test_getresuid(self):
(r, e, s) = getresuid()
self.assertEqual(r, os.getuid())
self.assertEqual(e, os.geteuid())
# NOTE: no other portable way to get saved UID
def test__getresgid(self):
__gid_t = c_int
r = __gid_t()
e = __gid_t()
s = __gid_t()
ret = _getresgid(byref(r), byref(e), byref(s))
self.assertEqual(ret, 0)
self.assertEqual(r.value, os.getgid())
self.assertEqual(e.value, os.getegid())
# NOTE: no other portable way to get saved GID
def test_getresgid(self):
(r, e, s) = getresgid()
self.assertEqual(r, os.getgid())
self.assertEqual(e, os.getegid())
# NOTE: no other portable way to get saved GID
class test_setresuid(unittest.TestCase):
"""Test the call to setresuid"""
def setUp(self):
self.uid = os.geteuid()
def test__setresuid(self):
__uid_t = c_int
r1 = __uid_t(1)
e1 = __uid_t(1)
# Must save root UID so that we can reset UIDs for other tests
s1 = __uid_t(0)
if self.uid == 0:
rv = _setresuid(r1, e1, s1)
self.assertEqual(rv, 0)
(r2, e2, s2) = getresuid()
self.assertEqual(r1.value, r2)
self.assertEqual(e1.value, e2)
self.assertEqual(s1.value, s2)
else:
rv = _setresuid(r1, e1, s1)
self.assertEqual(rv, -1)
def test_setresuid(self):
if self.uid == 0:
setresuid(1,1,0)
(r, e, s) = getresuid()
self.assertEqual(r, 1)
self.assertEqual(e, 1)
self.assertEqual(s, 0)
else:
self.assertRaises(OSError, setresuid, 1, 1, 1)
def tearDown(self):
if self.uid == 0:
# Restore UIDs for next test
_setresuid(0, 0, 0)
class test_setresgid(unittest.TestCase):
"""Test the call to setresgid"""
def setUp(self):
self.uid = os.geteuid()
def test__setresgid(self):
__gid_t = c_int
r1 = __gid_t(1)
e1 = __gid_t(1)
s1 = __gid_t(1)
if self.uid == 0:
rv = _setresgid(r1, e1, s1)
self.assertEqual(rv, 0)
(r2, e2, s2) = getresgid()
self.assertEqual(r1.value, r2)
self.assertEqual(e1.value, e2)
self.assertEqual(s1.value, s2)
else:
rv = _setresgid(r1, e1, s1)
self.assertEqual(rv, -1)
def test_setresgid(self):
if self.uid == 0:
setresgid(1,1,1)
(r, e, s) = getresgid()
self.assertEqual(r, 1)
self.assertEqual(e, 1)
self.assertEqual(s, 1)
else:
self.assertRaises(OSError, setresgid, 1, 1, 1)
class test_sort_uniq(unittest.TestCase):
def test_sort_uniq(self):
l = [ 'c', 'a', 'b', 'a' ]
self.assertEqual(sort_uniq(l), [ 'a', 'b', 'c'])
class test_user_credentials(unittest.TestCase):
def test_negatives(self):
self.assertRaises(PrivilegeFail, user_credentials, -1, 0, [])
self.assertRaises(PrivilegeFail, user_credentials, 0, -1, [])
uc = user_credentials(0, 0, [0, 1])
class test_eql_sups(unittest.TestCase):
def test_equal(self):
self.assert_(eql_sups([0, 1, 2], [0, 1, 2]))
def test_contains_egid(self):
self.assert_(eql_sups(sort_uniq([0, 1, 2, os.getegid()]), [0, 1, 2]))
self.assert_(eql_sups(sort_uniq([0, 1, os.getegid(), 9999]), [0, 1, 9999]))
class test_get_sups(unittest.TestCase):
def test(self):
self.assertEqual(os.getgroups(), get_sups())
class test_set_sups(unittest.TestCase):
def test_equal(self):
sups = os.getgroups()
set_sups(sups)
self.assertEqual(sups, os.getgroups())
def test_unequal(self):
old_sups = os.getgroups()
sups = [ 1, 2, 3 ]
if os.geteuid() == 0:
set_sups(sups)
self.assert_(eql_sups(os.getgroups(), sups))
# Clean up by resetting supplementary groups
set_sups(old_sups)
else:
self.assertRaises(OSError, set_sups, sups)
class test_res_ids(unittest.TestCase):
def test_res_ids(self):
ids = res_ids(1, 2, 3)
self.assertEqual(ids.r, 1)
self.assertEqual(ids.e, 2)
self.assertEqual(ids.s, 3)
class test_proc_credentials(unittest.TestCase):
def test_pc(self):
pc = proc_credentials()
self.assertEqual(pc.uids.r, os.getuid())
self.assertEqual(pc.uids.e, os.geteuid())
self.assertEqual(pc.uids.s, (getresuid())[2])
self.assertEqual(pc.gids.r, os.getgid())
self.assertEqual(pc.gids.e, os.getegid())
self.assertEqual(pc.gids.s, (getresgid())[2])
self.assertEqual(pc.sups, sort_uniq(os.getgroups()))
class test_get_fs_ids(unittest.TestCase):
def test_get_fs_ids(self):
uid, gid = get_fs_ids()
self.assertEqual(uid, os.getuid())
self.assertEqual(gid, os.getgid())
class test_drop_privs(unittest.TestCase):
def test_drop_privs(self):
"""This test must be run last"""
if os.geteuid() == 0:
drop_privileges_permanently(1, 1, [1])
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(test_getresXid))
suite.addTest(unittest.makeSuite(test_setresuid))
suite.addTest(unittest.makeSuite(test_setresgid))
suite.addTest(unittest.makeSuite(test_sort_uniq))
suite.addTest(unittest.makeSuite(test_user_credentials))
suite.addTest(unittest.makeSuite(test_eql_sups))
suite.addTest(unittest.makeSuite(test_set_sups))
suite.addTest(unittest.makeSuite(test_res_ids))
suite.addTest(unittest.makeSuite(test_proc_credentials))
suite.addTest(unittest.makeSuite(test_get_fs_ids))
suite.addTest(unittest.makeSuite(test_drop_privs))
unittest.TextTestRunner().run(suite)

View File

@ -16,7 +16,6 @@ DEBUG = True
# default in projectdirectory/static/content/
# use "/" even under Windows
PASTE_FILES_ROOT = os.path.join(STATIC_FILES_ROOT, 'content')
# Port and host the embeded python server should be using in prod and in dev
PROD_HOST = "0.0.0.0"
PROD_PORT= "80"

View File

@ -7,6 +7,10 @@
import os
import hashlib
import thread
import time
import tempfile
import glob
from datetime import datetime, timedelta
@ -14,6 +18,8 @@ from src import settings, setup_path, Paste
setup_path()
from privilege import drop_privileges_permanently, coerce_user, coerce_group
from bottle import (Bottle, route, run, abort,
static_file, debug, view, request)
@ -86,8 +92,32 @@ def server_static(filename):
if __name__ == "__main__":
def drop_privileges():
time.sleep(5)
if settings.USER:
settings.GROUP = settings.GROUP or settings.USER
try:
user = coerce_user(settings.USER)
group = coerce_group(settings.GROUP)
lock_files = glob.glob(os.path.join(tempfile.gettempdir(),
'bottle.*.lock'))
for lock_file in lock_files:
os.chown(lock_file, user, group)
drop_privileges_permanently(settings.USER, settings.GROUP, ())
except Exception:
print "Failed to drop privileges. Running with current user."
thread.start_new_thread(drop_privileges, ())
if settings.DEBUG:
debug(True)
run(app, host='localhost', port=8000, reloader=True, server="cherrypy")
run(app, host=settings.DEV_HOST, port=settings.DEV_PORT,
reloader=True, server="cherrypy")
else:
run(app, host='localhost', port=8000, server="cherrypy")
run(app, host=settings.PROD_HOST,
port=settings.PROD_PORT, server="cherrypy")