Merge "Use policy based rule to define context.is_admin"

This commit is contained in:
Jenkins 2012-10-05 04:32:15 +00:00 committed by Gerrit Code Review
commit 12678167d1
5 changed files with 68 additions and 5 deletions

View File

@ -24,6 +24,7 @@ import copy
from cinder.openstack.common import log as logging from cinder.openstack.common import log as logging
from cinder.openstack.common import local from cinder.openstack.common import local
from cinder.openstack.common import timeutils from cinder.openstack.common import timeutils
from cinder import policy
from cinder import utils from cinder import utils
@ -65,7 +66,7 @@ class RequestContext(object):
self.roles = roles or [] self.roles = roles or []
self.is_admin = is_admin self.is_admin = is_admin
if self.is_admin is None: if self.is_admin is None:
self.is_admin = 'admin' in [x.lower() for x in self.roles] self.is_admin = policy.check_is_admin(self.roles)
elif self.is_admin and 'admin' not in self.roles: elif self.is_admin and 'admin' not in self.roles:
self.roles.append('admin') self.roles.append('admin')
self.read_deleted = read_deleted self.read_deleted = read_deleted

View File

@ -86,3 +86,21 @@ def enforce(context, action, target):
policy.enforce(match_list, target, credentials, policy.enforce(match_list, target, credentials,
exception.PolicyNotAuthorized, action=action) exception.PolicyNotAuthorized, action=action)
def check_is_admin(roles):
"""Whether or not roles contains 'admin' role according to policy setting.
"""
init()
action = 'context_is_admin'
match_list = ('rule:%s' % action,)
# include project_id on target to avoid KeyError if context_is_admin
# policy definition is missing, and default admin_or_owner rule
# attempts to apply. Since our credentials dict does not include a
# project_id, this target can never match as a generic rule.
target = {'project_id': ''}
credentials = {'roles': roles}
return policy.enforce(match_list, target, credentials)

View File

@ -1,5 +1,6 @@
{ {
"admin_api": [["role:admin"]], "context_is_admin": [["role:admin"]],
"admin_api": [["is_admin:True"]],
"volume:create": [], "volume:create": [],
"volume:get": [], "volume:get": [],

View File

@ -36,8 +36,9 @@ FLAGS = flags.FLAGS
class PolicyFileTestCase(test.TestCase): class PolicyFileTestCase(test.TestCase):
def setUp(self): def setUp(self):
super(PolicyFileTestCase, self).setUp() super(PolicyFileTestCase, self).setUp()
policy.reset() # since is_admin is defined by policy, create context before reset
self.context = context.RequestContext('fake', 'fake') self.context = context.RequestContext('fake', 'fake')
policy.reset()
self.target = {} self.target = {}
def tearDown(self): def tearDown(self):
@ -188,3 +189,44 @@ class DefaultPolicyTestCase(test.TestCase):
self._set_brain("default_noexist") self._set_brain("default_noexist")
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce, self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
self.context, "example:noexist", {}) self.context, "example:noexist", {})
class ContextIsAdminPolicyTestCase(test.TestCase):
def setUp(self):
super(ContextIsAdminPolicyTestCase, self).setUp()
policy.reset()
policy.init()
def test_default_admin_role_is_admin(self):
ctx = context.RequestContext('fake', 'fake', roles=['johnny-admin'])
self.assertFalse(ctx.is_admin)
ctx = context.RequestContext('fake', 'fake', roles=['admin'])
self.assert_(ctx.is_admin)
def test_custom_admin_role_is_admin(self):
# define explict rules for context_is_admin
rules = {
'context_is_admin': [["role:administrator"], ["role:johnny-admin"]]
}
brain = common_policy.Brain(rules, FLAGS.policy_default_rule)
common_policy.set_brain(brain)
ctx = context.RequestContext('fake', 'fake', roles=['johnny-admin'])
self.assert_(ctx.is_admin)
ctx = context.RequestContext('fake', 'fake', roles=['administrator'])
self.assert_(ctx.is_admin)
# default rule no longer applies
ctx = context.RequestContext('fake', 'fake', roles=['admin'])
self.assertFalse(ctx.is_admin)
def test_context_is_admin_undefined(self):
rules = {
"admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]],
"default": [["rule:admin_or_owner"]],
}
brain = common_policy.Brain(rules, FLAGS.policy_default_rule)
common_policy.set_brain(brain)
ctx = context.RequestContext('fake', 'fake')
self.assertFalse(ctx.is_admin)
ctx = context.RequestContext('fake', 'fake', roles=['admin'])
self.assert_(ctx.is_admin)

View File

@ -1,8 +1,9 @@
{ {
"admin_or_owner": [["role:admin"], ["project_id:%(project_id)s"]], "context_is_admin": [["role:admin"]],
"admin_or_owner": [["is_admin:True"], ["project_id:%(project_id)s"]],
"default": [["rule:admin_or_owner"]], "default": [["rule:admin_or_owner"]],
"admin_api": [["role:admin"]], "admin_api": [["is_admin:True"]],
"volume:create": [], "volume:create": [],
"volume:get_all": [], "volume:get_all": [],