Fernando Ferraz 39883f2b70 NFS driver: Fix fail creating volume with multiple snapshots
The NFS driver uses qcow2 images with backing files to represent
volume snapshots, which is not allowed for qcow2 disk images
downloaded from glance.  The driver uses cinder.image_utils to
convert a qcow2 snapshot to a raw volume; this was not a problem
for the first snapshot, whose backing file is raw, and hence passed
the image format inspector, but the second snapshot has a qcow2
backing file, which the image_utils were rejecting as a security
risk.  Thus we now pass the qemu_img_info from the backing image as
an additional parameter to the image convert call, which indicates
that the file has already been screened and allows the conversion
to occur.

Co-authored-by: Fernando Ferraz <fesilva@redhat.com>
Co-authored-by: Brian Rosmaita <rosmaita.fossdev@gmail.com>

Closes-bug: #2074377
Change-Id: I49404e87eb0c77b4ed92918404f86c073fbfd713
(cherry picked from commit 9687fbac79b0af266dcefb89b8ecc5c2940f6c80)
(cherry picked from commit ca82907201bd79a0d3bb172b52da3689ba541396)
2025-04-23 16:37:57 +00:00

1819 lines
73 KiB
Python

# Copyright (c) 2012 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for the NFS driver module."""
import errno
import os
from unittest import mock
import castellan
import ddt
from oslo_utils import imageutils
from oslo_utils import units
from cinder import context
from cinder import exception
from cinder.image import image_utils
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.keymgr import fake as fake_keymgr
from cinder.tests.unit import test
from cinder.volume import configuration as conf
from cinder.volume.drivers import nfs
from cinder.volume.drivers import remotefs
from cinder.volume import volume_utils
class KeyObject(object):
def get_encoded(arg):
return "asdf".encode('utf-8')
class RemoteFsDriverTestCase(test.TestCase):
TEST_FILE_NAME = 'test.txt'
TEST_EXPORT = 'nas-host1:/export'
TEST_MNT_POINT = '/mnt/nas'
def setUp(self):
super(RemoteFsDriverTestCase, self).setUp()
self._driver = remotefs.RemoteFSDriver()
self.configuration = conf.Configuration(None)
self.configuration.append_config_values(nfs.nfs_opts)
self.configuration.append_config_values(remotefs.nas_opts)
self.override_config('nas_secure_file_permissions', 'false')
self.override_config('nas_secure_file_operations', 'false')
self.override_config('nfs_snapshot_support', True)
self.override_config('max_over_subscription_ratio', 1.0)
self.override_config('reserved_percentage', 5)
self._driver = remotefs.RemoteFSDriver(
configuration=self.configuration)
mock_exc = mock.patch.object(self._driver, '_execute')
self._execute = mock_exc.start()
self.addCleanup(mock_exc.stop)
def test_create_sparsed_file(self):
self._driver._create_sparsed_file('/path', 1)
self._execute.assert_called_once_with('truncate', '-s', '1G',
'/path', run_as_root=True)
def test_create_regular_file(self):
self._driver._create_regular_file('/path', 1)
self._execute.assert_called_once_with('dd', 'if=/dev/zero',
'of=/path', 'bs=1M',
'count=1024', run_as_root=True)
def test_create_qcow2_file(self):
file_size = 1
self._driver._create_qcow2_file('/path', file_size)
self._execute.assert_called_once_with('qemu-img', 'create', '-f',
'qcow2', '-o',
'preallocation=metadata',
'/path', '%s' %
str(file_size * units.Gi),
run_as_root=True)
def test_set_rw_permissions_for_all(self):
self._driver._set_rw_permissions_for_all('/path')
self._execute.assert_called_once_with('chmod', 'ugo+rw', '/path',
run_as_root=True)
@mock.patch.object(remotefs, 'LOG')
def test_set_rw_permissions_with_secure_file_permissions(self, LOG):
self._driver._mounted_shares = [self.TEST_EXPORT]
self.override_config('nas_secure_file_permissions', 'true')
self._driver._set_rw_permissions(self.TEST_FILE_NAME)
self.assertFalse(LOG.warning.called)
@mock.patch.object(remotefs, 'LOG')
def test_set_rw_permissions_without_secure_file_permissions(self, LOG):
self.override_config('nas_secure_file_permissions', 'false')
self._driver._set_rw_permissions(self.TEST_FILE_NAME)
self.assertTrue(LOG.warning.called)
warn_msg = "%(path)s is being set with open permissions: %(perm)s"
LOG.warning.assert_called_once_with(
warn_msg, {'path': self.TEST_FILE_NAME, 'perm': 'ugo+rw'})
@mock.patch('os.path.join')
@mock.patch('os.path.isfile', return_value=False)
def test_determine_nas_security_options_when_auto_and_new_install(
self,
mock_isfile,
mock_join):
"""Test the setting of the NAS Security Option
In this test case, we will create the marker file. No pre-exxisting
Cinder volumes found during bootup.
"""
self._driver._mounted_shares = [self.TEST_EXPORT]
file_path = '%s/.cinderSecureEnvIndicator' % self.TEST_MNT_POINT
is_new_install = True
self._driver._ensure_shares_mounted = mock.Mock()
nas_mount = self._driver._get_mount_point_for_share = mock.Mock(
return_value=self.TEST_MNT_POINT)
mock_join.return_value = file_path
secure_file_permissions = 'auto'
nas_option = self._driver._determine_nas_security_option_setting(
secure_file_permissions,
nas_mount, is_new_install)
self.assertEqual('true', nas_option)
secure_file_operations = 'auto'
nas_option = self._driver._determine_nas_security_option_setting(
secure_file_operations,
nas_mount, is_new_install)
self.assertEqual('true', nas_option)
@mock.patch('os.path.join')
@mock.patch('os.path.isfile')
def test_determine_nas_security_options_when_auto_and_new_install_exists(
self,
isfile,
join):
"""Test the setting of the NAS Security Option
In this test case, the marker file already exists. Cinder volumes
found during bootup.
"""
drv = self._driver
drv._mounted_shares = [self.TEST_EXPORT]
file_path = '%s/.cinderSecureEnvIndicator' % self.TEST_MNT_POINT
is_new_install = False
drv._ensure_shares_mounted = mock.Mock()
nas_mount = drv._get_mount_point_for_share = mock.Mock(
return_value=self.TEST_MNT_POINT)
join.return_value = file_path
isfile.return_value = True
secure_file_permissions = 'auto'
nas_option = drv._determine_nas_security_option_setting(
secure_file_permissions,
nas_mount, is_new_install)
self.assertEqual('true', nas_option)
secure_file_operations = 'auto'
nas_option = drv._determine_nas_security_option_setting(
secure_file_operations,
nas_mount, is_new_install)
self.assertEqual('true', nas_option)
@mock.patch('os.path.join')
@mock.patch('os.path.isfile')
def test_determine_nas_security_options_when_auto_and_old_install(self,
isfile,
join):
"""Test the setting of the NAS Security Option
In this test case, the marker file does not exist. There are also
pre-existing Cinder volumes.
"""
drv = self._driver
drv._mounted_shares = [self.TEST_EXPORT]
file_path = '%s/.cinderSecureEnvIndicator' % self.TEST_MNT_POINT
is_new_install = False
drv._ensure_shares_mounted = mock.Mock()
nas_mount = drv._get_mount_point_for_share = mock.Mock(
return_value=self.TEST_MNT_POINT)
join.return_value = file_path
isfile.return_value = False
secure_file_permissions = 'auto'
nas_option = drv._determine_nas_security_option_setting(
secure_file_permissions,
nas_mount, is_new_install)
self.assertEqual('false', nas_option)
secure_file_operations = 'auto'
nas_option = drv._determine_nas_security_option_setting(
secure_file_operations,
nas_mount, is_new_install)
self.assertEqual('false', nas_option)
def test_determine_nas_security_options_when_admin_set_true(self):
"""Test the setting of the NAS Security Option
In this test case, the Admin set the flag to 'true'.
"""
drv = self._driver
drv._mounted_shares = [self.TEST_EXPORT]
is_new_install = False
drv._ensure_shares_mounted = mock.Mock()
nas_mount = drv._get_mount_point_for_share = mock.Mock(
return_value=self.TEST_MNT_POINT)
secure_file_permissions = 'true'
nas_option = drv._determine_nas_security_option_setting(
secure_file_permissions,
nas_mount, is_new_install)
self.assertEqual('true', nas_option)
secure_file_operations = 'true'
nas_option = drv._determine_nas_security_option_setting(
secure_file_operations,
nas_mount, is_new_install)
self.assertEqual('true', nas_option)
def test_determine_nas_security_options_when_admin_set_false(self):
"""Test the setting of the NAS Security Option
In this test case, the Admin set the flag to 'false'.
"""
drv = self._driver
drv._mounted_shares = [self.TEST_EXPORT]
is_new_install = False
drv._ensure_shares_mounted = mock.Mock()
nas_mount = drv._get_mount_point_for_share = mock.Mock(
return_value=self.TEST_MNT_POINT)
secure_file_permissions = 'false'
nas_option = drv._determine_nas_security_option_setting(
secure_file_permissions,
nas_mount, is_new_install)
self.assertEqual('false', nas_option)
secure_file_operations = 'false'
nas_option = drv._determine_nas_security_option_setting(
secure_file_operations,
nas_mount, is_new_install)
self.assertEqual('false', nas_option)
@mock.patch.object(remotefs, 'LOG')
def test_set_nas_security_options(self, LOG):
"""Test setting of NAS Security options.
The RemoteFS driver will force set options to false. The derived
objects will provide an inherited interface to properly set options.
"""
drv = self._driver
is_new_install = False
drv.set_nas_security_options(is_new_install)
self.assertEqual('false', drv.configuration.nas_secure_file_operations)
self.assertEqual('false',
drv.configuration.nas_secure_file_permissions)
self.assertTrue(LOG.warning.called)
def test_secure_file_operations_enabled_true(self):
"""Test nas_secure_file_operations = 'true'
Networked file system based drivers may support secure file
operations. This test verifies the settings when secure.
"""
drv = self._driver
self.override_config('nas_secure_file_operations', 'true')
ret_flag = drv.secure_file_operations_enabled()
self.assertTrue(ret_flag)
def test_secure_file_operations_enabled_false(self):
"""Test nas_secure_file_operations = 'false'
Networked file system based drivers may support secure file
operations. This test verifies the settings when not secure.
"""
drv = self._driver
self.override_config('nas_secure_file_operations', 'false')
ret_flag = drv.secure_file_operations_enabled()
self.assertFalse(ret_flag)
# NFS configuration scenarios
NFS_CONFIG1 = {'max_over_subscription_ratio': 1.0,
'reserved_percentage': 0,
'nfs_sparsed_volumes': True,
'nfs_qcow2_volumes': False,
'nas_secure_file_permissions': 'false',
'nas_secure_file_operations': 'false'}
NFS_CONFIG2 = {'max_over_subscription_ratio': 10.0,
'reserved_percentage': 5,
'nfs_sparsed_volumes': False,
'nfs_qcow2_volumes': True,
'nas_secure_file_permissions': 'true',
'nas_secure_file_operations': 'true'}
NFS_CONFIG3 = {'max_over_subscription_ratio': 15.0,
'reserved_percentage': 10,
'nfs_sparsed_volumes': False,
'nfs_qcow2_volumes': False,
'nas_secure_file_permissions': 'auto',
'nas_secure_file_operations': 'auto'}
NFS_CONFIG4 = {'max_over_subscription_ratio': 20.0,
'reserved_percentage': 60,
'nfs_sparsed_volumes': True,
'nfs_qcow2_volumes': True,
'nas_secure_file_permissions': 'false',
'nas_secure_file_operations': 'true'}
QEMU_IMG_INFO_OUT1 = """{
"filename": "%(volid)s",
"format": "raw",
"virtual-size": %(size_b)s,
"actual-size": 173000
}"""
QEMU_IMG_INFO_OUT2 = """{
"filename": "%(volid)s",
"format": "qcow2",
"virtual-size": %(size_b)s,
"actual-size": 196000,
"cluster-size": 65536,
"format-specific": {
"compat": "1.1",
"lazy-refcounts": false,
"refcount-bits": 16,
"corrupt": false
}
}"""
QEMU_IMG_INFO_OUT3 = """{
"filename": "volume-%(volid)s.%(snapid)s",
"format": "qcow2",
"virtual-size": %(size_b)s,
"actual-size": 196000,
"cluster-size": 65536,
"backing-filename": "volume-%(volid)s",
"backing-filename-format": "qcow2",
"format-specific": {
"compat": "1.1",
"lazy-refcounts": false,
"refcount-bits": 16,
"corrupt": false
}
}"""
QEMU_IMG_INFO_OUT4 = """{
"filename": "volume-%(volid)s.%(snapid)s",
"format": "raw",
"virtual-size": %(size_b)s,
"actual-size": 196000,
"cluster-size": 65536,
"backing-filename": "volume-%(volid)s",
"backing-filename-format": "raw"
}"""
QEMU_IMG_INFO_OUT5 = """{
"filename": "volume-%(volid)s.%(snapid)s",
"format": "qcow2",
"virtual-size": %(size_b)s,
"actual-size": 196000,
"encrypted": true,
"cluster-size": 65536,
"backing-filename": "volume-%(volid)s",
"backing-filename-format": "raw",
"format-specific": {
"type": "luks",
"data": {
"ivgen-alg": "plain64",
"hash-alg": "sha256",
"cipher-alg": "aes-256",
"uuid": "386f8626-33f0-4683-a517-78ddfe385e33",
"cipher-mode": "xts",
"slots": [
{
"active": true,
"iters": 1892498,
"key offset": 4096,
"stripes": 4000
},
{
"active": false,
"key offset": 262144
},
{
"active": false,
"key offset": 520192
},
{
"active": false,
"key offset": 778240
},
{
"active": false,
"key offset": 1036288
},
{
"active": false,
"key offset": 1294336
},
{
"active": false,
"key offset": 1552384
},
{
"active": false,
"key offset": 1810432
}
],
"payload-offset": 2068480,
"master-key-iters": 459347
},
"corrupt": false
}
}"""
@ddt.ddt
class NfsDriverTestCase(test.TestCase):
"""Test case for NFS driver."""
TEST_NFS_HOST = 'nfs-host1'
TEST_NFS_SHARE_PATH = '/export'
TEST_NFS_EXPORT1 = '%s:%s' % (TEST_NFS_HOST, TEST_NFS_SHARE_PATH)
TEST_NFS_EXPORT2 = 'nfs-host2:/export'
TEST_NFS_EXPORT2_OPTIONS = '-o intr'
TEST_SIZE_IN_GB = 1
TEST_MNT_POINT = '/mnt/nfs'
TEST_MNT_POINT_BASE_EXTRA_SLASH = '/opt/stack/data/cinder//mnt'
TEST_MNT_POINT_BASE = '/mnt/test'
TEST_LOCAL_PATH = '/mnt/nfs/volume-123'
TEST_FILE_NAME = 'test.txt'
TEST_SHARES_CONFIG_FILE = '/etc/cinder/test-shares.conf'
TEST_NFS_EXPORT_SPACES = 'nfs-host3:/export this'
TEST_MNT_POINT_SPACES = '/ 0 0 0 /foo'
def setUp(self):
super(NfsDriverTestCase, self).setUp()
self.configuration = conf.Configuration(None)
self.configuration.append_config_values(nfs.nfs_opts)
self.configuration.append_config_values(remotefs.nas_opts)
self.override_config('max_over_subscription_ratio', 1.0)
self.override_config('reserved_percentage', 5)
self.override_config('nfs_shares_config', None)
self.override_config('nfs_sparsed_volumes', True)
self.override_config('reserved_percentage', 5.0)
self.override_config('nfs_mount_point_base', self.TEST_MNT_POINT_BASE)
self.override_config('nfs_mount_options', None)
self.override_config('nfs_mount_attempts', 3)
self.override_config('nfs_qcow2_volumes', False)
self.override_config('nas_secure_file_permissions', 'false')
self.override_config('nas_secure_file_operations', 'false')
self.override_config('nas_host', None)
self.override_config('nas_share_path', None)
self.override_config('nas_mount_options', None)
self.override_config('volume_dd_blocksize', '1M')
self.mock_object(volume_utils, 'get_max_over_subscription_ratio',
return_value=1)
self.context = context.get_admin_context()
def _set_driver(self, extra_confs=None):
# Overide the default configs
if extra_confs:
for config_name, config_value in extra_confs.items():
setattr(self.configuration, config_name, config_value)
self._driver = nfs.NfsDriver(configuration=self.configuration)
self._driver.shares = {}
self.mock_object(self._driver, '_execute')
@ddt.data(NFS_CONFIG1, NFS_CONFIG2, NFS_CONFIG3, NFS_CONFIG4)
def test_local_path(self, nfs_config):
"""local_path common use case."""
self.override_config('nfs_mount_point_base', self.TEST_MNT_POINT_BASE)
self._set_driver(extra_confs=nfs_config)
drv = self._driver
volume = fake_volume.fake_volume_obj(
self.context,
provider_location=self.TEST_NFS_EXPORT1)
self.assertEqual(
'/mnt/test/2f4f60214cf43c595666dd815f0360a4/%s' % volume.name,
drv.local_path(volume))
@ddt.data(NFS_CONFIG1, NFS_CONFIG2, NFS_CONFIG3, NFS_CONFIG4)
def test_copy_image_to_volume(self, nfs_config):
"""resize_image common case usage."""
mock_resize = self.mock_object(image_utils, 'resize_image')
mock_fetch = self.mock_object(image_utils, 'fetch_to_raw')
self._set_driver()
drv = self._driver
volume = fake_volume.fake_volume_obj(self.context,
size=self.TEST_SIZE_IN_GB)
test_img_source = 'volume-%s' % volume.id
self.mock_object(drv, 'local_path', return_value=test_img_source)
data = mock.Mock()
data.virtual_size = 1 * units.Gi
self.mock_object(image_utils, 'qemu_img_info', return_value=data)
drv.copy_image_to_volume(None, volume, None, None)
mock_fetch.assert_called_once_with(
None, None, None, test_img_source, mock.ANY, run_as_root=True,
size=self.TEST_SIZE_IN_GB, disable_sparse=False)
mock_resize.assert_called_once_with(test_img_source,
self.TEST_SIZE_IN_GB,
run_as_root=True)
def test_get_mount_point_for_share(self):
"""_get_mount_point_for_share should calculate correct value."""
self._set_driver()
drv = self._driver
self.override_config('nfs_mount_point_base', self.TEST_MNT_POINT_BASE)
self.assertEqual('/mnt/test/2f4f60214cf43c595666dd815f0360a4',
drv._get_mount_point_for_share(self.TEST_NFS_EXPORT1))
def test_get_mount_point_for_share_given_extra_slash_in_state_path(self):
"""_get_mount_point_for_share should calculate correct value."""
# This test gets called with the extra slash
self.override_config('nfs_mount_point_base',
self.TEST_MNT_POINT_BASE_EXTRA_SLASH)
# The driver gets called with the correct configuration and removes
# the extra slash
drv = nfs.NfsDriver(configuration=self.configuration)
self.assertEqual('/opt/stack/data/cinder/mnt', drv.base)
self.assertEqual(
'/opt/stack/data/cinder/mnt/2f4f60214cf43c595666dd815f0360a4',
drv._get_mount_point_for_share(self.TEST_NFS_EXPORT1))
def test_get_capacity_info(self):
"""_get_capacity_info should calculate correct value."""
self._set_driver()
drv = self._driver
stat_total_size = 2620544
stat_avail = 2129984
stat_output = '1 %d %d' % (stat_total_size, stat_avail)
du_used = 490560
du_output = '%d /mnt' % du_used
with mock.patch.object(
drv, '_get_mount_point_for_share') as mock_get_mount:
mock_get_mount.return_value = self.TEST_MNT_POINT
drv._execute.side_effect = [(stat_output, None),
(du_output, None)]
self.assertEqual((stat_total_size, stat_avail, du_used),
drv._get_capacity_info(self.TEST_NFS_EXPORT1))
mock_get_mount.assert_called_once_with(self.TEST_NFS_EXPORT1)
calls = [mock.call('stat', '-f', '-c', '%S %b %a',
self.TEST_MNT_POINT, run_as_root=True),
mock.call('du', '-sb', '--apparent-size',
'--exclude', '*snapshot*',
self.TEST_MNT_POINT, run_as_root=True)]
drv._execute.assert_has_calls(calls)
def test_get_capacity_info_for_share_and_mount_point_with_spaces(self):
"""_get_capacity_info should calculate correct value."""
self._set_driver()
drv = self._driver
stat_total_size = 2620544
stat_avail = 2129984
stat_output = '1 %d %d' % (stat_total_size, stat_avail)
du_used = 490560
du_output = '%d /mnt' % du_used
with mock.patch.object(
drv, '_get_mount_point_for_share') as mock_get_mount:
mock_get_mount.return_value = self.TEST_MNT_POINT_SPACES
drv._execute.side_effect = [(stat_output, None),
(du_output, None)]
self.assertEqual((stat_total_size, stat_avail, du_used),
drv._get_capacity_info(
self.TEST_NFS_EXPORT_SPACES))
mock_get_mount.assert_called_once_with(
self.TEST_NFS_EXPORT_SPACES)
calls = [mock.call('stat', '-f', '-c', '%S %b %a',
self.TEST_MNT_POINT_SPACES, run_as_root=True),
mock.call('du', '-sb', '--apparent-size',
'--exclude', '*snapshot*',
self.TEST_MNT_POINT_SPACES, run_as_root=True)]
drv._execute.assert_has_calls(calls)
def test_load_shares_config(self):
self._set_driver()
drv = self._driver
drv.configuration.nfs_shares_config = self.TEST_SHARES_CONFIG_FILE
with mock.patch.object(
drv, '_read_config_file') as mock_read_config:
config_data = []
config_data.append(self.TEST_NFS_EXPORT1)
config_data.append('#' + self.TEST_NFS_EXPORT2)
config_data.append('')
config_data.append(self.TEST_NFS_EXPORT2 + ' ' +
self.TEST_NFS_EXPORT2_OPTIONS)
config_data.append('broken:share_format')
mock_read_config.return_value = config_data
drv._load_shares_config(drv.configuration.nfs_shares_config)
mock_read_config.assert_called_once_with(
self.TEST_SHARES_CONFIG_FILE)
self.assertIn(self.TEST_NFS_EXPORT1, drv.shares)
self.assertIn(self.TEST_NFS_EXPORT2, drv.shares)
self.assertEqual(2, len(drv.shares))
self.assertEqual(self.TEST_NFS_EXPORT2_OPTIONS,
drv.shares[self.TEST_NFS_EXPORT2])
def test_load_shares_config_nas_opts(self):
self._set_driver()
drv = self._driver
drv.configuration.nas_host = self.TEST_NFS_HOST
drv.configuration.nas_share_path = self.TEST_NFS_SHARE_PATH
drv.configuration.nfs_shares_config = self.TEST_SHARES_CONFIG_FILE
drv._load_shares_config(drv.configuration.nfs_shares_config)
self.assertIn(self.TEST_NFS_EXPORT1, drv.shares)
self.assertEqual(1, len(drv.shares))
def test_ensure_shares_mounted_should_save_mounting_successfully(self):
"""_ensure_shares_mounted should save share if mounted with success."""
self._set_driver()
drv = self._driver
config_data = []
config_data.append(self.TEST_NFS_EXPORT1)
drv.configuration.nfs_shares_config = self.TEST_SHARES_CONFIG_FILE
with mock.patch.object(
drv, '_read_config_file') as mock_read_config:
with mock.patch.object(
drv, '_ensure_share_mounted') as mock_ensure:
mock_read_config.return_value = config_data
drv._ensure_share_mounted(self.TEST_NFS_EXPORT1)
mock_ensure.assert_called_once_with(self.TEST_NFS_EXPORT1)
@mock.patch.object(remotefs, 'LOG')
def test_ensure_shares_mounted_should_not_save_mounting_with_error(self,
LOG):
"""_ensure_shares_mounted should not save share if failed to mount."""
self._set_driver()
drv = self._driver
config_data = []
config_data.append(self.TEST_NFS_EXPORT1)
drv.configuration.nfs_shares_config = self.TEST_SHARES_CONFIG_FILE
with mock.patch.object(
drv, '_read_config_file') as mock_read_config:
with mock.patch.object(
drv, '_ensure_share_mounted') as mock_ensure:
mock_read_config.return_value = config_data
drv._ensure_share_mounted()
self.assertEqual(0, len(drv._mounted_shares))
mock_ensure.assert_called_once_with()
def test_find_share_should_throw_error_if_there_is_no_mounted_share(self):
"""_find_share should throw error if there is no mounted shares."""
self._set_driver()
drv = self._driver
drv._mounted_shares = []
self.assertRaises(exception.NfsNoSharesMounted, drv._find_share,
self._simple_volume())
def test_find_share(self):
"""_find_share simple use case."""
self._set_driver()
drv = self._driver
drv._mounted_shares = [self.TEST_NFS_EXPORT1, self.TEST_NFS_EXPORT2]
volume = fake_volume.fake_volume_obj(self.context,
size=self.TEST_SIZE_IN_GB)
with mock.patch.object(
drv, '_get_capacity_info') as mock_get_capacity_info:
mock_get_capacity_info.side_effect = [
(5 * units.Gi, 2 * units.Gi, 2 * units.Gi),
(10 * units.Gi, 3 * units.Gi, 1 * units.Gi)]
self.assertEqual(self.TEST_NFS_EXPORT2,
drv._find_share(volume))
calls = [mock.call(self.TEST_NFS_EXPORT1),
mock.call(self.TEST_NFS_EXPORT2)]
mock_get_capacity_info.assert_has_calls(calls)
self.assertEqual(2, mock_get_capacity_info.call_count)
def test_find_share_should_throw_error_if_there_is_not_enough_space(self):
"""_find_share should throw error if there is no share to host vol."""
self._set_driver()
drv = self._driver
drv._mounted_shares = [self.TEST_NFS_EXPORT1, self.TEST_NFS_EXPORT2]
with mock.patch.object(
drv, '_get_capacity_info') as mock_get_capacity_info:
mock_get_capacity_info.side_effect = [
(5 * units.Gi, 0, 5 * units.Gi),
(10 * units.Gi, 0, 10 * units.Gi)]
self.assertRaises(exception.NfsNoSuitableShareFound,
drv._find_share, self._simple_volume())
calls = [mock.call(self.TEST_NFS_EXPORT1),
mock.call(self.TEST_NFS_EXPORT2)]
mock_get_capacity_info.assert_has_calls(calls)
self.assertEqual(2, mock_get_capacity_info.call_count)
def _simple_volume(self, size=10):
loc = self.TEST_NFS_EXPORT1
return fake_volume.fake_volume_obj(self.context,
display_name='volume_name',
provider_location=loc,
size=size)
def _simple_encrypted_volume(self, size=10):
loc = self.TEST_NFS_EXPORT1
info_dic = {'name': u'volume-0000000a',
'id': '55555555-222f-4b32-b585-9991b3bf0a99',
'size': size,
'encryption_key_id': fake.ENCRYPTION_KEY_ID}
return fake_volume.fake_volume_obj(self.context,
provider_location=loc,
**info_dic)
def test_get_provisioned_capacity(self):
self._set_driver()
drv = self._driver
mock_execute = self.mock_object(drv, '_execute')
mock_execute.return_value = ("148418423\t/dir", "")
with mock.patch.object(drv, 'shares') as shares:
shares.keys.return_value = {'192.0.2.1:/srv/nfs1'}
shares.return_value = {'192.0.2.1:/srv/nfs1', ''}
ret = drv._get_provisioned_capacity()
self.assertEqual(ret, 0.14)
@mock.patch('cinder.objects.volume.Volume.save')
def test_create_sparsed_volume(self, mock_save):
self._set_driver()
drv = self._driver
volume = self._simple_volume()
self.override_config('nfs_sparsed_volumes', True)
with mock.patch.object(
drv, '_create_sparsed_file') as mock_create_sparsed_file:
with mock.patch.object(
drv, '_set_rw_permissions') as mock_set_rw_permissions:
drv._do_create_volume(volume)
mock_create_sparsed_file.assert_called_once_with(mock.ANY,
mock.ANY)
mock_set_rw_permissions.assert_called_once_with(mock.ANY)
@mock.patch('cinder.objects.volume.Volume.save')
def test_create_nonsparsed_volume(self, mock_save):
self._set_driver()
drv = self._driver
self.override_config('nfs_sparsed_volumes', False)
volume = self._simple_volume()
with mock.patch.object(
drv, '_create_regular_file') as mock_create_regular_file:
with mock.patch.object(
drv, '_set_rw_permissions') as mock_set_rw_permissions:
drv._do_create_volume(volume)
mock_create_regular_file.assert_called_once_with(mock.ANY,
mock.ANY)
mock_set_rw_permissions.assert_called_once_with(mock.ANY)
@mock.patch.object(nfs, 'LOG')
def test_create_volume_should_ensure_nfs_mounted(self, mock_log):
"""create_volume ensures shares provided in config are mounted."""
self._set_driver()
drv = self._driver
drv._find_share = mock.Mock()
drv._find_share.return_value = self.TEST_NFS_EXPORT1
drv._do_create_volume = mock.Mock()
with mock.patch.object(
drv, '_ensure_share_mounted') as mock_ensure_share:
drv._ensure_share_mounted()
volume = fake_volume.fake_volume_obj(self.context,
size=self.TEST_SIZE_IN_GB)
drv.create_volume(volume)
mock_ensure_share.assert_called_once_with()
@mock.patch.object(nfs, 'LOG')
def test_create_volume_should_return_provider_location(self, mock_log):
"""create_volume should return provider_location with found share."""
self._set_driver()
drv = self._driver
drv._ensure_shares_mounted = mock.Mock()
drv._do_create_volume = mock.Mock()
with mock.patch.object(drv, '_find_share') as mock_find_share:
mock_find_share.return_value = self.TEST_NFS_EXPORT1
volume = fake_volume.fake_volume_obj(self.context,
size=self.TEST_SIZE_IN_GB)
result = drv.create_volume(volume)
self.assertEqual(self.TEST_NFS_EXPORT1,
result['provider_location'])
mock_find_share.assert_called_once_with(volume)
def test_delete_volume(self):
"""delete_volume simple test case."""
self._set_driver()
drv = self._driver
drv._ensure_share_mounted = mock.Mock()
volume = fake_volume.fake_volume_obj(
self.context,
display_name='volume-123',
provider_location=self.TEST_NFS_EXPORT1)
with mock.patch.object(drv, 'local_path') as mock_local_path:
mock_local_path.return_value = self.TEST_LOCAL_PATH
drv.delete_volume(volume)
mock_local_path.assert_called_with(volume)
drv._execute.assert_called_once()
def test_delete_should_ensure_share_mounted(self):
"""delete_volume should ensure that corresponding share is mounted."""
self._set_driver()
drv = self._driver
volume = fake_volume.fake_volume_obj(
self.context,
display_name='volume-123',
provider_location=self.TEST_NFS_EXPORT1)
with mock.patch.object(drv, '_ensure_share_mounted'):
drv.delete_volume(volume)
def test_delete_should_not_delete_if_provider_location_not_provided(self):
"""delete_volume shouldn't delete if provider_location missed."""
self._set_driver()
drv = self._driver
volume = fake_volume.fake_volume_obj(self.context,
name='volume-123',
provider_location=None)
with mock.patch.object(drv, '_ensure_share_mounted'):
drv.delete_volume(volume)
self.assertFalse(drv._execute.called)
def test_get_volume_stats(self):
"""get_volume_stats must fill the correct values."""
self._set_driver()
drv = self._driver
drv._mounted_shares = [self.TEST_NFS_EXPORT1, self.TEST_NFS_EXPORT2]
with mock.patch.object(
drv, '_ensure_shares_mounted') as mock_ensure_share:
with mock.patch.object(
drv, '_get_capacity_info') as mock_get_capacity_info:
mock_get_capacity_info.side_effect = [
(10 * units.Gi, 2 * units.Gi, 2 * units.Gi),
(20 * units.Gi, 3 * units.Gi, 3 * units.Gi)]
drv._ensure_shares_mounted()
drv.get_volume_stats()
calls = [mock.call(self.TEST_NFS_EXPORT1),
mock.call(self.TEST_NFS_EXPORT2)]
mock_get_capacity_info.assert_has_calls(calls)
self.assertTrue(mock_ensure_share.called)
self.assertEqual(30.0, drv._stats['total_capacity_gb'])
self.assertEqual(5.0, drv._stats['free_capacity_gb'])
self.assertEqual(5, drv._stats['reserved_percentage'])
self.assertTrue(drv._stats['sparse_copy_volume'])
def test_get_volume_stats_with_non_zero_reserved_percentage(self):
"""get_volume_stats must fill the correct values."""
self.override_config('reserved_percentage', 10.0)
drv = nfs.NfsDriver(configuration=self.configuration)
drv._mounted_shares = [self.TEST_NFS_EXPORT1, self.TEST_NFS_EXPORT2]
with mock.patch.object(
drv, '_ensure_shares_mounted') as mock_ensure_share:
with mock.patch.object(
drv, '_get_capacity_info') as mock_get_capacity_info:
mock_get_capacity_info.side_effect = [
(10 * units.Gi, 2 * units.Gi, 2 * units.Gi),
(20 * units.Gi, 3 * units.Gi, 3 * units.Gi)]
drv._ensure_shares_mounted()
drv.get_volume_stats()
calls = [mock.call(self.TEST_NFS_EXPORT1),
mock.call(self.TEST_NFS_EXPORT2)]
mock_get_capacity_info.assert_has_calls(calls)
self.assertTrue(mock_ensure_share.called)
self.assertEqual(30.0, drv._stats['total_capacity_gb'])
self.assertEqual(5.0, drv._stats['free_capacity_gb'])
self.assertEqual(10.0, drv._stats['reserved_percentage'])
@ddt.data(True, False)
def test_update_volume_stats(self, thin):
self._set_driver()
self._driver.configuration.max_over_subscription_ratio = 20.0
self._driver.configuration.reserved_percentage = 5.0
self._driver.configuration.nfs_sparsed_volumes = thin
remotefs_volume_stats = {
'volume_backend_name': 'fake_backend_name',
'vendor_name': 'fake_vendor',
'driver_version': 'fake_version',
'storage_protocol': 'NFS',
'total_capacity_gb': 100.0,
'free_capacity_gb': 20.0,
'reserved_percentage': 5.0,
'QoS_support': False,
}
self.mock_object(remotefs.RemoteFSDriver, '_update_volume_stats')
self._driver._stats = remotefs_volume_stats
mock_get_provisioned_capacity = self.mock_object(
self._driver, '_get_provisioned_capacity', return_value=25.0)
self._driver._update_volume_stats()
nfs_added_volume_stats = {
'provisioned_capacity_gb': 25.0 if thin else 80.0,
'max_over_subscription_ratio': 20.0,
'reserved_percentage': 5.0,
'thin_provisioning_support': thin,
'thick_provisioning_support': not thin,
}
expected = remotefs_volume_stats
expected.update(nfs_added_volume_stats)
self.assertEqual(expected, self._driver._stats)
self.assertEqual(thin, mock_get_provisioned_capacity.called)
def _check_is_share_eligible(self, total_size, total_available,
total_allocated, requested_volume_size):
self._set_driver()
with mock.patch.object(self._driver, '_get_capacity_info')\
as mock_get_capacity_info:
mock_get_capacity_info.return_value = (total_size,
total_available,
total_allocated)
return self._driver._is_share_eligible('fake_share',
requested_volume_size)
def test_is_share_eligible(self):
self._set_driver()
total_size = 100.0 * units.Gi
total_available = 90.0 * units.Gi
total_allocated = 10.0 * units.Gi
requested_volume_size = 1 # GiB
self.assertTrue(self._check_is_share_eligible(total_size,
total_available,
total_allocated,
requested_volume_size))
def test_share_eligibility_with_reserved_percentage(self):
self._set_driver()
total_size = 100.0 * units.Gi
total_available = 4.0 * units.Gi
total_allocated = 96.0 * units.Gi
requested_volume_size = 1 # GiB
# Check used > used_ratio statement entered
self.assertFalse(self._check_is_share_eligible(total_size,
total_available,
total_allocated,
requested_volume_size))
def test_is_share_eligible_above_oversub_ratio(self):
self._set_driver()
total_size = 100.0 * units.Gi
total_available = 10.0 * units.Gi
total_allocated = 90.0 * units.Gi
requested_volume_size = 10 # GiB
# Check apparent_available <= requested_volume_size statement entered
self.assertFalse(self._check_is_share_eligible(total_size,
total_available,
total_allocated,
requested_volume_size))
def test_is_share_eligible_reserved_space_above_oversub_ratio(self):
self._set_driver()
total_size = 100.0 * units.Gi
total_available = 10.0 * units.Gi
total_allocated = 100.0 * units.Gi
requested_volume_size = 1 # GiB
# Check total_allocated / total_size >= oversub_ratio
# statement entered
self.assertFalse(self._check_is_share_eligible(total_size,
total_available,
total_allocated,
requested_volume_size))
@ddt.data(None, 'raw', 'qcow2')
@mock.patch('cinder.objects.volume.Volume.get_by_id')
def test_extend_volume(self, file_format, mock_get):
"""Extend a volume by 1."""
self._set_driver()
drv = self._driver
volume = fake_volume.fake_volume_obj(
self.context,
id='80ee16b6-75d2-4d54-9539-ffc1b4b0fb10',
size=1,
provider_location='nfs_share')
if file_format:
volume.admin_metadata = {'format': file_format}
mock_get.return_value = volume
local_vol_dir = 'dir'
newSize = volume['size'] + 1
with mock.patch.object(image_utils, 'resize_image') as resize:
with mock.patch.object(drv, '_local_volume_dir',
return_value=local_vol_dir):
with mock.patch.object(drv, '_is_share_eligible',
return_value=True):
with mock.patch.object(drv, '_is_file_size_equal',
return_value=True):
drv.extend_volume(volume, newSize)
path = os.path.join(local_vol_dir, volume.name)
resize.assert_called_once_with(path, newSize,
run_as_root=True,
file_format=file_format)
def test_extend_volume_attached_fail(self):
"""Extend a volume by 1."""
self._set_driver()
drv = self._driver
volume = fake_volume.fake_volume_obj(
self.context,
id='80ee16b6-75d2-4d54-9539-ffc1b4b0fb10',
size=1,
provider_location='nfs_share')
path = 'path'
newSize = volume['size'] + 1
with mock.patch.object(drv, 'local_path', return_value=path):
with mock.patch.object(drv, '_is_share_eligible',
return_value=True):
with mock.patch.object(drv, '_is_file_size_equal',
return_value=True):
with mock.patch.object(drv, '_is_volume_attached',
return_value=True):
self.assertRaises(exception.ExtendVolumeError,
drv.extend_volume, volume, newSize)
@mock.patch('cinder.objects.volume.Volume.get_by_id')
def test_extend_volume_failure(self, mock_get):
"""Error during extend operation."""
self._set_driver()
drv = self._driver
volume = fake_volume.fake_volume_obj(
self.context,
id='80ee16b6-75d2-4d54-9539-ffc1b4b0fb10',
size=1,
provider_location='nfs_share')
volume.admin_metadata = {'format': 'qcow2'}
mock_get.return_value = volume
with mock.patch.object(image_utils, 'resize_image'):
with mock.patch.object(drv, 'local_path', return_value='path'):
with mock.patch.object(drv, '_is_share_eligible',
return_value=True):
with mock.patch.object(drv, '_is_file_size_equal',
return_value=False):
self.assertRaises(exception.ExtendVolumeError,
drv.extend_volume, volume, 2)
def test_extend_volume_insufficient_space(self):
"""Insufficient space on nfs_share during extend operation."""
self._set_driver()
drv = self._driver
volume = fake_volume.fake_volume_obj(
self.context,
id='80ee16b6-75d2-4d54-9539-ffc1b4b0fb10',
size=1,
provider_location='nfs_share')
with mock.patch.object(image_utils, 'resize_image'):
with mock.patch.object(drv, 'local_path', return_value='path'):
with mock.patch.object(drv, '_is_share_eligible',
return_value=False):
with mock.patch.object(drv, '_is_file_size_equal',
return_value=False):
self.assertRaises(exception.ExtendVolumeError,
drv.extend_volume, volume, 2)
def test_is_file_size_equal(self):
"""File sizes are equal."""
self._set_driver()
drv = self._driver
path = 'fake/path'
size = 2
data = mock.MagicMock()
data.virtual_size = size * units.Gi
with mock.patch.object(image_utils, 'qemu_img_info',
return_value=data):
self.assertTrue(drv._is_file_size_equal(path, size))
def test_is_file_size_equal_false(self):
"""File sizes are not equal."""
self._set_driver()
drv = self._driver
path = 'fake/path'
size = 2
data = mock.MagicMock()
data.virtual_size = (size + 1) * units.Gi
with mock.patch.object(image_utils, 'qemu_img_info',
return_value=data):
self.assertFalse(drv._is_file_size_equal(path, size))
@mock.patch.object(nfs, 'LOG')
def test_set_nas_security_options_when_true(self, LOG):
"""Test higher level setting of NAS Security options.
The NFS driver overrides the base method with a driver specific
version.
"""
self._set_driver()
drv = self._driver
drv._mounted_shares = [self.TEST_NFS_EXPORT1]
is_new_install = True
drv._ensure_shares_mounted = mock.Mock()
drv._get_mount_point_for_share = mock.Mock(
return_value=self.TEST_MNT_POINT)
drv._determine_nas_security_option_setting = mock.Mock(
return_value='true')
drv.set_nas_security_options(is_new_install)
self.assertEqual('true', drv.configuration.nas_secure_file_operations)
self.assertEqual('true', drv.configuration.nas_secure_file_permissions)
self.assertFalse(LOG.warning.called)
@mock.patch.object(nfs, 'LOG')
def test_set_nas_security_options_when_false(self, LOG):
"""Test higher level setting of NAS Security options.
The NFS driver overrides the base method with a driver specific
version.
"""
self._set_driver()
drv = self._driver
drv._mounted_shares = [self.TEST_NFS_EXPORT1]
is_new_install = False
drv._ensure_shares_mounted = mock.Mock()
drv._get_mount_point_for_share = mock.Mock(
return_value=self.TEST_MNT_POINT)
drv._determine_nas_security_option_setting = mock.Mock(
return_value='false')
drv.set_nas_security_options(is_new_install)
self.assertEqual('false', drv.configuration.nas_secure_file_operations)
self.assertEqual('false',
drv.configuration.nas_secure_file_permissions)
self.assertTrue(LOG.warning.called)
def test_set_nas_security_options_exception_if_no_mounted_shares(self):
"""Ensure proper exception is raised if there are no mounted shares."""
self._set_driver()
drv = self._driver
drv._ensure_shares_mounted = mock.Mock()
drv._mounted_shares = []
is_new_cinder_install = 'does not matter'
self.assertRaises(exception.NfsNoSharesMounted,
drv.set_nas_security_options,
is_new_cinder_install)
def test_ensure_share_mounted(self):
"""Case where the mount works the first time."""
self._set_driver()
self.mock_object(self._driver._remotefsclient, 'mount', autospec=True)
drv = self._driver
drv.configuration.nfs_mount_attempts = 3
drv.shares = {self.TEST_NFS_EXPORT1: ''}
drv._ensure_share_mounted(self.TEST_NFS_EXPORT1)
drv._remotefsclient.mount.assert_called_once_with(
self.TEST_NFS_EXPORT1, [])
@mock.patch('time.sleep')
def test_ensure_share_mounted_exception(self, _mock_sleep):
"""Make the configured number of attempts when mounts fail."""
num_attempts = 3
self._set_driver()
self.mock_object(self._driver._remotefsclient, 'mount',
side_effect=Exception)
drv = self._driver
drv.configuration.nfs_mount_attempts = num_attempts
drv.shares = {self.TEST_NFS_EXPORT1: ''}
self.assertRaises(exception.NfsException, drv._ensure_share_mounted,
self.TEST_NFS_EXPORT1)
self.assertEqual(num_attempts, drv._remotefsclient.mount.call_count)
def test_ensure_share_mounted_at_least_one_attempt(self):
"""Make at least one mount attempt even if configured for less."""
min_num_attempts = 1
num_attempts = 0
self._set_driver()
self.mock_object(self._driver._remotefsclient, 'mount',
side_effect=Exception)
drv = self._driver
drv.configuration.nfs_mount_attempts = num_attempts
drv.shares = {self.TEST_NFS_EXPORT1: ''}
self.assertRaises(exception.NfsException, drv._ensure_share_mounted,
self.TEST_NFS_EXPORT1)
self.assertEqual(min_num_attempts,
drv._remotefsclient.mount.call_count)
@mock.patch('tempfile.NamedTemporaryFile')
@ddt.data([NFS_CONFIG1, QEMU_IMG_INFO_OUT3, False],
[NFS_CONFIG2, QEMU_IMG_INFO_OUT4, False],
[NFS_CONFIG3, QEMU_IMG_INFO_OUT3, False],
[NFS_CONFIG4, QEMU_IMG_INFO_OUT4, False],
[NFS_CONFIG4, QEMU_IMG_INFO_OUT5, True])
@ddt.unpack
def test_copy_volume_from_snapshot(self, nfs_conf, qemu_img_info,
encryption, mock_temp_file):
class DictObj(object):
# convert a dict to object w/ attributes
def __init__(self, d):
self.__dict__ = d
self._set_driver(extra_confs=nfs_conf)
drv = self._driver
src_encryption_key_id = None
dest_encryption_key_id = None
if encryption:
mock_temp_file.return_value.__enter__.side_effect = [
DictObj({'name': '/tmp/imgfile'}),
DictObj({'name': '/tmp/passfile'})]
dest_volume = self._simple_encrypted_volume()
src_volume = self._simple_encrypted_volume()
key_mgr = fake_keymgr.fake_api()
self.mock_object(castellan.key_manager, 'API',
return_value=key_mgr)
key_id = key_mgr.store(self.context, KeyObject())
src_volume.encryption_key_id = key_id
dest_volume.encryption_key_id = key_id
src_encryption_key_id = src_volume.encryption_key_id
dest_encryption_key_id = dest_volume.encryption_key_id
else:
dest_volume = self._simple_volume()
src_volume = self._simple_volume()
# snapshot img_info
fake_snap = fake_snapshot.fake_snapshot_obj(self.context)
fake_snap.volume = src_volume
img_out = qemu_img_info % {'volid': src_volume.id,
'snapid': fake_snap.id,
'size_gb': src_volume.size,
'size_b': src_volume.size * units.Gi}
img_info = imageutils.QemuImgInfo(img_out, format='json')
# backing file img_info
img_out = QEMU_IMG_INFO_OUT1 % {'volid': src_volume.id,
'size_b': src_volume.size * units.Gi}
bk_img_info = imageutils.QemuImgInfo(img_out, format='json')
mock_img_info = self.mock_object(image_utils, 'qemu_img_info')
mock_img_info.side_effect = [img_info, bk_img_info]
mock_convert_image = self.mock_object(image_utils, 'convert_image')
vol_dir = os.path.join(self.TEST_MNT_POINT_BASE,
drv._get_hash_str(src_volume.provider_location))
src_vol_path = os.path.join(vol_dir, img_info.backing_file)
dest_vol_path = os.path.join(vol_dir, dest_volume.name)
info_path = os.path.join(vol_dir, src_volume.name) + '.info'
snap_file = dest_volume.name + '.' + fake_snap.id
snap_path = os.path.join(vol_dir, snap_file)
size = dest_volume.size
mock_read_info_file = self.mock_object(drv, '_read_info_file')
mock_read_info_file.return_value = {'active': snap_file,
fake_snap.id: snap_file}
mock_permission = self.mock_object(drv, '_set_rw_permissions_for_all')
drv._copy_volume_from_snapshot(fake_snap, dest_volume, size,
src_encryption_key_id,
dest_encryption_key_id)
mock_read_info_file.assert_called_once_with(info_path)
snap_info_call = mock.call(snap_path,
force_share=True, run_as_root=True,
allow_qcow2_backing_file=True)
src_info_call = mock.call(src_vol_path,
force_share=True, run_as_root=True,
allow_qcow2_backing_file=True)
self.assertEqual(2, mock_img_info.call_count)
mock_img_info.assert_has_calls([snap_info_call, src_info_call])
used_qcow = nfs_conf['nfs_qcow2_volumes']
if encryption:
mock_convert_image.assert_called_once_with(
src_vol_path, dest_vol_path, 'luks',
passphrase_file='/tmp/passfile',
run_as_root=True,
src_passphrase_file='/tmp/imgfile',
data=bk_img_info)
else:
mock_convert_image.assert_called_once_with(
src_vol_path, dest_vol_path, 'qcow2' if used_qcow else 'raw',
run_as_root=True,
data=bk_img_info)
mock_permission.assert_called_once_with(dest_vol_path)
@ddt.data([NFS_CONFIG1, QEMU_IMG_INFO_OUT3, 'available'],
[NFS_CONFIG2, QEMU_IMG_INFO_OUT4, 'backing-up'],
[NFS_CONFIG2, QEMU_IMG_INFO_OUT4, 'restoring'],
[NFS_CONFIG3, QEMU_IMG_INFO_OUT3, 'available'],
[NFS_CONFIG4, QEMU_IMG_INFO_OUT4, 'backing-up'],
[NFS_CONFIG4, QEMU_IMG_INFO_OUT4, 'restoring'])
@ddt.unpack
@mock.patch('cinder.objects.volume.Volume.save')
def test_create_volume_from_snapshot(self, nfs_conf, qemu_img_info,
snap_status, mock_save):
self._set_driver(extra_confs=nfs_conf)
drv = self._driver
# Volume source of the snapshot we are trying to clone from. We need it
# to have a different id than the default provided.
src_volume = self._simple_volume(size=10)
src_volume.id = fake.VOLUME_ID
src_volume_dir = os.path.join(self.TEST_MNT_POINT_BASE,
drv._get_hash_str(
src_volume.provider_location))
src_volume_path = os.path.join(src_volume_dir, src_volume.name)
fake_snap = fake_snapshot.fake_snapshot_obj(self.context)
# Fake snapshot based in the previous created volume
snap_file = src_volume.name + '.' + fake_snap.id
fake_snap.volume = src_volume
fake_snap.status = snap_status
fake_snap.size = 10
# New fake volume where the snap will be copied
new_volume = self._simple_volume(size=10)
new_volume_dir = os.path.join(self.TEST_MNT_POINT_BASE,
drv._get_hash_str(
src_volume.provider_location))
new_volume_path = os.path.join(new_volume_dir, new_volume.name)
# Mocks
img_out = qemu_img_info % {'volid': src_volume.id,
'snapid': fake_snap.id,
'size_gb': src_volume.size,
'size_b': src_volume.size * units.Gi}
img_info = imageutils.QemuImgInfo(img_out, format='json')
mock_img_info = self.mock_object(image_utils, 'qemu_img_info')
mock_img_info.return_value = img_info
mock_ensure = self.mock_object(drv, '_ensure_shares_mounted')
mock_find_share = self.mock_object(drv, '_find_share',
return_value=self.TEST_NFS_EXPORT1)
mock_read_info_file = self.mock_object(drv, '_read_info_file')
mock_read_info_file.return_value = {'active': snap_file,
fake_snap.id: snap_file}
mock_convert_image = self.mock_object(image_utils, 'convert_image')
self.mock_object(drv, '_create_qcow2_file')
self.mock_object(drv, '_create_regular_file')
self.mock_object(drv, '_create_regular_file')
self.mock_object(drv, '_set_rw_permissions')
self.mock_object(drv, '_read_file')
ret = drv.create_volume_from_snapshot(new_volume, fake_snap)
# Test asserts
self.assertEqual(self.TEST_NFS_EXPORT1, ret['provider_location'])
used_qcow = nfs_conf['nfs_qcow2_volumes']
mock_convert_image.assert_called_once_with(
src_volume_path, new_volume_path, 'qcow2' if used_qcow else 'raw',
run_as_root=True, data=img_info)
mock_ensure.assert_called_once()
mock_find_share.assert_called_once_with(new_volume)
@ddt.data('error', 'creating', 'deleting', 'deleted', 'updating',
'error_deleting', 'unmanaging')
def test_create_volume_from_snapshot_invalid_status(self, snap_status):
"""Expect an error when the snapshot's status is not 'available'."""
self._set_driver()
drv = self._driver
src_volume = self._simple_volume()
fake_snap = fake_snapshot.fake_snapshot_obj(self.context)
fake_snap.volume = src_volume
fake_snap.status = snap_status
new_volume = self._simple_volume()
new_volume['size'] = fake_snap['volume_size']
self.assertRaises(exception.InvalidSnapshot,
drv.create_volume_from_snapshot,
new_volume,
fake_snap)
@ddt.data([NFS_CONFIG1, QEMU_IMG_INFO_OUT1],
[NFS_CONFIG2, QEMU_IMG_INFO_OUT2],
[NFS_CONFIG3, QEMU_IMG_INFO_OUT1],
[NFS_CONFIG4, QEMU_IMG_INFO_OUT2])
@ddt.unpack
def test_initialize_connection(self, nfs_confs, qemu_img_info):
self._set_driver(extra_confs=nfs_confs)
drv = self._driver
volume = self._simple_volume()
vol_dir = os.path.join(self.TEST_MNT_POINT_BASE,
drv._get_hash_str(volume.provider_location))
vol_path = os.path.join(vol_dir, volume.name)
mock_img_utils = self.mock_object(image_utils, 'qemu_img_info')
img_out = qemu_img_info % {'volid': volume.id, 'size_gb': volume.size,
'size_b': volume.size * units.Gi}
mock_img_utils.return_value = imageutils.QemuImgInfo(img_out,
format='json')
self.mock_object(drv, '_read_info_file',
return_value={'active': "volume-%s" % volume.id})
conn_info = drv.initialize_connection(volume, None)
mock_img_utils.assert_called_once_with(vol_path,
force_share=True,
run_as_root=True,
allow_qcow2_backing_file=True)
self.assertEqual('nfs', conn_info['driver_volume_type'])
self.assertEqual(volume.name, conn_info['data']['name'])
self.assertEqual(self.TEST_MNT_POINT_BASE,
conn_info['mount_point_base'])
@mock.patch.object(image_utils, 'qemu_img_info')
def test_initialize_connection_raise_exception(self, mock_img_info):
self._set_driver()
drv = self._driver
volume = self._simple_volume()
qemu_img_output = """{
"filename": "%s",
"format": "iso",
"virtual-size": 10737418240,
"actual-size": 173000
}""" % volume['name']
mock_img_info.return_value = imageutils.QemuImgInfo(qemu_img_output,
format='json')
self.assertRaisesRegex(exception.InvalidVolume,
"must be a valid raw or qcow2 image",
drv.initialize_connection,
volume,
None)
@mock.patch.object(image_utils, 'qemu_img_info')
def test_initialize_connection_raise_on_wrong_size(self, mock_img_info):
self._set_driver()
drv = self._driver
volume = self._simple_volume()
qemu_img_output = """{
"filename": "%s",
"format": "qcow2",
"virtual-size": 999999999999999,
"actual-size": 173000
}""" % volume['name']
mock_img_info.return_value = imageutils.QemuImgInfo(qemu_img_output,
format='json')
self.assertRaisesRegex(exception.InvalidVolume,
"virtual_size does not match",
drv.initialize_connection,
volume,
None)
def test_create_snapshot(self):
self._set_driver()
drv = self._driver
volume = self._simple_volume()
self.override_config('nfs_snapshot_support', True)
fake_snap = fake_snapshot.fake_snapshot_obj(self.context)
fake_snap.volume = volume
vol_dir = os.path.join(self.TEST_MNT_POINT_BASE,
drv._get_hash_str(self.TEST_NFS_EXPORT1))
snap_file = volume['name'] + '.' + fake_snap.id
snap_path = os.path.join(vol_dir, snap_file)
info_path = os.path.join(vol_dir, volume['name']) + '.info'
with mock.patch.object(drv, '_local_path_volume_info',
return_value=info_path), \
mock.patch.object(drv, '_read_info_file', return_value={}), \
mock.patch.object(drv, '_do_create_snapshot') \
as mock_do_create_snapshot, \
mock.patch.object(drv, '_check_snapshot_support') \
as mock_check_support, \
mock.patch.object(drv, '_write_info_file') \
as mock_write_info_file, \
mock.patch.object(drv, 'get_active_image_from_info',
return_value=volume['name']), \
mock.patch.object(drv, '_get_new_snap_path',
return_value=snap_path):
self._driver.create_snapshot(fake_snap)
mock_check_support.assert_called_once()
mock_do_create_snapshot.assert_called_with(fake_snap, volume['name'],
snap_path)
mock_write_info_file.assert_called_with(
info_path, {'active': snap_file, fake_snap.id: snap_file})
@ddt.data({'volume_status': 'available',
'original_provider': 'original_provider',
'rename_side_effect': None},
{'volume_status': 'available',
'original_provider': 'current_provider',
'rename_side_effect': None},
{'volume_status': 'in-use',
'original_provider': 'original_provider',
'rename_side_effect': None},
{'volume_status': 'available',
'original_provider': 'original_provider',
'rename_side_effect': OSError})
@ddt.unpack
@mock.patch('os.rename')
def test_update_migrated_volume(self,
mock_rename,
rename_side_effect,
original_provider,
volume_status):
drv = nfs.NfsDriver(configuration=self.configuration)
base_dir = '/dir_base/'
current_path = base_dir + fake.VOLUME2_NAME
current_provider = 'current_provider'
mock_rename.side_effect = rename_side_effect
volume = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME_ID,
provider_location=original_provider,
_name_id=None)
new_volume = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME2_ID,
provider_location=current_provider,
_name_id=None)
with mock.patch.object(drv, 'local_path') as local_path:
local_path.return_value = current_path
update = drv.update_migrated_volume(self.context,
volume,
new_volume,
volume_status)
if (volume_status == 'available' and
original_provider != current_provider):
original_path = base_dir + fake.VOLUME_NAME
mock_rename.assert_called_once_with(current_path,
original_path)
else:
mock_rename.assert_not_called()
if mock_rename.call_count > 0 and rename_side_effect is None:
self.assertEqual({'_name_id': None,
'provider_location': current_provider},
update)
else:
self.assertEqual({'_name_id': fake.VOLUME2_ID,
'provider_location': current_provider},
update)
class NfsDriverDoSetupTestCase(test.TestCase):
def setUp(self):
super(NfsDriverDoSetupTestCase, self).setUp()
self.context = mock.Mock()
self.create_configuration()
self.override_config('compute_api_class', 'unittest.mock.Mock')
def create_configuration(self):
config = conf.Configuration(None)
config.append_config_values(nfs.nfs_opts)
self.configuration = config
def test_setup_should_throw_error_if_shares_config_not_configured(self):
"""do_setup should throw error if shares config is not configured."""
self.override_config('nfs_shares_config', None)
drv = nfs.NfsDriver(configuration=self.configuration)
mock_os_path_exists = self.mock_object(os.path, 'exists')
with self.assertRaisesRegex(exception.NfsException,
".*no NFS config file configured.*"):
drv.do_setup(self.context)
self.assertEqual(0, mock_os_path_exists.call_count)
def test_setup_should_throw_error_if_shares_file_does_not_exist(self):
"""do_setup should throw error if shares file does not exist."""
drv = nfs.NfsDriver(configuration=self.configuration)
mock_os_path_exists = self.mock_object(os.path, 'exists')
mock_os_path_exists.return_value = False
with self.assertRaisesRegex(exception.NfsException,
"NFS config file.*doesn't exist"):
drv.do_setup(self.context)
mock_os_path_exists.assert_has_calls(
[mock.call(self.configuration.nfs_shares_config)])
def test_setup_should_not_throw_error_if_host_and_share_set(self):
"""do_setup shouldn't throw shares file error if host and share set."""
drv = nfs.NfsDriver(configuration=self.configuration)
self.override_config('nas_host', 'nfs-host1')
self.override_config('nas_share_path', '/export')
mock_os_path_exists = self.mock_object(os.path, 'exists')
mock_os_path_exists.return_value = False
mock_set_nas_sec_options = self.mock_object(nfs.NfsDriver,
'set_nas_security_options')
mock_set_nas_sec_options.return_value = True
mock_execute = self.mock_object(drv, '_execute')
mock_execute.return_value = True
drv.do_setup(self.context)
mock_os_path_exists.assert_not_called()
def test_setup_throw_error_if_shares_file_does_not_exist_no_host(self):
"""do_setup should throw error if no shares file and no host set."""
drv = nfs.NfsDriver(configuration=self.configuration)
self.override_config('nas_share_path', '/export')
mock_os_path_exists = self.mock_object(os.path, 'exists')
mock_os_path_exists.return_value = False
with self.assertRaisesRegex(exception.NfsException,
"NFS config file.*doesn't exist"):
drv.do_setup(self.context)
mock_os_path_exists.assert_has_calls(
[mock.call(self.configuration.nfs_shares_config)])
def test_setup_throw_error_if_shares_file_does_not_exist_no_share(self):
"""do_setup should throw error if no shares file and no share set."""
drv = nfs.NfsDriver(configuration=self.configuration)
self.override_config('nas_host', 'nfs-host1')
mock_os_path_exists = self.mock_object(os.path, 'exists')
mock_os_path_exists.return_value = False
with self.assertRaisesRegex(exception.NfsException,
"NFS config file.*doesn't exist"):
drv.do_setup(self.context)
mock_os_path_exists.assert_has_calls(
[mock.call(self.configuration.nfs_shares_config)])
def test_setup_throw_error_if_shares_file_doesnt_exist_no_share_host(self):
"""do_setup should throw error if no shares file and no host/share."""
drv = nfs.NfsDriver(configuration=self.configuration)
mock_os_path_exists = self.mock_object(os.path, 'exists')
mock_os_path_exists.return_value = False
with self.assertRaisesRegex(exception.NfsException,
"NFS config file.*doesn't exist"):
drv.do_setup(self.context)
mock_os_path_exists.assert_has_calls(
[mock.call(self.configuration.nfs_shares_config)])
def test_setup_should_throw_exception_if_nfs_client_is_not_installed(self):
"""do_setup should throw error if nfs client is not installed."""
drv = nfs.NfsDriver(configuration=self.configuration)
mock_os_path_exists = self.mock_object(os.path, 'exists')
mock_os_path_exists.return_value = True
mock_execute = self.mock_object(drv, '_execute')
mock_execute.side_effect = OSError(
errno.ENOENT, 'No such file or directory.')
with self.assertRaisesRegex(exception.NfsException,
'mount.nfs is not installed'):
drv.do_setup(self.context)
mock_os_path_exists.assert_has_calls(
[mock.call(self.configuration.nfs_shares_config)])
mock_execute.assert_has_calls(
[mock.call('mount.nfs',
check_exit_code=False,
run_as_root=True)])
def test_setup_should_throw_exception_if_mount_nfs_command_fails(self):
"""do_setup should throw error if mount.nfs fails with OSError
This test covers the OSError path when mount.nfs is installed.
"""
drv = nfs.NfsDriver(configuration=self.configuration)
mock_os_path_exists = self.mock_object(os.path, 'exists')
mock_os_path_exists.return_value = True
mock_execute = self.mock_object(drv, '_execute')
mock_execute.side_effect = OSError(
errno.EPERM, 'Operation... BROKEN')
with self.assertRaisesRegex(OSError, '.*Operation... BROKEN'):
drv.do_setup(self.context)
mock_os_path_exists.assert_has_calls(
[mock.call(self.configuration.nfs_shares_config)])
mock_execute.assert_has_calls(
[mock.call('mount.nfs',
check_exit_code=False,
run_as_root=True)])
def test_retype_is_there(self):
"Ensure that driver.retype() is there."""
drv = nfs.NfsDriver(configuration=self.configuration)
v1 = fake_volume.fake_volume_obj(self.context)
ret = drv.retype(self.context,
v1,
mock.sentinel.new_type,
mock.sentinel.diff,
mock.sentinel.host)
self.assertEqual((False, None), ret)