Add entry create and cast tasks to manage workflow

This change adds manage existing api task in flow. The task
is used in the volume api to provide full value task flow for
manage existing process. All errors occurred during manage
flow set volume to 'error' state.

Entry creating moved from volume api to EntryCreateTask. Also
added ManageCastTask to provide manage process to scheduler.

Related-Bug: #1364550
Change-Id: I12a4311953c1c86d584b5bf2fe2888e5b5127d43
This commit is contained in:
Anton Arefiev 2015-02-09 15:30:45 +02:00
parent d84c501a60
commit a1e4ad9ff2
8 changed files with 330 additions and 85 deletions

@ -647,6 +647,11 @@ class ManageExistingInvalidReference(CinderException):
"reference %(existing_ref)s: %(reason)s")
class ManageExistingAlreadyManaged(CinderException):
message = _("Unable to manage existing volume. "
"Volume %(volume_ref)s already managed.")
class ReplicationError(CinderException):
message = _("Volume %(volume_id)s replication "
"error: %(reason)s")

@ -0,0 +1,62 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class FakeVolumeAPI(object):
def __init__(self, expected_spec, test_inst):
self.expected_spec = expected_spec
self.test_inst = test_inst
def create_volume(self, ctxt, volume, host,
request_spec, filter_properties,
allow_reschedule=True,
snapshot_id=None, image_id=None,
source_volid=None,
source_replicaid=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)
self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
self.test_inst.assertEqual(request_spec['image_id'], image_id)
self.test_inst.assertEqual(request_spec['source_replicaid'],
source_replicaid)
class FakeSchedulerRpcAPI(object):
def __init__(self, expected_spec, test_inst):
self.expected_spec = expected_spec
self.test_inst = test_inst
def create_volume(self, ctxt, volume, volume_ref, snapshot_id=None,
image_id=None, request_spec=None,
filter_properties=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)
def manage_existing(self, context, volume_topic, volume_id,
request_spec=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)
class FakeDb(object):
def volume_get(self, *args, **kwargs):
return {'host': 'barf'}
def volume_update(self, *args, **kwargs):
return {'host': 'farb'}
def snapshot_get(self, *args, **kwargs):
return {'volume_id': 1}
def consistencygroup_get(self, *args, **kwargs):
return {'consistencygroup_id': 1}

@ -14,8 +14,6 @@
# under the License.
""" Tests for create_volume TaskFlow """
import time
import mock
from cinder import context
@ -23,57 +21,11 @@ from cinder import exception
from cinder import test
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.volume.flows import fake_volume_api
from cinder.volume.flows.api import create_volume
from cinder.volume.flows.manager import create_volume as create_volume_manager
class fake_scheduler_rpc_api(object):
def __init__(self, expected_spec, test_inst):
self.expected_spec = expected_spec
self.test_inst = test_inst
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
image_id=None, request_spec=None,
filter_properties=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)
class fake_volume_api(object):
def __init__(self, expected_spec, test_inst):
self.expected_spec = expected_spec
self.test_inst = test_inst
def create_volume(self, ctxt, volume, host,
request_spec, filter_properties,
allow_reschedule=True,
snapshot_id=None, image_id=None,
source_volid=None,
source_replicaid=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)
self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
self.test_inst.assertEqual(request_spec['image_id'], image_id)
self.test_inst.assertEqual(request_spec['source_replicaid'],
source_replicaid)
class fake_db(object):
def volume_get(self, *args, **kwargs):
return {'host': 'barf'}
def volume_update(self, *args, **kwargs):
return {'host': 'farb'}
def snapshot_get(self, *args, **kwargs):
return {'volume_id': 1}
def consistencygroup_get(self, *args, **kwargs):
return {'consistencygroup_id': 1}
class CreateVolumeFlowTestCase(test.TestCase):
def time_inc(self):
@ -88,9 +40,9 @@ class CreateVolumeFlowTestCase(test.TestCase):
# Ensure that time.time() always returns more than the last time it was
# called to avoid div by zero errors.
self.counter = float(0)
self.stubs.Set(time, 'time', self.time_inc)
def test_cast_create_volume(self):
@mock.patch('time.time', side_effect=time_inc)
def test_cast_create_volume(self, mock_time):
props = {}
spec = {'volume_id': None,
@ -101,10 +53,11 @@ class CreateVolumeFlowTestCase(test.TestCase):
'consistencygroup_id': None,
'cgsnapshot_id': None}
# Fake objects assert specs
task = create_volume.VolumeCastTask(
fake_scheduler_rpc_api(spec, self),
fake_volume_api(spec, self),
fake_db())
fake_volume_api.FakeSchedulerRpcAPI(spec, self),
fake_volume_api.FakeVolumeAPI(spec, self),
fake_volume_api.FakeDb())
task._cast_create_volume(self.ctxt, spec, props)
@ -116,10 +69,11 @@ class CreateVolumeFlowTestCase(test.TestCase):
'consistencygroup_id': 5,
'cgsnapshot_id': None}
# Fake objects assert specs
task = create_volume.VolumeCastTask(
fake_scheduler_rpc_api(spec, self),
fake_volume_api(spec, self),
fake_db())
fake_volume_api.FakeSchedulerRpcAPI(spec, self),
fake_volume_api.FakeVolumeAPI(spec, self),
fake_volume_api.FakeDb())
task._cast_create_volume(self.ctxt, spec, props)

