swift/test/unit/common/middleware/crypto/test_kmip_keymaster.py
Alistair Coles 1951dc7e9a Add keymaster to fetch root secret from KMIP service
Add a new middleware that can be used to fetch an encryption root
secret from a KMIP service. The middleware uses a PyKMIP client
to interact with a KMIP endpoint. The middleware is configured with
a unique identifier for the key to be fetched and options required
for the PyKMIP client.

Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Change-Id: Ib0943fb934b347060fc66c091673a33bcfac0a6d
2018-07-03 09:00:21 +01:00

192 lines
7.3 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2018 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import os
import unittest
from tempfile import mkdtemp
from textwrap import dedent
from shutil import rmtree
import sys
sys.modules['kmip'] = mock.Mock()
sys.modules['kmip.pie'] = mock.Mock()
sys.modules['kmip.pie.client'] = mock.Mock()
from swift.common.middleware.crypto.kmip_keymaster import KmipKeyMaster
class MockProxyKmipClient(object):
def __init__(self, secret):
self.secret = secret
self.uid = None
def get(self, uid):
self.uid = uid
return self.secret
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
def create_secret(algorithm_name, length, value):
algorithm = mock.MagicMock()
algorithm.name = algorithm_name
secret = mock.MagicMock(cryptographic_algorithm=algorithm,
cryptographic_length=length,
value=value)
return secret
def create_mock_client(secret, calls):
def mock_client(*args, **kwargs):
client = MockProxyKmipClient(secret)
calls.append({'args': args, 'kwargs': kwargs, 'client': client})
return client
return mock_client
class TestKmipKeymaster(unittest.TestCase):
def setUp(self):
self.tempdir = mkdtemp()
def tearDown(self):
rmtree(self.tempdir)
def test_config_in_filter_section(self):
conf = {'__file__': '/etc/swift/proxy-server.conf',
'__name__': 'filter:kmip_keymaster',
'key_id': '1234'}
secret = create_secret('AES', 256, b'x' * 32)
calls = []
klass = 'swift.common.middleware.crypto.kmip_keymaster.ProxyKmipClient'
with mock.patch(klass, create_mock_client(secret, calls)):
km = KmipKeyMaster(None, conf)
self.assertEqual(secret.value, km.root_secret)
self.assertIsNone(km.keymaster_config_path)
self.assertEqual({'config_file': '/etc/swift/proxy-server.conf',
'config': 'filter:kmip_keymaster'},
calls[0]['kwargs'])
self.assertEqual('1234', calls[0]['client'].uid)
def test_config_in_separate_file(self):
km_conf = """
[kmip_keymaster]
key_id = 4321
"""
km_config_file = os.path.join(self.tempdir, 'km.conf')
with open(km_config_file, 'wb') as fd:
fd.write(dedent(km_conf))
conf = {'__file__': '/etc/swift/proxy-server.conf',
'__name__': 'filter:kmip_keymaster',
'keymaster_config_path': km_config_file}
secret = create_secret('AES', 256, b'x' * 32)
calls = []
klass = 'swift.common.middleware.crypto.kmip_keymaster.ProxyKmipClient'
with mock.patch(klass, create_mock_client(secret, calls)):
km = KmipKeyMaster(None, conf)
self.assertEqual(secret.value, km.root_secret)
self.assertEqual(km_config_file, km.keymaster_config_path)
self.assertEqual({'config_file': km_config_file,
'config': 'kmip_keymaster'},
calls[0]['kwargs'])
self.assertEqual('4321', calls[0]['client'].uid)
def test_proxy_server_conf_dir(self):
proxy_server_conf_dir = os.path.join(self.tempdir, 'proxy_server.d')
os.mkdir(proxy_server_conf_dir)
# KmipClient can't read conf from a dir, so check that is caught early
conf = {'__file__': proxy_server_conf_dir,
'__name__': 'filter:kmip_keymaster',
'key_id': '789'}
with self.assertRaises(ValueError) as cm:
KmipKeyMaster(None, conf)
self.assertIn('config cannot be read from conf dir', str(cm.exception))
# ...but a conf file in a conf dir could point back to itself for the
# KmipClient config
km_config_file = os.path.join(proxy_server_conf_dir, '40.conf')
km_conf = """
[filter:kmip_keymaster]
keymaster_config_file = %s
[kmip_keymaster]
key_id = 789
""" % km_config_file
with open(km_config_file, 'wb') as fd:
fd.write(dedent(km_conf))
conf = {'__file__': proxy_server_conf_dir,
'__name__': 'filter:kmip_keymaster',
'keymaster_config_path': km_config_file}
secret = create_secret('AES', 256, b'x' * 32)
calls = []
klass = 'swift.common.middleware.crypto.kmip_keymaster.ProxyKmipClient'
with mock.patch(klass, create_mock_client(secret, calls)):
km = KmipKeyMaster(None, conf)
self.assertEqual(secret.value, km.root_secret)
self.assertEqual(km_config_file, km.keymaster_config_path)
self.assertEqual({'config_file': km_config_file,
'config': 'kmip_keymaster'},
calls[0]['kwargs'])
self.assertEqual('789', calls[0]['client'].uid)
def test_bad_key_length(self):
conf = {'__file__': '/etc/swift/proxy-server.conf',
'__name__': 'filter:kmip_keymaster',
'key_id': '1234'}
secret = create_secret('AES', 128, b'x' * 16)
calls = []
klass = 'swift.common.middleware.crypto.kmip_keymaster.ProxyKmipClient'
with mock.patch(klass, create_mock_client(secret, calls)):
with self.assertRaises(ValueError) as cm:
KmipKeyMaster(None, conf)
self.assertIn('Expected an AES-256 key', str(cm.exception))
self.assertEqual({'config_file': '/etc/swift/proxy-server.conf',
'config': 'filter:kmip_keymaster'},
calls[0]['kwargs'])
self.assertEqual('1234', calls[0]['client'].uid)
def test_bad_key_algorithm(self):
conf = {'__file__': '/etc/swift/proxy-server.conf',
'__name__': 'filter:kmip_keymaster',
'key_id': '1234'}
secret = create_secret('notAES', 256, b'x' * 32)
calls = []
klass = 'swift.common.middleware.crypto.kmip_keymaster.ProxyKmipClient'
with mock.patch(klass, create_mock_client(secret, calls)):
with self.assertRaises(ValueError) as cm:
KmipKeyMaster(None, conf)
self.assertIn('Expected an AES-256 key', str(cm.exception))
self.assertEqual({'config_file': '/etc/swift/proxy-server.conf',
'config': 'filter:kmip_keymaster'},
calls[0]['kwargs'])
self.assertEqual('1234', calls[0]['client'].uid)
def test_missing_key_id(self):
conf = {'__file__': '/etc/swift/proxy-server.conf',
'__name__': 'filter:kmip_keymaster'}
with self.assertRaises(ValueError) as cm:
KmipKeyMaster(None, conf)
self.assertIn('key_id option is required', str(cm.exception))