Create Consistency Group from CG Snapshot API

This patch addressed the following:
* Added a new create Consistency Group from CG Snapshot API.
  - Note this is separate from the Create Consistency Group
    API which requires volume types as the input.
* Added a corresponding driver API.

Conflicts:
	cinder/volume/rpcapi.py

Partial-Implements: blueprint consistency-groups-kilo-update
Change-Id: I3a5f55d9dfd3fd4d70833824b29ebbd71986c143
This commit is contained in:
Xing Yang 2015-02-23 21:38:14 -05:00
parent 1a62a6e60f
commit adb4c80be8
16 changed files with 853 additions and 50 deletions

View File

@ -42,6 +42,15 @@ def make_consistencygroup(elem):
elem.set('description')
def make_consistencygroup_from_src(elem):
elem.set('id')
elem.set('status')
elem.set('created_at')
elem.set('name')
elem.set('description')
elem.set('cgsnapshot_id')
class ConsistencyGroupTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('consistencygroup',
@ -63,6 +72,16 @@ class ConsistencyGroupsTemplate(xmlutil.TemplateBuilder):
return xmlutil.MasterTemplate(root, 1, nsmap={alias: namespace})
class ConsistencyGroupFromSrcTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('consistencygroup-from-src',
selector='consistencygroup-from-src')
make_consistencygroup_from_src(root)
alias = Consistencygroups.alias
namespace = Consistencygroups.namespace
return xmlutil.MasterTemplate(root, 1, nsmap={alias: namespace})
class CreateDeserializer(wsgi.MetadataXMLDeserializer):
def default(self, string):
dom = utils.safe_minidom_parse_string(string)
@ -85,6 +104,27 @@ class CreateDeserializer(wsgi.MetadataXMLDeserializer):
return consistencygroup
class CreateFromSrcDeserializer(wsgi.MetadataXMLDeserializer):
def default(self, string):
dom = utils.safe_minidom_parse_string(string)
consistencygroup = self._extract_consistencygroup(dom)
retval = {'body': {'consistencygroup-from-src': consistencygroup}}
return retval
def _extract_consistencygroup(self, node):
consistencygroup = {}
consistencygroup_node = self.find_first_child_named(
node, 'consistencygroup-from-src')
attributes = ['cgsnapshot', 'name', 'description']
for attr in attributes:
if consistencygroup_node.getAttribute(attr):
consistencygroup[attr] = (
consistencygroup_node.getAttribute(attr))
return consistencygroup
class ConsistencyGroupsController(wsgi.Controller):
"""The ConsistencyGroups API controller for the OpenStack API."""
@ -201,6 +241,58 @@ class ConsistencyGroupsController(wsgi.Controller):
dict(new_consistencygroup.iteritems()))
return retval
@wsgi.response(202)
@wsgi.serializers(xml=ConsistencyGroupFromSrcTemplate)
@wsgi.deserializers(xml=CreateFromSrcDeserializer)
def create_from_src(self, req, body):
"""Create a new consistency group from a source.
The source can be a snapshot. It could be extended
in the future to support other sources. Note that
this does not require volume_types as the "create"
API above.
"""
LOG.debug('Creating new consistency group %s.', body)
if not self.is_valid_body(body, 'consistencygroup-from-src'):
raise exc.HTTPBadRequest()
context = req.environ['cinder.context']
try:
consistencygroup = body['consistencygroup-from-src']
except KeyError:
msg = _("Incorrect request body format.")
raise exc.HTTPBadRequest(explanation=msg)
name = consistencygroup.get('name', None)
description = consistencygroup.get('description', None)
cgsnapshot_id = consistencygroup.get('cgsnapshot_id', None)
if not cgsnapshot_id:
msg = _("Cgsnapshot id must be provided to create "
"consistency group %(name)s from source.") % {'name': name}
raise exc.HTTPBadRequest(explanation=msg)
LOG.info(_LI("Creating consistency group %(name)s from cgsnapshot "
"%(snap)s."),
{'name': name, 'snap': cgsnapshot_id},
context=context)
try:
new_consistencygroup = self.consistencygroup_api.create_from_src(
context, name, description, cgsnapshot_id)
except exception.InvalidConsistencyGroup as error:
raise exc.HTTPBadRequest(explanation=error.msg)
except exception.CgSnapshotNotFound as error:
raise exc.HTTPBadRequest(explanation=error.msg)
except exception.ConsistencyGroupNotFound as error:
raise exc.HTTPNotFound(explanation=error.msg)
except exception.CinderException as error:
raise exc.HTTPBadRequest(explanation=error.msg)
retval = self._view_builder.summary(
req,
dict(new_consistencygroup.iteritems()))
return retval
@wsgi.serializers(xml=ConsistencyGroupTemplate)
def update(self, req, id, body):
"""Update the consistency group.
@ -273,7 +365,7 @@ class Consistencygroups(extensions.ExtensionDescriptor):
resources = []
res = extensions.ResourceExtension(
Consistencygroups.alias, ConsistencyGroupsController(),
collection_actions={'detail': 'GET'},
collection_actions={'detail': 'GET', 'create_from_src': 'POST'},
member_actions={'delete': 'POST', 'update': 'PUT'})
resources.append(res)
return resources

View File

@ -109,6 +109,7 @@ class API(base.Base):
cg_volume_types, availability_zone=None):
check_policy(context, 'create')
volume_type_list = None
volume_type_list = cg_volume_types.split(',')
@ -159,6 +160,113 @@ class API(base.Base):
return group
def create_from_src(self, context, name, description, cgsnapshot_id):
check_policy(context, 'create')
cgsnapshot = None
orig_cg = None
if cgsnapshot_id:
cgsnapshot = self.db.cgsnapshot_get(context, cgsnapshot_id)
if cgsnapshot:
orig_cg = self.db.consistencygroup_get(
context,
cgsnapshot['consistencygroup_id'])
options = {'user_id': context.user_id,
'project_id': context.project_id,
'status': "creating",
'name': name,
'description': description,
'cgsnapshot_id': cgsnapshot_id}
if orig_cg:
options['volume_type_id'] = orig_cg.get('volume_type_id')
options['availability_zone'] = orig_cg.get('availability_zone')
options['host'] = orig_cg.get('host')
group = None
try:
group = self.db.consistencygroup_create(context, options)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Error occurred when creating consistency group"
" %(cg)s from cgsnapshot %(cgsnap)s."),
{'cg': name, 'cgsnap': cgsnapshot_id})
# Update quota for consistencygroups
self.update_quota(context, group['id'], 1)
if not group['host']:
msg = _("No host to create consistency group %s.") % group['id']
LOG.error(msg)
raise exception.InvalidConsistencyGroup(reason=msg)
self._create_cg_from_cgsnapshot(context, group, cgsnapshot)
return group
def _create_cg_from_cgsnapshot(self, context, group, cgsnapshot):
try:
snapshots = self.db.snapshot_get_all_for_cgsnapshot(
context, cgsnapshot['id'])
if not snapshots:
msg = _("Cgsnahost is empty. No consistency group "
"will be created.")
raise exception.InvalidConsistencyGroup(reason=msg)
for snapshot in snapshots:
kwargs = {}
kwargs['availability_zone'] = group.get('availability_zone')
kwargs['cgsnapshot'] = cgsnapshot
kwargs['consistencygroup'] = group
kwargs['snapshot'] = snapshot
volume_type_id = snapshot.get('volume_type_id')
if volume_type_id:
kwargs['volume_type'] = volume_types.get_volume_type(
context, volume_type_id)
# Since cgsnapshot is passed in, the following call will
# create a db entry for the volume, but will not call the
# volume manager to create a real volume in the backend yet.
# If error happens, taskflow will handle rollback of quota
# and removal of volume entry in the db.
try:
self.volume_api.create(context,
snapshot['volume_size'],
None,
None,
**kwargs)
except exception.CinderException:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Error occurred when creating volume "
"entry from snapshot in the process of "
"creating consistency group %(group)s "
"from cgsnapshot %(cgsnap)s."),
{'group': group['id'],
'cgsnap': cgsnapshot['id']})
except Exception:
with excutils.save_and_reraise_exception():
try:
self.db.consistencygroup_destroy(context.elevated(),
group['id'])
finally:
LOG.error(_LE("Error occurred when creating consistency "
"group %(group)s from cgsnapshot "
"%(cgsnap)s."),
{'group': group['id'],
'cgsnap': cgsnapshot['id']})
volumes = self.db.volume_get_all_by_group(context,
group['id'])
for vol in volumes:
# Update the host field for the volume.
self.db.volume_update(context, vol['id'],
{'host': group.get('host')})
self.volume_rpcapi.create_consistencygroup_from_src(
context, group, group['host'], cgsnapshot)
def _cast_create_consistencygroup(self, context, group_id,
request_spec_list,
filter_properties_list):

View File

@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column
from sqlalchemy import MetaData, String, Table
from cinder.i18n import _LE
from cinder.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def upgrade(migrate_engine):
"""Add cgsnapshot_id column to consistencygroups."""
meta = MetaData()
meta.bind = migrate_engine
consistencygroups = Table('consistencygroups', meta, autoload=True)
cgsnapshot_id = Column('cgsnapshot_id', String(36))
try:
consistencygroups.create_column(cgsnapshot_id)
consistencygroups.update().values(cgsnapshot_id=None).execute()
except Exception:
LOG.error(_LE("Adding cgsnapshot_id column to consistencygroups "
"table failed."))
raise
def downgrade(migrate_engine):
"""Remove cgsnapshot_id column from consistencygroups."""
meta = MetaData()
meta.bind = migrate_engine
consistencygroups = Table('consistencygroups', meta, autoload=True)
cgsnapshot_id = consistencygroups.columns.cgsnapshot_id
try:
consistencygroups.drop_column(cgsnapshot_id)
except Exception:
LOG.error(_LE("Dropping cgsnapshot_id column from consistencygroups "
"table failed."))
raise

View File

@ -79,6 +79,7 @@ class ConsistencyGroup(BASE, CinderBase):
description = Column(String(255))
volume_type_id = Column(String(255))
status = Column(String(255))
cgsnapshot_id = Column(String(36))
class Cgsnapshot(BASE, CinderBase):

View File

@ -26,10 +26,13 @@ import webob
import cinder.consistencygroup
from cinder import context
from cinder import db
from cinder import exception
from cinder.i18n import _
from cinder import test
from cinder.tests.api import fakes
from cinder.tests.api.v2 import stubs
from cinder.tests import utils
from cinder.volume import api as volume_api
class ConsistencyGroupsAPITestCase(test.TestCase):
@ -673,3 +676,193 @@ class ConsistencyGroupsAPITestCase(test.TestCase):
self.assertEqual(msg, res_dict['badRequest']['message'])
db.consistencygroup_destroy(ctxt.elevated(), consistencygroup_id)
def test_create_consistencygroup_from_src(self):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
ctxt = context.RequestContext('fake', 'fake', auth_token=True)
consistencygroup_id = utils.create_consistencygroup(ctxt)['id']
volume_id = utils.create_volume(
ctxt,
consistencygroup_id=consistencygroup_id)['id']
cgsnapshot_id = utils.create_cgsnapshot(
ctxt,
consistencygroup_id=consistencygroup_id)['id']
snapshot_id = utils.create_snapshot(
ctxt,
volume_id,
cgsnapshot_id=cgsnapshot_id,
status='available')['id']
test_cg_name = 'test cg'
body = {"consistencygroup-from-src": {"name": test_cg_name,
"description":
"Consistency Group 1",
"cgsnapshot_id": cgsnapshot_id}}
req = webob.Request.blank('/v2/fake/consistencygroups/create_from_src')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(202, res.status_int)
self.assertIn('id', res_dict['consistencygroup'])
self.assertEqual(test_cg_name, res_dict['consistencygroup']['name'])
db.consistencygroup_destroy(ctxt.elevated(),
res_dict['consistencygroup']['id'])
db.snapshot_destroy(ctxt.elevated(), snapshot_id)
db.cgsnapshot_destroy(ctxt.elevated(), cgsnapshot_id)
db.volume_destroy(ctxt.elevated(), volume_id)
db.consistencygroup_destroy(ctxt.elevated(), consistencygroup_id)
def test_create_consistencygroup_from_src_invalid_body(self):
name = 'cg1'
body = {"invalid": {"name": name,
"description":
"Consistency Group 1", }}
req = webob.Request.blank('/v2/fake/consistencygroups/create_from_src')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(400, res.status_int)
self.assertEqual(400, res_dict['badRequest']['code'])
msg = (_('The server could not comply with the request since '
'it is either malformed or otherwise incorrect.'))
self.assertEqual(msg, res_dict['badRequest']['message'])
def test_create_consistencygroup_from_src_no_cgsnapshot_id(self):
name = 'cg1'
body = {"consistencygroup-from-src": {"name": name,
"description":
"Consistency Group 1", }}
req = webob.Request.blank('/v2/fake/consistencygroups/create_from_src')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(400, res.status_int)
self.assertEqual(400, res_dict['badRequest']['code'])
msg = (_('Cgsnapshot id must be provided to create '
'consistency group %s from source.') % name)
self.assertEqual(msg, res_dict['badRequest']['message'])
def test_create_consistencygroup_from_src_no_host(self):
ctxt = context.RequestContext('fake', 'fake', auth_token=True)
consistencygroup_id = utils.create_consistencygroup(
ctxt,
host=None)['id']
volume_id = utils.create_volume(
ctxt,
consistencygroup_id=consistencygroup_id)['id']
cgsnapshot_id = utils.create_cgsnapshot(
ctxt,
consistencygroup_id=consistencygroup_id)['id']
snapshot_id = utils.create_snapshot(
ctxt,
volume_id,
cgsnapshot_id=cgsnapshot_id,
status='available')['id']
test_cg_name = 'test cg'
body = {"consistencygroup-from-src": {"name": test_cg_name,
"description":
"Consistency Group 1",
"cgsnapshot_id": cgsnapshot_id}}
req = webob.Request.blank('/v2/fake/consistencygroups/create_from_src')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(400, res.status_int)
self.assertEqual(400, res_dict['badRequest']['code'])
msg = _('Invalid ConsistencyGroup: No host to create consistency '
'group')
self.assertIn(msg, res_dict['badRequest']['message'])
db.snapshot_destroy(ctxt.elevated(), snapshot_id)
db.cgsnapshot_destroy(ctxt.elevated(), cgsnapshot_id)
db.volume_destroy(ctxt.elevated(), volume_id)
db.consistencygroup_destroy(ctxt.elevated(), consistencygroup_id)
def test_create_consistencygroup_from_src_cgsnapshot_empty(self):
ctxt = context.RequestContext('fake', 'fake', auth_token=True)
consistencygroup_id = utils.create_consistencygroup(
ctxt)['id']
volume_id = utils.create_volume(
ctxt,
consistencygroup_id=consistencygroup_id)['id']
cgsnapshot_id = utils.create_cgsnapshot(
ctxt,
consistencygroup_id=consistencygroup_id)['id']
test_cg_name = 'test cg'
body = {"consistencygroup-from-src": {"name": test_cg_name,
"description":
"Consistency Group 1",
"cgsnapshot_id": cgsnapshot_id}}
req = webob.Request.blank('/v2/fake/consistencygroups/create_from_src')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(400, res.status_int)
self.assertEqual(400, res_dict['badRequest']['code'])
msg = _("Invalid ConsistencyGroup: Cgsnahost is empty. No "
"consistency group will be created.")
self.assertIn(msg, res_dict['badRequest']['message'])
db.cgsnapshot_destroy(ctxt.elevated(), cgsnapshot_id)
db.volume_destroy(ctxt.elevated(), volume_id)
db.consistencygroup_destroy(ctxt.elevated(), consistencygroup_id)
@mock.patch.object(volume_api.API, 'create',
side_effect=exception.CinderException(
'Create volume failed.'))
def test_create_consistencygroup_from_src_create_volume_failed(
self, mock_create):
ctxt = context.RequestContext('fake', 'fake', auth_token=True)
consistencygroup_id = utils.create_consistencygroup(ctxt)['id']
volume_id = utils.create_volume(
ctxt,
consistencygroup_id=consistencygroup_id)['id']
cgsnapshot_id = utils.create_cgsnapshot(
ctxt,
consistencygroup_id=consistencygroup_id)['id']
snapshot_id = utils.create_snapshot(
ctxt,
volume_id,
cgsnapshot_id=cgsnapshot_id,
status='available')['id']
test_cg_name = 'test cg'
body = {"consistencygroup-from-src": {"name": test_cg_name,
"description":
"Consistency Group 1",
"cgsnapshot_id": cgsnapshot_id}}
req = webob.Request.blank('/v2/fake/consistencygroups/create_from_src')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(400, res.status_int)
self.assertEqual(400, res_dict['badRequest']['code'])
msg = _("Create volume failed.")
self.assertEqual(msg, res_dict['badRequest']['message'])
db.snapshot_destroy(ctxt.elevated(), snapshot_id)
db.cgsnapshot_destroy(ctxt.elevated(), cgsnapshot_id)
db.volume_destroy(ctxt.elevated(), volume_id)
db.consistencygroup_destroy(ctxt.elevated(), consistencygroup_id)

View File

@ -92,7 +92,8 @@ class CreateVolumeFlowTestCase(test.TestCase):
'snapshot_id': None,
'image_id': None,
'source_replicaid': None,
'consistencygroup_id': None}
'consistencygroup_id': None,
'cgsnapshot_id': None}
task = create_volume.VolumeCastTask(
fake_scheduler_rpc_api(spec, self),
@ -106,7 +107,8 @@ class CreateVolumeFlowTestCase(test.TestCase):
'snapshot_id': 3,
'image_id': 4,
'source_replicaid': 5,
'consistencygroup_id': 5}
'consistencygroup_id': 5,
'cgsnapshot_id': None}
task = create_volume.VolumeCastTask(
fake_scheduler_rpc_api(spec, self),

View File

@ -722,6 +722,15 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
snapshots = db_utils.get_table(engine, 'snapshots')
self.assertNotIn('provider_id', snapshots.c)
def _check_037(self, engine, data):
consistencygroups = db_utils.get_table(engine, 'consistencygroups')
self.assertIsInstance(consistencygroups.c.cgsnapshot_id.type,
sqlalchemy.types.VARCHAR)
def _post_downgrade_037(self, engine):
consistencygroups = db_utils.get_table(engine, 'consistencygroups')
self.assertNotIn('cgsnapshot_id', consistencygroups.c)
def test_walk_versions(self):
self.walk_versions(True, False)

View File

@ -3678,6 +3678,99 @@ class VolumeTestCase(BaseVolumeTestCase):
self.volume.db.volume_get.reset_mock()
self.volume.db.volume_get = volume_get_orig
@mock.patch.object(driver.VolumeDriver,
"create_consistencygroup",
return_value={'status': 'available'})
@mock.patch.object(driver.VolumeDriver,
"delete_consistencygroup",
return_value=({'status': 'deleted'}, []))
@mock.patch.object(driver.VolumeDriver,
"create_cgsnapshot",
return_value={'status': 'available'})
@mock.patch.object(driver.VolumeDriver,
"delete_cgsnapshot",
return_value=({'status': 'deleted'}, []))
@mock.patch.object(driver.VolumeDriver,
"create_consistencygroup_from_src",
return_value=(None, None))
def test_create_consistencygroup_from_src(self, mock_create_from_src,
mock_delete_cgsnap,
mock_create_cgsnap,
mock_delete_cg, mock_create_cg):
"""Test consistencygroup can be created and deleted."""
group = tests_utils.create_consistencygroup(
self.context,
availability_zone=CONF.storage_availability_zone,
volume_type='type1,type2')
group_id = group['id']
volume = tests_utils.create_volume(
self.context,
consistencygroup_id=group_id,
**self.volume_params)
volume_id = volume['id']
cgsnapshot_returns = self._create_cgsnapshot(group_id, volume_id)
cgsnapshot_id = cgsnapshot_returns[0]['id']
snapshot_id = cgsnapshot_returns[1]['id']
group2 = tests_utils.create_consistencygroup(
self.context,
availability_zone=CONF.storage_availability_zone,
volume_type='type1,type2',
cgsnapshot_id=cgsnapshot_id)
group2_id = group2['id']
volume2 = tests_utils.create_volume(
self.context,
consistencygroup_id=group2_id,
snapshot_id=snapshot_id,
**self.volume_params)
volume2_id = volume2['id']
self.volume.create_volume(self.context, volume2_id)
self.volume.create_consistencygroup_from_src(
self.context, group2_id, cgsnapshot_id=cgsnapshot_id)
cg2 = db.consistencygroup_get(
self.context,
group2_id)
expected = {
'status': 'available',
'name': 'test_cg',
'availability_zone': 'nova',
'tenant_id': 'fake',
'created_at': 'DONTCARE',
'user_id': 'fake',
'consistencygroup_id': group2_id
}
self.assertEqual('available', cg2['status'])
self.assertEqual(6, len(fake_notifier.NOTIFICATIONS))
msg = fake_notifier.NOTIFICATIONS[2]
self.assertEqual('consistencygroup.create.start', msg['event_type'])
self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[4]
self.assertEqual('consistencygroup.create.end', msg['event_type'])
self.assertDictMatch(expected, msg['payload'])
self.volume.delete_consistencygroup(self.context, group2_id)
self.assertEqual(len(fake_notifier.NOTIFICATIONS), 10)
msg = fake_notifier.NOTIFICATIONS[6]
self.assertEqual(msg['event_type'], 'consistencygroup.delete.start')
expected['status'] = 'available'
self.assertDictMatch(expected, msg['payload'])
msg = fake_notifier.NOTIFICATIONS[8]
self.assertEqual(msg['event_type'], 'consistencygroup.delete.end')
self.assertDictMatch(expected, msg['payload'])
cg2 = db.consistencygroup_get(
context.get_admin_context(read_deleted='yes'),
group2_id)
self.assertEqual('deleted', cg2['status'])
self.assertRaises(exception.NotFound,
db.consistencygroup_get,
self.context,
group2_id)
self.volume.delete_cgsnapshot(self.context, cgsnapshot_id)
self.volume.delete_consistencygroup(self.context, group_id)
@staticmethod
def _create_cgsnapshot(group_id, volume_id, size='0'):
"""Create a cgsnapshot object."""

View File

@ -159,6 +159,7 @@ class VolumeRpcAPITestCase(test.TestCase):
source_volid='fake_src_id',
source_replicaid='fake_replica_id',
consistencygroup_id='fake_cg_id',
cgsnapshot_id=None,
version='1.4')
def test_create_volume_serialization(self):
@ -175,6 +176,7 @@ class VolumeRpcAPITestCase(test.TestCase):
source_volid='fake_src_id',
source_replicaid='fake_replica_id',
consistencygroup_id='fake_cg_id',
cgsnapshot_id=None,
version='1.4')
def test_delete_volume(self):

View File

@ -87,6 +87,7 @@ def create_consistencygroup(ctxt,
status='available',
availability_zone='fake_az',
volume_type_id=None,
cgsnapshot_id=None,
**kwargs):
"""Create a consistencygroup object in the DB."""
cg = {}

View File

@ -162,7 +162,8 @@ class API(base.Base):
image_id=None, volume_type=None, metadata=None,
availability_zone=None, source_volume=None,
scheduler_hints=None,
source_replica=None, consistencygroup=None):
source_replica=None, consistencygroup=None,
cgsnapshot=None):
# NOTE(jdg): we can have a create without size if we're
# doing a create from snap or volume. Currently
@ -180,7 +181,7 @@ class API(base.Base):
'than zero).') % size
raise exception.InvalidInput(reason=msg)
if consistencygroup:
if consistencygroup and not cgsnapshot:
if not volume_type:
msg = _("volume_type must be provided when creating "
"a volume in a consistency group.")
@ -235,15 +236,22 @@ class API(base.Base):
'key_manager': self.key_manager,
'source_replica': source_replica,
'optional_args': {'is_quota_committed': False},
'consistencygroup': consistencygroup
'consistencygroup': consistencygroup,
'cgsnapshot': cgsnapshot,
}
try:
flow_engine = create_volume.get_flow(self.scheduler_rpcapi,
self.volume_rpcapi,
self.db,
self.image_service,
availability_zones,
create_what)
if cgsnapshot:
flow_engine = create_volume.get_flow_no_rpc(self.db,
self.image_service,
availability_zones,
create_what)
else:
flow_engine = create_volume.get_flow(self.scheduler_rpcapi,
self.volume_rpcapi,
self.db,
self.image_service,
availability_zones,
create_what)
except Exception:
msg = _('Failed to create api volume flow.')
LOG.exception(msg)

View File

@ -1111,6 +1111,34 @@ class VolumeDriver(ConsistencyGroupVD, TransferVD, ManageableVD, ExtendVD,
"""Creates a consistencygroup."""
raise NotImplementedError()
def create_consistencygroup_from_src(self, context, group, volumes,
cgsnapshot=None, snapshots=None):
"""Creates a consistencygroup from source.
:param context: the context of the caller.
:param group: the dictionary of the consistency group to be created.
:param volumes: a list of volume dictionaries in the group.
:param cgsnapshot: the dictionary of the cgsnapshot as source.
:param snapshots: a list of snapshot dictionaries in the cgsnapshot.
:return model_update, volumes_model_update
Currently the source can only be cgsnapshot.
param volumes is retrieved directly from the db. It is a list of
cinder.db.sqlalchemy.models.Volume to be precise. It cannot be
assigned to volumes_model_update. volumes_model_update is a list of
dictionaries. It has to be built by the driver. An entry will be
in this format: ['id': xxx, 'status': xxx, ......]. model_update
will be in this format: ['status': xxx, ......].
To be consistent with other volume operations, the manager will
assume the operation is successful if no exception is thrown by
the driver. For a successful operation, the driver can either build
the model_update and volumes_model_update and return them or
return None, None.
"""
raise NotImplementedError()
def delete_consistencygroup(self, context, group):
"""Deletes a consistency group."""
raise NotImplementedError()

View File

@ -40,8 +40,9 @@ QUOTAS = quota.QUOTAS
# from, 'error' being the common example.
SNAPSHOT_PROCEED_STATUS = ('available',)
SRC_VOL_PROCEED_STATUS = ('available', 'in-use',)
REPLICA_PROCEED_STATUS = ('active', 'active-stopped')
CG_PROCEED_STATUS = ('available',)
REPLICA_PROCEED_STATUS = ('active', 'active-stopped',)
CG_PROCEED_STATUS = ('available', 'creating',)
CGSNAPSHOT_PROCEED_STATUS = ('available',)
class ExtractVolumeRequestTask(flow_utils.CinderTask):
@ -61,7 +62,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
default_provides = set(['availability_zone', 'size', 'snapshot_id',
'source_volid', 'volume_type', 'volume_type_id',
'encryption_key_id', 'source_replicaid',
'consistencygroup_id'])
'consistencygroup_id', 'cgsnapshot_id'])
def __init__(self, image_service, availability_zones, **kwargs):
super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION],
@ -87,6 +88,24 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
consistencygroup_id = consistencygroup['id']
return consistencygroup_id
@staticmethod
def _extract_cgsnapshot(cgsnapshot):
"""Extracts the cgsnapshot id from the provided cgsnapshot.
This function validates the input cgsnapshot dict and checks that
the status of that cgsnapshot is valid for creating a cg from.
"""
cgsnapshot_id = None
if cgsnapshot:
if cgsnapshot['status'] not in CGSNAPSHOT_PROCEED_STATUS:
msg = _("Originating CGSNAPSHOT status must be one"
" of '%s' values")
msg = msg % (", ".join(CGSNAPSHOT_PROCEED_STATUS))
raise exception.InvalidCgSnapshot(reason=msg)
cgsnapshot_id = cgsnapshot['id']
return cgsnapshot_id
@staticmethod
def _extract_snapshot(snapshot):
"""Extracts the snapshot id from the provided snapshot (if provided).
@ -379,7 +398,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
def execute(self, context, size, snapshot, image_id, source_volume,
availability_zone, volume_type, metadata,
key_manager, source_replica,
consistencygroup):
consistencygroup, cgsnapshot):
utils.check_exclusive_options(snapshot=snapshot,
imageRef=image_id,
@ -393,6 +412,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
source_replicaid = self._extract_source_replica(source_replica)
size = self._extract_size(size, source_volume, snapshot)
consistencygroup_id = self._extract_consistencygroup(consistencygroup)
cgsnapshot_id = self._extract_cgsnapshot(cgsnapshot)
self._check_image_metadata(context, image_id, size)
@ -445,6 +465,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
'qos_specs': specs,
'source_replicaid': source_replicaid,
'consistencygroup_id': consistencygroup_id,
'cgsnapshot_id': cgsnapshot_id,
}
@ -460,7 +481,8 @@ class EntryCreateTask(flow_utils.CinderTask):
requires = ['availability_zone', 'description', 'metadata',
'name', 'reservations', 'size', 'snapshot_id',
'source_volid', 'volume_type_id', 'encryption_key_id',
'source_replicaid', 'consistencygroup_id', ]
'source_replicaid', 'consistencygroup_id',
'cgsnapshot_id', ]
super(EntryCreateTask, self).__init__(addons=[ACTION],
requires=requires)
self.db = db
@ -672,7 +694,7 @@ class VolumeCastTask(flow_utils.CinderTask):
requires = ['image_id', 'scheduler_hints', 'snapshot_id',
'source_volid', 'volume_id', 'volume_type',
'volume_properties', 'source_replicaid',
'consistencygroup_id']
'consistencygroup_id', 'cgsnapshot_id', ]
super(VolumeCastTask, self).__init__(addons=[ACTION],
requires=requires)
self.volume_rpcapi = volume_rpcapi
@ -687,6 +709,7 @@ class VolumeCastTask(flow_utils.CinderTask):
image_id = request_spec['image_id']
group_id = request_spec['consistencygroup_id']
host = None
cgsnapshot_id = request_spec['cgsnapshot_id']
if group_id:
group = self.db.consistencygroup_get(context, group_id)
@ -726,18 +749,19 @@ class VolumeCastTask(flow_utils.CinderTask):
now = timeutils.utcnow()
values = {'host': host, 'scheduled_at': now}
volume_ref = self.db.volume_update(context, volume_id, values)
self.volume_rpcapi.create_volume(
context,
volume_ref,
volume_ref['host'],
request_spec,
filter_properties,
allow_reschedule=False,
snapshot_id=snapshot_id,
image_id=image_id,
source_volid=source_volid,
source_replicaid=source_replicaid,
consistencygroup_id=group_id)
if not cgsnapshot_id:
self.volume_rpcapi.create_volume(
context,
volume_ref,
volume_ref['host'],
request_spec,
filter_properties,
allow_reschedule=False,
snapshot_id=snapshot_id,
image_id=image_id,
source_volid=source_volid,
source_replicaid=source_replicaid,
consistencygroup_id=group_id)
def execute(self, context, **kwargs):
scheduler_hints = kwargs.pop('scheduler_hints', None)
@ -796,3 +820,33 @@ def get_flow(scheduler_rpcapi, volume_rpcapi, db_api,
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(api_flow, store=create_what)
def get_flow_no_rpc(db_api, image_service_api, availability_zones,
create_what):
"""Constructs and returns the api entrypoint flow.
This flow will do the following:
1. Inject keys & values for dependent tasks.
2. Extracts and validates the input keys & values.
3. Reserves the quota (reverts quota on any failures).
4. Creates the database entry.
5. Commits the quota.
"""
flow_name = ACTION.replace(":", "_") + "_api"
api_flow = linear_flow.Flow(flow_name)
api_flow.add(ExtractVolumeRequestTask(
image_service_api,
availability_zones,
rebind={'size': 'raw_size',
'availability_zone': 'raw_availability_zone',
'volume_type': 'raw_volume_type'}))
api_flow.add(QuotaReserveTask(),
EntryCreateTask(db_api),
QuotaCommitTask())
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(api_flow, store=create_what)

View File

@ -716,7 +716,7 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
allow_reschedule, reschedule_context, request_spec,
filter_properties, snapshot_id=None, image_id=None,
source_volid=None, source_replicaid=None,
consistencygroup_id=None):
consistencygroup_id=None, cgsnapshot_id=None):
"""Constructs and returns the manager entrypoint flow.
This flow will do the following:
@ -748,6 +748,7 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
'volume_id': volume_id,
'source_replicaid': source_replicaid,
'consistencygroup_id': consistencygroup_id,
'cgsnapshot_id': cgsnapshot_id,
}
volume_flow.add(ExtractVolumeRefTask(db, host))

View File

@ -73,6 +73,7 @@ LOG = logging.getLogger(__name__)
QUOTAS = quota.QUOTAS
CGQUOTAS = quota.CGQUOTAS
VALID_REMOVE_VOL_FROM_CG_STATUS = ('available', 'in-use',)
VALID_CREATE_CG_SRC_SNAP_STATUS = ('available',)
volume_manager_opts = [
cfg.StrOpt('volume_driver',
@ -161,7 +162,7 @@ def locked_snapshot_operation(f):
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.21'
RPC_API_VERSION = '1.22'
target = messaging.Target(version=RPC_API_VERSION)
@ -362,7 +363,8 @@ class VolumeManager(manager.SchedulerDependentManager):
def create_volume(self, context, volume_id, request_spec=None,
filter_properties=None, allow_reschedule=True,
snapshot_id=None, image_id=None, source_volid=None,
source_replicaid=None, consistencygroup_id=None):
source_replicaid=None, consistencygroup_id=None,
cgsnapshot_id=None):
"""Creates the volume."""
context_elevated = context.elevated()
@ -387,7 +389,8 @@ class VolumeManager(manager.SchedulerDependentManager):
image_id=image_id,
source_volid=source_volid,
source_replicaid=source_replicaid,
consistencygroup_id=consistencygroup_id)
consistencygroup_id=consistencygroup_id,
cgsnapshot_id=cgsnapshot_id)
except Exception:
LOG.exception(_LE("Failed to create manager volume flow"))
raise exception.CinderException(
@ -425,19 +428,7 @@ class VolumeManager(manager.SchedulerDependentManager):
# Fetch created volume from storage
vol_ref = flow_engine.storage.fetch('volume')
# Update volume stats
pool = vol_utils.extract_host(vol_ref['host'], 'pool')
if pool is None:
# Legacy volume, put them into default pool
pool = self.driver.configuration.safe_get(
'volume_backend_name') or vol_utils.extract_host(
vol_ref['host'], 'pool', True)
try:
self.stats['pools'][pool]['allocated_capacity_gb'] \
+= vol_ref['size']
except KeyError:
self.stats['pools'][pool] = dict(
allocated_capacity_gb=vol_ref['size'])
self._update_allocated_capacity(vol_ref)
return vol_ref['id']
@ -1748,6 +1739,163 @@ class VolumeManager(manager.SchedulerDependentManager):
return group_ref['id']
def create_consistencygroup_from_src(self, context, group_id,
cgsnapshot_id=None):
"""Creates the consistency group from source.
Currently the source can only be a cgsnapshot.
"""
group_ref = self.db.consistencygroup_get(context, group_id)
try:
volumes = self.db.volume_get_all_by_group(
context, group_id)
cgsnapshot = None
snapshots = None
if cgsnapshot_id:
try:
cgsnapshot = self.db.cgsnapshot_get(context, cgsnapshot_id)
except exception.CgSnapshotNotFound:
LOG.error(_LE("Cannot create consistency group %(group)s "
"because cgsnapshot %(snap)s cannot be "
"found."),
{'group': group_id,
'snap': cgsnapshot_id})
raise
if cgsnapshot:
snapshots = self.db.snapshot_get_all_for_cgsnapshot(
context, cgsnapshot_id)
for snap in snapshots:
if (snap['status'] not in
VALID_CREATE_CG_SRC_SNAP_STATUS):
msg = (_("Cannot create consistency group "
"%(group)s because snapshot %(snap)s is "
"not in a valid state. Valid states are: "
"%(valid)s.") %
{'group': group_id,
'snap': snap['id'],
'valid': VALID_CREATE_CG_SRC_SNAP_STATUS})
raise exception.InvalidConsistencyGroup(reason=msg)
self._notify_about_consistencygroup_usage(
context, group_ref, "create.start")
utils.require_driver_initialized(self.driver)
LOG.info(_LI("Consistency group %(group)s: creating from source "
"cgsnapshot %(snap)s."),
{'group': group_id,
'snap': cgsnapshot_id})
model_update, volumes_model_update = (
self.driver.create_consistencygroup_from_src(
context, group_ref, volumes, cgsnapshot, snapshots))
if volumes_model_update:
for update in volumes_model_update:
self.db.volume_update(context, update['id'], update)
if model_update:
group_ref = self.db.consistencygroup_update(
context, group_id, model_update)
except Exception:
with excutils.save_and_reraise_exception():
self.db.consistencygroup_update(
context,
group_id,
{'status': 'error'})
LOG.error(_LE("Consistency group %(group)s: create from "
"source cgsnapshot %(snap)s failed."),
{'group': group_id,
'snap': cgsnapshot_id})
# Update volume status to 'error' as well.
for vol in volumes:
self.db.volume_update(
context, vol['id'], {'status': 'error'})
now = timeutils.utcnow()
status = 'available'
for vol in volumes:
update = {'status': status, 'created_at': now}
self._update_volume_from_src(context, vol, update,
group_id=group_id)
self._update_allocated_capacity(vol)
self.db.consistencygroup_update(context,
group_id,
{'status': status,
'created_at': now})
LOG.info(_LI("Consistency group %(group)s: created successfully "
"from source cgsnapshot %(snap)s."),
{'group': group_id,
'snap': cgsnapshot_id})
self._notify_about_consistencygroup_usage(
context, group_ref, "create.end")
return group_ref['id']
def _update_volume_from_src(self, context, vol, update, group_id=None):
try:
snapshot_ref = self.db.snapshot_get(context,
vol['snapshot_id'])
orig_vref = self.db.volume_get(context,
snapshot_ref['volume_id'])
if orig_vref.bootable:
update['bootable'] = True
self.db.volume_glance_metadata_copy_to_volume(
context, vol['id'], vol['snapshot_id'])
except exception.SnapshotNotFound:
LOG.error(_LE("Source snapshot %(snapshot_id)s cannot be found."),
{'snapshot_id': vol['snapshot_id']})
self.db.volume_update(context, vol['id'],
{'status': 'error'})
if group_id:
self.db.consistencygroup_update(
context, group_id, {'status': 'error'})
raise
except exception.VolumeNotFound:
LOG.error(_LE("The source volume %(volume_id)s "
"cannot be found."),
{'volume_id': snapshot_ref['volume_id']})
self.db.volume_update(context, vol['id'],
{'status': 'error'})
if group_id:
self.db.consistencygroup_update(
context, group_id, {'status': 'error'})
raise
except exception.CinderException as ex:
LOG.error(_LE("Failed to update %(volume_id)s"
" metadata using the provided snapshot"
" %(snapshot_id)s metadata.") %
{'volume_id': vol['id'],
'snapshot_id': vol['snapshot_id']})
self.db.volume_update(context, vol['id'],
{'status': 'error'})
if group_id:
self.db.consistencygroup_update(
context, group_id, {'status': 'error'})
raise exception.MetadataCopyFailure(reason=ex)
self.db.volume_update(context, vol['id'], update)
def _update_allocated_capacity(self, vol):
# Update allocated capacity in volume stats
pool = vol_utils.extract_host(vol['host'], 'pool')
if pool is None:
# Legacy volume, put them into default pool
pool = self.driver.configuration.safe_get(
'volume_backend_name') or vol_utils.extract_host(
vol['host'], 'pool', True)
try:
self.stats['pools'][pool]['allocated_capacity_gb'] += (
vol['size'])
except KeyError:
self.stats['pools'][pool] = dict(
allocated_capacity_gb=vol['size'])
def delete_consistencygroup(self, context, group_id):
"""Deletes consistency group and the volumes in the group."""
context = context.elevated()

View File

@ -62,6 +62,7 @@ class VolumeAPI(object):
1.20 - Adds support for sending objects over RPC in create_snapshot()
and delete_snapshot()
1.21 - Adds update_consistencygroup.
1.22 - Adds create_consistencygroup_from_src.
'''
BASE_RPC_API_VERSION = '1.0'
@ -71,7 +72,7 @@ class VolumeAPI(object):
target = messaging.Target(topic=CONF.volume_topic,
version=self.BASE_RPC_API_VERSION)
serializer = objects_base.CinderObjectSerializer()
self.client = rpc.get_client(target, '1.21', serializer=serializer)
self.client = rpc.get_client(target, '1.22', serializer=serializer)
def create_consistencygroup(self, ctxt, group, host):
new_host = utils.extract_host(host)
@ -94,6 +95,14 @@ class VolumeAPI(object):
add_volumes=add_volumes,
remove_volumes=remove_volumes)
def create_consistencygroup_from_src(self, ctxt, group, host,
cgsnapshot=None):
new_host = utils.extract_host(host)
cctxt = self.client.prepare(server=new_host, version='1.22')
cctxt.cast(ctxt, 'create_consistencygroup_from_src',
group_id=group['id'],
cgsnapshot_id=cgsnapshot['id'])
def create_cgsnapshot(self, ctxt, group, cgsnapshot):
host = utils.extract_host(group['host'])
@ -114,7 +123,8 @@ class VolumeAPI(object):
snapshot_id=None, image_id=None,
source_replicaid=None,
source_volid=None,
consistencygroup_id=None):
consistencygroup_id=None,
cgsnapshot_id=None):
new_host = utils.extract_host(host)
cctxt = self.client.prepare(server=new_host, version='1.4')
@ -128,7 +138,8 @@ class VolumeAPI(object):
image_id=image_id,
source_replicaid=source_replicaid,
source_volid=source_volid,
consistencygroup_id=consistencygroup_id)
consistencygroup_id=consistencygroup_id,
cgsnapshot_id=cgsnapshot_id)
def delete_volume(self, ctxt, volume, unmanage_only=False):
new_host = utils.extract_host(volume['host'])