diff --git a/cinder/exception.py b/cinder/exception.py index 56aa9a89d15..127504f75d4 100644 --- a/cinder/exception.py +++ b/cinder/exception.py @@ -647,6 +647,11 @@ class ManageExistingInvalidReference(CinderException): "reference %(existing_ref)s: %(reason)s") +class ManageExistingAlreadyManaged(CinderException): + message = _("Unable to manage existing volume. " + "Volume %(volume_ref)s already managed.") + + class ReplicationError(CinderException): message = _("Volume %(volume_id)s replication " "error: %(reason)s") diff --git a/cinder/tests/unit/volume/flows/__init__.py b/cinder/tests/unit/volume/flows/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/cinder/tests/unit/volume/flows/fake_volume_api.py b/cinder/tests/unit/volume/flows/fake_volume_api.py new file mode 100644 index 00000000000..d424758c1c5 --- /dev/null +++ b/cinder/tests/unit/volume/flows/fake_volume_api.py @@ -0,0 +1,62 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class FakeVolumeAPI(object): + def __init__(self, expected_spec, test_inst): + self.expected_spec = expected_spec + self.test_inst = test_inst + + def create_volume(self, ctxt, volume, host, + request_spec, filter_properties, + allow_reschedule=True, + snapshot_id=None, image_id=None, + source_volid=None, + source_replicaid=None): + + self.test_inst.assertEqual(self.expected_spec, request_spec) + self.test_inst.assertEqual(request_spec['source_volid'], source_volid) + self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id) + self.test_inst.assertEqual(request_spec['image_id'], image_id) + self.test_inst.assertEqual(request_spec['source_replicaid'], + source_replicaid) + + +class FakeSchedulerRpcAPI(object): + def __init__(self, expected_spec, test_inst): + self.expected_spec = expected_spec + self.test_inst = test_inst + + def create_volume(self, ctxt, volume, volume_ref, snapshot_id=None, + image_id=None, request_spec=None, + filter_properties=None): + + self.test_inst.assertEqual(self.expected_spec, request_spec) + + def manage_existing(self, context, volume_topic, volume_id, + request_spec=None): + self.test_inst.assertEqual(self.expected_spec, request_spec) + + +class FakeDb(object): + + def volume_get(self, *args, **kwargs): + return {'host': 'barf'} + + def volume_update(self, *args, **kwargs): + return {'host': 'farb'} + + def snapshot_get(self, *args, **kwargs): + return {'volume_id': 1} + + def consistencygroup_get(self, *args, **kwargs): + return {'consistencygroup_id': 1} diff --git a/cinder/tests/unit/test_create_volume_flow.py b/cinder/tests/unit/volume/flows/test_create_volume_flow.py similarity index 69% rename from cinder/tests/unit/test_create_volume_flow.py rename to cinder/tests/unit/volume/flows/test_create_volume_flow.py index 2847be31252..5ec263d0bce 100644 --- a/cinder/tests/unit/test_create_volume_flow.py +++ b/cinder/tests/unit/volume/flows/test_create_volume_flow.py @@ -14,8 +14,6 @@ # under the License. """ Tests for create_volume TaskFlow """ -import time - import mock from cinder import context @@ -23,57 +21,11 @@ from cinder import exception from cinder import test from cinder.tests.unit import fake_snapshot from cinder.tests.unit import fake_volume +from cinder.tests.unit.volume.flows import fake_volume_api from cinder.volume.flows.api import create_volume from cinder.volume.flows.manager import create_volume as create_volume_manager -class fake_scheduler_rpc_api(object): - def __init__(self, expected_spec, test_inst): - self.expected_spec = expected_spec - self.test_inst = test_inst - - def create_volume(self, ctxt, topic, volume_id, snapshot_id=None, - image_id=None, request_spec=None, - filter_properties=None): - - self.test_inst.assertEqual(self.expected_spec, request_spec) - - -class fake_volume_api(object): - def __init__(self, expected_spec, test_inst): - self.expected_spec = expected_spec - self.test_inst = test_inst - - def create_volume(self, ctxt, volume, host, - request_spec, filter_properties, - allow_reschedule=True, - snapshot_id=None, image_id=None, - source_volid=None, - source_replicaid=None): - - self.test_inst.assertEqual(self.expected_spec, request_spec) - self.test_inst.assertEqual(request_spec['source_volid'], source_volid) - self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id) - self.test_inst.assertEqual(request_spec['image_id'], image_id) - self.test_inst.assertEqual(request_spec['source_replicaid'], - source_replicaid) - - -class fake_db(object): - - def volume_get(self, *args, **kwargs): - return {'host': 'barf'} - - def volume_update(self, *args, **kwargs): - return {'host': 'farb'} - - def snapshot_get(self, *args, **kwargs): - return {'volume_id': 1} - - def consistencygroup_get(self, *args, **kwargs): - return {'consistencygroup_id': 1} - - class CreateVolumeFlowTestCase(test.TestCase): def time_inc(self): @@ -88,9 +40,9 @@ class CreateVolumeFlowTestCase(test.TestCase): # Ensure that time.time() always returns more than the last time it was # called to avoid div by zero errors. self.counter = float(0) - self.stubs.Set(time, 'time', self.time_inc) - def test_cast_create_volume(self): + @mock.patch('time.time', side_effect=time_inc) + def test_cast_create_volume(self, mock_time): props = {} spec = {'volume_id': None, @@ -101,10 +53,11 @@ class CreateVolumeFlowTestCase(test.TestCase): 'consistencygroup_id': None, 'cgsnapshot_id': None} + # Fake objects assert specs task = create_volume.VolumeCastTask( - fake_scheduler_rpc_api(spec, self), - fake_volume_api(spec, self), - fake_db()) + fake_volume_api.FakeSchedulerRpcAPI(spec, self), + fake_volume_api.FakeVolumeAPI(spec, self), + fake_volume_api.FakeDb()) task._cast_create_volume(self.ctxt, spec, props) @@ -116,10 +69,11 @@ class CreateVolumeFlowTestCase(test.TestCase): 'consistencygroup_id': 5, 'cgsnapshot_id': None} + # Fake objects assert specs task = create_volume.VolumeCastTask( - fake_scheduler_rpc_api(spec, self), - fake_volume_api(spec, self), - fake_db()) + fake_volume_api.FakeSchedulerRpcAPI(spec, self), + fake_volume_api.FakeVolumeAPI(spec, self), + fake_volume_api.FakeDb()) task._cast_create_volume(self.ctxt, spec, props) diff --git a/cinder/tests/unit/volume/flows/test_manage_volume_flow.py b/cinder/tests/unit/volume/flows/test_manage_volume_flow.py new file mode 100644 index 00000000000..be32af93336 --- /dev/null +++ b/cinder/tests/unit/volume/flows/test_manage_volume_flow.py @@ -0,0 +1,70 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" Tests for manage_existing TaskFlow """ + +import mock + +from cinder import context +from cinder import test +from cinder.tests.unit.volume.flows import fake_volume_api +from cinder.volume.flows.api import manage_existing + + +class ManageVolumeFlowTestCase(test.TestCase): + + def setUp(self): + super(ManageVolumeFlowTestCase, self).setUp() + self.ctxt = context.get_admin_context() + self.counter = float(0) + + def test_cast_manage_existing(self): + + volume = mock.MagicMock(return_value=None) + spec = { + 'name': 'name', + 'description': 'description', + 'host': 'host', + 'ref': 'ref', + 'volume_type': 'volume_type', + 'metadata': 'metadata', + 'availability_zone': 'availability_zone', + 'bootable': 'bootable'} + + # Fake objects assert specs + task = manage_existing.ManageCastTask( + fake_volume_api.FakeSchedulerRpcAPI(spec, self), + fake_volume_api.FakeDb()) + + create_what = spec.copy() + create_what.update({'volume': volume}) + task.execute(self.ctxt, **create_what) + + volume = mock.MagicMock(return_value={'id': 1}) + + spec = { + 'name': 'name', + 'description': 'description', + 'host': 'host', + 'ref': 'ref', + 'volume_type': 'volume_type', + 'metadata': 'metadata', + 'availability_zone': 'availability_zone', + 'bootable': 'bootable'} + + # Fake objects assert specs + task = manage_existing.ManageCastTask( + fake_volume_api.FakeSchedulerRpcAPI(spec, self), + fake_volume_api.FakeDb()) + + create_what = spec.copy() + create_what.update({'volume': volume}) + task.execute(self.ctxt, **create_what) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 11abff87e5c..ebb1511fc9b 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -45,6 +45,7 @@ from cinder import quota_utils from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder import utils from cinder.volume.flows.api import create_volume +from cinder.volume.flows.api import manage_existing from cinder.volume import qos_specs from cinder.volume import rpcapi as volume_rpcapi from cinder.volume import utils as volume_utils @@ -1451,36 +1452,36 @@ class API(base.Base): LOG.error(_LE('Unable to find service for given host.')) availability_zone = service.get('availability_zone') - volume_type_id = volume_type['id'] if volume_type else None - volume_properties = { - 'size': 0, - 'user_id': context.user_id, - 'project_id': context.project_id, - 'status': 'creating', - 'attach_status': 'detached', - # Rename these to the internal name. - 'display_description': description, - 'display_name': name, + manage_what = { + 'context': context, + 'name': name, + 'description': description, 'host': host, - 'availability_zone': availability_zone, - 'volume_type_id': volume_type_id, + 'ref': ref, + 'volume_type': volume_type, 'metadata': metadata, - 'bootable': bootable + 'availability_zone': availability_zone, + 'bootable': bootable, } - # Call the scheduler to ensure that the host exists and that it can - # accept the volume - volume = self.db.volume_create(context, volume_properties) - request_spec = {'volume_properties': volume, - 'volume_type': volume_type, - 'volume_id': volume['id'], - 'ref': ref} - self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic, - volume['id'], - request_spec=request_spec) - LOG.info(_LI("Manage volume request issued successfully."), - resource=volume) - return volume + try: + flow_engine = manage_existing.get_flow(self.scheduler_rpcapi, + self.db, + manage_what) + except Exception: + msg = _('Failed to manage api volume flow.') + LOG.exception(msg) + raise exception.CinderException(msg) + + # Attaching this listener will capture all of the notifications that + # taskflow sends out and redirect them to a more useful log for + # cinder's debugging (or error reporting) usage. + with flow_utils.DynamicLogListener(flow_engine, logger=LOG): + flow_engine.run() + vol_ref = flow_engine.storage.fetch('volume') + LOG.info(_LI("Manage volume request issued successfully."), + resource=vol_ref) + return vol_ref class HostAPI(base.Base): diff --git a/cinder/volume/flows/api/manage_existing.py b/cinder/volume/flows/api/manage_existing.py new file mode 100644 index 00000000000..6b9017bd806 --- /dev/null +++ b/cinder/volume/flows/api/manage_existing.py @@ -0,0 +1,153 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_config import cfg +from oslo_log import log as logging +import taskflow.engines +from taskflow.patterns import linear_flow +from taskflow.types import failure as ft + +from cinder import exception +from cinder import flow_utils +from cinder.i18n import _LE +from cinder.volume.flows import common + +LOG = logging.getLogger(__name__) + +ACTION = 'volume:manage_existing' +CONF = cfg.CONF + + +class EntryCreateTask(flow_utils.CinderTask): + """Creates an entry for the given volume creation in the database. + + Reversion strategy: remove the volume_id created from the database. + """ + default_provides = set(['volume_properties', 'volume']) + + def __init__(self, db): + requires = ['availability_zone', 'description', 'metadata', + 'name', 'host', 'bootable', 'volume_type', 'ref'] + super(EntryCreateTask, self).__init__(addons=[ACTION], + requires=requires) + self.db = db + + def execute(self, context, **kwargs): + """Creates a database entry for the given inputs and returns details. + + Accesses the database and creates a new entry for the to be created + volume using the given volume properties which are extracted from the + input kwargs. + """ + volume_type = kwargs.pop('volume_type') + volume_type_id = volume_type['id'] if volume_type else None + + volume_properties = { + 'size': 0, + 'user_id': context.user_id, + 'project_id': context.project_id, + 'status': 'creating', + 'attach_status': 'detached', + # Rename these to the internal name. + 'display_description': kwargs.pop('description'), + 'display_name': kwargs.pop('name'), + 'host': kwargs.pop('host'), + 'availability_zone': kwargs.pop('availability_zone'), + 'volume_type_id': volume_type_id, + 'metadata': kwargs.pop('metadata'), + 'bootable': kwargs.pop('bootable'), + } + + volume = self.db.volume_create(context, volume_properties) + + return { + 'volume_properties': volume_properties, + # NOTE(harlowja): it appears like further usage of this volume + # result actually depend on it being a sqlalchemy object and not + # just a plain dictionary so that's why we are storing this here. + # + # In the future where this task results can be serialized and + # restored automatically for continued running we will need to + # resolve the serialization & recreation of this object since raw + # sqlalchemy objects can't be serialized. + 'volume': volume, + } + + def revert(self, context, result, optional_args, **kwargs): + # We never produced a result and therefore can't destroy anything. + if isinstance(result, ft.Failure): + return + + vol_id = result['volume_id'] + try: + self.db.volume_destroy(context.elevated(), vol_id) + except exception.CinderException: + LOG.exception(_LE("Failed destroying volume entry: %s."), vol_id) + + +class ManageCastTask(flow_utils.CinderTask): + """Performs a volume manage cast to the scheduler and to the volume manager. + + This which will signal a transition of the api workflow to another child + and/or related workflow. + """ + + def __init__(self, scheduler_rpcapi, db): + requires = ['volume', 'volume_properties', 'volume_type', 'ref'] + super(ManageCastTask, self).__init__(addons=[ACTION], + requires=requires) + self.scheduler_rpcapi = scheduler_rpcapi + self.db = db + + def execute(self, context, **kwargs): + volume = kwargs.pop('volume') + request_spec = kwargs.copy() + + # Call the scheduler to ensure that the host exists and that it can + # accept the volume + self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic, + volume['id'], + request_spec=request_spec) + + def revert(self, context, result, flow_failures, **kwargs): + # Restore the source volume status and set the volume to error status. + volume_id = kwargs['volume_id'] + common.error_out_volume(context, self.db, volume_id) + LOG.error(_LE("Volume %s: manage failed."), volume_id) + exc_info = False + if all(flow_failures[-1].exc_info): + exc_info = flow_failures[-1].exc_info + LOG.error(_LE('Unexpected build error:'), exc_info=exc_info) + + +def get_flow(scheduler_rpcapi, db_api, create_what): + """Constructs and returns the api entrypoint flow. + + This flow will do the following: + + 1. Inject keys & values for dependent tasks. + 2. Extracts and validates the input keys & values. + 3. Creates the database entry. + 4. Casts to volume manager and scheduler for further processing. + """ + + flow_name = ACTION.replace(":", "_") + "_api" + api_flow = linear_flow.Flow(flow_name) + + # This will cast it out to either the scheduler or volume manager via + # the rpc apis provided. + api_flow.add(EntryCreateTask(db_api), + ManageCastTask(scheduler_rpcapi, db_api)) + + # Now load (but do not run) the flow using the provided initial data. + return taskflow.engines.load(api_flow, store=create_what) diff --git a/tox.ini b/tox.ini index ce92a0bc536..3a965365759 100644 --- a/tox.ini +++ b/tox.ini @@ -49,7 +49,6 @@ commands = cinder.tests.unit.test_cmd \ cinder.tests.unit.test_conf \ cinder.tests.unit.test_context \ - cinder.tests.unit.test_create_volume_flow \ cinder.tests.unit.test_db_api \ cinder.tests.unit.test_dellfc \ cinder.tests.unit.test_dellsc \ @@ -93,7 +92,8 @@ commands = cinder.tests.unit.test_volume_rpcapi \ cinder.tests.unit.test_volume_types \ cinder.tests.unit.test_volume_types_extra_specs \ - cinder.tests.unit.test_volume_utils + cinder.tests.unit.test_volume_utils \ + cinder.tests.unit.volume.flows.test_create_volume_flow [testenv:pep8] commands =