From 81f3014760b63074a09ab326ae9afa26893a651d Mon Sep 17 00:00:00 2001 From: Yikun Jiang Date: Thu, 8 Nov 2018 15:57:19 +0800 Subject: [PATCH] Add policy test for TENANT_ATTRIBUTE_POLICY Add policy test for TENANT_ATTRIBUTE_POLICY, and remove it from test policy file, the default of TENANT_ATTRIBUTE_POLICY is 'RULE_ADMIN_OR_OWNER', that means only admin and owner are authorized to show 'tenant_id'. This is one of the serious patches of policy-in-code test, see more information on [1] and [2]. [1] 530fb9319ce21b7ff99e55f095c04f13f0785842 [2] f207bac80924ffaf6d4c2a500c295d0e2e71966e Change-Id: Ibc674d05eb391e2bc83bfc83bfdce99e5e692ab5 --- .../contrib/test_volume_tenant_attribute.py | 48 +++++++++++-- .../unit/api/v3/test_volume_protection.py | 70 +++++++++++++++++++ cinder/tests/unit/policy.json | 1 - 3 files changed, 111 insertions(+), 8 deletions(-) diff --git a/cinder/tests/unit/api/contrib/test_volume_tenant_attribute.py b/cinder/tests/unit/api/contrib/test_volume_tenant_attribute.py index 47ca4248f2c..c71d66b3e6b 100644 --- a/cinder/tests/unit/api/contrib/test_volume_tenant_attribute.py +++ b/cinder/tests/unit/api/contrib/test_volume_tenant_attribute.py @@ -14,11 +14,14 @@ import uuid +from oslo_policy import policy as oslo_policy from oslo_serialization import jsonutils import webob from cinder import context from cinder import objects +from cinder.policies.volumes import TENANT_ATTRIBUTE_POLICY +from cinder import policy from cinder import test from cinder.tests.unit.api import fakes from cinder.tests.unit import fake_constants as fake @@ -57,8 +60,13 @@ class VolumeTenantAttributeTest(test.TestCase): self.mock_object(volume.api.API, 'get', fake_volume_get) self.mock_object(volume.api.API, 'get_all', fake_volume_get_all) self.UUID = uuid.uuid4() + policy.reset() + policy.init() + self.addCleanup(policy.reset) - def test_get_volume_allowed(self): + def test_get_volume_includes_tenant_id(self): + allow_all = {TENANT_ATTRIBUTE_POLICY: oslo_policy._checks.TrueCheck()} + policy._ENFORCER.set_rules(allow_all, overwrite=False) ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True) req = webob.Request.blank('/v2/%s/volumes/%s' % ( fake.PROJECT_ID, self.UUID)) @@ -67,19 +75,26 @@ class VolumeTenantAttributeTest(test.TestCase): res = req.get_response(app()) vol = jsonutils.loads(res.body)['volume'] self.assertEqual(PROJECT_ID, vol['os-vol-tenant-attr:tenant_id']) + self.assertIn('os-vol-tenant-attr:tenant_id', vol) - def test_get_volume_unallowed(self): - ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, False) + def test_get_volume_excludes_tenant_id(self): + allow_none = {TENANT_ATTRIBUTE_POLICY: + oslo_policy._checks.FalseCheck()} + policy._ENFORCER.set_rules(allow_none, overwrite=False) + ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True) req = webob.Request.blank('/v2/%s/volumes/%s' % ( fake.PROJECT_ID, self.UUID)) req.method = 'GET' req.environ['cinder.context'] = ctx res = req.get_response(app()) vol = jsonutils.loads(res.body)['volume'] + self.assertEqual(fake.VOLUME_ID, vol['id']) self.assertNotIn('os-vol-tenant-attr:tenant_id', vol) - def test_list_detail_volumes_allowed(self): - ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True) + def test_list_detail_volumes_includes_tenant_id(self): + allow_all = {TENANT_ATTRIBUTE_POLICY: oslo_policy._checks.TrueCheck()} + policy._ENFORCER.set_rules(allow_all, overwrite=False) + ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, False) req = webob.Request.blank('/v2/%s/volumes/detail' % fake.PROJECT_ID) req.method = 'GET' req.environ['cinder.context'] = ctx @@ -87,20 +102,39 @@ class VolumeTenantAttributeTest(test.TestCase): vol = jsonutils.loads(res.body)['volumes'] self.assertEqual(PROJECT_ID, vol[0]['os-vol-tenant-attr:tenant_id']) - def test_list_detail_volumes_unallowed(self): + def test_list_detail_volumes_excludes_tenant_id(self): + allow_none = {TENANT_ATTRIBUTE_POLICY: + oslo_policy._checks.FalseCheck()} + policy._ENFORCER.set_rules(allow_none, overwrite=False) ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, False) req = webob.Request.blank('/v2/%s/volumes/detail' % fake.PROJECT_ID) req.method = 'GET' req.environ['cinder.context'] = ctx res = req.get_response(app()) vol = jsonutils.loads(res.body)['volumes'] + self.assertEqual(fake.VOLUME_ID, vol[0]['id']) self.assertNotIn('os-vol-tenant-attr:tenant_id', vol[0]) - def test_list_simple_volumes_no_tenant_id(self): + def test_list_simple_volumes_never_has_tenant_id(self): + allow_all = {TENANT_ATTRIBUTE_POLICY: oslo_policy._checks.TrueCheck()} + policy._ENFORCER.set_rules(allow_all, overwrite=False) ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True) req = webob.Request.blank('/v2/%s/volumes' % fake.PROJECT_ID) req.method = 'GET' req.environ['cinder.context'] = ctx res = req.get_response(app()) vol = jsonutils.loads(res.body)['volumes'] + self.assertEqual(fake.VOLUME_ID, vol[0]['id']) + self.assertNotIn('os-vol-tenant-attr:tenant_id', vol[0]) + + allow_none = {TENANT_ATTRIBUTE_POLICY: + oslo_policy._checks.FalseCheck()} + policy._ENFORCER.set_rules(allow_none, overwrite=False) + ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True) + req = webob.Request.blank('/v2/%s/volumes' % fake.PROJECT_ID) + req.method = 'GET' + req.environ['cinder.context'] = ctx + res = req.get_response(app()) + vol = jsonutils.loads(res.body)['volumes'] + self.assertEqual(fake.VOLUME_ID, vol[0]['id']) self.assertNotIn('os-vol-tenant-attr:tenant_id', vol[0]) diff --git a/cinder/tests/unit/api/v3/test_volume_protection.py b/cinder/tests/unit/api/v3/test_volume_protection.py index 42c90921804..a8fb52c4e4a 100644 --- a/cinder/tests/unit/api/v3/test_volume_protection.py +++ b/cinder/tests/unit/api/v3/test_volume_protection.py @@ -98,6 +98,42 @@ class VolumeProtectionTests(test.TestCase): self.assertEqual(http_client.OK, response.status_int) self.assertEqual(response.json_body['volume']['id'], volume.id) + @mock.patch.object(volume_api.API, 'get_volume') + def test_admin_can_show_tenant_id_in_volume(self, mock_volume): + # Make sure administrators are authorized to show tenant_id + admin_context = self.admin_context + + volume = self._create_fake_volume(admin_context) + mock_volume.return_value = volume + path = '/v3/%(project_id)s/volumes/%(volume_id)s' % { + 'project_id': admin_context.project_id, 'volume_id': volume.id + } + + response = self._get_request_response(admin_context, path, 'GET') + + self.assertEqual(http_client.OK, response.status_int) + res_vol = response.json_body['volume'] + self.assertEqual(admin_context.project_id, + res_vol['os-vol-tenant-attr:tenant_id']) + + @mock.patch.object(volume_api.API, 'get_volume') + def test_owner_can_show_tenant_id_in_volume(self, mock_volume): + # Make sure owners are authorized to show tenant_id in volume + user_context = self.user_context + + volume = self._create_fake_volume(user_context) + mock_volume.return_value = volume + path = '/v3/%(project_id)s/volumes/%(volume_id)s' % { + 'project_id': user_context.project_id, 'volume_id': volume.id + } + + response = self._get_request_response(user_context, path, 'GET') + + self.assertEqual(http_client.OK, response.status_int) + res_vol = response.json_body['volume'] + self.assertEqual(user_context.project_id, + res_vol['os-vol-tenant-attr:tenant_id']) + @mock.patch.object(volume_api.API, 'get_volume') def test_owner_cannot_show_volumes_for_others(self, mock_volume): # Make sure volumes are only exposed to their owners @@ -150,6 +186,40 @@ class VolumeProtectionTests(test.TestCase): self.assertEqual(volume.id, res_vol['id']) + def test_admin_can_show_tenant_id_in_volume_detail(self): + # Make sure admins are authorized to show tenant_id in volume detail + admin_context = self.admin_context + + volume = self._create_fake_volume(admin_context) + path = '/v3/%(project_id)s/volumes/detail' % { + 'project_id': admin_context.project_id, 'volume_id': volume.id + } + + response = self._get_request_response(admin_context, path, 'GET') + + self.assertEqual(http_client.OK, response.status_int) + res_vol = response.json_body['volumes'][0] + # Make sure owners are authorized to show tenant_id + self.assertEqual(admin_context.project_id, + res_vol['os-vol-tenant-attr:tenant_id']) + + def test_owner_can_show_tenant_id_in_volume_detail(self): + # Make sure owners are authorized to show tenant_id in volume detail + user_context = self.user_context + + volume = self._create_fake_volume(user_context) + path = '/v3/%(project_id)s/volumes/detail' % { + 'project_id': user_context.project_id, 'volume_id': volume.id + } + + response = self._get_request_response(user_context, path, 'GET') + + self.assertEqual(http_client.OK, response.status_int) + res_vol = response.json_body['volumes'][0] + # Make sure owners are authorized to show tenant_id + self.assertEqual(user_context.project_id, + res_vol['os-vol-tenant-attr:tenant_id']) + @mock.patch.object(volume_api.API, 'get_volume') def test_admin_can_force_delete_volumes(self, mock_volume): # Make sure administrators are authorized to force delete volumes diff --git a/cinder/tests/unit/policy.json b/cinder/tests/unit/policy.json index 464199e7ce8..5e08a83267c 100644 --- a/cinder/tests/unit/policy.json +++ b/cinder/tests/unit/policy.json @@ -42,7 +42,6 @@ "volume_extension:volume_type_access": "", "volume_extension:extended_snapshot_attributes": "", "volume_extension:volume_image_metadata": "", - "volume_extension:volume_tenant_attribute": "rule:admin_api", "volume_extension:services:index": "", "volume_extension:services:update" : "rule:admin_api",