Refactoring of manager's create_volume flow
This commit resolves a TODO item (bulk metadata create), rephrases some comments and makes use of new TaskFlow functionality to save results of revert command to make decisions in c-vol manager instead of doing an ugly workaround by injecting the information into exception raised. Partial-Implements: blueprint taskflow-refactoring Depends-On: I82ebd0102aa5f50d98d9d6b48b13cd659d6dbcec Change-Id: Ia4739c87ca83ae725e16256e3f2c596c5e6d935b
This commit is contained in:
parent
8f18900d2e
commit
83d5b25ed0
cinder
@ -639,6 +639,12 @@ def volume_glance_metadata_create(context, volume_id, key, value):
|
||||
value)
|
||||
|
||||
|
||||
def volume_glance_metadata_bulk_create(context, volume_id, metadata):
|
||||
"""Add Glance metadata for specified volume (multiple pairs)."""
|
||||
return IMPL.volume_glance_metadata_bulk_create(context, volume_id,
|
||||
metadata)
|
||||
|
||||
|
||||
def volume_glance_metadata_get_all(context):
|
||||
"""Return the glance metadata for all volumes."""
|
||||
return IMPL.volume_glance_metadata_get_all(context)
|
||||
|
@ -3154,6 +3154,34 @@ def volume_glance_metadata_create(context, volume_id, key, value):
|
||||
return
|
||||
|
||||
|
||||
@require_context
|
||||
@require_volume_exists
|
||||
def volume_glance_metadata_bulk_create(context, volume_id, metadata):
|
||||
"""Update the Glance metadata for a volume by adding new key:value pairs.
|
||||
|
||||
This API does not support changing the value of a key once it has been
|
||||
created.
|
||||
"""
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
for (key, value) in metadata.items():
|
||||
rows = session.query(models.VolumeGlanceMetadata).\
|
||||
filter_by(volume_id=volume_id).\
|
||||
filter_by(key=key).\
|
||||
filter_by(deleted=False).all()
|
||||
|
||||
if len(rows) > 0:
|
||||
raise exception.GlanceMetadataExists(key=key,
|
||||
volume_id=volume_id)
|
||||
|
||||
vol_glance_metadata = models.VolumeGlanceMetadata()
|
||||
vol_glance_metadata.volume_id = volume_id
|
||||
vol_glance_metadata.key = key
|
||||
vol_glance_metadata.value = six.text_type(value)
|
||||
session.add(vol_glance_metadata)
|
||||
|
||||
|
||||
@require_context
|
||||
@require_snapshot_exists
|
||||
def volume_glance_metadata_copy_to_snapshot(context, snapshot_id, volume_id):
|
||||
|
@ -40,9 +40,11 @@ class CinderTask(task.Task):
|
||||
"""
|
||||
|
||||
def __init__(self, addons=None, **kwargs):
|
||||
super(CinderTask, self).__init__(_make_task_name(self.__class__,
|
||||
addons),
|
||||
**kwargs)
|
||||
super(CinderTask, self).__init__(self.make_name(addons), **kwargs)
|
||||
|
||||
@classmethod
|
||||
def make_name(cls, addons=None):
|
||||
return _make_task_name(cls, addons)
|
||||
|
||||
|
||||
class DynamicLogListener(logging_listener.DynamicLoggingListener):
|
||||
|
@ -49,6 +49,8 @@ IMAGE_ATTRIBUTES = (
|
||||
class OnFailureRescheduleTask(flow_utils.CinderTask):
|
||||
"""Triggers a rescheduling request to be sent when reverting occurs.
|
||||
|
||||
If rescheduling doesn't occur this task errors out the volume.
|
||||
|
||||
Reversion strategy: Triggers the rescheduling mechanism whereby a cast gets
|
||||
sent to the scheduler rpc api to allow for an attempt X of Y for scheduling
|
||||
this volume elsewhere.
|
||||
@ -88,6 +90,31 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
|
||||
def execute(self, **kwargs):
|
||||
pass
|
||||
|
||||
def _pre_reschedule(self, context, volume_id):
|
||||
"""Actions that happen before the rescheduling attempt occur here."""
|
||||
|
||||
try:
|
||||
# Update volume's timestamp and host.
|
||||
#
|
||||
# NOTE(harlowja): this is awkward to be done here, shouldn't
|
||||
# this happen at the scheduler itself and not before it gets
|
||||
# sent to the scheduler? (since what happens if it never gets
|
||||
# there??). It's almost like we need a status of 'on-the-way-to
|
||||
# scheduler' in the future.
|
||||
# We don't need to update the volume's status to creating, since
|
||||
# we haven't changed it to error.
|
||||
update = {
|
||||
'scheduled_at': timeutils.utcnow(),
|
||||
'host': None,
|
||||
}
|
||||
LOG.debug("Updating volume %(volume_id)s with %(update)s.",
|
||||
{'update': update, 'volume_id': volume_id})
|
||||
self.db.volume_update(context, volume_id, update)
|
||||
except exception.CinderException:
|
||||
# Don't let updating the state cause the rescheduling to fail.
|
||||
LOG.exception(_LE("Volume %s: update volume state failed."),
|
||||
volume_id)
|
||||
|
||||
def _reschedule(self, context, cause, request_spec, filter_properties,
|
||||
volume_id):
|
||||
"""Actions that happen during the rescheduling attempt occur here."""
|
||||
@ -122,47 +149,18 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
|
||||
|
||||
LOG.debug("Volume %s: re-scheduled", volume_id)
|
||||
|
||||
def _pre_reschedule(self, context, volume_id):
|
||||
"""Actions that happen before the rescheduling attempt occur here."""
|
||||
|
||||
try:
|
||||
# Update volume's timestamp and host.
|
||||
#
|
||||
# NOTE(harlowja): this is awkward to be done here, shouldn't
|
||||
# this happen at the scheduler itself and not before it gets
|
||||
# sent to the scheduler? (since what happens if it never gets
|
||||
# there??). It's almost like we need a status of 'on-the-way-to
|
||||
# scheduler' in the future.
|
||||
# We don't need to update the volume's status to creating, since
|
||||
# we haven't changed it to error.
|
||||
update = {
|
||||
'scheduled_at': timeutils.utcnow(),
|
||||
'host': None
|
||||
}
|
||||
LOG.debug("Updating volume %(volume_id)s with %(update)s.",
|
||||
{'update': update, 'volume_id': volume_id})
|
||||
self.db.volume_update(context, volume_id, update)
|
||||
except exception.CinderException:
|
||||
# Don't let updating the state cause the rescheduling to fail.
|
||||
LOG.exception(_LE("Volume %s: update volume state failed."),
|
||||
volume_id)
|
||||
|
||||
def revert(self, context, result, flow_failures, **kwargs):
|
||||
volume_id = kwargs['volume_id']
|
||||
def revert(self, context, result, flow_failures, volume_id, **kwargs):
|
||||
# NOTE(dulek): Revert is occurring and manager need to know if
|
||||
# rescheduling happened. We're returning boolean flag that will
|
||||
# indicate that. It which will be available in flow engine store
|
||||
# through get_revert_result method.
|
||||
|
||||
# If do not want to be rescheduled, just set the volume's status to
|
||||
# error and return.
|
||||
if not self.do_reschedule:
|
||||
common.error_out_volume(context, self.db, volume_id)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume_id)
|
||||
return
|
||||
|
||||
# NOTE(dulek): Revert is occurring and manager need to know if
|
||||
# rescheduling happened. We're injecting this information into
|
||||
# exception that will be caught there. This is ugly and we need
|
||||
# TaskFlow to support better way of returning data from reverted flow.
|
||||
cause = list(flow_failures.values())[0]
|
||||
cause.exception.rescheduled = False
|
||||
return False
|
||||
|
||||
# Check if we have a cause which can tell us not to reschedule and
|
||||
# set the volume's status to error.
|
||||
@ -170,20 +168,22 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
|
||||
if failure.check(*self.no_reschedule_types):
|
||||
common.error_out_volume(context, self.db, volume_id)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume_id)
|
||||
return
|
||||
return False
|
||||
|
||||
# Use a different context when rescheduling.
|
||||
if self.reschedule_context:
|
||||
cause = list(flow_failures.values())[0]
|
||||
context = self.reschedule_context
|
||||
try:
|
||||
self._pre_reschedule(context, volume_id)
|
||||
self._reschedule(context, cause, **kwargs)
|
||||
self._reschedule(context, cause, volume_id=volume_id, **kwargs)
|
||||
self._post_reschedule(context, volume_id)
|
||||
# Inject information that we rescheduled
|
||||
cause.exception.rescheduled = True
|
||||
return True
|
||||
except exception.CinderException:
|
||||
LOG.exception(_LE("Volume %s: rescheduling failed"), volume_id)
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class ExtractVolumeRefTask(flow_utils.CinderTask):
|
||||
"""Extracts volume reference for given volume id."""
|
||||
@ -206,11 +206,11 @@ class ExtractVolumeRefTask(flow_utils.CinderTask):
|
||||
return volume_ref
|
||||
|
||||
def revert(self, context, volume_id, result, **kwargs):
|
||||
if isinstance(result, ft.Failure):
|
||||
if isinstance(result, ft.Failure) or not self.set_error:
|
||||
return
|
||||
if self.set_error:
|
||||
common.error_out_volume(context, self.db, volume_id)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume_id)
|
||||
|
||||
common.error_out_volume(context, self.db, volume_id)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume_id)
|
||||
|
||||
|
||||
class ExtractVolumeSpecTask(flow_utils.CinderTask):
|
||||
@ -561,21 +561,14 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
if value is not None:
|
||||
property_metadata[key] = value
|
||||
|
||||
# NOTE(harlowja): The best way for this to happen would be in bulk,
|
||||
# but that doesn't seem to exist (yet), so we go through one by one
|
||||
# which means we can have partial create/update failure.
|
||||
volume_metadata = dict(property_metadata)
|
||||
volume_metadata.update(base_metadata)
|
||||
LOG.debug("Creating volume glance metadata for volume %(volume_id)s"
|
||||
" backed by image %(image_id)s with: %(vol_metadata)s.",
|
||||
{'volume_id': volume_id, 'image_id': image_id,
|
||||
'vol_metadata': volume_metadata})
|
||||
for (key, value) in volume_metadata.items():
|
||||
try:
|
||||
self.db.volume_glance_metadata_create(context, volume_id,
|
||||
key, value)
|
||||
except exception.GlanceMetadataExists:
|
||||
pass
|
||||
self.db.volume_glance_metadata_bulk_create(context, volume_id,
|
||||
volume_metadata)
|
||||
|
||||
def _create_from_image(self, context, volume_ref,
|
||||
image_location, image_id, image_meta,
|
||||
@ -710,7 +703,7 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
|
||||
# TODO(harlowja): is it acceptable to only log if this fails??
|
||||
# or are there other side-effects that this will cause if the
|
||||
# status isn't updated correctly (aka it will likely be stuck in
|
||||
# 'building' if this fails)??
|
||||
# 'creating' if this fails)??
|
||||
volume_ref = self.db.volume_update(context, volume_id, update)
|
||||
# Now use the parent to notify.
|
||||
super(CreateVolumeOnFinishTask, self).execute(context, volume_ref)
|
||||
@ -737,7 +730,7 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
|
||||
3. Selects 1 of 2 activated only on *failure* tasks (one to update the db
|
||||
status & notify or one to update the db status & notify & *reschedule*).
|
||||
4. Extracts a volume specification from the provided inputs.
|
||||
5. Notifies that the volume has start to be created.
|
||||
5. Notifies that the volume has started to be created.
|
||||
6. Creates a volume from the extracted volume specification.
|
||||
7. Attaches a on-success *only* task that notifies that the volume creation
|
||||
has ended and performs further database status updates.
|
||||
@ -761,7 +754,7 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
|
||||
retry = filter_properties.get('retry', None)
|
||||
|
||||
# Always add OnFailureRescheduleTask and we handle the change of volume's
|
||||
# status when revert task flow. Meanwhile, no need to revert process of
|
||||
# status when reverting the flow. Meanwhile, no need to revert process of
|
||||
# ExtractVolumeRefTask.
|
||||
do_reschedule = allow_reschedule and request_spec and retry
|
||||
volume_flow.add(OnFailureRescheduleTask(reschedule_context, db,
|
||||
|
@ -466,24 +466,31 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
# NOTE(dulek): Flag to indicate if volume was rescheduled. Used to
|
||||
# decide if allocated_capacity should be incremented.
|
||||
rescheduled = False
|
||||
vol_ref = None
|
||||
|
||||
try:
|
||||
if locked_action is None:
|
||||
_run_flow()
|
||||
else:
|
||||
_run_flow_locked()
|
||||
except Exception as e:
|
||||
if hasattr(e, 'rescheduled'):
|
||||
rescheduled = e.rescheduled
|
||||
raise
|
||||
finally:
|
||||
try:
|
||||
vol_ref = flow_engine.storage.fetch('volume_ref')
|
||||
except tfe.NotFound as e:
|
||||
# Flow was reverted, fetching volume_ref from the DB.
|
||||
vol_ref = self.db.volume_get(context, volume_id)
|
||||
except tfe.NotFound:
|
||||
# If there's no vol_ref, then flow is reverted. Lets check out
|
||||
# if rescheduling occurred.
|
||||
try:
|
||||
rescheduled = flow_engine.storage.get_revert_result(
|
||||
create_volume.OnFailureRescheduleTask.make_name(
|
||||
[create_volume.ACTION]))
|
||||
except tfe.NotFound:
|
||||
pass
|
||||
|
||||
if not rescheduled:
|
||||
if not vol_ref:
|
||||
# Flow was reverted and not rescheduled, fetching
|
||||
# volume_ref from the DB, because it will be needed.
|
||||
vol_ref = self.db.volume_get(context, volume_id)
|
||||
# NOTE(dulek): Volume wasn't rescheduled so we need to update
|
||||
# volume stats as these are decremented on delete.
|
||||
self._update_allocated_capacity(vol_ref)
|
||||
|
Loading…
x
Reference in New Issue
Block a user