Update migrate_volume API to use versionedobjects

The following patch updates migrate_volume,
migrate_volume_completion, and update_migrated_volume
APIs to use volume versionedobjects.  Changes were
made to be backwards compatible with older RPC clients.
It only includes changes to the core cinder code.
Changes in the drivers are left to each driver
maintainer to update.

Note that this patch DOES NOT try to use object dot
notation everywhere, since it would increase the
size of the patch.  Instead, it will be done in
subsequent patches.

Change-Id: I21fe68193c934a7ef3688274ab35f664a08cac7e
Partial-Implements: blueprint cinder-objects
Closes-Bug: #1521085
This commit is contained in:
Thang Pham 2015-09-29 07:12:35 -07:00
parent 1b3a5ea893
commit 12e4d9236d
9 changed files with 338 additions and 224 deletions

@ -56,7 +56,7 @@ LOG = logging.getLogger(__name__)
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
RPC_API_VERSION = '1.10'
RPC_API_VERSION = '1.11'
target = messaging.Target(version=RPC_API_VERSION)
@ -148,13 +148,18 @@ class SchedulerManager(manager.Manager):
def migrate_volume_to_host(self, context, topic, volume_id, host,
force_host_copy, request_spec,
filter_properties=None):
filter_properties=None, volume=None):
"""Ensure that the host exists and can accept the volume."""
self._wait_for_scheduler()
# FIXME(thangp): Remove this in v2.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the
# volume by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
def _migrate_volume_set_error(self, context, ex, request_spec):
volume = db.volume_get(context, request_spec['volume_id'])
if volume.status == 'maintenance':
previous_status = (
volume.previous_status or 'maintenance')
@ -176,8 +181,7 @@ class SchedulerManager(manager.Manager):
with excutils.save_and_reraise_exception():
_migrate_volume_set_error(self, context, ex, request_spec)
else:
volume_ref = db.volume_get(context, volume_id)
volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,
volume_rpcapi.VolumeAPI().migrate_volume(context, volume,
tgt_host,
force_host_copy)

