Merge "Refactor Windows drivers using os-win"

This commit is contained in:
Jenkins 2016-01-21 01:07:58 +00:00 committed by Gerrit Code Review
commit 7e040192d4
10 changed files with 557 additions and 2162 deletions

View File

@ -13,9 +13,9 @@
# under the License.
import os
import sys
import mock
from oslo_utils import units
from cinder import exception
from cinder.image import image_utils
@ -45,27 +45,20 @@ class WindowsSmbFsTestCase(test.TestCase):
_FAKE_VOLUME_PATH = os.path.join(_FAKE_MNT_POINT,
_FAKE_VOLUME_NAME + '.vhdx')
def setUp(self):
@mock.patch.object(smbfs, 'utilsfactory')
@mock.patch.object(smbfs, 'remotefs')
def setUp(self, mock_remotefs, mock_utilsfactory):
super(WindowsSmbFsTestCase, self).setUp()
self._mock_wmi = mock.MagicMock()
mock.patch('sys.platform', 'win32').start()
mock.patch.dict(sys.modules, ctypes=mock.DEFAULT).start()
mock.patch('six.moves.builtins.wmi', create=True).start()
self.addCleanup(mock.patch.stopall)
self._smbfs_driver = smbfs.WindowsSmbfsDriver(
configuration=mock.Mock())
self._smbfs_driver._remotefsclient = mock.Mock()
self._smbfs_driver._delete = mock.Mock()
self._smbfs_driver.local_path = mock.Mock(
return_value=self._FAKE_VOLUME_PATH)
self._smbfs_driver.vhdutils = mock.Mock()
def _test_create_volume(self, volume_exists=False, volume_format='vhdx'):
self._smbfs_driver.create_dynamic_vhd = mock.MagicMock()
fake_create = self._smbfs_driver.vhdutils.create_dynamic_vhd
fake_create = self._smbfs_driver._vhdutils.create_dynamic_vhd
self._smbfs_driver.get_volume_format = mock.Mock(
return_value=volume_format)
@ -90,8 +83,8 @@ class WindowsSmbFsTestCase(test.TestCase):
self._test_create_volume(volume_format="qcow")
def test_get_capacity_info(self):
self._smbfs_driver._remotefsclient.get_capacity_info = mock.Mock(
return_value=(self._FAKE_TOTAL_SIZE, self._FAKE_TOTAL_AVAILABLE))
self._smbfs_driver._smbutils.get_share_capacity_info.return_value = (
self._FAKE_TOTAL_SIZE, self._FAKE_TOTAL_AVAILABLE)
self._smbfs_driver._get_total_allocated = mock.Mock(
return_value=self._FAKE_TOTAL_ALLOCATED)
@ -102,7 +95,7 @@ class WindowsSmbFsTestCase(test.TestCase):
self.assertEqual(expected_ret_val, ret_val)
def _test_get_img_info(self, backing_file=None):
self._smbfs_driver.vhdutils.get_vhd_parent_path.return_value = (
self._smbfs_driver._vhdutils.get_vhd_parent_path.return_value = (
backing_file)
image_info = self._smbfs_driver._qemu_img_info(self._FAKE_VOLUME_PATH)
@ -118,13 +111,13 @@ class WindowsSmbFsTestCase(test.TestCase):
self._test_get_img_info(self._FAKE_VOLUME_PATH)
def test_create_snapshot(self):
self._smbfs_driver.vhdutils.create_differencing_vhd = (
self._smbfs_driver._vhdutils.create_differencing_vhd = (
mock.Mock())
self._smbfs_driver._local_volume_dir = mock.Mock(
return_value=self._FAKE_MNT_POINT)
fake_create_diff = (
self._smbfs_driver.vhdutils.create_differencing_vhd)
self._smbfs_driver._vhdutils.create_differencing_vhd)
self._smbfs_driver._do_create_snapshot(
self._FAKE_SNAPSHOT,
@ -158,7 +151,7 @@ class WindowsSmbFsTestCase(test.TestCase):
return_value=self._FAKE_MNT_POINT)
drv.get_volume_format = mock.Mock(
return_value=volume_format)
drv.vhdutils.get_vhd_parent_path.return_value = (
drv._vhdutils.get_vhd_parent_path.return_value = (
fake_parent_path)
with mock.patch.object(image_utils, 'upload_volume') as (
@ -183,7 +176,7 @@ class WindowsSmbFsTestCase(test.TestCase):
fake_active_image)
upload_path = fake_temp_image_path
drv.vhdutils.convert_vhd.assert_called_once_with(
drv._vhdutils.convert_vhd.assert_called_once_with(
fake_active_image_path,
fake_temp_image_path)
drv._delete.assert_called_once_with(
@ -213,7 +206,6 @@ class WindowsSmbFsTestCase(test.TestCase):
return_value=self._FAKE_VOLUME_PATH)
drv.configuration = mock.MagicMock()
drv.configuration.volume_dd_blocksize = mock.sentinel.block_size
drv._extend_vhd_if_needed = mock.Mock()
with mock.patch.object(image_utils,
'fetch_to_volume_format') as fake_fetch:
@ -227,8 +219,9 @@ class WindowsSmbFsTestCase(test.TestCase):
mock.sentinel.image_id,
self._FAKE_VOLUME_PATH, mock.sentinel.volume_format,
mock.sentinel.block_size)
drv._extend_vhd_if_needed.assert_called_once_with(
self._FAKE_VOLUME_PATH, self._FAKE_VOLUME['size'])
drv._vhdutils.resize_vhd.assert_called_once_with(
self._FAKE_VOLUME_PATH,
self._FAKE_VOLUME['size'] * units.Gi)
def test_copy_volume_from_snapshot(self):
drv = self._smbfs_driver
@ -247,60 +240,23 @@ class WindowsSmbFsTestCase(test.TestCase):
return_value=fake_img_info)
drv.local_path = mock.Mock(
return_value=mock.sentinel.new_volume_path)
drv._extend_vhd_if_needed = mock.Mock()
drv._copy_volume_from_snapshot(
self._FAKE_SNAPSHOT, self._FAKE_VOLUME,
self._FAKE_VOLUME['size'])
drv._delete.assert_called_once_with(mock.sentinel.new_volume_path)
drv.vhdutils.convert_vhd.assert_called_once_with(
drv._vhdutils.convert_vhd.assert_called_once_with(
self._FAKE_VOLUME_PATH,
mock.sentinel.new_volume_path)
drv._extend_vhd_if_needed.assert_called_once_with(
mock.sentinel.new_volume_path, self._FAKE_VOLUME['size'])
drv._vhdutils.resize_vhd.assert_called_once_with(
mock.sentinel.new_volume_path,
self._FAKE_VOLUME['size'] * units.Gi)
def test_rebase_img(self):
self._smbfs_driver._rebase_img(
drv = self._smbfs_driver
drv._rebase_img(
self._FAKE_SNAPSHOT_PATH,
self._FAKE_VOLUME_NAME + '.vhdx', 'vhdx')
self._smbfs_driver.vhdutils.reconnect_parent.assert_called_once_with(
drv._vhdutils.reconnect_parent_vhd.assert_called_once_with(
self._FAKE_SNAPSHOT_PATH, self._FAKE_VOLUME_PATH)
def _test_extend_vhd_if_needed(self, virtual_size_gb, requested_size_gb):
drv = self._smbfs_driver
virtual_size_bytes = virtual_size_gb << 30
requested_size_bytes = requested_size_gb << 30
virtual_size_dict = {'VirtualSize': virtual_size_bytes}
drv.vhdutils.get_vhd_size = mock.Mock(return_value=virtual_size_dict)
if virtual_size_gb > requested_size_gb:
self.assertRaises(exception.VolumeBackendAPIException,
drv._extend_vhd_if_needed,
mock.sentinel.vhd_path,
requested_size_gb)
else:
drv._extend_vhd_if_needed(mock.sentinel.vhd_path,
requested_size_gb)
if virtual_size_gb < requested_size_gb:
drv.vhdutils.resize_vhd.assert_called_once_with(
mock.sentinel.vhd_path, requested_size_bytes)
else:
self.assertFalse(drv.vhdutils.resize_vhd.called)
drv.vhdutils.get_vhd_size.assert_called_once_with(
mock.sentinel.vhd_path)
def test_extend_vhd_if_needed_bigger_size(self):
self._test_extend_vhd_if_needed(virtual_size_gb=1,
requested_size_gb=2)
def test_extend_vhd_if_needed_equal_size(self):
self._test_extend_vhd_if_needed(virtual_size_gb=1,
requested_size_gb=1)
def test_extend_vhd_if_needed_smaller_size(self):
self._test_extend_vhd_if_needed(virtual_size_gb=2,
requested_size_gb=1)

View File

@ -1,382 +0,0 @@
# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cinder import exception
from cinder import test
from cinder.volume.drivers.windows import constants
from cinder.volume.drivers.windows import vhdutils
class VHDUtilsTestCase(test.TestCase):
_FAKE_FORMAT = 2
_FAKE_TYPE = constants.VHD_TYPE_DYNAMIC
_FAKE_JOB_PATH = 'fake_job_path'
_FAKE_VHD_PATH = r'C:\fake\vhd.vhd'
_FAKE_DEST_PATH = r'C:\fake\destination.vhdx'
_FAKE_FILE_HANDLE = 'fake_file_handle'
_FAKE_RET_VAL = 0
_FAKE_VHD_SIZE = 1024
def setUp(self):
super(VHDUtilsTestCase, self).setUp()
self._setup_mocks()
self._vhdutils = vhdutils.VHDUtils()
self._vhdutils._msft_vendor_id = 'fake_vendor_id'
self.addCleanup(mock.patch.stopall)
def _setup_mocks(self):
fake_ctypes = mock.Mock()
# Use this in order to make assertions on the variables parsed by
# references.
fake_ctypes.byref = lambda x: x
fake_ctypes.c_wchar_p = lambda x: x
fake_ctypes.c_ulong = lambda x: x
mock.patch.multiple(
'cinder.volume.drivers.windows.vhdutils',
ctypes=fake_ctypes, kernel32=mock.DEFAULT,
wintypes=mock.DEFAULT, virtdisk=mock.DEFAULT,
Win32_GUID=mock.DEFAULT,
Win32_RESIZE_VIRTUAL_DISK_PARAMETERS=mock.DEFAULT,
Win32_CREATE_VIRTUAL_DISK_PARAMETERS=mock.DEFAULT,
Win32_VIRTUAL_STORAGE_TYPE=mock.DEFAULT,
Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V1=mock.DEFAULT,
Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V2=mock.DEFAULT,
Win32_MERGE_VIRTUAL_DISK_PARAMETERS=mock.DEFAULT,
Win32_GET_VIRTUAL_DISK_INFO_PARAMETERS=mock.DEFAULT,
Win32_SET_VIRTUAL_DISK_INFO_PARAMETERS=mock.DEFAULT,
create=True).start()
def _test_create_vhd(self, src_path=None, max_internal_size=0,
parent_path=None, create_failed=False):
self._vhdutils._get_device_id_by_path = mock.Mock(
side_effect=(vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHD,
vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHDX))
self._vhdutils._close = mock.Mock()
fake_params = (
vhdutils.Win32_CREATE_VIRTUAL_DISK_PARAMETERS.return_value)
fake_vst = mock.Mock()
fake_source_vst = mock.Mock()
vhdutils.Win32_VIRTUAL_STORAGE_TYPE.side_effect = [
fake_vst, None, fake_source_vst]
vhdutils.virtdisk.CreateVirtualDisk.return_value = int(
create_failed)
if create_failed:
self.assertRaises(exception.VolumeBackendAPIException,
self._vhdutils._create_vhd,
self._FAKE_DEST_PATH,
constants.VHD_TYPE_DYNAMIC,
src_path=src_path,
max_internal_size=max_internal_size,
parent_path=parent_path)
else:
self._vhdutils._create_vhd(self._FAKE_DEST_PATH,
constants.VHD_TYPE_DYNAMIC,
src_path=src_path,
max_internal_size=max_internal_size,
parent_path=parent_path)
self.assertEqual(vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHD,
fake_vst.DeviceId)
self.assertEqual(parent_path, fake_params.ParentPath)
self.assertEqual(max_internal_size, fake_params.MaximumSize)
if src_path:
self.assertEqual(vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHDX,
fake_source_vst.DeviceId)
self.assertEqual(src_path, fake_params.SourcePath)
vhdutils.virtdisk.CreateVirtualDisk.assert_called_with(
vhdutils.ctypes.byref(fake_vst),
vhdutils.ctypes.c_wchar_p(self._FAKE_DEST_PATH),
vhdutils.VIRTUAL_DISK_ACCESS_NONE, None,
vhdutils.CREATE_VIRTUAL_DISK_FLAG_NONE, 0,
vhdutils.ctypes.byref(fake_params), None,
vhdutils.ctypes.byref(vhdutils.wintypes.HANDLE()))
self.assertTrue(self._vhdutils._close.called)
def test_create_vhd_exception(self):
self._test_create_vhd(create_failed=True)
def test_create_dynamic_vhd(self):
self._test_create_vhd(max_internal_size=1 << 30)
def test_create_differencing_vhd(self):
self._test_create_vhd(parent_path=self._FAKE_VHD_PATH)
def test_convert_vhd(self):
self._test_create_vhd(src_path=self._FAKE_VHD_PATH)
def _test_open(self, open_failed=False):
fake_device_id = vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHD
vhdutils.virtdisk.OpenVirtualDisk.return_value = int(open_failed)
self._vhdutils._get_device_id_by_path = mock.Mock(
return_value=fake_device_id)
fake_vst = vhdutils.Win32_VIRTUAL_STORAGE_TYPE.return_value
fake_params = 'fake_params'
fake_access_mask = vhdutils.VIRTUAL_DISK_ACCESS_NONE
fake_open_flag = vhdutils.OPEN_VIRTUAL_DISK_FLAG_NO_PARENTS
if open_failed:
self.assertRaises(exception.VolumeBackendAPIException,
self._vhdutils._open,
self._FAKE_VHD_PATH)
else:
self._vhdutils._open(self._FAKE_VHD_PATH,
open_flag=fake_open_flag,
open_access_mask=fake_access_mask,
open_params=fake_params)
vhdutils.virtdisk.OpenVirtualDisk.assert_called_with(
vhdutils.ctypes.byref(fake_vst),
vhdutils.ctypes.c_wchar_p(self._FAKE_VHD_PATH),
fake_access_mask, fake_open_flag, fake_params,
vhdutils.ctypes.byref(vhdutils.wintypes.HANDLE()))
self.assertEqual(fake_device_id, fake_vst.DeviceId)
def test_open_success(self):
self._test_open()
def test_open_failed(self):
self._test_open(open_failed=True)
def _test_get_device_id_by_path(self,
get_device_failed=False):
if get_device_failed:
self.assertRaises(exception.VolumeBackendAPIException,
self._vhdutils._get_device_id_by_path,
self._FAKE_VHD_PATH[:-4])
else:
ret_val = self._vhdutils._get_device_id_by_path(
self._FAKE_VHD_PATH)
self.assertEqual(
ret_val,
vhdutils.VIRTUAL_STORAGE_TYPE_DEVICE_VHD)
def test_get_device_id_by_path_success(self):
self._test_get_device_id_by_path()
def test_get_device_id_by_path_failed(self):
self._test_get_device_id_by_path(get_device_failed=True)
def _test_resize_vhd(self, resize_failed=False):
fake_params = (
vhdutils.Win32_RESIZE_VIRTUAL_DISK_PARAMETERS.return_value)
self._vhdutils._open = mock.Mock(
return_value=self._FAKE_FILE_HANDLE)
self._vhdutils._close = mock.Mock()
vhdutils.virtdisk.ResizeVirtualDisk.return_value = int(
resize_failed)
if resize_failed:
self.assertRaises(exception.VolumeBackendAPIException,
self._vhdutils.resize_vhd,
self._FAKE_VHD_PATH,
self._FAKE_VHD_SIZE)
else:
self._vhdutils.resize_vhd(self._FAKE_VHD_PATH,
self._FAKE_VHD_SIZE)
vhdutils.virtdisk.ResizeVirtualDisk.assert_called_with(
self._FAKE_FILE_HANDLE,
vhdutils.RESIZE_VIRTUAL_DISK_FLAG_NONE,
vhdutils.ctypes.byref(fake_params),
None)
self.assertTrue(self._vhdutils._close.called)
def test_resize_vhd_success(self):
self._test_resize_vhd()
def test_resize_vhd_failed(self):
self._test_resize_vhd(resize_failed=True)
def _test_merge_vhd(self, merge_failed=False):
self._vhdutils._open = mock.Mock(
return_value=self._FAKE_FILE_HANDLE)
self._vhdutils._close = mock.Mock()
fake_open_params = vhdutils.Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V1()
fake_params = vhdutils.Win32_MERGE_VIRTUAL_DISK_PARAMETERS()
vhdutils.virtdisk.MergeVirtualDisk.return_value = int(
merge_failed)
vhdutils.Win32_RESIZE_VIRTUAL_DISK_PARAMETERS.return_value = (
fake_params)
if merge_failed:
self.assertRaises(exception.VolumeBackendAPIException,
self._vhdutils.merge_vhd,
self._FAKE_VHD_PATH)
else:
self._vhdutils.merge_vhd(self._FAKE_VHD_PATH)
self._vhdutils._open.assert_called_once_with(
self._FAKE_VHD_PATH,
open_params=vhdutils.ctypes.byref(fake_open_params))
self.assertEqual(vhdutils.OPEN_VIRTUAL_DISK_VERSION_1,
fake_open_params.Version)
self.assertEqual(2, fake_open_params.RWDepth)
vhdutils.virtdisk.MergeVirtualDisk.assert_called_with(
self._FAKE_FILE_HANDLE,
vhdutils.MERGE_VIRTUAL_DISK_FLAG_NONE,
vhdutils.ctypes.byref(fake_params),
None)
def test_merge_vhd_success(self):
self._test_merge_vhd()
def test_merge_vhd_failed(self):
self._test_merge_vhd(merge_failed=True)
def _test_get_vhd_info_member(self, get_vhd_info_failed=False):
fake_params = vhdutils.Win32_GET_VIRTUAL_DISK_INFO_PARAMETERS()
fake_info_size = vhdutils.ctypes.sizeof(fake_params)
vhdutils.virtdisk.GetVirtualDiskInformation.return_value = (
get_vhd_info_failed)
self._vhdutils._close = mock.Mock()
if get_vhd_info_failed:
self.assertRaises(exception.VolumeBackendAPIException,
self._vhdutils._get_vhd_info_member,
self._FAKE_VHD_PATH,
vhdutils.GET_VIRTUAL_DISK_INFO_SIZE)
else:
self._vhdutils._get_vhd_info_member(
self._FAKE_VHD_PATH,
vhdutils.GET_VIRTUAL_DISK_INFO_SIZE)
vhdutils.virtdisk.GetVirtualDiskInformation.assert_called_with(
self._FAKE_VHD_PATH,
vhdutils.ctypes.byref(
vhdutils.ctypes.c_ulong(fake_info_size)),
vhdutils.ctypes.byref(fake_params), 0)
def test_get_vhd_info_member_success(self):
self._test_get_vhd_info_member()
def test_get_vhd_info_member_failed(self):
self._test_get_vhd_info_member(get_vhd_info_failed=True)
def test_get_vhd_info(self):
fake_vhd_info = {'VirtualSize': self._FAKE_VHD_SIZE}
fake_info_member = vhdutils.GET_VIRTUAL_DISK_INFO_SIZE
self._vhdutils._open = mock.Mock(
return_value=self._FAKE_FILE_HANDLE)
self._vhdutils._close = mock.Mock()
self._vhdutils._get_vhd_info_member = mock.Mock(
return_value=fake_vhd_info)
ret_val = self._vhdutils.get_vhd_info(self._FAKE_VHD_PATH,
[fake_info_member])
self.assertEqual(fake_vhd_info, ret_val)
self._vhdutils._open.assert_called_once_with(
self._FAKE_VHD_PATH,
open_access_mask=vhdutils.VIRTUAL_DISK_ACCESS_GET_INFO)
self._vhdutils._get_vhd_info_member.assert_called_with(
self._FAKE_FILE_HANDLE, fake_info_member)
self._vhdutils._close.assert_called_once_with(self._FAKE_FILE_HANDLE)
def test_parse_vhd_info(self):
fake_physical_size = self._FAKE_VHD_SIZE + 1
fake_info_member = vhdutils.GET_VIRTUAL_DISK_INFO_SIZE
fake_info = mock.Mock()
fake_info.VhdInfo.Size._fields_ = [
("VirtualSize", vhdutils.wintypes.ULARGE_INTEGER),
("PhysicalSize", vhdutils.wintypes.ULARGE_INTEGER)]
fake_info.VhdInfo.Size.VirtualSize = self._FAKE_VHD_SIZE
fake_info.VhdInfo.Size.PhysicalSize = fake_physical_size
ret_val = self._vhdutils._parse_vhd_info(fake_info, fake_info_member)
expected = {'VirtualSize': self._FAKE_VHD_SIZE,
'PhysicalSize': fake_physical_size}
self.assertEqual(expected, ret_val)
def _test_reconnect_parent(self, reconnect_failed=False):
fake_params = vhdutils.Win32_SET_VIRTUAL_DISK_INFO_PARAMETERS()
fake_open_params = vhdutils.Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V2()
self._vhdutils._open = mock.Mock(return_value=self._FAKE_FILE_HANDLE)
self._vhdutils._close = mock.Mock()
vhdutils.virtdisk.SetVirtualDiskInformation.return_value = int(
reconnect_failed)
if reconnect_failed:
self.assertRaises(exception.VolumeBackendAPIException,
self._vhdutils.reconnect_parent,
self._FAKE_VHD_PATH, self._FAKE_DEST_PATH)
else:
self._vhdutils.reconnect_parent(self._FAKE_VHD_PATH,
self._FAKE_DEST_PATH)
self._vhdutils._open.assert_called_once_with(
self._FAKE_VHD_PATH,
open_flag=vhdutils.OPEN_VIRTUAL_DISK_FLAG_NO_PARENTS,
open_access_mask=vhdutils.VIRTUAL_DISK_ACCESS_NONE,
open_params=vhdutils.ctypes.byref(fake_open_params))
self.assertEqual(vhdutils.OPEN_VIRTUAL_DISK_VERSION_2,
fake_open_params.Version)
self.assertFalse(fake_open_params.GetInfoOnly)
vhdutils.virtdisk.SetVirtualDiskInformation.assert_called_once_with(
self._FAKE_FILE_HANDLE, vhdutils.ctypes.byref(fake_params))
self.assertEqual(self._FAKE_DEST_PATH, fake_params.ParentFilePath)
def test_reconnect_parent_success(self):
self._test_reconnect_parent()
def test_reconnect_parent_failed(self):
self._test_reconnect_parent(reconnect_failed=True)
@mock.patch('sys.exc_info')
@mock.patch.object(vhdutils, 'LOG')
def test_run_and_check_output_fails_exc_info_set(self, mock_log,
mock_exc_info):
# we can't use assertRaises because we're mocking sys.exc_info and
# that messes up how assertRaises handles the exception
try:
self._vhdutils._run_and_check_output(lambda *args, **kwargs: 1)
self.fail('Expected _run_and_check_output to fail.')
except exception.VolumeBackendAPIException:
pass
mock_log.error.assert_called_once_with(mock.ANY, exc_info=True)
@mock.patch('sys.exc_info', return_value=None)
@mock.patch.object(vhdutils, 'LOG')
def test_run_and_check_output_fails_exc_info_not_set(self, mock_log,
mock_exc_info):
# we can't use assertRaises because we're mocking sys.exc_info and
# that messes up how assertRaises handles the exception
try:
self._vhdutils._run_and_check_output(lambda *args, **kwargs: 1)
self.fail('Expected _run_and_check_output to fail.')
except exception.VolumeBackendAPIException:
pass
mock_log.error.assert_called_once_with(mock.ANY, exc_info=False)

View File

@ -1,4 +1,5 @@
# Copyright 2012 Pedro Navarro Perez
# Copyright 2015 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -17,389 +18,409 @@
Unit tests for Windows Server 2012 OpenStack Cinder volume driver
"""
import mock
import os
import shutil
import tempfile
from mox3 import mox
from oslo_config import cfg
from oslo_utils import fileutils
from oslo_utils import units
from cinder.image import image_utils
from cinder import test
from cinder.tests.unit.windows import db_fakes
from cinder.volume import configuration as conf
from cinder.volume.drivers.windows import constants
from cinder.volume.drivers.windows import vhdutils
from cinder.volume.drivers.windows import windows
from cinder.volume.drivers.windows import windows_utils
CONF = cfg.CONF
class TestWindowsDriver(test.TestCase):
def __init__(self, method):
super(TestWindowsDriver, self).__init__(method)
def setUp(self):
self.lun_path_tempdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.lun_path_tempdir)
@mock.patch.object(windows, 'utilsfactory')
def setUp(self, mock_utilsfactory):
super(TestWindowsDriver, self).setUp()
self.flags(
windows_iscsi_lun_path=self.lun_path_tempdir,
)
self._setup_stubs()
configuration = conf.Configuration(None)
configuration.append_config_values(windows.windows_opts)
self.flags(windows_iscsi_lun_path=mock.sentinel.iscsi_lun_path)
self.flags(image_conversion_dir=mock.sentinel.image_conversion_dir)
self._driver = windows.WindowsDriver(configuration=configuration)
self._driver.do_setup({})
def _setup_stubs(self):
@mock.patch.object(fileutils, 'ensure_tree')
def test_do_setup(self, mock_ensure_tree):
self._driver.do_setup(mock.sentinel.context)
def fake_wutils__init__(self):
pass
mock_ensure_tree.assert_has_calls(
[mock.call(mock.sentinel.iscsi_lun_path),
mock.call(mock.sentinel.image_conversion_dir)])
windows_utils.WindowsUtils.__init__ = fake_wutils__init__
def test_check_for_setup_error(self):
self._driver.check_for_setup_error()
def fake_local_path(self, volume):
return os.path.join(CONF.windows_iscsi_lun_path,
str(volume['name']) + ".vhd")
self._driver._tgt_utils.get_portal_locations.assert_called_once_with(
available_only=True, fail_if_none_found=True)
def test_check_for_setup_errors(self):
drv = self._driver
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'check_for_setup_error')
windows_utils.WindowsUtils.check_for_setup_error()
@mock.patch.object(windows.WindowsDriver, '_get_target_name')
def test_get_host_information(self, mock_get_target_name):
tgt_utils = self._driver._tgt_utils
self.mox.ReplayAll()
fake_auth_meth = 'CHAP'
fake_chap_username = 'fake_chap_username'
fake_chap_password = 'fake_chap_password'
fake_host_info = {'fake_prop': 'fake_value'}
fake_volume = db_fakes.get_fake_volume_info()
fake_volume['provider_auth'] = "%s %s %s" % (fake_auth_meth,
fake_chap_username,
fake_chap_password)
drv.check_for_setup_error()
mock_get_target_name.return_value = mock.sentinel.target_name
tgt_utils.get_portal_locations.return_value = [
mock.sentinel.portal_location]
tgt_utils.get_target_information.return_value = fake_host_info
def test_create_volume(self):
drv = self._driver
vol = db_fakes.get_fake_volume_info()
expected_host_info = dict(fake_host_info,
auth_method=fake_auth_meth,
auth_username=fake_chap_username,
auth_password=fake_chap_password,
target_discovered=False,
target_portal=mock.sentinel.portal_location,
target_lun=0,
volume_id=fake_volume['id'])
self.stubs.Set(drv, 'local_path', self.fake_local_path)
host_info = self._driver._get_host_information(fake_volume)
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'create_volume')
self.assertEqual(expected_host_info, host_info)
windows_utils.WindowsUtils.create_volume(self.fake_local_path(vol),
vol['name'], vol['size'])
mock_get_target_name.assert_called_once_with(fake_volume)
tgt_utils.get_portal_locations.assert_called_once_with()
tgt_utils.get_target_information.assert_called_once_with(
mock.sentinel.target_name)
self.mox.ReplayAll()
@mock.patch.object(windows.WindowsDriver, '_get_host_information')
def test_initialize_connection(self, mock_get_host_info):
tgt_utils = self._driver._tgt_utils
drv.create_volume(vol)
fake_volume = db_fakes.get_fake_volume_info()
fake_initiator = db_fakes.get_fake_connector_info()
fake_host_info = {'fake_host_prop': 'fake_value'}
def test_delete_volume(self):
"""delete_volume simple test case."""
drv = self._driver
mock_get_host_info.return_value = fake_host_info
vol = db_fakes.get_fake_volume_info()
expected_conn_info = {'driver_volume_type': 'iscsi',
'data': fake_host_info}
conn_info = self._driver.initialize_connection(fake_volume,
fake_initiator)
self.mox.StubOutWithMock(drv, 'local_path')
drv.local_path(vol).AndReturn(self.fake_local_path(vol))
self.assertEqual(expected_conn_info, conn_info)
mock_associate = tgt_utils.associate_initiator_with_iscsi_target
mock_associate.assert_called_once_with(
fake_initiator['initiator'],
fake_volume['provider_location'])
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'delete_volume')
windows_utils.WindowsUtils.delete_volume(vol['name'],
self.fake_local_path(vol))
self.mox.ReplayAll()
def test_terminate_connection(self):
fake_volume = db_fakes.get_fake_volume_info()
fake_initiator = db_fakes.get_fake_connector_info()
drv.delete_volume(vol)
self._driver.terminate_connection(fake_volume, fake_initiator)
self._driver._tgt_utils.deassociate_initiator.assert_called_once_with(
fake_initiator['initiator'], fake_volume['provider_location'])
@mock.patch.object(windows.WindowsDriver, 'local_path')
def test_create_volume(self, mock_local_path):
fake_volume = db_fakes.get_fake_volume_info()
self._driver.create_volume(fake_volume)
mock_local_path.assert_called_once_with(fake_volume)
self._driver._tgt_utils.create_wt_disk.assert_called_once_with(
mock_local_path.return_value,
fake_volume['name'],
size_mb=fake_volume['size'] * 1024)
def test_local_path(self):
fake_volume = db_fakes.get_fake_volume_info()
fake_lun_path = 'fake_lun_path'
self.flags(windows_iscsi_lun_path=fake_lun_path)
disk_format = 'vhd'
mock_get_fmt = self._driver._tgt_utils.get_supported_disk_format
mock_get_fmt.return_value = disk_format
disk_path = self._driver.local_path(fake_volume)
expected_fname = "%s.%s" % (fake_volume['name'], disk_format)
expected_disk_path = os.path.join(fake_lun_path,
expected_fname)
self.assertEqual(expected_disk_path, disk_path)
mock_get_fmt.assert_called_once_with()
@mock.patch.object(windows.WindowsDriver, 'local_path')
@mock.patch.object(fileutils, 'delete_if_exists')
def test_delete_volume(self, mock_delete_if_exists, mock_local_path):
fake_volume = db_fakes.get_fake_volume_info()
self._driver.delete_volume(fake_volume)
mock_local_path.assert_called_once_with(fake_volume)
self._driver._tgt_utils.remove_wt_disk.assert_called_once_with(
fake_volume['name'])
mock_delete_if_exists.assert_called_once_with(
mock_local_path.return_value)
def test_create_snapshot(self):
drv = self._driver
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'create_snapshot')
volume = db_fakes.get_fake_volume_info()
snapshot = db_fakes.get_fake_snapshot_info()
fake_snapshot = db_fakes.get_fake_snapshot_info()
self.stubs.Set(drv, 'local_path', self.fake_local_path(snapshot))
self._driver.create_snapshot(fake_snapshot)
windows_utils.WindowsUtils.create_snapshot(volume['name'],
snapshot['name'])
self._driver._tgt_utils.create_snapshot.assert_called_once_with(
fake_snapshot['volume_name'], fake_snapshot['name'])
self.mox.ReplayAll()
@mock.patch.object(windows.WindowsDriver, 'local_path')
def test_create_volume_from_snapshot(self, mock_local_path):
fake_volume = db_fakes.get_fake_volume_info()
fake_snapshot = db_fakes.get_fake_snapshot_info()
drv.create_snapshot(snapshot)
self._driver.create_volume_from_snapshot(fake_volume, fake_snapshot)
def test_create_volume_from_snapshot(self):
drv = self._driver
snapshot = db_fakes.get_fake_snapshot_info()
volume = db_fakes.get_fake_volume_info()
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'create_volume_from_snapshot')
windows_utils.WindowsUtils.\
create_volume_from_snapshot(volume, snapshot['name'])
self.mox.ReplayAll()
drv.create_volume_from_snapshot(volume, snapshot)
self._driver._tgt_utils.export_snapshot.assert_called_once_with(
fake_snapshot['name'],
mock_local_path.return_value)
self._driver._tgt_utils.import_wt_disk.assert_called_once_with(
mock_local_path.return_value,
fake_volume['name'])
def test_delete_snapshot(self):
drv = self._driver
fake_snapshot = db_fakes.get_fake_snapshot_info()
snapshot = db_fakes.get_fake_snapshot_info()
self._driver.delete_snapshot(fake_snapshot)
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'delete_snapshot')
windows_utils.WindowsUtils.delete_snapshot(snapshot['name'])
self._driver._tgt_utils.delete_snapshot.assert_called_once_with(
fake_snapshot['name'])
self.mox.ReplayAll()
def test_get_target_name(self):
fake_volume = db_fakes.get_fake_volume_info()
expected_target_name = "%s%s" % (
self._driver.configuration.iscsi_target_prefix,
fake_volume['name'])
drv.delete_snapshot(snapshot)
target_name = self._driver._get_target_name(fake_volume)
self.assertEqual(expected_target_name, target_name)
def _test_create_export(self, chap_enabled=False):
drv = self._driver
volume = db_fakes.get_fake_volume_info()
initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
@mock.patch.object(windows.WindowsDriver, '_get_target_name')
@mock.patch.object(windows.utils, 'generate_username')
@mock.patch.object(windows.utils, 'generate_password')
def test_create_export(self, mock_generate_password,
mock_generate_username,
mock_get_target_name):
tgt_utils = self._driver._tgt_utils
fake_volume = db_fakes.get_fake_volume_info()
self._driver.configuration.chap_username = None
self._driver.configuration.chap_password = None
self._driver.configuration.use_chap_auth = True
fake_chap_username = 'fake_chap_username'
fake_chap_password = 'fake_chap_password'
self.flags(use_chap_auth=chap_enabled)
self.flags(chap_username=fake_chap_username)
self.flags(chap_password=fake_chap_password)
mock_get_target_name.return_value = mock.sentinel.target_name
mock_generate_username.return_value = fake_chap_username
mock_generate_password.return_value = fake_chap_password
tgt_utils.iscsi_target_exists.return_value = False
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'add_disk_to_target')
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'create_iscsi_target')
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'set_chap_credentials')
self.mox.StubOutWithMock(self._driver,
'remove_export')
vol_updates = self._driver.create_export(mock.sentinel.context,
fake_volume,
mock.sentinel.connector)
self._driver.remove_export(mox.IgnoreArg(), mox.IgnoreArg())
windows_utils.WindowsUtils.create_iscsi_target(initiator_name)
mock_get_target_name.assert_called_once_with(fake_volume)
tgt_utils.iscsi_target_exists.assert_called_once_with(
mock.sentinel.target_name)
tgt_utils.set_chap_credentials.assert_called_once_with(
mock.sentinel.target_name,
fake_chap_username,
fake_chap_password)
tgt_utils.add_disk_to_target.assert_called_once_with(
fake_volume['name'], mock.sentinel.target_name)
if chap_enabled:
windows_utils.WindowsUtils.set_chap_credentials(
mox.IgnoreArg(),
fake_chap_username,
fake_chap_password)
expected_provider_auth = ' '.join(('CHAP',
fake_chap_username,
fake_chap_password))
expected_vol_updates = dict(
provider_location=mock.sentinel.target_name,
provider_auth=expected_provider_auth)
self.assertEqual(expected_vol_updates, vol_updates)
windows_utils.WindowsUtils.add_disk_to_target(volume['name'],
initiator_name)
@mock.patch.object(windows.WindowsDriver, '_get_target_name')
def test_remove_export(self, mock_get_target_name):
fake_volume = db_fakes.get_fake_volume_info()
self.mox.ReplayAll()
self._driver.remove_export(mock.sentinel.context, fake_volume)
export_info = drv.create_export(None, volume, {})
mock_get_target_name.assert_called_once_with(fake_volume)
self._driver._tgt_utils.delete_iscsi_target.assert_called_once_with(
mock_get_target_name.return_value)
self.assertEqual(initiator_name, export_info['provider_location'])
if chap_enabled:
expected_provider_auth = ' '.join(('CHAP',
fake_chap_username,
fake_chap_password))
self.assertEqual(expected_provider_auth,
export_info['provider_auth'])
@mock.patch.object(windows.WindowsDriver, 'local_path')
@mock.patch.object(image_utils, 'temporary_file')
@mock.patch.object(image_utils, 'fetch_to_vhd')
@mock.patch('os.unlink')
def test_copy_image_to_volume(self, mock_unlink, mock_fetch_to_vhd,
mock_tmp_file, mock_local_path):
tgt_utils = self._driver._tgt_utils
fake_volume = db_fakes.get_fake_volume_info()
def test_create_export_chap_disabled(self):
self._test_create_export()
mock_tmp_file.return_value.__enter__.return_value = (
mock.sentinel.tmp_vhd_path)
mock_local_path.return_value = mock.sentinel.vol_vhd_path
def test_create_export_chap_enabled(self):
self._test_create_export(chap_enabled=True)
self._driver.copy_image_to_volume(mock.sentinel.context,
fake_volume,
mock.sentinel.image_service,
mock.sentinel.image_id)
def test_initialize_connection(self):
drv = self._driver
mock_local_path.assert_called_once_with(fake_volume)
mock_tmp_file.assert_called_once_with(suffix='.vhd')
image_utils.fetch_to_vhd.assert_called_once_with(
mock.sentinel.context, mock.sentinel.image_service,
mock.sentinel.image_id, mock.sentinel.tmp_vhd_path,
self._driver.configuration.volume_dd_blocksize)
volume = db_fakes.get_fake_volume_info()
initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
mock_unlink.assert_called_once_with(mock.sentinel.vol_vhd_path)
self._driver._vhdutils.convert_vhd.assert_called_once_with(
mock.sentinel.tmp_vhd_path,
mock.sentinel.vol_vhd_path,
tgt_utils.get_supported_vhd_type.return_value)
self._driver._vhdutils.resize_vhd.assert_called_once_with(
mock.sentinel.vol_vhd_path,
fake_volume['size'] * units.Gi,
is_file_max_size=False)
connector = db_fakes.get_fake_connector_info()
tgt_utils.change_wt_disk_status.assert_has_calls(
[mock.call(fake_volume['name'], enabled=False),
mock.call(fake_volume['name'], enabled=True)])
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'associate_initiator_with_iscsi_target')
windows_utils.WindowsUtils.associate_initiator_with_iscsi_target(
volume['provider_location'], initiator_name, )
@mock.patch.object(windows.uuidutils, 'generate_uuid')
def test_temporary_snapshot(self, mock_generate_uuid):
tgt_utils = self._driver._tgt_utils
mock_generate_uuid.return_value = mock.sentinel.snap_uuid
expected_snap_name = '%s-tmp-snapshot-%s' % (
mock.sentinel.volume_name, mock.sentinel.snap_uuid)
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'get_host_information')
windows_utils.WindowsUtils.get_host_information(
volume, volume['provider_location'])
with self._driver._temporary_snapshot(
mock.sentinel.volume_name) as snap_name:
self.assertEqual(expected_snap_name, snap_name)
tgt_utils.create_snapshot.assert_called_once_with(
mock.sentinel.volume_name, expected_snap_name)
self.mox.ReplayAll()
tgt_utils.delete_snapshot.assert_called_once_with(
expected_snap_name)
drv.initialize_connection(volume, connector)
@mock.patch.object(windows.WindowsDriver, '_temporary_snapshot')
@mock.patch.object(image_utils, 'upload_volume')
@mock.patch.object(fileutils, 'delete_if_exists')
def test_copy_volume_to_image(self, mock_delete_if_exists,
mock_upload_volume,
mock_tmp_snap):
tgt_utils = self._driver._tgt_utils
def test_terminate_connection(self):
drv = self._driver
disk_format = 'vhd'
fake_image_meta = db_fakes.get_fake_image_meta()
fake_volume = db_fakes.get_fake_volume_info()
fake_img_conv_dir = 'fake_img_conv_dir'
self._driver.configuration.image_conversion_dir = fake_img_conv_dir
volume = db_fakes.get_fake_volume_info()
initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])
connector = db_fakes.get_fake_connector_info()
tgt_utils.get_supported_disk_format.return_value = disk_format
mock_tmp_snap.return_value.__enter__.return_value = (
mock.sentinel.tmp_snap_name)
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'delete_iscsi_target')
windows_utils.WindowsUtils.delete_iscsi_target(
initiator_name, volume['provider_location'])
expected_tmp_vhd_path = os.path.join(
fake_img_conv_dir,
fake_image_meta['id'] + '.' + disk_format)
self.mox.ReplayAll()
self._driver.copy_volume_to_image(
mock.sentinel.context, fake_volume,
mock.sentinel.image_service,
fake_image_meta)
drv.terminate_connection(volume, connector)
mock_tmp_snap.assert_called_once_with(fake_volume['name'])
tgt_utils.export_snapshot.assert_called_once_with(
mock.sentinel.tmp_snap_name,
expected_tmp_vhd_path)
mock_upload_volume.assert_called_once_with(
mock.sentinel.context, mock.sentinel.image_service,
fake_image_meta, expected_tmp_vhd_path, 'vhd')
mock_delete_if_exists.assert_called_once_with(
expected_tmp_vhd_path)
def test_remove_export(self):
drv = self._driver
@mock.patch.object(windows.WindowsDriver, '_temporary_snapshot')
@mock.patch.object(windows.WindowsDriver, 'local_path')
def test_create_cloned_volume(self, mock_local_path,
mock_tmp_snap):
tgt_utils = self._driver._tgt_utils
volume = db_fakes.get_fake_volume_info()
fake_volume = db_fakes.get_fake_volume_info()
fake_src_volume = db_fakes.get_fake_volume_info_cloned()
target_name = volume['provider_location']
mock_tmp_snap.return_value.__enter__.return_value = (
mock.sentinel.tmp_snap_name)
mock_local_path.return_value = mock.sentinel.vol_vhd_path
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'remove_iscsi_target')
windows_utils.WindowsUtils.remove_iscsi_target(target_name)
self._driver.create_cloned_volume(fake_volume, fake_src_volume)
self.mox.ReplayAll()
mock_tmp_snap.assert_called_once_with(fake_src_volume['name'])
tgt_utils.export_snapshot.assert_called_once_with(
mock.sentinel.tmp_snap_name,
mock.sentinel.vol_vhd_path)
self._driver._vhdutils.resize_vhd.assert_called_once_with(
mock.sentinel.vol_vhd_path, fake_volume['size'] * units.Gi,
is_file_max_size=False)
tgt_utils.import_wt_disk.assert_called_once_with(
mock.sentinel.vol_vhd_path, fake_volume['name'])
drv.remove_export(None, volume)
@mock.patch('os.path.splitdrive')
def test_get_capacity_info(self, mock_splitdrive):
mock_splitdrive.return_value = (mock.sentinel.drive,
mock.sentinel.path_tail)
fake_size_gb = 2
fake_free_space_gb = 1
self._driver._hostutils.get_volume_info.return_value = (
fake_size_gb * units.Gi,
fake_free_space_gb * units.Gi)
def test_copy_image_to_volume(self):
"""resize_image common case usage."""
drv = self._driver
total_gb, free_gb = self._driver._get_capacity_info()
volume = db_fakes.get_fake_volume_info()
self.assertEqual(fake_size_gb, total_gb)
self.assertEqual(fake_free_space_gb, free_gb)
fake_get_supported_type = lambda x: constants.VHD_TYPE_FIXED
self.stubs.Set(drv, 'local_path', self.fake_local_path)
self.stubs.Set(windows_utils.WindowsUtils, 'get_supported_vhd_type',
fake_get_supported_type)
self._driver._hostutils.get_volume_info.assert_called_once_with(
mock.sentinel.drive)
mock_splitdrive.assert_called_once_with(
mock.sentinel.iscsi_lun_path)
self.mox.StubOutWithMock(os, 'unlink')
self.mox.StubOutWithMock(image_utils, 'create_temporary_file')
self.mox.StubOutWithMock(image_utils, 'fetch_to_vhd')
self.mox.StubOutWithMock(vhdutils.VHDUtils, 'convert_vhd')
self.mox.StubOutWithMock(vhdutils.VHDUtils, 'resize_vhd')
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'change_disk_status')
@mock.patch.object(windows.WindowsDriver, '_get_capacity_info')
def test_update_volume_stats(self, mock_get_capacity_info):
mock_get_capacity_info.return_value = (
mock.sentinel.size_gb,
mock.sentinel.free_space_gb)
fake_temp_path = r'C:\fake\temp\file'
if (CONF.image_conversion_dir and not
os.path.exists(CONF.image_conversion_dir)):
os.makedirs(CONF.image_conversion_dir)
image_utils.create_temporary_file(suffix='.vhd').AndReturn(
fake_temp_path)
self.flags(volume_backend_name=mock.sentinel.backend_name)
self.flags(reserved_percentage=mock.sentinel.reserved_percentage)
fake_volume_path = self.fake_local_path(volume)
expected_volume_stats = dict(
volume_backend_name=mock.sentinel.backend_name,
vendor_name='Microsoft',
driver_version=self._driver.VERSION,
storage_protocol='iSCSI',
total_capacity_gb=mock.sentinel.size_gb,
free_capacity_gb=mock.sentinel.free_space_gb,
reserved_percentage=mock.sentinel.reserved_percentage,
QoS_support=False)
image_utils.fetch_to_vhd(None, None, None,
fake_temp_path,
mox.IgnoreArg())
windows_utils.WindowsUtils.change_disk_status(volume['name'],
mox.IsA(bool))
vhdutils.VHDUtils.convert_vhd(fake_temp_path,
fake_volume_path,
constants.VHD_TYPE_FIXED)
vhdutils.VHDUtils.resize_vhd(fake_volume_path,
volume['size'] << 30)
windows_utils.WindowsUtils.change_disk_status(volume['name'],
mox.IsA(bool))
os.unlink(mox.IsA(str))
self.mox.ReplayAll()
drv.copy_image_to_volume(None, volume, None, None)
def _test_copy_volume_to_image(self, supported_format):
drv = self._driver
vol = db_fakes.get_fake_volume_info()
image_meta = db_fakes.get_fake_image_meta()
fake_get_supported_format = lambda x: supported_format
self.stubs.Set(os.path, 'exists', lambda x: False)
self.stubs.Set(drv, 'local_path', self.fake_local_path)
self.stubs.Set(windows_utils.WindowsUtils, 'get_supported_format',
fake_get_supported_format)
self.mox.StubOutWithMock(fileutils, 'ensure_tree')
self.mox.StubOutWithMock(fileutils, 'delete_if_exists')
self.mox.StubOutWithMock(image_utils, 'upload_volume')
self.mox.StubOutWithMock(windows_utils.WindowsUtils, 'copy_vhd_disk')
self.mox.StubOutWithMock(vhdutils.VHDUtils, 'convert_vhd')
fileutils.ensure_tree(CONF.image_conversion_dir)
temp_vhd_path = os.path.join(CONF.image_conversion_dir,
str(image_meta['id']) + "." +
supported_format)
upload_image = temp_vhd_path
windows_utils.WindowsUtils.copy_vhd_disk(self.fake_local_path(vol),
temp_vhd_path)
if supported_format == 'vhdx':
upload_image = upload_image[:-1]
vhdutils.VHDUtils.convert_vhd(temp_vhd_path, upload_image,
constants.VHD_TYPE_DYNAMIC)
image_utils.upload_volume(None, None, image_meta, upload_image, 'vhd')
fileutils.delete_if_exists(temp_vhd_path)
fileutils.delete_if_exists(upload_image)
self.mox.ReplayAll()
drv.copy_volume_to_image(None, vol, None, image_meta)
def test_copy_volume_to_image_using_vhd(self):
self._test_copy_volume_to_image('vhd')
def test_copy_volume_to_image_using_vhdx(self):
self._test_copy_volume_to_image('vhdx')
def test_create_cloned_volume(self):
drv = self._driver
volume = db_fakes.get_fake_volume_info()
volume_cloned = db_fakes.get_fake_volume_info_cloned()
new_vhd_path = self.fake_local_path(volume)
src_vhd_path = self.fake_local_path(volume_cloned)
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'copy_vhd_disk')
self.mox.StubOutWithMock(windows_utils.WindowsUtils,
'import_wt_disk')
self.mox.StubOutWithMock(vhdutils.VHDUtils,
'resize_vhd')
self.stubs.Set(drv.utils,
'is_resize_needed',
lambda vhd_path, new_size, old_size: True)
self.stubs.Set(drv, 'local_path', self.fake_local_path)
windows_utils.WindowsUtils.copy_vhd_disk(src_vhd_path,
new_vhd_path)
drv.utils.is_resize_needed(new_vhd_path,
volume['size'],
volume_cloned['size'])
vhdutils.VHDUtils.resize_vhd(new_vhd_path, volume['size'] << 30)
windows_utils.WindowsUtils.import_wt_disk(new_vhd_path,
volume['name'])
self.mox.ReplayAll()
drv.create_cloned_volume(volume, volume_cloned)
self._driver._update_volume_stats()
self.assertEqual(expected_volume_stats,
self._driver._stats)
def test_extend_volume(self):
drv = self._driver
fake_volume = db_fakes.get_fake_volume_info()
new_size_gb = 2
expected_additional_sz_mb = 1024
volume = db_fakes.get_fake_volume_info()
self._driver.extend_volume(fake_volume, new_size_gb)
TEST_VOLUME_ADDITIONAL_SIZE_MB = 1024
TEST_VOLUME_ADDITIONAL_SIZE_GB = 1
self.mox.StubOutWithMock(windows_utils.WindowsUtils, 'extend')
windows_utils.WindowsUtils.extend(volume['name'],
TEST_VOLUME_ADDITIONAL_SIZE_MB)
new_size = volume['size'] + TEST_VOLUME_ADDITIONAL_SIZE_GB
self.mox.ReplayAll()
drv.extend_volume(volume, new_size)
self._driver._tgt_utils.extend_wt_disk.assert_called_once_with(
fake_volume['name'], expected_additional_sz_mb)

View File

@ -12,9 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import os
import mock
from cinder import exception
@ -23,142 +20,57 @@ from cinder.volume.drivers.windows import remotefs
class WindowsRemoteFsTestCase(test.TestCase):
_FAKE_SHARE = '//1.2.3.4/share1'
_FAKE_MNT_BASE = 'C:\OpenStack\mnt'
_FAKE_HASH = 'db0bf952c1734092b83e8990bd321131'
_FAKE_MNT_POINT = os.path.join(_FAKE_MNT_BASE, _FAKE_HASH)
_FAKE_SHARE_OPTS = '-o username=Administrator,password=12345'
_FAKE_OPTIONS_DICT = {'username': 'Administrator',
'password': '12345'}
def setUp(self):
super(WindowsRemoteFsTestCase, self).setUp()
remotefs.ctypes.windll = mock.MagicMock()
remotefs.WindowsRemoteFsClient.__init__ = mock.Mock(return_value=None)
with mock.patch.object(remotefs.WindowsRemoteFsClient,
'__init__', lambda x: None):
self._remotefs = remotefs.WindowsRemoteFsClient()
self._remotefs = remotefs.WindowsRemoteFsClient(
'cifs', root_helper=None,
smbfs_mount_point_base=self._FAKE_MNT_BASE)
self._remotefs._mount_base = self._FAKE_MNT_BASE
self._remotefs.smb_conn = mock.MagicMock()
self._remotefs.conn_cimv2 = mock.MagicMock()
self._remotefs._mount_base = mock.sentinel.mnt_base
self._remotefs._smbutils = mock.Mock()
self._remotefs._pathutils = mock.Mock()
def _test_mount_share(self, mount_point_exists=True, is_symlink=True,
mount_base_exists=True):
fake_exists = mock.Mock(return_value=mount_point_exists)
fake_isdir = mock.Mock(return_value=mount_base_exists)
fake_makedirs = mock.Mock()
with mock.patch.multiple('os.path', exists=fake_exists,
isdir=fake_isdir):
with mock.patch('os.makedirs', fake_makedirs):
self._remotefs.is_symlink = mock.Mock(
return_value=is_symlink)
self._remotefs.create_sym_link = mock.MagicMock()
self._remotefs._mount = mock.MagicMock()
fake_norm_path = os.path.abspath(self._FAKE_SHARE)
@mock.patch('os.path.isdir')
@mock.patch('os.makedirs')
@mock.patch('os.path.exists')
@mock.patch('os.path.abspath')
@mock.patch.object(remotefs.WindowsRemoteFsClient, 'get_mount_point')
def _test_mount_share(self, mock_get_mnt_point, mock_abspath,
mock_path_exists, mock_makedirs, mock_isdir,
mnt_point_exists=False, is_mnt_point_slink=True):
mount_options = dict(username=mock.sentinel.username,
password=mock.sentinel.password)
mock_isdir.return_value = False
mock_get_mnt_point.return_value = mock.sentinel.mnt_point
mock_abspath.return_value = mock.sentinel.norm_export_path
mock_path_exists.return_value = mnt_point_exists
if mount_point_exists:
if not is_symlink:
self.assertRaises(exception.SmbfsException,
self._remotefs.mount,
self._FAKE_MNT_POINT,
self._FAKE_OPTIONS_DICT)
else:
self._remotefs.mount(self._FAKE_SHARE,
self._FAKE_OPTIONS_DICT)
if not mount_base_exists:
fake_makedirs.assert_called_once_with(
self._FAKE_MNT_BASE)
self._remotefs._mount.assert_called_once_with(
fake_norm_path, self._FAKE_OPTIONS_DICT)
self._remotefs.create_sym_link.assert_called_once_with(
self._FAKE_MNT_POINT, fake_norm_path)
self._remotefs._pathutils.is_symlink.return_value = is_mnt_point_slink
self._remotefs._smbutils.check_smb_mapping.return_value = False
def test_mount_linked_share(self):
# The mountpoint contains a symlink targeting the share path
self._test_mount_share(True)
if mnt_point_exists and not is_mnt_point_slink:
self.assertRaises(exception.SmbfsException,
self._remotefs.mount,
mock.sentinel.export_path,
mount_options)
else:
self._remotefs.mount(mock.sentinel.export_path, mount_options)
def test_mount_unlinked_share(self):
self._test_mount_share(False)
mock_makedirs.assert_called_once_with(mock.sentinel.mnt_base)
mock_get_mnt_point.assert_called_once_with(mock.sentinel.export_path)
self._remotefs._smbutils.check_smb_mapping.assert_called_once_with(
mock.sentinel.norm_export_path, remove_unavailable_mapping=True)
self._remotefs._smbutils.mount_smb_share.assert_called_once_with(
mock.sentinel.norm_export_path, **mount_options)
def test_mount_point_exception(self):
# The mountpoint already exists but it is not a symlink
self._test_mount_share(True, False)
if not mnt_point_exists:
self._remotefs._pathutils.create_sym_link.assert_called_once_with(
mock.sentinel.mnt_point, mock.sentinel.norm_export_path)
def test_mount_base_missing(self):
# The mount point base dir does not exist
self._test_mount_share(mount_base_exists=False)
def test_mount_share(self):
self._test_mount_share()
def _test_check_symlink(self, is_symlink=True, python_version=(2, 7),
is_dir=True):
fake_isdir = mock.Mock(return_value=is_dir)
fake_islink = mock.Mock(return_value=is_symlink)
with mock.patch('sys.version_info', python_version):
with mock.patch.multiple('os.path', isdir=fake_isdir,
islink=fake_islink):
if is_symlink:
ret_value = 0x400
else:
ret_value = 0x80
fake_get_attributes = mock.Mock(return_value=ret_value)
ctypes.windll.kernel32.GetFileAttributesW = fake_get_attributes
ret_value = self._remotefs.is_symlink(self._FAKE_MNT_POINT)
if python_version >= (3, 2):
fake_islink.assert_called_once_with(self._FAKE_MNT_POINT)
else:
fake_get_attributes.assert_called_once_with(
self._FAKE_MNT_POINT)
self.assertEqual(ret_value, is_symlink)
def test_is_symlink(self):
self._test_check_symlink()
def test_is_not_symlink(self):
self._test_check_symlink(False)
def test_check_symlink_python_gt_3_2(self):
self._test_check_symlink(python_version=(3, 3))
def test_create_sym_link_exception(self):
ctypes.windll.kernel32.CreateSymbolicLinkW.return_value = 0
self.assertRaises(exception.VolumeBackendAPIException,
self._remotefs.create_sym_link,
self._FAKE_MNT_POINT, self._FAKE_SHARE)
def _test_check_smb_mapping(self, existing_mappings=False,
share_available=False):
with mock.patch('os.path.exists', lambda x: share_available):
fake_mapping = mock.MagicMock()
if existing_mappings:
fake_mappings = [fake_mapping]
else:
fake_mappings = []
self._remotefs.smb_conn.query.return_value = fake_mappings
ret_val = self._remotefs.check_smb_mapping(self._FAKE_SHARE)
if existing_mappings:
if share_available:
self.assertTrue(ret_val)
else:
fake_mapping.Remove.assert_called_once_with(True, True)
else:
self.assertFalse(ret_val)
def test_check_mapping(self):
self._test_check_smb_mapping()
def test_remake_unavailable_mapping(self):
self._test_check_smb_mapping(True, False)
def test_available_mapping(self):
self._test_check_smb_mapping(True, True)
def test_mount_smb(self):
fake_create = self._remotefs.smb_conn.Msft_SmbMapping.Create
self._remotefs._mount(self._FAKE_SHARE, {})
fake_create.assert_called_once_with(UserName=None,
Password=None,
RemotePath=self._FAKE_SHARE)
def test_mount_share_existing_mnt_point_not_symlink(self):
self._test_mount_share(mnt_point_exists=True,
is_mnt_point_slink=False)

View File

@ -1,147 +0,0 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cinder import exception
from cinder import test
from cinder.volume.drivers.windows import windows_utils
class WindowsUtilsTestCase(test.TestCase):
def setUp(self):
super(WindowsUtilsTestCase, self).setUp()
windows_utils.WindowsUtils.__init__ = lambda x: None
self.wutils = windows_utils.WindowsUtils()
self.wutils._conn_wmi = mock.Mock()
self.wutils._conn_cimv2 = mock.MagicMock()
@mock.patch.object(windows_utils.WindowsUtils, 'get_windows_version')
def test_check_min_windows_version(self, mock_get_win_version):
required_win_version = [6, 4]
actual_win_version = '6.3.0'
mock_get_win_version.return_value = actual_win_version
self.assertFalse(self.wutils.check_min_windows_version(
*required_win_version))
def _test_copy_vhd_disk(self, source_exists=True, copy_failed=False):
fake_data_file_object = mock.MagicMock()
fake_data_file_object.Copy.return_value = [int(copy_failed)]
fake_vhd_list = [fake_data_file_object] if source_exists else []
mock_query = mock.Mock(return_value=fake_vhd_list)
self.wutils._conn_cimv2.query = mock_query
if not source_exists or copy_failed:
self.assertRaises(exception.VolumeBackendAPIException,
self.wutils.copy_vhd_disk,
mock.sentinel.src,
mock.sentinel.dest)
else:
self.wutils.copy_vhd_disk(mock.sentinel.src, mock.sentinel.dest)
expected_query = (
"Select * from CIM_DataFile where Name = '%s'" %
mock.sentinel.src)
mock_query.assert_called_once_with(expected_query)
fake_data_file_object.Copy.assert_called_with(
mock.sentinel.dest)
def test_copy_vhd_disk(self):
self._test_copy_vhd_disk()
def test_copy_vhd_disk_invalid_source(self):
self._test_copy_vhd_disk(source_exists=False)
def test_copy_vhd_disk_copy_failed(self):
self._test_copy_vhd_disk(copy_failed=True)
@mock.patch.object(windows_utils, 'wmi', create=True)
def test_import_wt_disk_exception(self, mock_wmi):
mock_wmi.x_wmi = Exception
mock_import_disk = self.wutils._conn_wmi.WT_Disk.ImportWTDisk
mock_import_disk.side_effect = mock_wmi.x_wmi
self.assertRaises(exception.VolumeBackendAPIException,
self.wutils.import_wt_disk,
mock.sentinel.vhd_path,
mock.sentinel.vol_name)
mock_import_disk.assert_called_once_with(
DevicePath=mock.sentinel.vhd_path,
Description=mock.sentinel.vol_name)
def test_check_if_resize_is_needed_bigger_requested_size(self):
ret_val = self.wutils.is_resize_needed(
mock.sentinel.vhd_path, 1, 0)
self.assertTrue(ret_val)
def test_check_if_resize_is_needed_equal_requested_size(self):
ret_val = self.wutils.is_resize_needed(
mock.sentinel.vhd_path, 1, 1)
self.assertFalse(ret_val)
def test_check_if_resize_is_needed_smaller_requested_size(self):
self.assertRaises(
exception.VolumeBackendAPIException,
self.wutils.is_resize_needed,
mock.sentinel.vhd_path, 1, 2)
@mock.patch.object(windows_utils.WindowsUtils, '_wmi_obj_set_attr')
@mock.patch.object(windows_utils, 'wmi', create=True)
def test_set_chap_credentials(self, mock_wmi, mock_set_attr):
mock_wt_host = mock.Mock()
mock_wt_host_class = self.wutils._conn_wmi.WT_Host
mock_wt_host_class.return_value = [mock_wt_host]
self.wutils.set_chap_credentials(mock.sentinel.target_name,
mock.sentinel.chap_username,
mock.sentinel.chap_password)
mock_wt_host_class.assert_called_once_with(
HostName=mock.sentinel.target_name)
mock_set_attr.assert_has_calls([
mock.call(mock_wt_host, 'EnableCHAP', True),
mock.call(mock_wt_host, 'CHAPUserName',
mock.sentinel.chap_username),
mock.call(mock_wt_host, 'CHAPSecret',
mock.sentinel.chap_password)])
mock_wt_host.put.assert_called_once_with()
@mock.patch.object(windows_utils.WindowsUtils, '_wmi_obj_set_attr')
@mock.patch.object(windows_utils, 'wmi', create=True)
def test_set_chap_credentials_exc(self, mock_wmi, mock_set_attr):
mock_wmi.x_wmi = Exception
mock_set_attr.side_effect = mock_wmi.x_wmi
self.assertRaises(exception.VolumeBackendAPIException,
self.wutils.set_chap_credentials,
mock.sentinel.target_name,
mock.sentinel.chap_username,
mock.sentinel.chap_password)
def test_set_wmi_obj_attr(self):
wmi_obj = mock.Mock()
wmi_property_method = wmi_obj.wmi_property
wmi_property = wmi_obj.wmi_property.return_value
self.wutils._wmi_obj_set_attr(wmi_obj,
mock.sentinel.key,
mock.sentinel.value)
wmi_property_method.assert_called_once_with(mock.sentinel.key)
wmi_property.set.assert_called_once_with(mock.sentinel.value)

View File

@ -12,131 +12,44 @@
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import os
import sys
if sys.platform == 'win32':
import wmi
from os_brick.remotefs import remotefs
from oslo_log import log as logging
import six
from os_win import utilsfactory
from cinder import exception
from cinder.i18n import _, _LE, _LI
LOG = logging.getLogger(__name__)
from cinder.i18n import _
class WindowsRemoteFsClient(remotefs.RemoteFsClient):
_FILE_ATTRIBUTE_REPARSE_POINT = 0x0400
def __init__(self, *args, **kwargs):
super(WindowsRemoteFsClient, self).__init__(*args, **kwargs)
self.smb_conn = wmi.WMI(moniker='root/Microsoft/Windows/SMB')
self.conn_cimv2 = wmi.WMI(moniker='root/cimv2')
self._smbutils = utilsfactory.get_smbutils()
self._pathutils = utilsfactory.get_pathutils()
def mount(self, export_path, mnt_options=None):
if not os.path.isdir(self._mount_base):
os.makedirs(self._mount_base)
export_hash = self._get_hash_str(export_path)
mnt_point = self.get_mount_point(export_path)
norm_path = os.path.abspath(export_path)
mnt_options = mnt_options or {}
if not self.check_smb_mapping(norm_path):
self._mount(norm_path, mnt_options)
username = (mnt_options.get('username') or
mnt_options.get('user'))
password = (mnt_options.get('password') or
mnt_options.get('pass'))
link_path = os.path.join(self._mount_base, export_hash)
if os.path.exists(link_path):
if not self.is_symlink(link_path):
if not self._smbutils.check_smb_mapping(
norm_path,
remove_unavailable_mapping=True):
self._smbutils.mount_smb_share(norm_path,
username=username,
password=password)
if os.path.exists(mnt_point):
if not self._pathutils.is_symlink(mnt_point):
raise exception.SmbfsException(_("Link path already exists "
"and its not a symlink"))
else:
self.create_sym_link(link_path, norm_path)
def is_symlink(self, path):
if sys.version_info >= (3, 2):
return os.path.islink(path)
file_attr = ctypes.windll.kernel32.GetFileAttributesW(
six.text_type(path))
return bool(os.path.isdir(path) and (
file_attr & self._FILE_ATTRIBUTE_REPARSE_POINT))
def create_sym_link(self, link, target, target_is_dir=True):
"""If target_is_dir is True, a junction will be created.
NOTE: Juctions only work on same filesystem.
"""
symlink = ctypes.windll.kernel32.CreateSymbolicLinkW
symlink.argtypes = (
ctypes.c_wchar_p,
ctypes.c_wchar_p,
ctypes.c_ulong,
)
symlink.restype = ctypes.c_ubyte
retcode = symlink(link, target, target_is_dir)
if retcode == 0:
err_msg = (_("Could not create symbolic link. "
"Link: %(link)s Target %(target)s")
% {'link': link, 'target': target})
raise exception.VolumeBackendAPIException(err_msg)
def check_smb_mapping(self, smbfs_share):
mappings = self.smb_conn.query("SELECT * FROM "
"MSFT_SmbMapping "
"WHERE RemotePath='%s'" %
smbfs_share)
if len(mappings) > 0:
if os.path.exists(smbfs_share):
LOG.debug('Share already mounted: %s', smbfs_share)
return True
else:
LOG.debug('Share exists but is unavailable: %s ', smbfs_share)
for mapping in mappings:
# Due to a bug in the WMI module, getting the output of
# methods returning None will raise an AttributeError
try:
mapping.Remove(True, True)
except AttributeError:
pass
return False
def _mount(self, smbfs_share, options):
smb_opts = {'RemotePath': smbfs_share}
smb_opts['UserName'] = (options.get('username') or
options.get('user'))
smb_opts['Password'] = (options.get('password') or
options.get('pass'))
try:
LOG.info(_LI('Mounting share: %s'), smbfs_share)
self.smb_conn.Msft_SmbMapping.Create(**smb_opts)
except wmi.x_wmi as exc:
err_msg = (_(
'Unable to mount SMBFS share: %(smbfs_share)s '
'WMI exception: %(wmi_exc)s'
'Options: %(options)s') % {'smbfs_share': smbfs_share,
'options': smb_opts,
'wmi_exc': six.text_type(exc)})
raise exception.VolumeBackendAPIException(data=err_msg)
def get_capacity_info(self, smbfs_share):
norm_path = os.path.abspath(smbfs_share)
kernel32 = ctypes.windll.kernel32
free_bytes = ctypes.c_ulonglong(0)
total_bytes = ctypes.c_ulonglong(0)
retcode = kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(norm_path),
None,
ctypes.pointer(total_bytes),
ctypes.pointer(free_bytes))
if retcode == 0:
LOG.error(_LE("Could not get share %s capacity info."),
smbfs_share)
return 0, 0
return total_bytes.value, free_bytes.value
self._pathutils.create_sym_link(mnt_point, norm_path)

View File

@ -17,6 +17,7 @@
import os
import sys
from os_win import utilsfactory
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import fileutils
@ -28,8 +29,6 @@ from cinder.image import image_utils
from cinder.volume.drivers import remotefs as remotefs_drv
from cinder.volume.drivers import smbfs
from cinder.volume.drivers.windows import remotefs
from cinder.volume.drivers.windows import vhdutils
from cinder.volume.drivers.windows import windows_utils
VERSION = '1.1.0'
@ -58,8 +57,10 @@ class WindowsSmbfsDriver(smbfs.SmbfsDriver):
self._remotefsclient = remotefs.WindowsRemoteFsClient(
'cifs', root_helper=None, smbfs_mount_point_base=self.base,
smbfs_mount_options=opts)
self.vhdutils = vhdutils.VHDUtils()
self._windows_utils = windows_utils.WindowsUtils()
self._vhdutils = utilsfactory.get_vhdutils()
self._pathutils = utilsfactory.get_pathutils()
self._smbutils = utilsfactory.get_smbutils()
def do_setup(self, context):
self._check_os_platform()
@ -85,7 +86,7 @@ class WindowsSmbfsDriver(smbfs.SmbfsDriver):
err_msg = _("Unsupported volume format: %s ") % volume_format
raise exception.InvalidVolume(err_msg)
self.vhdutils.create_dynamic_vhd(volume_path, volume_size_bytes)
self._vhdutils.create_dynamic_vhd(volume_path, volume_size_bytes)
def _ensure_share_mounted(self, smbfs_share):
mnt_options = {}
@ -102,7 +103,7 @@ class WindowsSmbfsDriver(smbfs.SmbfsDriver):
:param smbfs_share: example //172.18.194.100/var/smbfs
"""
total_size, total_available = self._remotefsclient.get_capacity_info(
total_size, total_available = self._smbutils.get_share_capacity_info(
smbfs_share)
total_allocated = self._get_total_allocated(smbfs_share)
return_value = [total_size, total_available, total_allocated]
@ -113,21 +114,20 @@ class WindowsSmbfsDriver(smbfs.SmbfsDriver):
return [float(x) for x in return_value]
def _img_commit(self, snapshot_path):
self.vhdutils.merge_vhd(snapshot_path)
self._delete(snapshot_path)
self._vhdutils.merge_vhd(snapshot_path)
def _rebase_img(self, image, backing_file, volume_format):
# Relative path names are not supported in this case.
image_dir = os.path.dirname(image)
backing_file_path = os.path.join(image_dir, backing_file)
self.vhdutils.reconnect_parent(image, backing_file_path)
self._vhdutils.reconnect_parent_vhd(image, backing_file_path)
def _qemu_img_info(self, path, volume_name=None):
# This code expects to deal only with relative filenames.
# As this method is needed by the upper class and qemu-img does
# not fully support vhdx images, for the moment we'll use Win32 API
# for retrieving image information.
parent_path = self.vhdutils.get_vhd_parent_path(path)
parent_path = self._vhdutils.get_vhd_parent_path(path)
file_format = os.path.splitext(path)[1][1:].lower()
if parent_path:
@ -148,11 +148,11 @@ class WindowsSmbfsDriver(smbfs.SmbfsDriver):
backing_file_full_path = os.path.join(
self._local_volume_dir(snapshot['volume']),
backing_file)
self.vhdutils.create_differencing_vhd(new_snap_path,
backing_file_full_path)
self._vhdutils.create_differencing_vhd(new_snap_path,
backing_file_full_path)
def _do_extend_volume(self, volume_path, size_gb, volume_name=None):
self.vhdutils.resize_vhd(volume_path, size_gb * units.Gi)
self._vhdutils.resize_vhd(volume_path, size_gb * units.Gi)
@remotefs_drv.locked_volume_id_operation
def copy_volume_to_image(self, context, volume, image_service, image_meta):
@ -163,7 +163,7 @@ class WindowsSmbfsDriver(smbfs.SmbfsDriver):
active_file = self.get_active_image_from_info(volume)
active_file_path = os.path.join(self._local_volume_dir(volume),
active_file)
backing_file = self.vhdutils.get_vhd_parent_path(active_file_path)
backing_file = self._vhdutils.get_vhd_parent_path(active_file_path)
root_file_fmt = self.get_volume_format(volume)
temp_path = None
@ -177,7 +177,7 @@ class WindowsSmbfsDriver(smbfs.SmbfsDriver):
temp_path = os.path.join(self._local_volume_dir(volume),
temp_file_name)
self.vhdutils.convert_vhd(active_file_path, temp_path)
self._vhdutils.convert_vhd(active_file_path, temp_path)
upload_path = temp_path
else:
upload_path = active_file_path
@ -202,7 +202,8 @@ class WindowsSmbfsDriver(smbfs.SmbfsDriver):
volume_path, volume_format,
self.configuration.volume_dd_blocksize)
self._extend_vhd_if_needed(self.local_path(volume), volume['size'])
self._vhdutils.resize_vhd(self.local_path(volume),
volume['size'] * units.Gi)
def _copy_volume_from_snapshot(self, snapshot, volume, volume_size):
"""Copy data from snapshot to destination volume."""
@ -227,16 +228,6 @@ class WindowsSmbfsDriver(smbfs.SmbfsDriver):
volume_path = self.local_path(volume)
self._delete(volume_path)
self.vhdutils.convert_vhd(snapshot_path,
volume_path)
self._extend_vhd_if_needed(volume_path, volume_size)
def _extend_vhd_if_needed(self, vhd_path, new_size_gb):
old_size_bytes = self.vhdutils.get_vhd_size(vhd_path)['VirtualSize']
new_size_bytes = new_size_gb * units.Gi
# This also ensures we're not attempting to shrink the image.
is_resize_needed = self._windows_utils.is_resize_needed(
vhd_path, new_size_bytes, old_size_bytes)
if is_resize_needed:
self.vhdutils.resize_vhd(vhd_path, new_size_bytes)
self._vhdutils.convert_vhd(snapshot_path,
volume_path)
self._vhdutils.resize_vhd(volume_path, volume_size * units.Gi)

View File

@ -1,458 +0,0 @@
# Copyright 2014 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Utility class for VHD related operations.
Official VHD format specs can be retrieved at:
http://technet.microsoft.com/en-us/library/bb676673.aspx
See "Download the Specifications Without Registering"
Official VHDX format specs can be retrieved at:
http://www.microsoft.com/en-us/download/details.aspx?id=34750
VHD related Win32 API reference:
http://msdn.microsoft.com/en-us/library/windows/desktop/dd323700.aspx
"""
import ctypes
import os
import sys
if os.name == 'nt':
from ctypes import wintypes
kernel32 = ctypes.windll.kernel32
virtdisk = ctypes.windll.virtdisk
from oslo_log import log as logging
from cinder import exception
from cinder.i18n import _
from cinder.volume.drivers.windows import constants
LOG = logging.getLogger(__name__)
if os.name == 'nt':
class Win32_GUID(ctypes.Structure):
_fields_ = [("Data1", wintypes.DWORD),
("Data2", wintypes.WORD),
("Data3", wintypes.WORD),
("Data4", wintypes.BYTE * 8)]
class Win32_VIRTUAL_STORAGE_TYPE(ctypes.Structure):
_fields_ = [
('DeviceId', wintypes.ULONG),
('VendorId', Win32_GUID)
]
class Win32_RESIZE_VIRTUAL_DISK_PARAMETERS(ctypes.Structure):
_fields_ = [
('Version', wintypes.DWORD),
('NewSize', ctypes.c_ulonglong)
]
class Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V1(ctypes.Structure):
_fields_ = [
('Version', wintypes.DWORD),
('RWDepth', ctypes.c_ulong),
]
class Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V2(ctypes.Structure):
_fields_ = [
('Version', wintypes.DWORD),
('GetInfoOnly', wintypes.BOOL),
('ReadOnly', wintypes.BOOL),
('ResiliencyGuid', Win32_GUID)
]
class Win32_MERGE_VIRTUAL_DISK_PARAMETERS(ctypes.Structure):
_fields_ = [
('Version', wintypes.DWORD),
('MergeDepth', ctypes.c_ulong)
]
class Win32_CREATE_VIRTUAL_DISK_PARAMETERS(ctypes.Structure):
_fields_ = [
('Version', wintypes.DWORD),
('UniqueId', Win32_GUID),
('MaximumSize', ctypes.c_ulonglong),
('BlockSizeInBytes', wintypes.ULONG),
('SectorSizeInBytes', wintypes.ULONG),
('PhysicalSectorSizeInBytes', wintypes.ULONG),
('ParentPath', wintypes.LPCWSTR),
('SourcePath', wintypes.LPCWSTR),
('OpenFlags', wintypes.DWORD),
('ParentVirtualStorageType', Win32_VIRTUAL_STORAGE_TYPE),
('SourceVirtualStorageType', Win32_VIRTUAL_STORAGE_TYPE),
('ResiliencyGuid', Win32_GUID)
]
class Win32_SIZE(ctypes.Structure):
_fields_ = [("VirtualSize", wintypes.ULARGE_INTEGER),
("PhysicalSize", wintypes.ULARGE_INTEGER),
("BlockSize", wintypes.ULONG),
("SectorSize", wintypes.ULONG)]
class Win32_PARENT_LOCATION(ctypes.Structure):
_fields_ = [('ParentResolved', wintypes.BOOL),
('ParentLocationBuffer', wintypes.WCHAR * 512)]
class Win32_PHYSICAL_DISK(ctypes.Structure):
_fields_ = [("LogicalSectorSize", wintypes.ULONG),
("PhysicalSectorSize", wintypes.ULONG),
("IsRemote", wintypes.BOOL)]
class Win32_VHD_INFO(ctypes.Union):
_fields_ = [("Size", Win32_SIZE),
("Identifier", Win32_GUID),
("ParentLocation", Win32_PARENT_LOCATION),
("ParentIdentifier", Win32_GUID),
("ParentTimestamp", wintypes.ULONG),
("VirtualStorageType", Win32_VIRTUAL_STORAGE_TYPE),
("ProviderSubtype", wintypes.ULONG),
("Is4kAligned", wintypes.BOOL),
("PhysicalDisk", Win32_PHYSICAL_DISK),
("VhdPhysicalSectorSize", wintypes.ULONG),
("SmallestSafeVirtualSize",
wintypes.ULARGE_INTEGER),
("FragmentationPercentage", wintypes.ULONG)]
class Win32_GET_VIRTUAL_DISK_INFO_PARAMETERS(ctypes.Structure):
_fields_ = [("VERSION", wintypes.UINT),
("VhdInfo", Win32_VHD_INFO)]
class Win32_SET_VIRTUAL_DISK_INFO_PARAMETERS(ctypes.Structure):
_fields_ = [
('Version', wintypes.DWORD),
('ParentFilePath', wintypes.LPCWSTR)
]
VIRTUAL_STORAGE_TYPE_DEVICE_ISO = 1
VIRTUAL_STORAGE_TYPE_DEVICE_VHD = 2
VIRTUAL_STORAGE_TYPE_DEVICE_VHDX = 3
VIRTUAL_DISK_ACCESS_NONE = 0
VIRTUAL_DISK_ACCESS_ALL = 0x003f0000
VIRTUAL_DISK_ACCESS_CREATE = 0x00100000
VIRTUAL_DISK_ACCESS_GET_INFO = 0x80000
OPEN_VIRTUAL_DISK_FLAG_NONE = 0
OPEN_VIRTUAL_DISK_FLAG_NO_PARENTS = 1
OPEN_VIRTUAL_DISK_VERSION_1 = 1
OPEN_VIRTUAL_DISK_VERSION_2 = 2
RESIZE_VIRTUAL_DISK_FLAG_NONE = 0
RESIZE_VIRTUAL_DISK_VERSION_1 = 1
CREATE_VIRTUAL_DISK_VERSION_2 = 2
CREATE_VHD_PARAMS_DEFAULT_BLOCK_SIZE = 0
CREATE_VIRTUAL_DISK_FLAG_NONE = 0
CREATE_VIRTUAL_DISK_FLAG_FULL_PHYSICAL_ALLOCATION = 1
MERGE_VIRTUAL_DISK_VERSION_1 = 1
MERGE_VIRTUAL_DISK_FLAG_NONE = 0x00000000
GET_VIRTUAL_DISK_INFO_SIZE = 1
GET_VIRTUAL_DISK_INFO_PARENT_LOCATION = 3
GET_VIRTUAL_DISK_INFO_VIRTUAL_STORAGE_TYPE = 6
GET_VIRTUAL_DISK_INFO_PROVIDER_SUBTYPE = 7
SET_VIRTUAL_DISK_INFO_PARENT_PATH = 1
FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
ERROR_VHD_INVALID_TYPE = 0xC03A001B
class VHDUtils(object):
def __init__(self):
self._ext_device_id_map = {
'vhd': VIRTUAL_STORAGE_TYPE_DEVICE_VHD,
'vhdx': VIRTUAL_STORAGE_TYPE_DEVICE_VHDX}
self.create_virtual_disk_flags = {
constants.VHD_TYPE_FIXED: (
CREATE_VIRTUAL_DISK_FLAG_FULL_PHYSICAL_ALLOCATION),
constants.VHD_TYPE_DYNAMIC: CREATE_VIRTUAL_DISK_FLAG_NONE
}
self._vhd_info_members = {
GET_VIRTUAL_DISK_INFO_SIZE: 'Size',
GET_VIRTUAL_DISK_INFO_PARENT_LOCATION: 'ParentLocation',
GET_VIRTUAL_DISK_INFO_VIRTUAL_STORAGE_TYPE:
'VirtualStorageType',
GET_VIRTUAL_DISK_INFO_PROVIDER_SUBTYPE: 'ProviderSubtype',
}
if os.name == 'nt':
self._msft_vendor_id = (
self.get_WIN32_VIRTUAL_STORAGE_TYPE_VENDOR_MSFT())
def _run_and_check_output(self, func, *args, **kwargs):
"""Convenience helper method for running Win32 API methods."""
ignored_error_codes = kwargs.pop('ignored_error_codes', [])
ret_val = func(*args, **kwargs)
# The VHD Win32 API functions return non-zero error codes
# in case of failure.
if ret_val and ret_val not in ignored_error_codes:
error_message = self._get_error_message(ret_val)
func_name = getattr(func, '__name__', '')
err = (_("Executing Win32 API function %(func_name)s failed. "
"Error code: %(error_code)s. "
"Error message: %(error_message)s") %
{'func_name': func_name,
'error_code': ret_val,
'error_message': error_message})
LOG.error(err, exc_info=(sys.exc_info() is not None))
raise exception.VolumeBackendAPIException(err)
@staticmethod
def _get_error_message(error_code):
message_buffer = ctypes.c_char_p()
kernel32.FormatMessageA(
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_IGNORE_INSERTS,
None, error_code, 0, ctypes.byref(message_buffer), 0, None)
error_message = message_buffer.value
kernel32.LocalFree(message_buffer)
return error_message
@staticmethod
def get_WIN32_VIRTUAL_STORAGE_TYPE_VENDOR_MSFT():
guid = Win32_GUID()
guid.Data1 = 0xec984aec
guid.Data2 = 0xa0f9
guid.Data3 = 0x47e9
ByteArray8 = wintypes.BYTE * 8
guid.Data4 = ByteArray8(0x90, 0x1f, 0x71, 0x41, 0x5a, 0x66, 0x34, 0x5b)
return guid
def _open(self, vhd_path, open_flag=OPEN_VIRTUAL_DISK_FLAG_NONE,
open_access_mask=VIRTUAL_DISK_ACCESS_ALL,
open_params=0):
device_id = self._get_device_id_by_path(vhd_path)
vst = Win32_VIRTUAL_STORAGE_TYPE()
vst.DeviceId = device_id
vst.VendorId = self._msft_vendor_id
handle = wintypes.HANDLE()
self._run_and_check_output(virtdisk.OpenVirtualDisk,
ctypes.byref(vst),
ctypes.c_wchar_p(vhd_path),
open_access_mask,
open_flag,
open_params,
ctypes.byref(handle))
return handle
def _close(self, handle):
kernel32.CloseHandle(handle)
def _get_device_id_by_path(self, vhd_path):
ext = os.path.splitext(vhd_path)[1][1:].lower()
device_id = self._ext_device_id_map.get(ext)
if not device_id:
raise exception.VolumeBackendAPIException(
_("Unsupported virtual disk extension: %s") % ext)
return device_id
def resize_vhd(self, vhd_path, new_max_size):
handle = self._open(vhd_path)
params = Win32_RESIZE_VIRTUAL_DISK_PARAMETERS()
params.Version = RESIZE_VIRTUAL_DISK_VERSION_1
params.NewSize = new_max_size
try:
self._run_and_check_output(virtdisk.ResizeVirtualDisk,
handle,
RESIZE_VIRTUAL_DISK_FLAG_NONE,
ctypes.byref(params),
None)
finally:
self._close(handle)
def merge_vhd(self, vhd_path):
open_params = Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V1()
open_params.Version = OPEN_VIRTUAL_DISK_VERSION_1
open_params.RWDepth = 2
handle = self._open(vhd_path,
open_params=ctypes.byref(open_params))
params = Win32_MERGE_VIRTUAL_DISK_PARAMETERS()
params.Version = MERGE_VIRTUAL_DISK_VERSION_1
params.MergeDepth = 1
try:
self._run_and_check_output(virtdisk.MergeVirtualDisk,
handle,
MERGE_VIRTUAL_DISK_FLAG_NONE,
ctypes.byref(params),
None)
finally:
self._close(handle)
def _create_vhd(self, new_vhd_path, new_vhd_type, src_path=None,
max_internal_size=0, parent_path=None):
new_device_id = self._get_device_id_by_path(new_vhd_path)
vst = Win32_VIRTUAL_STORAGE_TYPE()
vst.DeviceId = new_device_id
vst.VendorId = self._msft_vendor_id
params = Win32_CREATE_VIRTUAL_DISK_PARAMETERS()
params.Version = CREATE_VIRTUAL_DISK_VERSION_2
params.UniqueId = Win32_GUID()
params.BlockSizeInBytes = CREATE_VHD_PARAMS_DEFAULT_BLOCK_SIZE
params.SectorSizeInBytes = 0x200
params.PhysicalSectorSizeInBytes = 0x200
params.OpenFlags = OPEN_VIRTUAL_DISK_FLAG_NONE
params.ResiliencyGuid = Win32_GUID()
params.MaximumSize = max_internal_size
params.ParentPath = parent_path
params.ParentVirtualStorageType = Win32_VIRTUAL_STORAGE_TYPE()
if src_path:
src_device_id = self._get_device_id_by_path(src_path)
params.SourcePath = src_path
params.SourceVirtualStorageType = Win32_VIRTUAL_STORAGE_TYPE()
params.SourceVirtualStorageType.DeviceId = src_device_id
params.SourceVirtualStorageType.VendorId = self._msft_vendor_id
handle = wintypes.HANDLE()
create_virtual_disk_flag = self.create_virtual_disk_flags.get(
new_vhd_type)
try:
self._run_and_check_output(virtdisk.CreateVirtualDisk,
ctypes.byref(vst),
ctypes.c_wchar_p(new_vhd_path),
VIRTUAL_DISK_ACCESS_NONE,
None,
create_virtual_disk_flag,
0,
ctypes.byref(params),
None,
ctypes.byref(handle))
finally:
self._close(handle)
def get_vhd_info(self, vhd_path, info_members=None):
vhd_info = {}
info_members = info_members or self._vhd_info_members
handle = self._open(vhd_path,
open_access_mask=VIRTUAL_DISK_ACCESS_GET_INFO)
try:
for member in info_members:
info = self._get_vhd_info_member(handle, member)
vhd_info.update(info)
finally:
self._close(handle)
return vhd_info
def _get_vhd_info_member(self, vhd_file, info_member):
virt_disk_info = Win32_GET_VIRTUAL_DISK_INFO_PARAMETERS()
virt_disk_info.VERSION = ctypes.c_uint(info_member)
infoSize = ctypes.sizeof(virt_disk_info)
virtdisk.GetVirtualDiskInformation.restype = wintypes.DWORD
# Note(lpetrut): If the vhd has no parent image, this will
# return an error. No need to raise an exception in this case.
ignored_error_codes = []
if info_member == GET_VIRTUAL_DISK_INFO_PARENT_LOCATION:
ignored_error_codes.append(ERROR_VHD_INVALID_TYPE)
self._run_and_check_output(virtdisk.GetVirtualDiskInformation,
vhd_file,
ctypes.byref(ctypes.c_ulong(infoSize)),
ctypes.byref(virt_disk_info),
0,
ignored_error_codes=ignored_error_codes)
return self._parse_vhd_info(virt_disk_info, info_member)
def _parse_vhd_info(self, virt_disk_info, info_member):
vhd_info = {}
vhd_info_member = self._vhd_info_members[info_member]
info = getattr(virt_disk_info.VhdInfo, vhd_info_member)
if hasattr(info, '_fields_'):
for field in info._fields_:
vhd_info[field[0]] = getattr(info, field[0])
else:
vhd_info[vhd_info_member] = info
return vhd_info
def get_vhd_size(self, vhd_path):
"""Return vhd size.
Returns a dict containing the virtual size, physical size,
block size and sector size of the vhd.
"""
size = self.get_vhd_info(vhd_path,
[GET_VIRTUAL_DISK_INFO_SIZE])
return size
def get_vhd_parent_path(self, vhd_path):
vhd_info = self.get_vhd_info(vhd_path,
[GET_VIRTUAL_DISK_INFO_PARENT_LOCATION])
parent_path = vhd_info['ParentLocationBuffer']
if len(parent_path) > 0:
return parent_path
return None
def create_dynamic_vhd(self, path, max_internal_size):
self._create_vhd(path,
constants.VHD_TYPE_DYNAMIC,
max_internal_size=max_internal_size)
def convert_vhd(self, src, dest,
vhd_type=constants.VHD_TYPE_DYNAMIC):
self._create_vhd(dest, vhd_type, src_path=src)
def create_differencing_vhd(self, path, parent_path):
self._create_vhd(path,
constants.VHD_TYPE_DIFFERENCING,
parent_path=parent_path)
def reconnect_parent(self, child_path, parent_path):
open_params = Win32_OPEN_VIRTUAL_DISK_PARAMETERS_V2()
open_params.Version = OPEN_VIRTUAL_DISK_VERSION_2
open_params.GetInfoOnly = False
handle = self._open(
child_path,
open_flag=OPEN_VIRTUAL_DISK_FLAG_NO_PARENTS,
open_access_mask=VIRTUAL_DISK_ACCESS_NONE,
open_params=ctypes.byref(open_params))
params = Win32_SET_VIRTUAL_DISK_INFO_PARAMETERS()
params.Version = SET_VIRTUAL_DISK_INFO_PARENT_PATH
params.ParentFilePath = parent_path
try:
self._run_and_check_output(virtdisk.SetVirtualDiskInformation,
handle,
ctypes.byref(params))
finally:
self._close(handle)

View File

@ -19,17 +19,18 @@ This driver requires ISCSI target role installed
"""
import contextlib
import os
from os_win import utilsfactory
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import fileutils
from oslo_utils import units
from oslo_utils import uuidutils
from cinder.image import image_utils
from cinder.volume import driver
from cinder.volume.drivers.windows import constants
from cinder.volume.drivers.windows import vhdutils
from cinder.volume.drivers.windows import windows_utils
from cinder.volume import utils
LOG = logging.getLogger(__name__)
@ -55,28 +56,58 @@ class WindowsDriver(driver.ISCSIDriver):
if self.configuration:
self.configuration.append_config_values(windows_opts)
self._vhdutils = utilsfactory.get_vhdutils()
self._tgt_utils = utilsfactory.get_iscsi_target_utils()
self._hostutils = utilsfactory.get_hostutils()
def do_setup(self, context):
"""Setup the Windows Volume driver.
Called one time by the manager after the driver is loaded.
Validate the flags we care about
"""
self.utils = windows_utils.WindowsUtils()
self.vhdutils = vhdutils.VHDUtils()
fileutils.ensure_tree(self.configuration.windows_iscsi_lun_path)
fileutils.ensure_tree(CONF.image_conversion_dir)
def check_for_setup_error(self):
"""Check that the driver is working and can communicate."""
self.utils.check_for_setup_error()
self._tgt_utils.get_portal_locations(available_only=True,
fail_if_none_found=True)
def _get_host_information(self, volume):
"""Getting the portal and port information."""
# TODO(lpetrut): properly handle multiple existing portals, also
# use the iSCSI traffic addresses config options.
target_name = self._get_target_name(volume)
available_portal_location = self._tgt_utils.get_portal_locations()[0]
properties = self._tgt_utils.get_target_information(target_name)
# Note(lpetrut): the WT_Host CHAPSecret field cannot be accessed
# for security reasons.
auth = volume['provider_auth']
if auth:
(auth_method, auth_username, auth_secret) = auth.split()
properties['auth_method'] = auth_method
properties['auth_username'] = auth_username
properties['auth_password'] = auth_secret
properties['target_discovered'] = False
properties['target_portal'] = available_portal_location
properties['target_lun'] = 0
properties['volume_id'] = volume['id']
return properties
def initialize_connection(self, volume, connector):
"""Driver entry point to attach a volume to an instance."""
initiator_name = connector['initiator']
target_name = volume['provider_location']
self.utils.associate_initiator_with_iscsi_target(initiator_name,
target_name)
self._tgt_utils.associate_initiator_with_iscsi_target(initiator_name,
target_name)
properties = self.utils.get_host_information(volume, target_name)
properties = self._get_host_information(volume)
return {
'driver_volume_type': 'iscsi',
@ -91,25 +122,32 @@ class WindowsDriver(driver.ISCSIDriver):
"""
initiator_name = connector['initiator']
target_name = volume['provider_location']
self.utils.delete_iscsi_target(initiator_name, target_name)
self._tgt_utils.deassociate_initiator(initiator_name, target_name)
def create_volume(self, volume):
"""Driver entry point for creating a new volume."""
vhd_path = self.local_path(volume)
vol_name = volume['name']
vol_size = volume['size']
vol_size_mb = volume['size'] * 1024
self.utils.create_volume(vhd_path, vol_name, vol_size)
self._tgt_utils.create_wt_disk(vhd_path, vol_name,
size_mb=vol_size_mb)
def local_path(self, volume, format=None):
return self.utils.local_path(volume, format)
def local_path(self, volume, disk_format=None):
base_vhd_folder = self.configuration.windows_iscsi_lun_path
if not disk_format:
disk_format = self._tgt_utils.get_supported_disk_format()
disk_fname = "%s.%s" % (volume['name'], disk_format)
return os.path.join(base_vhd_folder, disk_fname)
def delete_volume(self, volume):
"""Driver entry point for destroying existing volumes."""
vol_name = volume['name']
vhd_path = self.local_path(volume)
self.utils.delete_volume(vol_name, vhd_path)
self._tgt_utils.remove_wt_disk(vol_name)
fileutils.delete_if_exists(vhd_path)
def create_snapshot(self, snapshot):
"""Driver entry point for creating a snapshot."""
@ -117,146 +155,150 @@ class WindowsDriver(driver.ISCSIDriver):
vol_name = snapshot['volume_name']
snapshot_name = snapshot['name']
self.utils.create_snapshot(vol_name, snapshot_name)
self._tgt_utils.create_snapshot(vol_name, snapshot_name)
def create_volume_from_snapshot(self, volume, snapshot):
"""Driver entry point for exporting snapshots as volumes."""
snapshot_name = snapshot['name']
self.utils.create_volume_from_snapshot(volume, snapshot_name)
vol_name = volume['name']
vhd_path = self.local_path(volume)
self._tgt_utils.export_snapshot(snapshot_name, vhd_path)
self._tgt_utils.import_wt_disk(vhd_path, vol_name)
def delete_snapshot(self, snapshot):
"""Driver entry point for deleting a snapshot."""
snapshot_name = snapshot['name']
self.utils.delete_snapshot(snapshot_name)
self._tgt_utils.delete_snapshot(snapshot_name)
def ensure_export(self, context, volume):
# iSCSI targets exported by WinTarget persist after host reboot.
pass
def _get_target_name(self, volume):
return "%s%s" % (self.configuration.iscsi_target_prefix,
volume['name'])
def create_export(self, context, volume, connector):
"""Driver entry point to get the export info for a new volume."""
# Since the iSCSI targets are not reused, being deleted when the
# volume is detached, we should clean up existing targets before
# creating a new one.
self.remove_export(context, volume)
target_name = self._get_target_name(volume)
updates = {}
target_name = "%s%s" % (self.configuration.iscsi_target_prefix,
volume['name'])
updates = {'provider_location': target_name}
self.utils.create_iscsi_target(target_name)
if not self._tgt_utils.iscsi_target_exists(target_name):
self._tgt_utils.create_iscsi_target(target_name)
updates['provider_location'] = target_name
if self.configuration.use_chap_auth:
chap_username = (self.configuration.chap_username or
utils.generate_username())
chap_password = (self.configuration.chap_password or
utils.generate_password())
if self.configuration.use_chap_auth:
chap_username = (self.configuration.chap_username or
utils.generate_username())
chap_password = (self.configuration.chap_password or
utils.generate_password())
self.utils.set_chap_credentials(target_name,
chap_username,
chap_password)
self._tgt_utils.set_chap_credentials(target_name,
chap_username,
chap_password)
updates['provider_auth'] = ' '.join(('CHAP',
chap_username,
chap_password))
# Get the disk to add
vol_name = volume['name']
self.utils.add_disk_to_target(vol_name, target_name)
updates['provider_auth'] = ' '.join(('CHAP',
chap_username,
chap_password))
# This operation is idempotent
self._tgt_utils.add_disk_to_target(volume['name'], target_name)
return updates
def remove_export(self, context, volume):
"""Driver entry point to remove an export for a volume."""
target_name = "%s%s" % (self.configuration.iscsi_target_prefix,
volume['name'])
self.utils.remove_iscsi_target(target_name)
target_name = self._get_target_name(volume)
self._tgt_utils.delete_iscsi_target(target_name)
def copy_image_to_volume(self, context, volume, image_service, image_id):
"""Fetch the image from image_service and create a volume using it."""
# Convert to VHD and file back to VHD
vhd_type = self.utils.get_supported_vhd_type()
vhd_type = self._tgt_utils.get_supported_vhd_type()
with image_utils.temporary_file(suffix='.vhd') as tmp:
volume_path = self.local_path(volume)
image_utils.fetch_to_vhd(context, image_service, image_id, tmp,
self.configuration.volume_dd_blocksize)
# The vhd must be disabled and deleted before being replaced with
# the desired image.
self.utils.change_disk_status(volume['name'], False)
self._tgt_utils.change_wt_disk_status(volume['name'],
enabled=False)
os.unlink(volume_path)
self.vhdutils.convert_vhd(tmp, volume_path,
vhd_type)
self.vhdutils.resize_vhd(volume_path,
volume['size'] << 30)
self.utils.change_disk_status(volume['name'], True)
self._vhdutils.convert_vhd(tmp, volume_path,
vhd_type)
self._vhdutils.resize_vhd(volume_path,
volume['size'] << 30,
is_file_max_size=False)
self._tgt_utils.change_wt_disk_status(volume['name'],
enabled=True)
@contextlib.contextmanager
def _temporary_snapshot(self, volume_name):
try:
snap_uuid = uuidutils.generate_uuid()
snapshot_name = '%s-tmp-snapshot-%s' % (volume_name, snap_uuid)
self._tgt_utils.create_snapshot(volume_name, snapshot_name)
yield snapshot_name
finally:
self._tgt_utils.delete_snapshot(snapshot_name)
def copy_volume_to_image(self, context, volume, image_service, image_meta):
"""Copy the volume to the specified image."""
disk_format = self.utils.get_supported_format()
if not os.path.exists(self.configuration.image_conversion_dir):
fileutils.ensure_tree(self.configuration.image_conversion_dir)
disk_format = self._tgt_utils.get_supported_disk_format()
temp_vhd_path = os.path.join(self.configuration.image_conversion_dir,
str(image_meta['id']) + '.' + disk_format)
upload_image = temp_vhd_path
try:
self.utils.copy_vhd_disk(self.local_path(volume), temp_vhd_path)
# qemu-img does not yet fully support vhdx format, so we'll first
# convert the image to vhd before attempting upload
if disk_format == 'vhdx':
upload_image = upload_image[:-1]
self.vhdutils.convert_vhd(temp_vhd_path, upload_image,
constants.VHD_TYPE_DYNAMIC)
image_utils.upload_volume(context, image_service, image_meta,
upload_image, 'vhd')
with self._temporary_snapshot(volume['name']) as tmp_snap_name:
# qemu-img cannot access VSS snapshots, for which reason it
# must be exported first.
self._tgt_utils.export_snapshot(tmp_snap_name, temp_vhd_path)
image_utils.upload_volume(context, image_service, image_meta,
temp_vhd_path, 'vhd')
finally:
fileutils.delete_if_exists(temp_vhd_path)
fileutils.delete_if_exists(upload_image)
def create_cloned_volume(self, volume, src_vref):
"""Creates a clone of the specified volume."""
src_vol_name = src_vref['name']
vol_name = volume['name']
vol_size = volume['size']
src_vol_size = src_vref['size']
new_vhd_path = self.local_path(volume)
src_vhd_path = self.local_path(src_vref)
self.utils.copy_vhd_disk(src_vhd_path,
new_vhd_path)
with self._temporary_snapshot(src_vol_name) as tmp_snap_name:
self._tgt_utils.export_snapshot(tmp_snap_name, new_vhd_path)
self._vhdutils.resize_vhd(new_vhd_path, vol_size << 30,
is_file_max_size=False)
if self.utils.is_resize_needed(new_vhd_path, vol_size, src_vol_size):
self.vhdutils.resize_vhd(new_vhd_path, vol_size << 30)
self._tgt_utils.import_wt_disk(new_vhd_path, vol_name)
self.utils.import_wt_disk(new_vhd_path, vol_name)
def _get_capacity_info(self):
drive = os.path.splitdrive(
self.configuration.windows_iscsi_lun_path)[0]
(size, free_space) = self._hostutils.get_volume_info(drive)
def get_volume_stats(self, refresh=False):
"""Get volume stats.
If 'refresh' is True, run update the stats first.
"""
if refresh:
self._update_volume_stats()
return self._stats
total_gb = size / units.Gi
free_gb = free_space / units.Gi
return (total_gb, free_gb)
def _update_volume_stats(self):
"""Retrieve stats info for Windows device."""
LOG.debug("Updating volume stats")
total_gb, free_gb = self._get_capacity_info()
data = {}
backend_name = self.__class__.__name__
if self.configuration:
backend_name = self.configuration.safe_get('volume_backend_name')
backend_name = self.configuration.safe_get('volume_backend_name')
data["volume_backend_name"] = backend_name or self.__class__.__name__
data["vendor_name"] = 'Microsoft'
data["driver_version"] = self.VERSION
data["storage_protocol"] = 'iSCSI'
data['total_capacity_gb'] = 'infinite'
data['free_capacity_gb'] = 'infinite'
data['reserved_percentage'] = 100
data['total_capacity_gb'] = total_gb
data['free_capacity_gb'] = free_gb
data['reserved_percentage'] = self.configuration.reserved_percentage
data['QoS_support'] = False
self._stats = data
def extend_volume(self, volume, new_size):
@ -264,5 +306,6 @@ class WindowsDriver(driver.ISCSIDriver):
old_size = volume['size']
LOG.debug("Extend volume from %(old_size)s GB to %(new_size)s GB.",
{'old_size': old_size, 'new_size': new_size})
additional_size = (new_size - old_size) * 1024
self.utils.extend(volume['name'], additional_size)
additional_size_mb = (new_size - old_size) * 1024
self._tgt_utils.extend_wt_disk(volume['name'], additional_size_mb)

View File

@ -1,454 +0,0 @@
# Copyright 2013 Pedro Navarro Perez
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Utility class for Windows Storage Server 2012 volume related operations.
"""
import ctypes
import os
import sys
from oslo_config import cfg
from oslo_log import log as logging
import six
from cinder import exception
from cinder.i18n import _, _LI
from cinder.volume.drivers.windows import constants
# Check needed for unit testing on Unix
if sys.platform == 'win32':
import wmi
from ctypes import wintypes
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class WindowsUtils(object):
"""Executes volume driver commands on Windows Storage server."""
def __init__(self, *args, **kwargs):
# Set the flags
self._conn_wmi = wmi.WMI(moniker='//./root/wmi')
self._conn_cimv2 = wmi.WMI(moniker='//./root/cimv2')
def check_for_setup_error(self):
"""Check that the driver is working and can communicate.
Invokes the portal and checks that is listening ISCSI traffic.
"""
try:
wt_portal = self._conn_wmi.WT_Portal()[0]
listen = wt_portal.Listen
except wmi.x_wmi as exc:
err_msg = (_('check_for_setup_error: the state of the WT Portal '
'could not be verified. WMI exception: %s')
% six.text_type(exc))
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
if not listen:
err_msg = (_('check_for_setup_error: there is no ISCSI traffic '
'listening.'))
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def get_host_information(self, volume, target_name):
"""Getting the portal and port information."""
try:
wt_portal = self._conn_wmi.WT_Portal()[0]
except wmi.x_wmi as exc:
err_msg = (_('get_host_information: the state of the WT Portal '
'could not be verified. WMI exception: %s')
% six.text_type(exc))
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
(address, port) = (wt_portal.Address, wt_portal.Port)
# Getting the host information
try:
hosts = self._conn_wmi.WT_Host(Hostname=target_name)
host = hosts[0]
except wmi.x_wmi as exc:
err_msg = (_('get_host_information: the ISCSI target information '
'could not be retrieved. WMI exception: %s')
% six.text_type(exc))
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
properties = {}
properties['target_discovered'] = False
properties['target_portal'] = '%s:%s' % (address, port)
properties['target_iqn'] = host.TargetIQN
properties['target_lun'] = 0
properties['volume_id'] = volume['id']
auth = volume['provider_auth']
if auth:
(auth_method, auth_username, auth_secret) = auth.split()
properties['auth_method'] = auth_method
properties['auth_username'] = auth_username
properties['auth_password'] = auth_secret
return properties
def associate_initiator_with_iscsi_target(self, initiator_name,
target_name):
"""Sets information used by the iSCSI target entry."""
try:
cl = self._conn_wmi.__getattr__("WT_IDMethod")
wt_idmethod = cl.new()
wt_idmethod.HostName = target_name
# Identification method is IQN
wt_idmethod.Method = 4
wt_idmethod.Value = initiator_name
wt_idmethod.put()
except wmi.x_wmi as exc:
err_msg = (_('associate_initiator_with_iscsi_target: an '
'association between initiator: %(init)s and '
'target name: %(target)s could not be established. '
'WMI exception: %(wmi_exc)s') %
{'init': initiator_name, 'target': target_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def delete_iscsi_target(self, initiator_name, target_name):
"""Removes iSCSI targets to hosts."""
try:
wt_idmethod = self._conn_wmi.WT_IDMethod(HostName=target_name,
Method=4,
Value=initiator_name)[0]
wt_idmethod.Delete_()
except wmi.x_wmi as exc:
err_msg = (_(
'delete_iscsi_target: error when deleting the iscsi target '
'associated with target name: %(target)s . WMI '
'exception: %(wmi_exc)s') % {'target': target_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def create_volume(self, vhd_path, vol_name, vol_size=None):
"""Creates a volume."""
try:
cl = self._conn_wmi.__getattr__("WT_Disk")
if vol_size:
size_mb = vol_size * 1024
else:
size_mb = None
cl.NewWTDisk(DevicePath=vhd_path,
Description=vol_name,
SizeInMB=size_mb)
except wmi.x_wmi as exc:
err_msg = (_(
'create_volume: error when creating the volume name: '
'%(vol_name)s . WMI exception: '
'%(wmi_exc)s') % {'vol_name': vol_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def import_wt_disk(self, vhd_path, vol_name):
"""Import a vhd/x image to be used by Windows iSCSI targets"""
try:
self._conn_wmi.WT_Disk.ImportWTDisk(DevicePath=vhd_path,
Description=vol_name)
except wmi.x_wmi as exc:
err_msg = (_("Failed to import disk: %(vhd_path)s. "
"WMI exception: %(exc)s") %
{'vhd_path': vhd_path,
'exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def change_disk_status(self, vol_name, enabled):
try:
cl = self._conn_wmi.WT_Disk(Description=vol_name)[0]
cl.Enabled = enabled
cl.put()
except wmi.x_wmi as exc:
err_msg = (_(
'Error changing disk status: '
'%(vol_name)s . WMI exception: '
'%(wmi_exc)s') % {'vol_name': vol_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def delete_volume(self, vol_name, vhd_path):
"""Driver entry point for destroying existing volumes."""
try:
disk = self._conn_wmi.WT_Disk(Description=vol_name)
if not disk:
LOG.debug('Skipping deleting disk %s as it does not '
'exist.', vol_name)
return
wt_disk = disk[0]
wt_disk.Delete_()
vhdfiles = self._conn_cimv2.query(
"Select * from CIM_DataFile where Name = '" +
vhd_path + "'")
if len(vhdfiles) > 0:
vhdfiles[0].Delete()
except wmi.x_wmi as exc:
err_msg = (_(
'delete_volume: error when deleting the volume name: '
'%(vol_name)s . WMI exception: '
'%(wmi_exc)s') % {'vol_name': vol_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def create_snapshot(self, vol_name, snapshot_name):
"""Driver entry point for creating a snapshot."""
try:
wt_disk = self._conn_wmi.WT_Disk(Description=vol_name)[0]
# API Calls gets Generic Failure
cl = self._conn_wmi.__getattr__("WT_Snapshot")
disk_id = wt_disk.WTD
out = cl.Create(WTD=disk_id)
# Setting description since it used as a KEY
wt_snapshot_created = self._conn_wmi.WT_Snapshot(Id=out[0])[0]
wt_snapshot_created.Description = snapshot_name
wt_snapshot_created.put()
except wmi.x_wmi as exc:
err_msg = (_(
'create_snapshot: error when creating the snapshot name: '
'%(vol_name)s . WMI exception: '
'%(wmi_exc)s') % {'vol_name': snapshot_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def create_volume_from_snapshot(self, volume, snap_name):
"""Driver entry point for exporting snapshots as volumes."""
try:
vol_name = volume['name']
vol_path = self.local_path(volume)
wt_snapshot = self._conn_wmi.WT_Snapshot(Description=snap_name)[0]
disk_id = wt_snapshot.Export()[0]
# This export is read-only, so it needs to be copied
# to another disk.
wt_disk = self._conn_wmi.WT_Disk(WTD=disk_id)[0]
wt_disk.Description = '%s-temp' % vol_name
wt_disk.put()
src_path = wt_disk.DevicePath
self.copy(src_path, vol_path)
self.create_volume(vol_path, vol_name)
wt_disk.Delete_()
except wmi.x_wmi as exc:
err_msg = (_(
'create_volume_from_snapshot: error when creating the volume '
'name: %(vol_name)s from snapshot name: %(snap_name)s. WMI '
'exception: %(wmi_exc)s') % {'vol_name': vol_name,
'snap_name': snap_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def delete_snapshot(self, snap_name):
"""Driver entry point for deleting a snapshot."""
try:
wt_snapshot = self._conn_wmi.WT_Snapshot(Description=snap_name)[0]
wt_snapshot.Delete_()
except wmi.x_wmi as exc:
err_msg = (_(
'delete_snapshot: error when deleting the snapshot name: '
'%(snap_name)s . WMI exception: '
'%(wmi_exc)s') % {'snap_name': snap_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def create_iscsi_target(self, target_name):
"""Creates ISCSI target."""
try:
self._conn_wmi.WT_Host.NewHost(HostName=target_name)
except wmi.x_wmi as exc:
excep_info = exc.com_error.excepinfo[2]
if excep_info.find(u'The file exists') != -1:
err_msg = (_(
'create_iscsi_target: error when creating iscsi target: '
'%(tar_name)s . WMI exception: '
'%(wmi_exc)s') % {'tar_name': target_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
else:
LOG.info(_LI('The iSCSI target %(target_name)s already '
'exists.'), {'target_name': target_name})
def remove_iscsi_target(self, target_name):
"""Removes ISCSI target."""
try:
host = self._conn_wmi.WT_Host(HostName=target_name)
if not host:
LOG.debug('Skipping removing target %s as it does not '
'exist.', target_name)
return
wt_host = host[0]
wt_host.RemoveAllWTDisks()
wt_host.Delete_()
except wmi.x_wmi as exc:
err_msg = (_(
'remove_iscsi_target: error when deleting iscsi target: '
'%(tar_name)s . WMI exception: '
'%(wmi_exc)s') % {'tar_name': target_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def set_chap_credentials(self, target_name, chap_username, chap_password):
try:
wt_host = self._conn_wmi.WT_Host(HostName=target_name)[0]
self._wmi_obj_set_attr(wt_host, 'EnableCHAP', True)
self._wmi_obj_set_attr(wt_host, 'CHAPUserName', chap_username)
self._wmi_obj_set_attr(wt_host, 'CHAPSecret', chap_password)
wt_host.put()
except wmi.x_wmi as exc:
err_msg = (_('Failed to set CHAP credentials on '
'target %(target_name)s. WMI exception: %(wmi_exc)s')
% {'target_name': target_name,
'wmi_exc': exc})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
@staticmethod
def _wmi_obj_set_attr(wmi_obj, key, value):
# Due to a bug in the python WMI module, some wmi object attributes
# cannot be modified. This method is used as a workaround.
wmi_property = getattr(wmi_obj, 'wmi_property')
wmi_property(key).set(value)
def add_disk_to_target(self, vol_name, target_name):
"""Adds the disk to the target."""
try:
q = self._conn_wmi.WT_Disk(Description=vol_name)
wt_disk = q[0]
wt_host = self._conn_wmi.WT_Host(HostName=target_name)[0]
wt_host.AddWTDisk(wt_disk.WTD)
except wmi.x_wmi as exc:
err_msg = (_(
'add_disk_to_target: error adding disk associated to volume : '
'%(vol_name)s to the target name: %(tar_name)s . WMI '
'exception: %(wmi_exc)s') % {'tar_name': target_name,
'vol_name': vol_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def copy_vhd_disk(self, source_path, destination_path):
"""Copy the vhd disk from source path to destination path."""
# Note: As WQL is a small subset of SQL which does not allow multiple
# queries or comments, WQL queries are not exposed to WQL injection.
vhdfiles = self._conn_cimv2.query(
"Select * from CIM_DataFile where Name = '%s'" % source_path)
if len(vhdfiles) > 0:
ret_val = vhdfiles[0].Copy(destination_path)[0]
if ret_val:
err_msg = (
_('Could not copy virtual disk %(src_path)s '
'to %(dest_path)s. Error code: %(error_code)s') %
{'src_path': source_path,
'dest_path': destination_path,
'error_code': ret_val})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
else:
err_msg = (
_('Could not copy virtual disk %(src_path)s '
'to %(dest_path)s. Could not find source path.') %
{'src_path': source_path,
'dest_path': destination_path})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def is_resize_needed(self, vhd_path, new_size, old_size):
if new_size > old_size:
return True
elif old_size > new_size:
err_msg = (_("Cannot resize image %(vhd_path)s "
"to a smaller size. "
"Image size: %(old_size)s, "
"Requested size: %(new_size)s") %
{'vhd_path': vhd_path,
'old_size': old_size,
'new_size': new_size})
raise exception.VolumeBackendAPIException(data=err_msg)
return False
def extend(self, vol_name, additional_size):
"""Extend an existing volume."""
try:
q = self._conn_wmi.WT_Disk(Description=vol_name)
wt_disk = q[0]
wt_disk.Extend(additional_size)
except wmi.x_wmi as exc:
err_msg = (_(
'extend: error when extending the volume: %(vol_name)s .WMI '
'exception: %(wmi_exc)s') % {'vol_name': vol_name,
'wmi_exc': six.text_type(exc)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def local_path(self, volume, format=None):
base_vhd_folder = CONF.windows_iscsi_lun_path
if not os.path.exists(base_vhd_folder):
LOG.debug('Creating folder: %s', base_vhd_folder)
os.makedirs(base_vhd_folder)
if not format:
format = self.get_supported_format()
return os.path.join(base_vhd_folder, str(volume['name']) + "." +
format)
def check_min_windows_version(self, major, minor, build=0):
version_str = self.get_windows_version()
return list(map(int, version_str.split('.'))) >= [major, minor, build]
def get_windows_version(self):
return self._conn_cimv2.Win32_OperatingSystem()[0].Version
def get_supported_format(self):
if self.check_min_windows_version(6, 3):
return 'vhdx'
else:
return 'vhd'
def get_supported_vhd_type(self):
if self.check_min_windows_version(6, 3):
return constants.VHD_TYPE_DYNAMIC
else:
return constants.VHD_TYPE_FIXED
def copy(self, src, dest):
# With large files this is 2x-3x faster than shutil.copy(src, dest),
# especially with UNC targets.
kernel32 = ctypes.windll.kernel32
kernel32.CopyFileW.restype = wintypes.BOOL
retcode = kernel32.CopyFileW(ctypes.c_wchar_p(src),
ctypes.c_wchar_p(dest),
wintypes.BOOL(True))
if not retcode:
raise IOError(_('The file copy from %(src)s to %(dest)s failed.')
% {'src': src, 'dest': dest})