
md5 is not an approved algorithm in FIPS mode, and trying to instantiate a hashlib.md5() will fail when the system is running in FIPS mode. md5 is allowed when in a non-security context. There is a plan to add a keyword parameter (usedforsecurity) to hashlib.md5() to annotate whether or not the instance is being used in a security context. In the case where it is not, the instantiation of md5 will be allowed. See https://bugs.python.org/issue9216 for more details. Some downstream python versions already support this parameter. To support these versions, a new encapsulation of md5() is added to swift/common/utils.py. This encapsulation is identical to the one being added to oslo.utils, but is recreated here to avoid adding a dependency. This patch is to replace the instances of hashlib.md5() with this new encapsulation, adding an annotation indicating whether the usage is a security context or not. While this patch seems large, it is really just the same change over and again. Reviewers need to pay particular attention as to whether the keyword parameter (usedforsecurity) is set correctly. Right now, all of them appear to be not used in a security context. Now that all the instances have been converted, we can update the bandit run to look for these instances and ensure that new invocations do not creep in. With this latest patch, the functional and unit tests all pass on a FIPS enabled system. Co-Authored-By: Pete Zaitcev Change-Id: Ibb4917da4c083e1e094156d748708b87387f2d87
234 lines
11 KiB
Python
234 lines
11 KiB
Python
# Copyright (c) 2014 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import base64
|
|
import unittest
|
|
import mock
|
|
|
|
from io import BytesIO
|
|
|
|
from swift.common.swob import Request, HTTPAccepted
|
|
from swift.common.middleware.s3api.etree import fromstring, tostring, \
|
|
Element, SubElement, XMLNS_XSI
|
|
from swift.common.middleware.s3api.s3response import InvalidArgument
|
|
from swift.common.middleware.s3api.acl_utils import handle_acl_header
|
|
from swift.common.utils import md5
|
|
|
|
from test.unit.common.middleware.s3api import S3ApiTestCase
|
|
from test.unit.common.middleware.s3api.helpers import UnreadableInput
|
|
from test.unit.common.middleware.s3api.test_s3_acl import s3acl
|
|
|
|
|
|
class TestS3ApiAcl(S3ApiTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestS3ApiAcl, self).setUp()
|
|
# All ACL API should be called against to existing bucket.
|
|
self.swift.register('PUT', '/v1/AUTH_test/bucket',
|
|
HTTPAccepted, {}, None)
|
|
|
|
def _check_acl(self, owner, body):
|
|
elem = fromstring(body, 'AccessControlPolicy')
|
|
permission = elem.find('./AccessControlList/Grant/Permission').text
|
|
self.assertEqual(permission, 'FULL_CONTROL')
|
|
name = elem.find('./AccessControlList/Grant/Grantee/ID').text
|
|
self.assertEqual(name, owner)
|
|
|
|
def test_bucket_acl_GET(self):
|
|
req = Request.blank('/bucket?acl',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'Authorization': 'AWS test:tester:hmac',
|
|
'Date': self.get_date_header()})
|
|
status, headers, body = self.call_s3api(req)
|
|
self._check_acl('test:tester', body)
|
|
|
|
def test_bucket_acl_PUT(self):
|
|
elem = Element('AccessControlPolicy')
|
|
owner = SubElement(elem, 'Owner')
|
|
SubElement(owner, 'ID').text = 'id'
|
|
acl = SubElement(elem, 'AccessControlList')
|
|
grant = SubElement(acl, 'Grant')
|
|
grantee = SubElement(grant, 'Grantee', nsmap={'xsi': XMLNS_XSI})
|
|
grantee.set('{%s}type' % XMLNS_XSI, 'Group')
|
|
SubElement(grantee, 'URI').text = \
|
|
'http://acs.amazonaws.com/groups/global/AllUsers'
|
|
SubElement(grant, 'Permission').text = 'READ'
|
|
|
|
xml = tostring(elem)
|
|
req = Request.blank('/bucket?acl',
|
|
environ={'REQUEST_METHOD': 'PUT'},
|
|
headers={'Authorization': 'AWS test:tester:hmac',
|
|
'Date': self.get_date_header()},
|
|
body=xml)
|
|
status, headers, body = self.call_s3api(req)
|
|
self.assertEqual(status.split()[0], '200')
|
|
|
|
req = Request.blank('/bucket?acl',
|
|
environ={'REQUEST_METHOD': 'PUT',
|
|
'wsgi.input': BytesIO(xml)},
|
|
headers={'Authorization': 'AWS test:tester:hmac',
|
|
'Date': self.get_date_header(),
|
|
'Transfer-Encoding': 'chunked'})
|
|
self.assertIsNone(req.content_length)
|
|
self.assertIsNone(req.message_length())
|
|
status, headers, body = self.call_s3api(req)
|
|
self.assertEqual(status.split()[0], '200')
|
|
|
|
def test_bucket_canned_acl_PUT(self):
|
|
req = Request.blank('/bucket?acl',
|
|
environ={'REQUEST_METHOD': 'PUT'},
|
|
headers={'Authorization': 'AWS test:tester:hmac',
|
|
'Date': self.get_date_header(),
|
|
'X-AMZ-ACL': 'public-read'})
|
|
status, headers, body = self.call_s3api(req)
|
|
self.assertEqual(status.split()[0], '200')
|
|
|
|
@s3acl(s3acl_only=True)
|
|
def test_bucket_canned_acl_PUT_with_s3acl(self):
|
|
req = Request.blank('/bucket?acl',
|
|
environ={'REQUEST_METHOD': 'PUT'},
|
|
headers={'Authorization': 'AWS test:tester:hmac',
|
|
'Date': self.get_date_header(),
|
|
'X-AMZ-ACL': 'public-read'})
|
|
with mock.patch('swift.common.middleware.s3api.s3request.'
|
|
'handle_acl_header') as mock_handler:
|
|
status, headers, body = self.call_s3api(req)
|
|
self.assertEqual(status.split()[0], '200')
|
|
self.assertEqual(mock_handler.call_count, 0)
|
|
|
|
def test_bucket_fails_with_both_acl_header_and_xml_PUT(self):
|
|
elem = Element('AccessControlPolicy')
|
|
owner = SubElement(elem, 'Owner')
|
|
SubElement(owner, 'ID').text = 'id'
|
|
acl = SubElement(elem, 'AccessControlList')
|
|
grant = SubElement(acl, 'Grant')
|
|
grantee = SubElement(grant, 'Grantee', nsmap={'xsi': XMLNS_XSI})
|
|
grantee.set('{%s}type' % XMLNS_XSI, 'Group')
|
|
SubElement(grantee, 'URI').text = \
|
|
'http://acs.amazonaws.com/groups/global/AllUsers'
|
|
SubElement(grant, 'Permission').text = 'READ'
|
|
|
|
xml = tostring(elem)
|
|
req = Request.blank('/bucket?acl',
|
|
environ={'REQUEST_METHOD': 'PUT'},
|
|
headers={'Authorization': 'AWS test:tester:hmac',
|
|
'Date': self.get_date_header(),
|
|
'X-AMZ-ACL': 'public-read'},
|
|
body=xml)
|
|
status, headers, body = self.call_s3api(req)
|
|
self.assertEqual(self._get_error_code(body),
|
|
'UnexpectedContent')
|
|
|
|
def _test_put_no_body(self, use_content_length=False,
|
|
use_transfer_encoding=False, string_to_md5=b''):
|
|
content_md5 = base64.b64encode(
|
|
md5(string_to_md5, usedforsecurity=False).digest()).strip()
|
|
with UnreadableInput(self) as fake_input:
|
|
req = Request.blank(
|
|
'/bucket?acl',
|
|
environ={
|
|
'REQUEST_METHOD': 'PUT',
|
|
'wsgi.input': fake_input},
|
|
headers={
|
|
'Authorization': 'AWS test:tester:hmac',
|
|
'Date': self.get_date_header(),
|
|
'Content-MD5': content_md5},
|
|
body='')
|
|
if not use_content_length:
|
|
req.environ.pop('CONTENT_LENGTH')
|
|
if use_transfer_encoding:
|
|
req.environ['HTTP_TRANSFER_ENCODING'] = 'chunked'
|
|
status, headers, body = self.call_s3api(req)
|
|
self.assertEqual(status, '400 Bad Request')
|
|
self.assertEqual(self._get_error_code(body), 'MissingSecurityHeader')
|
|
self.assertEqual(self._get_error_message(body),
|
|
'Your request was missing a required header.')
|
|
self.assertIn(b'<MissingHeaderName>x-amz-acl</MissingHeaderName>',
|
|
body)
|
|
|
|
@s3acl
|
|
def test_bucket_fails_with_neither_acl_header_nor_xml_PUT(self):
|
|
self._test_put_no_body()
|
|
self._test_put_no_body(string_to_md5=b'test')
|
|
self._test_put_no_body(use_content_length=True)
|
|
self._test_put_no_body(use_content_length=True, string_to_md5=b'test')
|
|
self._test_put_no_body(use_transfer_encoding=True)
|
|
self._test_put_no_body(use_transfer_encoding=True, string_to_md5=b'zz')
|
|
|
|
def test_object_acl_GET(self):
|
|
req = Request.blank('/bucket/object?acl',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'Authorization': 'AWS test:tester:hmac',
|
|
'Date': self.get_date_header()})
|
|
status, headers, body = self.call_s3api(req)
|
|
self._check_acl('test:tester', body)
|
|
|
|
def test_invalid_xml(self):
|
|
req = Request.blank('/bucket?acl',
|
|
environ={'REQUEST_METHOD': 'PUT'},
|
|
headers={'Authorization': 'AWS test:tester:hmac',
|
|
'Date': self.get_date_header()},
|
|
body='invalid')
|
|
status, headers, body = self.call_s3api(req)
|
|
self.assertEqual(self._get_error_code(body), 'MalformedACLError')
|
|
|
|
def test_handle_acl_header(self):
|
|
def check_generated_acl_header(acl, targets):
|
|
req = Request.blank('/bucket',
|
|
headers={'X-Amz-Acl': acl})
|
|
handle_acl_header(req)
|
|
for target in targets:
|
|
self.assertTrue(target[0] in req.headers)
|
|
self.assertEqual(req.headers[target[0]], target[1])
|
|
|
|
check_generated_acl_header('public-read',
|
|
[('X-Container-Read', '.r:*,.rlistings')])
|
|
check_generated_acl_header('public-read-write',
|
|
[('X-Container-Read', '.r:*,.rlistings'),
|
|
('X-Container-Write', '.r:*')])
|
|
check_generated_acl_header('private',
|
|
[('X-Container-Read', '.'),
|
|
('X-Container-Write', '.')])
|
|
|
|
@s3acl(s3acl_only=True)
|
|
def test_handle_acl_header_with_s3acl(self):
|
|
def check_generated_acl_header(acl, targets):
|
|
req = Request.blank('/bucket',
|
|
headers={'X-Amz-Acl': acl})
|
|
for target in targets:
|
|
self.assertTrue(target not in req.headers)
|
|
self.assertTrue('HTTP_X_AMZ_ACL' in req.environ)
|
|
# TODO: add transration and assertion for s3acl
|
|
|
|
check_generated_acl_header('public-read',
|
|
['X-Container-Read'])
|
|
check_generated_acl_header('public-read-write',
|
|
['X-Container-Read', 'X-Container-Write'])
|
|
check_generated_acl_header('private',
|
|
['X-Container-Read', 'X-Container-Write'])
|
|
|
|
def test_handle_acl_with_invalid_header_string(self):
|
|
req = Request.blank('/bucket', headers={'X-Amz-Acl': 'invalid'})
|
|
with self.assertRaises(InvalidArgument) as cm:
|
|
handle_acl_header(req)
|
|
self.assertTrue('argument_name' in cm.exception.info)
|
|
self.assertEqual(cm.exception.info['argument_name'], 'x-amz-acl')
|
|
self.assertTrue('argument_value' in cm.exception.info)
|
|
self.assertEqual(cm.exception.info['argument_value'], 'invalid')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|