@ -44,6 +44,8 @@ class SchedulerAPI(object):
1.8 - Add sending object over RPC in create_consistencygroup method
1.9 - Adds support for sending objects over RPC in create_volume()
1.10 - Adds support for sending objects over RPC in retype()
1.11 - Adds support for sending objects over RPC in
migrate_volume_to_host()
"""
RPC_API_VERSION = '1.0'
@ -95,17 +97,20 @@ class SchedulerAPI(object):
def migrate_volume_to_host(self, ctxt, topic, volume_id, host,
force_host_copy=False, request_spec=None,
filter_properties=None):
cctxt = self.client.prepare(version='1.3')
filter_properties=None, volume=None):
request_spec_p = jsonutils.to_primitive(request_spec)
return cctxt.cast(ctxt, 'migrate_volume_to_host',
topic=topic,
volume_id=volume_id,
host=host,
force_host_copy=force_host_copy,
request_spec=request_spec_p,
filter_properties=filter_properties)
msg_args = {'topic': topic, 'volume_id': volume_id,
'host': host, 'force_host_copy': force_host_copy,
'request_spec': request_spec_p,
'filter_properties': filter_properties}
if self.client.can_send_version('1.11'):
version = '1.11'
msg_args['volume'] = volume
else:
version = '1.3'
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args)
def retype(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None, volume=None):

@ -801,7 +801,7 @@ class AdminActionsTest(test.TestCase):
force_host_copy=False):
admin_ctx = context.get_admin_context()
# build request to migrate to host
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume.id)
req.method = 'POST'
req.headers['content-type'] = 'application/json'
body = {'os-migrate_volume': {'host': host,
@ -811,7 +811,7 @@ class AdminActionsTest(test.TestCase):
resp = req.get_response(app())
# verify status
self.assertEqual(expected_status, resp.status_int)
volume = db.volume_get(admin_ctx, volume['id'])
volume = objects.Volume.get_by_id(admin_ctx, volume.id)
return volume
def test_migrate_volume_success(self):

@ -117,7 +117,9 @@ class SchedulerRpcAPITestCase(test.TestCase):
version='1.2')
can_send_version.assert_called_once_with('1.9')
def test_migrate_volume_to_host(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
def test_migrate_volume_to_host(self, can_send_version):
self._test_scheduler_api('migrate_volume_to_host',
rpc_method='cast',
topic='topic',
@ -126,7 +128,24 @@ class SchedulerRpcAPITestCase(test.TestCase):
force_host_copy=True,
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='1.11')
can_send_version.assert_called_once_with('1.11')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_migrate_volume_to_host_old(self, can_send_version):
self._test_scheduler_api('migrate_volume_to_host',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
host='host',
force_host_copy=True,
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='1.3')
can_send_version.assert_called_once_with('1.11')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)

@ -4183,22 +4183,20 @@ class VolumeTestCase(BaseVolumeTestCase):
def test_clean_temporary_volume(self):
def fake_delete_volume(ctxt, volume):
db.volume_destroy(ctxt, volume['id'])
volume.destroy()
fake_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
host=CONF.host,
migration_status='migrating')
fake_new_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
# Check when the migrated volume is in migration
db.volume_update(self.context, fake_volume['id'],
{'migration_status': 'migrating'})
# 1. Only clean the db
self.volume._clean_temporary_volume(self.context, fake_volume['id'],
fake_new_volume['id'],
self.volume._clean_temporary_volume(self.context, fake_volume,
fake_new_volume,
clean_db_only=True)
self.assertRaises(exception.VolumeNotFound,
db.volume_get, self.context,
fake_new_volume['id'])
fake_new_volume.id)
# 2. Delete the backend storage
fake_new_volume = tests_utils.create_volume(self.context, size=1,
@ -4207,23 +4205,23 @@ class VolumeTestCase(BaseVolumeTestCase):
mock_delete_volume:
mock_delete_volume.side_effect = fake_delete_volume
self.volume._clean_temporary_volume(self.context,
fake_volume['id'],
fake_new_volume['id'],
fake_volume,
fake_new_volume,
clean_db_only=False)
self.assertRaises(exception.VolumeNotFound,
db.volume_get, self.context,
fake_new_volume['id'])
fake_new_volume.id)
# Check when the migrated volume is not in migration
fake_new_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
db.volume_update(self.context, fake_volume['id'],
{'migration_status': 'non-migrating'})
self.volume._clean_temporary_volume(self.context, fake_volume['id'],
fake_new_volume['id'])
fake_volume.migration_status = 'non-migrating'
fake_volume.save()
self.volume._clean_temporary_volume(self.context, fake_volume,
fake_new_volume)
volume = db.volume_get(context.get_admin_context(),
fake_new_volume['id'])
self.assertIsNone(volume['migration_status'])
fake_new_volume.id)
self.assertIsNone(volume.migration_status)
def test_update_volume_readonly_flag(self):
"""Test volume readonly flag can be updated at API level."""
@ -4323,13 +4321,14 @@ class VolumeMigrationTestCase(VolumeTestCase):
host=CONF.host,
migration_status='migrating')
host_obj = {'host': 'newhost', 'capabilities': {}}
self.volume.migrate_volume(self.context, volume['id'],
host_obj, False)
self.volume.migrate_volume(self.context, volume.id, host_obj, False,
volume=volume)
# check volume properties
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual('newhost', volume['host'])
self.assertEqual('success', volume['migration_status'])
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('newhost', volume.host)
self.assertEqual('success', volume.migration_status)
def _fake_create_volume(self, ctxt, volume, host, req_spec, filters,
allow_reschedule=True):
@ -4351,12 +4350,14 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
False)
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
False,
volume=volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume.migration_status)
self.assertEqual('available', volume.status)
@mock.patch('cinder.compute.API')
@mock.patch('cinder.volume.manager.VolumeManager.'
@ -4366,7 +4367,10 @@ class VolumeMigrationTestCase(VolumeTestCase):
migrate_volume_completion,
nova_api):
fake_volume_id = 'fake_volume_id'
fake_new_volume = {'status': 'available', 'id': fake_volume_id}
fake_db_new_volume = {'status': 'available', 'id': fake_volume_id}
fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume)
new_volume_obj = fake_volume.fake_volume_obj(self.context,
**fake_new_volume)
host_obj = {'host': 'newhost', 'capabilities': {}}
volume_get.return_value = fake_new_volume
update_server_volume = nova_api.return_value.update_server_volume
@ -4377,12 +4381,10 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.volume._migrate_volume_generic(self.context, volume,
host_obj, None)
mock_copy_volume.assert_called_with(self.context, volume,
fake_new_volume,
new_volume_obj,
remote='dest')
migrate_volume_completion.assert_called_with(self.context,
volume['id'],
fake_new_volume['id'],
error=False)
migrate_volume_completion.assert_called_with(
self.context, volume.id, new_volume_obj.id, error=False)
self.assertFalse(update_server_volume.called)
@mock.patch('cinder.compute.API')
@ -4421,6 +4423,7 @@ class VolumeMigrationTestCase(VolumeTestCase):
rpc_delete_volume,
update_migrated_volume):
fake_volume = tests_utils.create_volume(self.context, size=1,
previous_status='available',
host=CONF.host)
host_obj = {'host': 'newhost', 'capabilities': {}}
@ -4430,12 +4433,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
mock.patch.object(self.volume.driver, 'delete_volume') as \
delete_volume:
create_volume.side_effect = self._fake_create_volume
self.volume.migrate_volume(self.context, fake_volume['id'],
host_obj, True)
volume = db.volume_get(context.get_admin_context(),
fake_volume['id'])
self.assertEqual('newhost', volume['host'])
self.assertEqual('success', volume['migration_status'])
self.volume.migrate_volume(self.context, fake_volume.id,
host_obj, True, volume=fake_volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
fake_volume.id)
self.assertEqual('newhost', volume.host)
self.assertEqual('success', volume.migration_status)
self.assertFalse(mock_migrate_volume.called)
self.assertFalse(delete_volume.called)
self.assertTrue(rpc_delete_volume.called)
@ -4461,12 +4464,14 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
True)
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
True,
volume=volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume.migration_status)
self.assertEqual('available', volume.status)
@mock.patch('cinder.db.volume_update')
def test_update_migrated_volume(self, volume_update):
@ -4474,7 +4479,8 @@ class VolumeMigrationTestCase(VolumeTestCase):
fake_new_host = 'fake_new_host'
fake_update = {'_name_id': 'updated_id',
'provider_location': 'updated_location'}
fake_elevated = 'fake_elevated'
fake_elevated = context.RequestContext('fake', self.project_id,
is_admin=True)
volume = tests_utils.create_volume(self.context, size=1,
status='available',
host=fake_host)
@ -4484,13 +4490,13 @@ class VolumeMigrationTestCase(VolumeTestCase):
provider_location='fake_provider_location',
_name_id='fake_name_id',
host=fake_new_host)
new_volume['_name_id'] = 'fake_name_id'
new_volume['provider_location'] = 'fake_provider_location'
fake_update_error = {'_name_id': new_volume['_name_id'],
new_volume._name_id = 'fake_name_id'
new_volume.provider_location = 'fake_provider_location'
fake_update_error = {'_name_id': new_volume._name_id,
'provider_location':
new_volume['provider_location']}
expected_update = {'_name_id': volume['_name_id'],
'provider_location': volume['provider_location']}
new_volume.provider_location}
expected_update = {'_name_id': volume._name_id,
'provider_location': volume.provider_location}
with mock.patch.object(self.volume.driver,
'update_migrated_volume') as migrate_update,\
mock.patch.object(self.context, 'elevated') as elevated:
@ -4499,19 +4505,23 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.volume.update_migrated_volume(self.context, volume,
new_volume, 'available')
volume_update.assert_has_calls((
mock.call(fake_elevated, new_volume['id'], expected_update),
mock.call(fake_elevated, volume['id'], fake_update)))
mock.call(fake_elevated, new_volume.id, expected_update),
mock.call(fake_elevated, volume.id, fake_update)))
# Test the case for update_migrated_volume not implemented
# for the driver.
migrate_update.reset_mock()
volume_update.reset_mock()
# Reset the volume objects to their original value, since they
# were changed in the last call.
new_volume._name_id = 'fake_name_id'
new_volume.provider_location = 'fake_provider_location'
migrate_update.side_effect = NotImplementedError
self.volume.update_migrated_volume(self.context, volume,
new_volume, 'available')
volume_update.assert_has_calls((
mock.call(fake_elevated, new_volume['id'], expected_update),
mock.call(fake_elevated, volume['id'], fake_update_error)))
mock.call(fake_elevated, new_volume.id, fake_update),
mock.call(fake_elevated, volume.id, fake_update_error)))
def test_migrate_volume_generic_create_volume_error(self):
self.expected_status = 'error'
@ -4530,10 +4540,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(exception.VolumeMigrationFailed,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
True)
volume = db.volume_get(context.get_admin_context(), volume['id'])
True,
volume=volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
self.assertTrue(clean_temporary_volume.called)
@ -4558,10 +4570,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(exception.VolumeMigrationFailed,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
True)
volume = db.volume_get(context.get_admin_context(), volume['id'])
True,
volume=volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
self.assertTrue(clean_temporary_volume.called)
@ -4588,10 +4602,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
True)
volume = db.volume_get(context.get_admin_context(), volume['id'])
True,
volume=volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
@ -4634,9 +4650,10 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
True)
True,
volume=volume)
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
@ -4649,7 +4666,7 @@ class VolumeMigrationTestCase(VolumeTestCase):
previous_status='available'):
def fake_attach_volume(ctxt, volume, instance_uuid, host_name,
mountpoint, mode):
tests_utils.attach_volume(ctxt, volume['id'],
tests_utils.attach_volume(ctxt, volume.id,
instance_uuid, host_name,
'/dev/vda')
@ -4661,12 +4678,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
previous_status=previous_status)
attachment_id = None
if status == 'in-use':
vol = tests_utils.attach_volume(self.context, old_volume['id'],
vol = tests_utils.attach_volume(self.context, old_volume.id,
instance_uuid, attached_host,
'/dev/vda')
self.assertEqual('in-use', vol['status'])
attachment_id = vol['volume_attachment'][0]['id']
target_status = 'target:%s' % old_volume['id']
target_status = 'target:%s' % old_volume.id
new_host = CONF.host + 'new'
new_volume = tests_utils.create_volume(self.context, size=0,
host=new_host,
@ -4681,16 +4698,18 @@ class VolumeMigrationTestCase(VolumeTestCase):
'update_migrated_volume'),\
mock.patch.object(self.volume.driver, 'attach_volume'):
mock_attach_volume.side_effect = fake_attach_volume
self.volume.migrate_volume_completion(self.context, old_volume[
'id'], new_volume['id'])
after_new_volume = db.volume_get(self.context, new_volume.id)
after_old_volume = db.volume_get(self.context, old_volume.id)
self.volume.migrate_volume_completion(self.context, old_volume.id,
new_volume.id)
after_new_volume = objects.Volume.get_by_id(self.context,
new_volume.id)
after_old_volume = objects.Volume.get_by_id(self.context,
old_volume.id)
if status == 'in-use':
mock_detach_volume.assert_called_with(self.context,
old_volume['id'],
old_volume.id,
attachment_id)
attachment = db.volume_attachment_get_by_instance_uuid(
self.context, old_volume['id'], instance_uuid)
self.context, old_volume.id, instance_uuid)
self.assertIsNotNone(attachment)
self.assertEqual(attached_host, attachment['attached_host'])
self.assertEqual(instance_uuid, attachment['instance_uuid'])
@ -4865,10 +4884,11 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.volume.driver._initialized = False
self.assertRaises(exception.DriverNotInitialized,
self.volume.migrate_volume,
self.context, volume['id'],
host_obj, True)
self.context, volume.id, host_obj, True,
volume=volume)
volume = db.volume_get(context.get_admin_context(), volume['id'])
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume.migration_status)
# lets cleanup the mess.

@ -146,7 +146,6 @@ class VolumeRpcAPITestCase(test.TestCase):
expected_msg['host'] = dest_host_dict
if 'new_volume' in expected_msg:
volume = expected_msg['new_volume']
del expected_msg['new_volume']
expected_msg['new_volume_id'] = volume['id']
if 'host' in kwargs:
@ -392,7 +391,9 @@ class VolumeRpcAPITestCase(test.TestCase):
version='1.14')
can_send_version.assert_called_once_with('1.35')
def test_migrate_volume(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
def test_migrate_volume(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
@ -400,18 +401,49 @@ class VolumeRpcAPITestCase(test.TestCase):
dest_host = FakeHost()
self._test_volume_api('migrate_volume',
rpc_method='cast',
volume=self.fake_volume,
volume=self.fake_volume_obj,
dest_host=dest_host,
force_host_copy=True,
version='1.36')
can_send_version.assert_called_once_with('1.36')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_migrate_volume_old(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
self.capabilities = {}
dest_host = FakeHost()
self._test_volume_api('migrate_volume',
rpc_method='cast',
volume=self.fake_volume_obj,
dest_host=dest_host,
force_host_copy=True,
version='1.8')
can_send_version.assert_called_once_with('1.36')
def test_migrate_volume_completion(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
def test_migrate_volume_completion(self, can_send_version):
self._test_volume_api('migrate_volume_completion',
rpc_method='call',
volume=self.fake_volume,
new_volume=self.fake_volume,
volume=self.fake_volume_obj,
new_volume=self.fake_volume_obj,
error=False,
version='1.36')
can_send_version.assert_called_once_with('1.36')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_migrate_volume_completion_old(self, can_send_version):
self._test_volume_api('migrate_volume_completion',
rpc_method='call',
volume=self.fake_volume_obj,
new_volume=self.fake_volume_obj,
error=False,
version='1.10')
can_send_version.assert_called_once_with('1.36')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)

@ -1275,39 +1275,38 @@ class API(base.Base):
lock_volume):
"""Migrate the volume to the specified host."""
if volume['status'] not in ['available', 'in-use']:
if volume.status not in ['available', 'in-use']:
msg = _('Volume %(vol_id)s status must be available or in-use, '
'but current status is: '
'%(vol_status)s.') % {'vol_id': volume['id'],
'vol_status': volume['status']}
'%(vol_status)s.') % {'vol_id': volume.id,
'vol_status': volume.status}
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# Make sure volume is not part of a migration.
if self._is_volume_migrating(volume):
msg = _("Volume %s is already part of an active "
"migration.") % volume['id']
"migration.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# We only handle volumes without snapshots for now
snaps = objects.SnapshotList.get_all_for_volume(context, volume['id'])
snaps = objects.SnapshotList.get_all_for_volume(context, volume.id)
if snaps:
msg = _("Volume %s must not have snapshots.") % volume['id']
msg = _("Volume %s must not have snapshots.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# We only handle non-replicated volumes for now
rep_status = volume['replication_status']
if rep_status is not None and rep_status != 'disabled':
msg = _("Volume %s must not be replicated.") % volume['id']
if (volume.replication_status is not None and
volume.replication_status != 'disabled'):
msg = _("Volume %s must not be replicated.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
cg_id = volume.get('consistencygroup_id', None)
if cg_id:
if volume.consistencygroup_id:
msg = _("Volume %s must not be part of a consistency "
"group.") % volume['id']
"group.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
@ -1327,7 +1326,7 @@ class API(base.Base):
raise exception.InvalidHost(reason=msg)
# Make sure the destination host is different than the current one
if host == volume['host']:
if host == volume.host:
msg = _('Destination host must be different '
'than the current host.')
LOG.error(msg)
@ -1340,27 +1339,27 @@ class API(base.Base):
# that this volume is in maintenance mode, and no action is allowed
# on this volume, e.g. attach, detach, retype, migrate, etc.
updates = {'migration_status': 'starting',
'previous_status': volume['status']}
if lock_volume and volume['status'] == 'available':
'previous_status': volume.status}
if lock_volume and volume.status == 'available':
updates['status'] = 'maintenance'
self.update(context, volume, updates)
# Call the scheduler to ensure that the host exists and that it can
# accept the volume
volume_type = {}
volume_type_id = volume['volume_type_id']
if volume_type_id:
if volume.volume_type_id:
volume_type = volume_types.get_volume_type(context.elevated(),
volume_type_id)
volume.volume_type_id)
request_spec = {'volume_properties': volume,
'volume_type': volume_type,
'volume_id': volume['id']}
'volume_id': volume.id}
self.scheduler_rpcapi.migrate_volume_to_host(context,
CONF.volume_topic,
volume['id'],
volume.id,
host,
force_host_copy,
request_spec)
request_spec,
volume=volume)
LOG.info(_LI("Migrate volume request issued successfully."),
resource=volume)
@ -1368,34 +1367,34 @@ class API(base.Base):
def migrate_volume_completion(self, context, volume, new_volume, error):
# This is a volume swap initiated by Nova, not Cinder. Nova expects
# us to return the new_volume_id.
if not (volume['migration_status'] or new_volume['migration_status']):
if not (volume.migration_status or new_volume.migration_status):
# Don't need to do migration, but still need to finish the
# volume attach and detach so volumes don't end in 'attaching'
# and 'detaching' state
attachments = volume['volume_attachment']
attachments = volume.volume_attachment
for attachment in attachments:
self.detach(context, volume, attachment['id'])
self.detach(context, volume, attachment.id)
self.attach(context, new_volume,
attachment['instance_uuid'],
attachment['attached_host'],
attachment['mountpoint'],
attachment.instance_uuid,
attachment.attached_host,
attachment.mountpoint,
'rw')
return new_volume['id']
return new_volume.id
if not volume['migration_status']:
if not volume.migration_status:
msg = _('Source volume not mid-migration.')
raise exception.InvalidVolume(reason=msg)
if not new_volume['migration_status']:
if not new_volume.migration_status:
msg = _('Destination volume not mid-migration.')
raise exception.InvalidVolume(reason=msg)
expected_status = 'target:%s' % volume['id']
if not new_volume['migration_status'] == expected_status:
expected_status = 'target:%s' % volume.id
if not new_volume.migration_status == expected_status:
msg = (_('Destination has migration_status %(stat)s, expected '
'%(exp)s.') % {'stat': new_volume['migration_status'],
'%(exp)s.') % {'stat': new_volume.migration_status,
'exp': expected_status})
raise exception.InvalidVolume(reason=msg)

@ -197,7 +197,7 @@ def locked_snapshot_operation(f):
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.35'
RPC_API_VERSION = '1.36'
target = messaging.Target(version=RPC_API_VERSION)
@ -1626,35 +1626,38 @@ class VolumeManager(manager.SchedulerDependentManager):
# Wait for new_volume to become ready
starttime = time.time()
deadline = starttime + CONF.migration_create_volume_timeout_secs
new_volume = self.db.volume_get(ctxt, new_volume['id'])
# TODO(thangp): Replace get_by_id with refresh when it is available
new_volume = objects.Volume.get_by_id(ctxt, new_volume.id)
tries = 0
while new_volume['status'] != 'available':
while new_volume.status != 'available':
tries += 1
now = time.time()
if new_volume['status'] == 'error':
if new_volume.status == 'error':
msg = _("failed to create new_volume on destination host")
self._clean_temporary_volume(ctxt, volume['id'],
new_volume['id'],
self._clean_temporary_volume(ctxt, volume,
new_volume,
clean_db_only=True)
raise exception.VolumeMigrationFailed(reason=msg)
elif now > deadline:
msg = _("timeout creating new_volume on destination host")
self._clean_temporary_volume(ctxt, volume['id'],
new_volume['id'],
self._clean_temporary_volume(ctxt, volume,
new_volume,
clean_db_only=True)
raise exception.VolumeMigrationFailed(reason=msg)
else:
time.sleep(tries ** 2)
new_volume = self.db.volume_get(ctxt, new_volume['id'])
# TODO(thangp): Replace get_by_id with refresh when it is
# available
new_volume = objects.Volume.get_by_id(ctxt, new_volume.id)
# Copy the source volume to the destination volume
try:
attachments = volume['volume_attachment']
attachments = volume.volume_attachment
if not attachments:
self._copy_volume_data(ctxt, volume, new_volume, remote='dest')
# The above call is synchronous so we complete the migration
self.migrate_volume_completion(ctxt, volume['id'],
new_volume['id'],
self.migrate_volume_completion(ctxt, volume.id,
new_volume.id,
error=False)
else:
nova_api = compute.API()
@ -1663,58 +1666,63 @@ class VolumeManager(manager.SchedulerDependentManager):
for attachment in attachments:
instance_uuid = attachment['instance_uuid']
nova_api.update_server_volume(ctxt, instance_uuid,
volume['id'],
new_volume['id'])
volume.id,
new_volume.id)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Failed to copy volume %(vol1)s to %(vol2)s"),
{'vol1': volume['id'], 'vol2': new_volume['id']})
self._clean_temporary_volume(ctxt, volume['id'],
new_volume['id'])
{'vol1': volume.id, 'vol2': new_volume.id})
self._clean_temporary_volume(ctxt, volume,
new_volume)
def _clean_temporary_volume(self, ctxt, volume_id, new_volume_id,
def _clean_temporary_volume(self, ctxt, volume, new_volume,
clean_db_only=False):
volume = self.db.volume_get(ctxt, volume_id)
# If we're in the migrating phase, we need to cleanup
# destination volume because source volume is remaining
if volume['migration_status'] == 'migrating':
if volume.migration_status == 'migrating':
try:
if clean_db_only:
# The temporary volume is not created, only DB data
# is created
self.db.volume_destroy(ctxt, new_volume_id)
new_volume.destroy()
else:
# The temporary volume is already created
rpcapi = volume_rpcapi.VolumeAPI()
volume = self.db.volume_get(ctxt, new_volume_id)
rpcapi.delete_volume(ctxt, volume)
rpcapi.delete_volume(ctxt, new_volume)
except exception.VolumeNotFound:
LOG.info(_LI("Couldn't find the temporary volume "
"%(vol)s in the database. There is no need "
"to clean up this volume."),
{'vol': new_volume_id})
{'vol': new_volume.id})
else:
# If we're in the completing phase don't delete the
# destination because we may have already deleted the
# source! But the migration_status in database should
# be cleared to handle volume after migration failure
try:
updates = {'migration_status': None}
self.db.volume_update(ctxt, new_volume_id, updates)
new_volume.migration_status = None
new_volume.save()
except exception.VolumeNotFound:
LOG.info(_LI("Couldn't find destination volume "
"%(vol)s in the database. The entry might be "
"successfully deleted during migration "
"completion phase."),
{'vol': new_volume_id})
{'vol': new_volume.id})
LOG.warning(_LW("Failed to migrate volume. The destination "
"volume %(vol)s is not deleted since the "
"source volume may have been deleted."),
{'vol': new_volume_id})
{'vol': new_volume.id})
def migrate_volume_completion(self, ctxt, volume_id, new_volume_id,
error=False):
error=False, volume=None, new_volume=None):
# FIXME(thangp): Remove this in v2.0 of RPC API.
if volume is None or new_volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(ctxt, volume_id)
new_volume = objects.Volume.get_by_id(ctxt, new_volume_id)
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
@ -1722,37 +1730,36 @@ class VolumeManager(manager.SchedulerDependentManager):
utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
self.db.volume_update(ctxt, volume_id,
{'migration_status': 'error'})
volume.migration_status = 'error'
volume.save()
LOG.debug("migrate_volume_completion: completing migration for "
"volume %(vol1)s (temporary volume %(vol2)s",
{'vol1': volume_id, 'vol2': new_volume_id})
volume = self.db.volume_get(ctxt, volume_id)
new_volume = self.db.volume_get(ctxt, new_volume_id)
{'vol1': volume.id, 'vol2': new_volume.id})
rpcapi = volume_rpcapi.VolumeAPI()
orig_volume_status = volume['previous_status']
orig_volume_status = volume.previous_status
if error:
LOG.info(_LI("migrate_volume_completion is cleaning up an error "
"for volume %(vol1)s (temporary volume %(vol2)s"),
{'vol1': volume['id'], 'vol2': new_volume['id']})
{'vol1': volume['id'], 'vol2': new_volume.id})
rpcapi.delete_volume(ctxt, new_volume)
updates = {'migration_status': 'error',
'status': orig_volume_status}
self.db.volume_update(ctxt, volume_id, updates)
return volume_id
volume.update(updates)
volume.save()
return volume.id
self.db.volume_update(ctxt, volume_id,
{'migration_status': 'completing'})
volume.migration_status = 'completing'
volume.save()
# Detach the source volume (if it fails, don't fail the migration)
try:
if orig_volume_status == 'in-use':
attachments = volume['volume_attachment']
attachments = volume.volume_attachment
for attachment in attachments:
self.detach_volume(ctxt, volume_id, attachment['id'])
self.detach_volume(ctxt, volume.id, attachment['id'])
except Exception as ex:
LOG.error(_LE("Detach migration source volume failed: %(err)s"),
{'err': ex}, resource=volume)
@ -1767,20 +1774,21 @@ class VolumeManager(manager.SchedulerDependentManager):
# Swap src and dest DB records so we can continue using the src id and
# asynchronously delete the destination id
__, updated_new = self.db.finish_volume_migration(
ctxt, volume_id, new_volume_id)
ctxt, volume.id, new_volume.id)
updates = {'status': orig_volume_status,
'previous_status': volume['status'],
'previous_status': volume.status,
'migration_status': 'success'}
if orig_volume_status == 'in-use':
attachments = volume['volume_attachment']
attachments = volume.volume_attachment
for attachment in attachments:
rpcapi.attach_volume(ctxt, volume,
attachment['instance_uuid'],
attachment['attached_host'],
attachment['mountpoint'],
'rw')
self.db.volume_update(ctxt, volume_id, updates)
volume.update(updates)
volume.save()
# Asynchronous deletion of the source volume in the back-end (now
# pointed by the target volume id)
@ -1789,15 +1797,21 @@ class VolumeManager(manager.SchedulerDependentManager):
except Exception as ex:
LOG.error(_LE('Failed to request async delete of migration source '
'vol %(vol)s: %(err)s'),
{'vol': volume_id, 'err': ex})
{'vol': volume.id, 'err': ex})
LOG.info(_LI("Complete-Migrate volume completed successfully."),
resource=volume)
return volume['id']
return volume.id
def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False,
new_type_id=None):
new_type_id=None, volume=None):
"""Migrate the volume to the specified host (called on source host)."""
# FIXME(thangp): Remove this in v2.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
@ -1805,54 +1819,54 @@ class VolumeManager(manager.SchedulerDependentManager):
utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
self.db.volume_update(ctxt, volume_id,
{'migration_status': 'error'})
volume.migration_status = 'error'
volume.save()
volume_ref = self.db.volume_get(ctxt, volume_id)
model_update = None
moved = False
status_update = None
if volume_ref['status'] in ('retyping', 'maintenance'):
status_update = {'status': volume_ref['previous_status']}
if volume.status in ('retyping', 'maintenance'):
status_update = {'status': volume.previous_status}
self.db.volume_update(ctxt, volume_ref['id'],
{'migration_status': 'migrating'})
volume.migration_status = 'migrating'
volume.save()
if not force_host_copy and new_type_id is None:
try:
LOG.debug("Issue driver.migrate_volume.", resource=volume_ref)
LOG.debug("Issue driver.migrate_volume.", resource=volume)
moved, model_update = self.driver.migrate_volume(ctxt,
volume_ref,
volume,
host)
if moved:
updates = {'host': host['host'],
'migration_status': 'success',
'previous_status': volume_ref['status']}
'previous_status': volume.status}
if status_update:
updates.update(status_update)
if model_update:
updates.update(model_update)
volume_ref = self.db.volume_update(ctxt,
volume_ref['id'],
updates)
volume.update(updates)
volume.save()
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': 'error'}
if status_update:
updates.update(status_update)
self.db.volume_update(ctxt, volume_ref['id'], updates)
volume.update(updates)
volume.save()
if not moved:
try:
self._migrate_volume_generic(ctxt, volume_ref, host,
self._migrate_volume_generic(ctxt, volume, host,
new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': 'error'}
if status_update:
updates.update(status_update)
self.db.volume_update(ctxt, volume_ref['id'], updates)
volume.update(updates)
volume.save()
LOG.info(_LI("Migrate volume completed successfully."),
resource=volume_ref)
resource=volume)
@periodic_task.periodic_task
def _report_driver_status(self, context):
@ -3088,14 +3102,16 @@ class VolumeManager(manager.SchedulerDependentManager):
def update_migrated_volume(self, ctxt, volume, new_volume,
volume_status):
"""Finalize migration process on backend device."""
# FIXME(thangp): Remove this in v2.0 of RPC API.
if (not isinstance(volume, objects.Volume) or
not isinstance(new_volume, objects.Volume)):
volume = objects.Volume.get_by_id(ctxt, volume['id'])
new_volume = objects.Volume.get_by_id(ctxt, new_volume['id'])
model_update = None
# This is temporary fix for bug 1491210.
volume = self.db.volume_get(ctxt, volume['id'])
new_volume = self.db.volume_get(ctxt, new_volume['id'])
model_update_default = {'_name_id': new_volume['_name_id'] or
new_volume['id'],
model_update_default = {'_name_id': new_volume.name_id,
'provider_location':
new_volume['provider_location']}
new_volume.provider_location}
try:
model_update = self.driver.update_migrated_volume(ctxt,
volume,
@ -3119,17 +3135,19 @@ class VolumeManager(manager.SchedulerDependentManager):
if volume.get('volume_metadata'):
model_update_new[key] = {
metadata['key']: metadata['value']
for metadata in volume.get('volume_metadata')}
for metadata in volume.volume_metadata}
elif key == 'admin_metadata':
model_update_new[key] = {
metadata['key']: metadata['value']
for metadata in volume.get('volume_admin_metadata')}
for metadata in volume.volume_admin_metadata}
else:
model_update_new[key] = volume[key]
self.db.volume_update(ctxt.elevated(), new_volume['id'],
model_update_new)
self.db.volume_update(ctxt.elevated(), volume['id'],
model_update_default)
with new_volume.obj_as_admin():
new_volume.update(model_update_new)
new_volume.save()
with volume.obj_as_admin():
volume.update(model_update_default)
volume.save()
# Replication V2 methods
def enable_replication(self, context, volume):

@ -83,6 +83,8 @@ class VolumeAPI(object):
1.33 - Adds support for sending objects over RPC in delete_volume().
1.34 - Adds support for sending objects over RPC in retype().
1.35 - Adds support for sending objects over RPC in extend_volume().
1.36 - Adds support for sending objects over RPC in migrate_volume(),
migrate_volume_completion(), and update_migrated_volume().
"""
BASE_RPC_API_VERSION = '1.0'
@ -246,20 +248,35 @@ class VolumeAPI(object):
cctxt.cast(ctxt, 'extend_volume', **msg_args)
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.8')
new_host = utils.extract_host(volume.host)
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
cctxt.cast(ctxt, 'migrate_volume', volume_id=volume['id'],
host=host_p, force_host_copy=force_host_copy)
msg_args = {'volume_id': volume.id, 'host': host_p,
'force_host_copy': force_host_copy}
if self.client.can_send_version('1.36'):
version = '1.36'
msg_args['volume'] = volume
else:
version = '1.8'
cctxt = self.client.prepare(server=new_host, version=version)
cctxt.cast(ctxt, 'migrate_volume', **msg_args)
def migrate_volume_completion(self, ctxt, volume, new_volume, error):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.10')
return cctxt.call(ctxt, 'migrate_volume_completion',
volume_id=volume['id'],
new_volume_id=new_volume['id'],
error=error)
new_host = utils.extract_host(volume.host)
msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id,
'error': error}
if self.client.can_send_version('1.36'):
version = '1.36'
msg_args['volume'] = volume
msg_args['new_volume'] = new_volume
else:
version = '1.10'
cctxt = self.client.prepare(server=new_host, version=version)
return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args)
def retype(self, ctxt, volume, new_type_id, dest_host,
migration_policy='never', reservations=None):
@ -296,7 +313,7 @@ class VolumeAPI(object):
def update_migrated_volume(self, ctxt, volume, new_volume,
original_volume_status):
host = utils.extract_host(new_volume['host'])
cctxt = self.client.prepare(server=host, version='1.19')
cctxt = self.client.prepare(server=host, version='1.36')
cctxt.call(ctxt,
'update_migrated_volume',
volume=volume,