@ -0,0 +1,70 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" Tests for manage_existing TaskFlow """
import mock
from cinder import context
from cinder import test
from cinder.tests.unit.volume.flows import fake_volume_api
from cinder.volume.flows.api import manage_existing
class ManageVolumeFlowTestCase(test.TestCase):
def setUp(self):
super(ManageVolumeFlowTestCase, self).setUp()
self.ctxt = context.get_admin_context()
self.counter = float(0)
def test_cast_manage_existing(self):
volume = mock.MagicMock(return_value=None)
spec = {
'name': 'name',
'description': 'description',
'host': 'host',
'ref': 'ref',
'volume_type': 'volume_type',
'metadata': 'metadata',
'availability_zone': 'availability_zone',
'bootable': 'bootable'}
# Fake objects assert specs
task = manage_existing.ManageCastTask(
fake_volume_api.FakeSchedulerRpcAPI(spec, self),
fake_volume_api.FakeDb())
create_what = spec.copy()
create_what.update({'volume': volume})
task.execute(self.ctxt, **create_what)
volume = mock.MagicMock(return_value={'id': 1})
spec = {
'name': 'name',
'description': 'description',
'host': 'host',
'ref': 'ref',
'volume_type': 'volume_type',
'metadata': 'metadata',
'availability_zone': 'availability_zone',
'bootable': 'bootable'}
# Fake objects assert specs
task = manage_existing.ManageCastTask(
fake_volume_api.FakeSchedulerRpcAPI(spec, self),
fake_volume_api.FakeDb())
create_what = spec.copy()
create_what.update({'volume': volume})
task.execute(self.ctxt, **create_what)

@ -45,6 +45,7 @@ from cinder import quota_utils
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import utils
from cinder.volume.flows.api import create_volume
from cinder.volume.flows.api import manage_existing
from cinder.volume import qos_specs
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
@ -1451,36 +1452,36 @@ class API(base.Base):
LOG.error(_LE('Unable to find service for given host.'))
availability_zone = service.get('availability_zone')
volume_type_id = volume_type['id'] if volume_type else None
volume_properties = {
'size': 0,
'user_id': context.user_id,
'project_id': context.project_id,
'status': 'creating',
'attach_status': 'detached',
# Rename these to the internal name.
'display_description': description,
'display_name': name,
manage_what = {
'context': context,
'name': name,
'description': description,
'host': host,
'availability_zone': availability_zone,
'volume_type_id': volume_type_id,
'ref': ref,
'volume_type': volume_type,
'metadata': metadata,
'bootable': bootable
'availability_zone': availability_zone,
'bootable': bootable,
}
# Call the scheduler to ensure that the host exists and that it can
# accept the volume
volume = self.db.volume_create(context, volume_properties)
request_spec = {'volume_properties': volume,
'volume_type': volume_type,
'volume_id': volume['id'],
'ref': ref}
self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic,
volume['id'],
request_spec=request_spec)
LOG.info(_LI("Manage volume request issued successfully."),
resource=volume)
return volume
try:
flow_engine = manage_existing.get_flow(self.scheduler_rpcapi,
self.db,
manage_what)
except Exception:
msg = _('Failed to manage api volume flow.')
LOG.exception(msg)
raise exception.CinderException(msg)
# Attaching this listener will capture all of the notifications that
# taskflow sends out and redirect them to a more useful log for
# cinder's debugging (or error reporting) usage.
with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
vol_ref = flow_engine.storage.fetch('volume')
LOG.info(_LI("Manage volume request issued successfully."),
resource=vol_ref)
return vol_ref
class HostAPI(base.Base):

