Merge "Add entry create and cast tasks to manage workflow"
This commit is contained in:
commit
690f1b24eb
@ -652,6 +652,11 @@ class ManageExistingInvalidReference(CinderException):
|
||||
"reference %(existing_ref)s: %(reason)s")
|
||||
|
||||
|
||||
class ManageExistingAlreadyManaged(CinderException):
|
||||
message = _("Unable to manage existing volume. "
|
||||
"Volume %(volume_ref)s already managed.")
|
||||
|
||||
|
||||
class ReplicationError(CinderException):
|
||||
message = _("Volume %(volume_id)s replication "
|
||||
"error: %(reason)s")
|
||||
|
0
cinder/tests/unit/volume/flows/__init__.py
Normal file
0
cinder/tests/unit/volume/flows/__init__.py
Normal file
62
cinder/tests/unit/volume/flows/fake_volume_api.py
Normal file
62
cinder/tests/unit/volume/flows/fake_volume_api.py
Normal file
@ -0,0 +1,62 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class FakeVolumeAPI(object):
|
||||
def __init__(self, expected_spec, test_inst):
|
||||
self.expected_spec = expected_spec
|
||||
self.test_inst = test_inst
|
||||
|
||||
def create_volume(self, ctxt, volume, host,
|
||||
request_spec, filter_properties,
|
||||
allow_reschedule=True,
|
||||
snapshot_id=None, image_id=None,
|
||||
source_volid=None,
|
||||
source_replicaid=None):
|
||||
|
||||
self.test_inst.assertEqual(self.expected_spec, request_spec)
|
||||
self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
|
||||
self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
|
||||
self.test_inst.assertEqual(request_spec['image_id'], image_id)
|
||||
self.test_inst.assertEqual(request_spec['source_replicaid'],
|
||||
source_replicaid)
|
||||
|
||||
|
||||
class FakeSchedulerRpcAPI(object):
|
||||
def __init__(self, expected_spec, test_inst):
|
||||
self.expected_spec = expected_spec
|
||||
self.test_inst = test_inst
|
||||
|
||||
def create_volume(self, ctxt, volume, volume_ref, snapshot_id=None,
|
||||
image_id=None, request_spec=None,
|
||||
filter_properties=None):
|
||||
|
||||
self.test_inst.assertEqual(self.expected_spec, request_spec)
|
||||
|
||||
def manage_existing(self, context, volume_topic, volume_id,
|
||||
request_spec=None):
|
||||
self.test_inst.assertEqual(self.expected_spec, request_spec)
|
||||
|
||||
|
||||
class FakeDb(object):
|
||||
|
||||
def volume_get(self, *args, **kwargs):
|
||||
return {'host': 'barf'}
|
||||
|
||||
def volume_update(self, *args, **kwargs):
|
||||
return {'host': 'farb'}
|
||||
|
||||
def snapshot_get(self, *args, **kwargs):
|
||||
return {'volume_id': 1}
|
||||
|
||||
def consistencygroup_get(self, *args, **kwargs):
|
||||
return {'consistencygroup_id': 1}
|
@ -14,8 +14,6 @@
|
||||
# under the License.
|
||||
""" Tests for create_volume TaskFlow """
|
||||
|
||||
import time
|
||||
|
||||
import mock
|
||||
|
||||
from cinder import context
|
||||
@ -23,57 +21,11 @@ from cinder import exception
|
||||
from cinder import test
|
||||
from cinder.tests.unit import fake_snapshot
|
||||
from cinder.tests.unit import fake_volume
|
||||
from cinder.tests.unit.volume.flows import fake_volume_api
|
||||
from cinder.volume.flows.api import create_volume
|
||||
from cinder.volume.flows.manager import create_volume as create_volume_manager
|
||||
|
||||
|
||||
class fake_scheduler_rpc_api(object):
|
||||
def __init__(self, expected_spec, test_inst):
|
||||
self.expected_spec = expected_spec
|
||||
self.test_inst = test_inst
|
||||
|
||||
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
|
||||
image_id=None, request_spec=None,
|
||||
filter_properties=None):
|
||||
|
||||
self.test_inst.assertEqual(self.expected_spec, request_spec)
|
||||
|
||||
|
||||
class fake_volume_api(object):
|
||||
def __init__(self, expected_spec, test_inst):
|
||||
self.expected_spec = expected_spec
|
||||
self.test_inst = test_inst
|
||||
|
||||
def create_volume(self, ctxt, volume, host,
|
||||
request_spec, filter_properties,
|
||||
allow_reschedule=True,
|
||||
snapshot_id=None, image_id=None,
|
||||
source_volid=None,
|
||||
source_replicaid=None):
|
||||
|
||||
self.test_inst.assertEqual(self.expected_spec, request_spec)
|
||||
self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
|
||||
self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
|
||||
self.test_inst.assertEqual(request_spec['image_id'], image_id)
|
||||
self.test_inst.assertEqual(request_spec['source_replicaid'],
|
||||
source_replicaid)
|
||||
|
||||
|
||||
class fake_db(object):
|
||||
|
||||
def volume_get(self, *args, **kwargs):
|
||||
return {'host': 'barf'}
|
||||
|
||||
def volume_update(self, *args, **kwargs):
|
||||
return {'host': 'farb'}
|
||||
|
||||
def snapshot_get(self, *args, **kwargs):
|
||||
return {'volume_id': 1}
|
||||
|
||||
def consistencygroup_get(self, *args, **kwargs):
|
||||
return {'consistencygroup_id': 1}
|
||||
|
||||
|
||||
class CreateVolumeFlowTestCase(test.TestCase):
|
||||
|
||||
def time_inc(self):
|
||||
@ -88,9 +40,9 @@ class CreateVolumeFlowTestCase(test.TestCase):
|
||||
# Ensure that time.time() always returns more than the last time it was
|
||||
# called to avoid div by zero errors.
|
||||
self.counter = float(0)
|
||||
self.stubs.Set(time, 'time', self.time_inc)
|
||||
|
||||
def test_cast_create_volume(self):
|
||||
@mock.patch('time.time', side_effect=time_inc)
|
||||
def test_cast_create_volume(self, mock_time):
|
||||
|
||||
props = {}
|
||||
spec = {'volume_id': None,
|
||||
@ -101,10 +53,11 @@ class CreateVolumeFlowTestCase(test.TestCase):
|
||||
'consistencygroup_id': None,
|
||||
'cgsnapshot_id': None}
|
||||
|
||||
# Fake objects assert specs
|
||||
task = create_volume.VolumeCastTask(
|
||||
fake_scheduler_rpc_api(spec, self),
|
||||
fake_volume_api(spec, self),
|
||||
fake_db())
|
||||
fake_volume_api.FakeSchedulerRpcAPI(spec, self),
|
||||
fake_volume_api.FakeVolumeAPI(spec, self),
|
||||
fake_volume_api.FakeDb())
|
||||
|
||||
task._cast_create_volume(self.ctxt, spec, props)
|
||||
|
||||
@ -116,10 +69,11 @@ class CreateVolumeFlowTestCase(test.TestCase):
|
||||
'consistencygroup_id': 5,
|
||||
'cgsnapshot_id': None}
|
||||
|
||||
# Fake objects assert specs
|
||||
task = create_volume.VolumeCastTask(
|
||||
fake_scheduler_rpc_api(spec, self),
|
||||
fake_volume_api(spec, self),
|
||||
fake_db())
|
||||
fake_volume_api.FakeSchedulerRpcAPI(spec, self),
|
||||
fake_volume_api.FakeVolumeAPI(spec, self),
|
||||
fake_volume_api.FakeDb())
|
||||
|
||||
task._cast_create_volume(self.ctxt, spec, props)
|
||||
|
70
cinder/tests/unit/volume/flows/test_manage_volume_flow.py
Normal file
70
cinder/tests/unit/volume/flows/test_manage_volume_flow.py
Normal file
@ -0,0 +1,70 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
""" Tests for manage_existing TaskFlow """
|
||||
|
||||
import mock
|
||||
|
||||
from cinder import context
|
||||
from cinder import test
|
||||
from cinder.tests.unit.volume.flows import fake_volume_api
|
||||
from cinder.volume.flows.api import manage_existing
|
||||
|
||||
|
||||
class ManageVolumeFlowTestCase(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ManageVolumeFlowTestCase, self).setUp()
|
||||
self.ctxt = context.get_admin_context()
|
||||
self.counter = float(0)
|
||||
|
||||
def test_cast_manage_existing(self):
|
||||
|
||||
volume = mock.MagicMock(return_value=None)
|
||||
spec = {
|
||||
'name': 'name',
|
||||
'description': 'description',
|
||||
'host': 'host',
|
||||
'ref': 'ref',
|
||||
'volume_type': 'volume_type',
|
||||
'metadata': 'metadata',
|
||||
'availability_zone': 'availability_zone',
|
||||
'bootable': 'bootable'}
|
||||
|
||||
# Fake objects assert specs
|
||||
task = manage_existing.ManageCastTask(
|
||||
fake_volume_api.FakeSchedulerRpcAPI(spec, self),
|
||||
fake_volume_api.FakeDb())
|
||||
|
||||
create_what = spec.copy()
|
||||
create_what.update({'volume': volume})
|
||||
task.execute(self.ctxt, **create_what)
|
||||
|
||||
volume = mock.MagicMock(return_value={'id': 1})
|
||||
|
||||
spec = {
|
||||
'name': 'name',
|
||||
'description': 'description',
|
||||
'host': 'host',
|
||||
'ref': 'ref',
|
||||
'volume_type': 'volume_type',
|
||||
'metadata': 'metadata',
|
||||
'availability_zone': 'availability_zone',
|
||||
'bootable': 'bootable'}
|
||||
|
||||
# Fake objects assert specs
|
||||
task = manage_existing.ManageCastTask(
|
||||
fake_volume_api.FakeSchedulerRpcAPI(spec, self),
|
||||
fake_volume_api.FakeDb())
|
||||
|
||||
create_what = spec.copy()
|
||||
create_what.update({'volume': volume})
|
||||
task.execute(self.ctxt, **create_what)
|
@ -44,6 +44,7 @@ from cinder import quota_utils
|
||||
from cinder.scheduler import rpcapi as scheduler_rpcapi
|
||||
from cinder import utils
|
||||
from cinder.volume.flows.api import create_volume
|
||||
from cinder.volume.flows.api import manage_existing
|
||||
from cinder.volume import qos_specs
|
||||
from cinder.volume import rpcapi as volume_rpcapi
|
||||
from cinder.volume import utils as volume_utils
|
||||
@ -1466,36 +1467,36 @@ class API(base.Base):
|
||||
LOG.error(_LE('Unable to find service for given host.'))
|
||||
availability_zone = service.get('availability_zone')
|
||||
|
||||
volume_type_id = volume_type['id'] if volume_type else None
|
||||
volume_properties = {
|
||||
'size': 0,
|
||||
'user_id': context.user_id,
|
||||
'project_id': context.project_id,
|
||||
'status': 'creating',
|
||||
'attach_status': 'detached',
|
||||
# Rename these to the internal name.
|
||||
'display_description': description,
|
||||
'display_name': name,
|
||||
manage_what = {
|
||||
'context': context,
|
||||
'name': name,
|
||||
'description': description,
|
||||
'host': host,
|
||||
'availability_zone': availability_zone,
|
||||
'volume_type_id': volume_type_id,
|
||||
'ref': ref,
|
||||
'volume_type': volume_type,
|
||||
'metadata': metadata,
|
||||
'bootable': bootable
|
||||
'availability_zone': availability_zone,
|
||||
'bootable': bootable,
|
||||
}
|
||||
|
||||
# Call the scheduler to ensure that the host exists and that it can
|
||||
# accept the volume
|
||||
volume = self.db.volume_create(context, volume_properties)
|
||||
request_spec = {'volume_properties': volume,
|
||||
'volume_type': volume_type,
|
||||
'volume_id': volume['id'],
|
||||
'ref': ref}
|
||||
self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic,
|
||||
volume['id'],
|
||||
request_spec=request_spec)
|
||||
try:
|
||||
flow_engine = manage_existing.get_flow(self.scheduler_rpcapi,
|
||||
self.db,
|
||||
manage_what)
|
||||
except Exception:
|
||||
msg = _('Failed to manage api volume flow.')
|
||||
LOG.exception(msg)
|
||||
raise exception.CinderException(msg)
|
||||
|
||||
# Attaching this listener will capture all of the notifications that
|
||||
# taskflow sends out and redirect them to a more useful log for
|
||||
# cinder's debugging (or error reporting) usage.
|
||||
with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
|
||||
flow_engine.run()
|
||||
vol_ref = flow_engine.storage.fetch('volume')
|
||||
LOG.info(_LI("Manage volume request issued successfully."),
|
||||
resource=volume)
|
||||
return volume
|
||||
resource=vol_ref)
|
||||
return vol_ref
|
||||
|
||||
|
||||
class HostAPI(base.Base):
|
||||
|
153
cinder/volume/flows/api/manage_existing.py
Normal file
153
cinder/volume/flows/api/manage_existing.py
Normal file
@ -0,0 +1,153 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import taskflow.engines
|
||||
from taskflow.patterns import linear_flow
|
||||
from taskflow.types import failure as ft
|
||||
|
||||
from cinder import exception
|
||||
from cinder import flow_utils
|
||||
from cinder.i18n import _LE
|
||||
from cinder.volume.flows import common
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
ACTION = 'volume:manage_existing'
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class EntryCreateTask(flow_utils.CinderTask):
|
||||
"""Creates an entry for the given volume creation in the database.
|
||||
|
||||
Reversion strategy: remove the volume_id created from the database.
|
||||
"""
|
||||
default_provides = set(['volume_properties', 'volume'])
|
||||
|
||||
def __init__(self, db):
|
||||
requires = ['availability_zone', 'description', 'metadata',
|
||||
'name', 'host', 'bootable', 'volume_type', 'ref']
|
||||
super(EntryCreateTask, self).__init__(addons=[ACTION],
|
||||
requires=requires)
|
||||
self.db = db
|
||||
|
||||
def execute(self, context, **kwargs):
|
||||
"""Creates a database entry for the given inputs and returns details.
|
||||
|
||||
Accesses the database and creates a new entry for the to be created
|
||||
volume using the given volume properties which are extracted from the
|
||||
input kwargs.
|
||||
"""
|
||||
volume_type = kwargs.pop('volume_type')
|
||||
volume_type_id = volume_type['id'] if volume_type else None
|
||||
|
||||
volume_properties = {
|
||||
'size': 0,
|
||||
'user_id': context.user_id,
|
||||
'project_id': context.project_id,
|
||||
'status': 'creating',
|
||||
'attach_status': 'detached',
|
||||
# Rename these to the internal name.
|
||||
'display_description': kwargs.pop('description'),
|
||||
'display_name': kwargs.pop('name'),
|
||||
'host': kwargs.pop('host'),
|
||||
'availability_zone': kwargs.pop('availability_zone'),
|
||||
'volume_type_id': volume_type_id,
|
||||
'metadata': kwargs.pop('metadata'),
|
||||
'bootable': kwargs.pop('bootable'),
|
||||
}
|
||||
|
||||
volume = self.db.volume_create(context, volume_properties)
|
||||
|
||||
return {
|
||||
'volume_properties': volume_properties,
|
||||
# NOTE(harlowja): it appears like further usage of this volume
|
||||
# result actually depend on it being a sqlalchemy object and not
|
||||
# just a plain dictionary so that's why we are storing this here.
|
||||
#
|
||||
# In the future where this task results can be serialized and
|
||||
# restored automatically for continued running we will need to
|
||||
# resolve the serialization & recreation of this object since raw
|
||||
# sqlalchemy objects can't be serialized.
|
||||
'volume': volume,
|
||||
}
|
||||
|
||||
def revert(self, context, result, optional_args, **kwargs):
|
||||
# We never produced a result and therefore can't destroy anything.
|
||||
if isinstance(result, ft.Failure):
|
||||
return
|
||||
|
||||
vol_id = result['volume_id']
|
||||
try:
|
||||
self.db.volume_destroy(context.elevated(), vol_id)
|
||||
except exception.CinderException:
|
||||
LOG.exception(_LE("Failed destroying volume entry: %s."), vol_id)
|
||||
|
||||
|
||||
class ManageCastTask(flow_utils.CinderTask):
|
||||
"""Performs a volume manage cast to the scheduler and to the volume manager.
|
||||
|
||||
This which will signal a transition of the api workflow to another child
|
||||
and/or related workflow.
|
||||
"""
|
||||
|
||||
def __init__(self, scheduler_rpcapi, db):
|
||||
requires = ['volume', 'volume_properties', 'volume_type', 'ref']
|
||||
super(ManageCastTask, self).__init__(addons=[ACTION],
|
||||
requires=requires)
|
||||
self.scheduler_rpcapi = scheduler_rpcapi
|
||||
self.db = db
|
||||
|
||||
def execute(self, context, **kwargs):
|
||||
volume = kwargs.pop('volume')
|
||||
request_spec = kwargs.copy()
|
||||
|
||||
# Call the scheduler to ensure that the host exists and that it can
|
||||
# accept the volume
|
||||
self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic,
|
||||
volume['id'],
|
||||
request_spec=request_spec)
|
||||
|
||||
def revert(self, context, result, flow_failures, **kwargs):
|
||||
# Restore the source volume status and set the volume to error status.
|
||||
volume_id = kwargs['volume_id']
|
||||
common.error_out_volume(context, self.db, volume_id)
|
||||
LOG.error(_LE("Volume %s: manage failed."), volume_id)
|
||||
exc_info = False
|
||||
if all(flow_failures[-1].exc_info):
|
||||
exc_info = flow_failures[-1].exc_info
|
||||
LOG.error(_LE('Unexpected build error:'), exc_info=exc_info)
|
||||
|
||||
|
||||
def get_flow(scheduler_rpcapi, db_api, create_what):
|
||||
"""Constructs and returns the api entrypoint flow.
|
||||
|
||||
This flow will do the following:
|
||||
|
||||
1. Inject keys & values for dependent tasks.
|
||||
2. Extracts and validates the input keys & values.
|
||||
3. Creates the database entry.
|
||||
4. Casts to volume manager and scheduler for further processing.
|
||||
"""
|
||||
|
||||
flow_name = ACTION.replace(":", "_") + "_api"
|
||||
api_flow = linear_flow.Flow(flow_name)
|
||||
|
||||
# This will cast it out to either the scheduler or volume manager via
|
||||
# the rpc apis provided.
|
||||
api_flow.add(EntryCreateTask(db_api),
|
||||
ManageCastTask(scheduler_rpcapi, db_api))
|
||||
|
||||
# Now load (but do not run) the flow using the provided initial data.
|
||||
return taskflow.engines.load(api_flow, store=create_what)
|
4
tox.ini
4
tox.ini
@ -49,7 +49,6 @@ commands =
|
||||
cinder.tests.unit.test_cmd \
|
||||
cinder.tests.unit.test_conf \
|
||||
cinder.tests.unit.test_context \
|
||||
cinder.tests.unit.test_create_volume_flow \
|
||||
cinder.tests.unit.test_db_api \
|
||||
cinder.tests.unit.test_dellfc \
|
||||
cinder.tests.unit.test_dellsc \
|
||||
@ -92,7 +91,8 @@ commands =
|
||||
cinder.tests.unit.test_volume_rpcapi \
|
||||
cinder.tests.unit.test_volume_types \
|
||||
cinder.tests.unit.test_volume_types_extra_specs \
|
||||
cinder.tests.unit.test_volume_utils
|
||||
cinder.tests.unit.test_volume_utils \
|
||||
cinder.tests.unit.volume.flows.test_create_volume_flow
|
||||
|
||||
[testenv:pep8]
|
||||
commands =
|
||||
|
Loading…
x
Reference in New Issue
Block a user