@ -0,0 +1,153 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
import taskflow.engines
from taskflow.patterns import linear_flow
from taskflow.types import failure as ft
from cinder import exception
from cinder import flow_utils
from cinder.i18n import _LE
from cinder.volume.flows import common
LOG = logging.getLogger(__name__)
ACTION = 'volume:manage_existing'
CONF = cfg.CONF
class EntryCreateTask(flow_utils.CinderTask):
"""Creates an entry for the given volume creation in the database.
Reversion strategy: remove the volume_id created from the database.
"""
default_provides = set(['volume_properties', 'volume'])
def __init__(self, db):
requires = ['availability_zone', 'description', 'metadata',
'name', 'host', 'bootable', 'volume_type', 'ref']
super(EntryCreateTask, self).__init__(addons=[ACTION],
requires=requires)
self.db = db
def execute(self, context, **kwargs):
"""Creates a database entry for the given inputs and returns details.
Accesses the database and creates a new entry for the to be created
volume using the given volume properties which are extracted from the
input kwargs.
"""
volume_type = kwargs.pop('volume_type')
volume_type_id = volume_type['id'] if volume_type else None
volume_properties = {
'size': 0,
'user_id': context.user_id,
'project_id': context.project_id,
'status': 'creating',
'attach_status': 'detached',
# Rename these to the internal name.
'display_description': kwargs.pop('description'),
'display_name': kwargs.pop('name'),
'host': kwargs.pop('host'),
'availability_zone': kwargs.pop('availability_zone'),
'volume_type_id': volume_type_id,
'metadata': kwargs.pop('metadata'),
'bootable': kwargs.pop('bootable'),
}
volume = self.db.volume_create(context, volume_properties)
return {
'volume_properties': volume_properties,
# NOTE(harlowja): it appears like further usage of this volume
# result actually depend on it being a sqlalchemy object and not
# just a plain dictionary so that's why we are storing this here.
#
# In the future where this task results can be serialized and
# restored automatically for continued running we will need to
# resolve the serialization & recreation of this object since raw
# sqlalchemy objects can't be serialized.
'volume': volume,
}
def revert(self, context, result, optional_args, **kwargs):
# We never produced a result and therefore can't destroy anything.
if isinstance(result, ft.Failure):
return
vol_id = result['volume_id']
try:
self.db.volume_destroy(context.elevated(), vol_id)
except exception.CinderException:
LOG.exception(_LE("Failed destroying volume entry: %s."), vol_id)
class ManageCastTask(flow_utils.CinderTask):
"""Performs a volume manage cast to the scheduler and to the volume manager.
This which will signal a transition of the api workflow to another child
and/or related workflow.
"""
def __init__(self, scheduler_rpcapi, db):
requires = ['volume', 'volume_properties', 'volume_type', 'ref']
super(ManageCastTask, self).__init__(addons=[ACTION],
requires=requires)
self.scheduler_rpcapi = scheduler_rpcapi
self.db = db
def execute(self, context, **kwargs):
volume = kwargs.pop('volume')
request_spec = kwargs.copy()
# Call the scheduler to ensure that the host exists and that it can
# accept the volume
self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic,
volume['id'],
request_spec=request_spec)
def revert(self, context, result, flow_failures, **kwargs):
# Restore the source volume status and set the volume to error status.
volume_id = kwargs['volume_id']
common.error_out_volume(context, self.db, volume_id)
LOG.error(_LE("Volume %s: manage failed."), volume_id)
exc_info = False
if all(flow_failures[-1].exc_info):
exc_info = flow_failures[-1].exc_info
LOG.error(_LE('Unexpected build error:'), exc_info=exc_info)
def get_flow(scheduler_rpcapi, db_api, create_what):
"""Constructs and returns the api entrypoint flow.
This flow will do the following:
1. Inject keys & values for dependent tasks.
2. Extracts and validates the input keys & values.
3. Creates the database entry.
4. Casts to volume manager and scheduler for further processing.
"""
flow_name = ACTION.replace(":", "_") + "_api"
api_flow = linear_flow.Flow(flow_name)
# This will cast it out to either the scheduler or volume manager via
# the rpc apis provided.
api_flow.add(EntryCreateTask(db_api),
ManageCastTask(scheduler_rpcapi, db_api))
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(api_flow, store=create_what)

@ -49,7 +49,6 @@ commands =
cinder.tests.unit.test_cmd \
cinder.tests.unit.test_conf \
cinder.tests.unit.test_context \
cinder.tests.unit.test_create_volume_flow \
cinder.tests.unit.test_db_api \
cinder.tests.unit.test_dellfc \
cinder.tests.unit.test_dellsc \
@ -93,7 +92,8 @@ commands =
cinder.tests.unit.test_volume_rpcapi \
cinder.tests.unit.test_volume_types \
cinder.tests.unit.test_volume_types_extra_specs \
cinder.tests.unit.test_volume_utils
cinder.tests.unit.test_volume_utils \
cinder.tests.unit.volume.flows.test_create_volume_flow
[testenv:pep8]
commands =