diff --git a/cinder/api/openstack/api_version_request.py b/cinder/api/openstack/api_version_request.py index 16a2d1623ca..e2e99fb8599 100644 --- a/cinder/api/openstack/api_version_request.py +++ b/cinder/api/openstack/api_version_request.py @@ -73,6 +73,7 @@ REST_API_VERSION_HISTORY = """ * 3.21 - Show provider_id in detailed view of a volume for admin. * 3.22 - Add filtering based on metadata for snapshot listing. * 3.23 - Allow passing force parameter to volume delete. + * 3.24 - Add workers/cleanup endpoint. """ # The minimum and maximum versions of the API supported @@ -80,7 +81,7 @@ REST_API_VERSION_HISTORY = """ # minimum version of the API supported. # Explicitly using /v1 or /v2 enpoints will still work _MIN_API_VERSION = "3.0" -_MAX_API_VERSION = "3.23" +_MAX_API_VERSION = "3.24" _LEGACY_API_VERSION1 = "1.0" _LEGACY_API_VERSION2 = "2.0" diff --git a/cinder/api/openstack/rest_api_version_history.rst b/cinder/api/openstack/rest_api_version_history.rst index 431b5e26190..6cd29eeaf53 100644 --- a/cinder/api/openstack/rest_api_version_history.rst +++ b/cinder/api/openstack/rest_api_version_history.rst @@ -227,3 +227,36 @@ user documentation. ---- Added support to filter snapshot list based on metadata of snapshot. +3.24 +---- + New API endpoint /workers/cleanup allows triggering cleanup for cinder-volume + services. Meant for cleaning ongoing operations from failed nodes. + + The cleanup will be performed by other services belonging to the same + cluster, so at least one of them must be up to be able to do the cleanup. + + Cleanup cannot be triggered during a cloud upgrade. + + If no arguments are provided cleanup will try to issue a clean message for + all nodes that are down, but we can restrict which nodes we want to be + cleaned using parameters ``service_id``, ``cluster_name``, ``host``, + ``binary``, and ``disabled``. + + Cleaning specific resources is also possible using ``resource_type`` and + ``resource_id`` parameters. + + We can even force cleanup on nodes that are up with ``is_up``, but that's + not recommended and should only used if you know what you are doing. For + example if you know a specific cinder-volume is down even though it's still + not being reported as down when listing the services and you know the cluster + has at least another service to do the cleanup. + + API will return a dictionary with 2 lists, one with services that have been + issued a cleanup request (``cleaning`` key) and another list with services + that cannot be cleaned right now because there is no alternative service to + do the cleanup in that cluster (``unavailable`` key). + + Data returned for each service element in these two lists consist of the + ``id``, ``host``, ``binary``, and ``cluster_name``. These are not the + services that will be performing the cleanup, but the services that will be + cleaned up or couldn't be cleaned up. diff --git a/cinder/api/v3/router.py b/cinder/api/v3/router.py index f9d1e14422e..2866f52b658 100644 --- a/cinder/api/v3/router.py +++ b/cinder/api/v3/router.py @@ -37,6 +37,7 @@ from cinder.api.v3 import snapshots from cinder.api.v3 import volume_manage from cinder.api.v3 import volume_metadata from cinder.api.v3 import volumes +from cinder.api.v3 import workers from cinder.api import versions @@ -173,3 +174,8 @@ class APIRouter(cinder.api.openstack.APIRouter): mapper.resource("backup", "backups", controller=self.resources['backups'], collection={'detail': 'GET'}) + + self.resources['workers'] = workers.create_resource() + mapper.resource('worker', 'workers', + controller=self.resources['workers'], + collection={'cleanup': 'POST'}) diff --git a/cinder/api/v3/views/workers.py b/cinder/api/v3/views/workers.py new file mode 100644 index 00000000000..c4b20b2dda6 --- /dev/null +++ b/cinder/api/v3/views/workers.py @@ -0,0 +1,25 @@ +# Copyright (c) 2016 Red Hat Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class ViewBuilder(object): + """Map Cluster into dicts for API responses.""" + + _collection_name = 'workers' + + @classmethod + def service_list(cls, services): + return [{'id': s.id, 'host': s.host, 'binary': s.binary, + 'cluster_name': s.cluster_name} for s in services] diff --git a/cinder/api/v3/workers.py b/cinder/api/v3/workers.py new file mode 100644 index 00000000000..f38a5baa9cc --- /dev/null +++ b/cinder/api/v3/workers.py @@ -0,0 +1,124 @@ +# Copyright (c) 2016 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_utils import timeutils +from oslo_utils import uuidutils + +from cinder.api.openstack import wsgi +from cinder.api.v3.views import workers as workers_view +from cinder import db +from cinder import exception +from cinder.i18n import _ +from cinder import objects +from cinder.objects import cleanable +from cinder.scheduler import rpcapi as sch_rpc +from cinder import utils + + +class WorkerController(wsgi.Controller): + allowed_clean_keys = {'service_id', 'cluster_name', 'host', 'binary', + 'is_up', 'disabled', 'resource_id', 'resource_type', + 'until'} + + policy_checker = wsgi.Controller.get_policy_checker('workers') + + def __init__(self, *args, **kwargs): + self.sch_api = sch_rpc.SchedulerAPI() + + def _prepare_params(self, ctxt, params, allowed): + if not allowed.issuperset(params): + invalid_keys = set(params).difference(allowed) + msg = _('Invalid filter keys: %s') % ', '.join(invalid_keys) + raise exception.InvalidInput(reason=msg) + + if params.get('binary') not in (None, 'cinder-volume', + 'cinder-scheduler'): + msg = _('binary must be empty or set to cinder-volume or ' + 'cinder-scheduler') + raise exception.InvalidInput(reason=msg) + + for boolean in ('disabled', 'is_up'): + if params.get(boolean) is not None: + params[boolean] = utils.get_bool_param(boolean, params) + + resource_type = params.get('resource_type') + + if resource_type: + resource_type = resource_type.title() + types = cleanable.CinderCleanableObject.cleanable_resource_types + if resource_type not in types: + msg = (_('Resource type %s not valid, must be ') % + resource_type) + msg = utils.build_or_str(types, msg + '%s.') + raise exception.InvalidInput(reason=msg) + params['resource_type'] = resource_type + + resource_id = params.get('resource_id') + if resource_id: + if not uuidutils.is_uuid_like(resource_id): + msg = (_('Resource ID must be a UUID, and %s is not.') % + resource_id) + raise exception.InvalidInput(reason=msg) + + # If we have the resource type but we don't have where it is + # located, we get it from the DB to limit the distribution of the + # request by the scheduler, otherwise it will be distributed to all + # the services. + location_keys = {'service_id', 'cluster_name', 'host'} + if not location_keys.intersection(params): + workers = db.worker_get_all(ctxt, resource_id=resource_id, + binary=params.get('binary'), + resource_type=resource_type) + + if len(workers) == 0: + msg = (_('There is no resource with UUID %s pending ' + 'cleanup.'), resource_id) + raise exception.InvalidInput(reason=msg) + if len(workers) > 1: + msg = (_('There are multiple resources with UUID %s ' + 'pending cleanup. Please be more specific.'), + resource_id) + raise exception.InvalidInput(reason=msg) + + worker = workers[0] + params.update(service_id=worker.service_id, + resource_type=worker.resource_type) + + return params + + @wsgi.Controller.api_version('3.24') + @wsgi.response(202) + def cleanup(self, req, body=None): + """Do the cleanup on resources from a specific service/host/node.""" + # Let the wsgi middleware convert NotAuthorized exceptions + ctxt = self.policy_checker(req, 'cleanup') + body = body or {} + + params = self._prepare_params(ctxt, body, self.allowed_clean_keys) + params['until'] = timeutils.utcnow() + + # NOTE(geguileo): If is_up is not specified in the request + # CleanupRequest's default will be used (False) + cleanup_request = objects.CleanupRequest(**params) + cleaning, unavailable = self.sch_api.work_cleanup(ctxt, + cleanup_request) + return { + 'cleaning': workers_view.ViewBuilder.service_list(cleaning), + 'unavailable': workers_view.ViewBuilder.service_list(unavailable), + } + + +def create_resource(): + return wsgi.Resource(WorkerController()) diff --git a/cinder/exception.py b/cinder/exception.py index 33b557e1ce5..59656b96bcf 100644 --- a/cinder/exception.py +++ b/cinder/exception.py @@ -239,6 +239,10 @@ class ServiceUnavailable(Invalid): message = _("Service is unavailable at this time.") +class UnavailableDuringUpgrade(Invalid): + message = _('Cannot perform %(action)s during system upgrade.') + + class ImageUnacceptable(Invalid): message = _("Image %(image_id)s is unacceptable: %(reason)s") diff --git a/cinder/objects/cleanable.py b/cinder/objects/cleanable.py index a8e1b3804ef..184e14f33fa 100644 --- a/cinder/objects/cleanable.py +++ b/cinder/objects/cleanable.py @@ -26,14 +26,28 @@ from cinder.volume import rpcapi as vol_rpcapi class CinderCleanableObject(base.CinderPersistentObject): - """Base class for cleanable OVO resources.""" + """Base class for cleanable OVO resources. + + All cleanable objects must have a host property/attribute. + """ worker = None + cleanable_resource_types = set() + @classmethod def get_rpc_api(cls): # By default assume all resources are handled by c-vol services return vol_rpcapi.VolumeAPI + @classmethod + def cinder_ovo_cls_init(cls): + """Called on OVO registration, sets set of cleanable resources.""" + # First call persistent object method to store the DB model + super(CinderCleanableObject, cls).cinder_ovo_cls_init() + + # Add this class to the set of resources + cls.cleanable_resource_types.add(cls.obj_name()) + @classmethod def get_pinned_version(cls): # We pin the version by the last service that gets updated, which is diff --git a/cinder/objects/snapshot.py b/cinder/objects/snapshot.py index 374677407b4..91347ade162 100644 --- a/cinder/objects/snapshot.py +++ b/cinder/objects/snapshot.py @@ -263,6 +263,11 @@ class Snapshot(cleanable.CinderCleanableObject, base.CinderObject, return False return status == 'creating' + @property + def host(self): + """All cleanable VO must have a host property/attribute.""" + return self.volume.host + @base.CinderObjectRegistry.register class SnapshotList(base.ObjectListBase, base.CinderObject): diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index c10799813d4..c21639e1046 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -19,6 +19,7 @@ Scheduler Service """ +import collections from datetime import datetime import eventlet @@ -28,13 +29,14 @@ import oslo_messaging as messaging from oslo_utils import excutils from oslo_utils import importutils from oslo_utils import timeutils +from oslo_utils import versionutils import six from cinder import context from cinder import db from cinder import exception from cinder import flow_utils -from cinder.i18n import _, _LE +from cinder.i18n import _, _LE, _LI from cinder import manager from cinder import objects from cinder import quota @@ -71,6 +73,10 @@ class SchedulerManager(manager.CleanableManager, manager.Manager): self.driver = importutils.import_object(scheduler_driver) super(SchedulerManager, self).__init__(*args, **kwargs) self._startup_delay = True + self.volume_api = volume_rpcapi.VolumeAPI() + self.sch_api = scheduler_rpcapi.SchedulerAPI() + self.rpc_api_version = versionutils.convert_version_to_int( + self.RPC_API_VERSION) def init_host_with_rpc(self): ctxt = context.get_admin_context() @@ -81,6 +87,8 @@ class SchedulerManager(manager.CleanableManager, manager.Manager): def reset(self): super(SchedulerManager, self).reset() + self.volume_api = volume_rpcapi.VolumeAPI() + self.sch_api = scheduler_rpcapi.SchedulerAPI() self.driver.reset() def update_service_capabilities(self, context, service_name=None, @@ -373,3 +381,99 @@ class SchedulerManager(manager.CleanableManager, manager.Manager): rpc.get_notifier("scheduler").error(context, 'scheduler.' + method, payload) + + @property + def upgrading_cloud(self): + min_version_str = self.sch_api.determine_rpc_version_cap() + min_version = versionutils.convert_version_to_int(min_version_str) + return min_version < self.rpc_api_version + + def _cleanup_destination(self, clusters, service): + """Determines the RPC method, destination service and name. + + The name is only used for logging, and it is the topic queue. + """ + # For the scheduler we don't have a specific destination, as any + # scheduler will do and we know we are up, since we are running this + # code. + if service.binary == 'cinder-scheduler': + cleanup_rpc = self.sch_api.do_cleanup + dest = None + dest_name = service.host + else: + cleanup_rpc = self.volume_api.do_cleanup + + # For clustered volume services we try to get info from the cache. + if service.is_clustered: + # Get cluster info from cache + dest = clusters[service.binary].get(service.cluster_name) + # Cache miss forces us to get the cluster from the DB via OVO + if not dest: + dest = service.cluster + clusters[service.binary][service.cluster_name] = dest + dest_name = dest.name + # Non clustered volume services + else: + dest = service + dest_name = service.host + return cleanup_rpc, dest, dest_name + + def work_cleanup(self, context, cleanup_request): + """Process request from API to do cleanup on services. + + Here we retrieve from the DB which services we want to clean up based + on the request from the user. + + Then send individual cleanup requests to each of the services that are + up, and we finally return a tuple with services that we have sent a + cleanup request and those that were not up and we couldn't send it. + """ + if self.upgrading_cloud: + raise exception.UnavailableDuringUpgrade(action='workers cleanup') + + LOG.info(_LI('Workers cleanup request started.')) + + filters = dict(service_id=cleanup_request.service_id, + cluster_name=cleanup_request.cluster_name, + host=cleanup_request.host, + binary=cleanup_request.binary, + is_up=cleanup_request.is_up, + disabled=cleanup_request.disabled) + # Get the list of all the services that match the request + services = objects.ServiceList.get_all(context, filters) + + until = cleanup_request.until or timeutils.utcnow() + requested = [] + not_requested = [] + + # To reduce DB queries we'll cache the clusters data + clusters = collections.defaultdict(dict) + + for service in services: + cleanup_request.cluster_name = service.cluster_name + cleanup_request.service_id = service.id + cleanup_request.host = service.host + cleanup_request.binary = service.binary + cleanup_request.until = until + + cleanup_rpc, dest, dest_name = self._cleanup_destination(clusters, + service) + + # If it's a scheduler or the service is up, send the request. + if not dest or dest.is_up: + LOG.info(_LI('Sending cleanup for %(binary)s %(dest_name)s.'), + {'binary': service.binary, + 'dest_name': dest_name}) + cleanup_rpc(context, cleanup_request) + requested.append(service) + # We don't send cleanup requests when there are no services alive + # to do the cleanup. + else: + LOG.info(_LI('No service available to cleanup %(binary)s ' + '%(dest_name)s.'), + {'binary': service.binary, + 'dest_name': dest_name}) + not_requested.append(service) + + LOG.info(_LI('Cleanup requests completed.')) + return requested, not_requested diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index a5124a80b86..6a48ff96ad8 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -66,9 +66,10 @@ class SchedulerAPI(rpc.RPCAPI): 3.3 - Add cluster support to migrate_volume, and to update_service_capabilities and send the timestamp from the capabilities. + 3.4 - Adds work_cleanup and do_cleanup methods. """ - RPC_API_VERSION = '3.3' + RPC_API_VERSION = '3.4' RPC_DEFAULT_VERSION = '3.0' TOPIC = constants.SCHEDULER_TOPIC BINARY = 'cinder-scheduler' @@ -199,3 +200,28 @@ class SchedulerAPI(rpc.RPCAPI): cctxt.cast(ctxt, 'notify_service_capabilities', service_name=service_name, host=host, capabilities=capabilities) + + def work_cleanup(self, ctxt, cleanup_request): + """Generate individual service cleanup requests from user request.""" + if not self.client.can_send_version('3.4'): + msg = _('One of cinder-scheduler services is too old to accept ' + 'such request. Are you running mixed Newton-Ocata' + 'cinder-schedulers?') + raise exception.ServiceTooOld(msg) + + cctxt = self.client.prepare(version='3.4') + # Response will have services that are receiving the cleanup request + # and services that couldn't receive it since they are down. + return cctxt.call(ctxt, 'work_cleanup', + cleanup_request=cleanup_request) + + def do_cleanup(self, ctxt, cleanup_request): + """Perform this scheduler's resource cleanup as per cleanup_request.""" + if not self.client.can_send_version('3.4'): + msg = _('One of cinder-scheduler services is too old to accept ' + 'such request. Are you running mixed Newton-Ocata' + 'cinder-schedulers?') + raise exception.ServiceTooOld(msg) + + cctxt = self.client.prepare(version='3.4') + cctxt.cast(ctxt, 'do_cleanup', cleanup_request=cleanup_request) diff --git a/cinder/tests/unit/api/v3/test_workers.py b/cinder/tests/unit/api/v3/test_workers.py new file mode 100644 index 00000000000..9dceb522768 --- /dev/null +++ b/cinder/tests/unit/api/v3/test_workers.py @@ -0,0 +1,158 @@ +# Copyright (c) 2016 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ddt +import mock +from oslo_serialization import jsonutils +import webob + +from cinder.api.v3 import router as router_v3 +from cinder.api.v3 import workers +from cinder import context +from cinder import objects +from cinder import test +from cinder.tests.unit.api import fakes +from cinder.tests.unit import fake_constants as fake + + +SERVICES = ( + [objects.Service(id=1, host='host1', binary='cinder-volume', + cluster_name='mycluster'), + objects.Service(id=2, host='host2', binary='cinder-volume', + cluster_name='mycluster')], + [objects.Service(id=3, host='host3', binary='cinder-volume', + cluster_name='mycluster'), + objects.Service(id=4, host='host4', binary='cinder-volume', + cluster_name='mycluster')], +) + + +def app(): + # no auth, just let environ['cinder.context'] pass through + api = router_v3.APIRouter() + mapper = fakes.urlmap.URLMap() + mapper['/v3'] = api + return mapper + + +@ddt.ddt +class WorkersTestCase(test.TestCase): + """Tes Case for the cleanup of Workers entries.""" + def setUp(self): + super(WorkersTestCase, self).setUp() + + self.context = context.RequestContext(user_id=None, + project_id=fake.PROJECT_ID, + is_admin=True, + read_deleted='no', + overwrite=False) + self.controller = workers.create_resource() + + def _get_resp_post(self, body, version='3.24', ctxt=None): + """Helper to execute a POST workers API call.""" + req = webob.Request.blank('/v3/%s/workers/cleanup' % fake.PROJECT_ID) + req.method = 'POST' + req.headers['Content-Type'] = 'application/json' + req.headers['OpenStack-API-Version'] = 'volume ' + version + req.environ['cinder.context'] = ctxt or self.context + req.body = jsonutils.dump_as_bytes(body) + res = req.get_response(app()) + return res + + @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup') + def test_cleanup_old_api_version(self, rpc_mock): + res = self._get_resp_post({}, '3.19') + self.assertEqual(404, res.status_code) + rpc_mock.assert_not_called() + + @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup') + def test_cleanup_not_authorized(self, rpc_mock): + ctxt = context.RequestContext(user_id=None, + project_id=fake.PROJECT_ID, + is_admin=False, + read_deleted='no', + overwrite=False) + res = self._get_resp_post({}, ctxt=ctxt) + self.assertEqual(403, res.status_code) + rpc_mock.assert_not_called() + + @ddt.data({'fake_key': 'value'}, {'binary': 'nova-scheduler'}, + {'disabled': 'sure'}, {'is_up': 'nop'}, + {'resource_type': 'service'}, {'resource_id': 'non UUID'}) + @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup') + def test_cleanup_wrong_param(self, body, rpc_mock): + res = self._get_resp_post(body) + self.assertEqual(400, res.status_code) + if 'disabled' in body or 'is_up' in body: + expected = 'is not a boolean' + else: + expected = 'Invalid input' + self.assertIn(expected, res.json['badRequest']['message']) + rpc_mock.assert_not_called() + + def _expected_services(self, cleaning, unavailable): + def service_view(service): + return {'id': service.id, 'host': service.host, + 'binary': service.binary, + 'cluster_name': service.cluster_name} + return {'cleaning': [service_view(s) for s in cleaning], + 'unavailable': [service_view(s) for s in unavailable]} + + @ddt.data({'service_id': 10}, {'cluster_name': 'cluster_name'}, + {'host': 'hostname'}, {'binary': 'cinder-volume'}, + {'binary': 'cinder-scheduler'}, {'disabled': 'true'}, + {'is_up': 'no'}, {'resource_type': 'Volume'}, + {'resource_id': fake.VOLUME_ID, 'host': 'hostname'}) + @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup', + return_value=SERVICES) + def test_cleanup_params(self, body, rpc_mock): + res = self._get_resp_post(body) + self.assertEqual(202, res.status_code) + rpc_mock.assert_called_once_with(self.context, mock.ANY) + cleanup_request = rpc_mock.call_args[0][1] + for key, value in body.items(): + if key in ('disabled', 'is_up'): + value = value == 'true' + self.assertEqual(value, getattr(cleanup_request, key)) + self.assertEqual(self._expected_services(*SERVICES), res.json) + + @mock.patch('cinder.db.worker_get_all', + return_value=[mock.Mock(service_id=1, resource_type='Volume')]) + @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup', + return_value=SERVICES) + def test_cleanup_missing_location_ok(self, rpc_mock, worker_mock): + res = self._get_resp_post({'resource_id': fake.VOLUME_ID}) + self.assertEqual(202, res.status_code) + rpc_mock.assert_called_once_with(self.context, mock.ANY) + cleanup_request = rpc_mock.call_args[0][1] + self.assertEqual(fake.VOLUME_ID, cleanup_request.resource_id) + self.assertEqual(1, cleanup_request.service_id) + self.assertEqual('Volume', cleanup_request.resource_type) + self.assertEqual(self._expected_services(*SERVICES), res.json) + + @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup') + def test_cleanup_missing_location_fail_none(self, rpc_mock): + res = self._get_resp_post({'resource_id': fake.VOLUME_ID}) + self.assertEqual(400, res.status_code) + self.assertIn('Invalid input', res.json['badRequest']['message']) + rpc_mock.assert_not_called() + + @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.work_cleanup', + return_value=[1, 2]) + def test_cleanup_missing_location_fail_multiple(self, rpc_mock): + res = self._get_resp_post({'resource_id': fake.VOLUME_ID}) + self.assertEqual(400, res.status_code) + self.assertIn('Invalid input', res.json['badRequest']['message']) + rpc_mock.assert_not_called() diff --git a/cinder/tests/unit/policy.json b/cinder/tests/unit/policy.json index ad8766d5fd3..84f0841ea7e 100644 --- a/cinder/tests/unit/policy.json +++ b/cinder/tests/unit/policy.json @@ -143,5 +143,7 @@ "clusters:get": "rule:admin_api", "clusters:get_all": "rule:admin_api", - "clusters:update": "rule:admin_api" + "clusters:update": "rule:admin_api", + + "workers:cleanup": "rule:admin_api" } diff --git a/cinder/tests/unit/scheduler/test_rpcapi.py b/cinder/tests/unit/scheduler/test_rpcapi.py index 3ffbe2b7b2a..7f6197611c5 100644 --- a/cinder/tests/unit/scheduler/test_rpcapi.py +++ b/cinder/tests/unit/scheduler/test_rpcapi.py @@ -22,6 +22,7 @@ import mock from cinder import context from cinder import exception +from cinder import objects from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder import test from cinder.tests.unit import fake_constants @@ -227,3 +228,36 @@ class SchedulerRpcAPITestCase(test.TestCase): filter_properties_list= ['fake_filter_properties_list'], version='3.0') + + @ddt.data(('work_cleanup', 'myhost', None), + ('work_cleanup', 'myhost', 'mycluster'), + ('do_cleanup', 'myhost', None), + ('do_cleanup', 'myhost', 'mycluster')) + @ddt.unpack + @mock.patch('cinder.rpc.get_client') + def test_cleanup(self, method, host, cluster, get_client): + cleanup_request = objects.CleanupRequest(self.context, + host=host, + cluster_name=cluster) + rpcapi = scheduler_rpcapi.SchedulerAPI() + getattr(rpcapi, method)(self.context, cleanup_request) + + prepare = get_client.return_value.prepare + + prepare.assert_called_once_with( + version='3.4') + rpc_call = 'cast' if method == 'do_cleanup' else 'call' + getattr(prepare.return_value, rpc_call).assert_called_once_with( + self.context, method, cleanup_request=cleanup_request) + + @ddt.data('do_cleanup', 'work_cleanup') + def test_cleanup_too_old(self, method): + cleanup_request = objects.CleanupRequest(self.context) + rpcapi = scheduler_rpcapi.SchedulerAPI() + with mock.patch.object(rpcapi.client, 'can_send_version', + return_value=False) as can_send_mock: + self.assertRaises(exception.ServiceTooOld, + getattr(rpcapi, method), + self.context, + cleanup_request) + can_send_mock.assert_called_once_with('3.4') diff --git a/cinder/tests/unit/scheduler/test_scheduler.py b/cinder/tests/unit/scheduler/test_scheduler.py index 12f86823995..76b477a8c20 100644 --- a/cinder/tests/unit/scheduler/test_scheduler.py +++ b/cinder/tests/unit/scheduler/test_scheduler.py @@ -17,6 +17,8 @@ Tests For Scheduler """ +import collections + import mock from oslo_config import cfg @@ -75,7 +77,8 @@ class SchedulerManagerTestCase(test.TestCase): @mock.patch('cinder.objects.service.Service.get_minimum_rpc_version') @mock.patch('cinder.objects.service.Service.get_minimum_obj_version') @mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'cinder-volume': '1.3'}) - @mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'cinder-volume': '1.4'}) + @mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'cinder-volume': '1.4', + 'cinder-scheduler': '1.4'}) def test_reset(self, get_min_obj, get_min_rpc): mgr = self.manager_cls() @@ -347,6 +350,98 @@ class SchedulerManagerTestCase(test.TestCase): vol.refresh() self.assertEqual('error', vol.status) + @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI' + '.determine_rpc_version_cap', mock.Mock(return_value='2.0')) + def test_upgrading_cloud(self): + self.assertTrue(self.manager.upgrading_cloud) + + @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI' + '.determine_rpc_version_cap') + def test_upgrading_cloud_not(self, cap_mock): + cap_mock.return_value = self.manager.RPC_API_VERSION + self.assertFalse(self.manager.upgrading_cloud) + + def test_cleanup_destination_scheduler(self): + service = objects.Service(id=1, host='hostname', + binary='cinder-scheduler') + result = self.manager._cleanup_destination(None, service) + expected = self.manager.sch_api.do_cleanup, None, service.host + self.assertEqual(expected, result) + + def test_cleanup_destination_volume(self): + service = objects.Service(id=1, host='hostname', cluster_name=None, + binary='cinder-volume') + result = self.manager._cleanup_destination(None, service) + expected = self.manager.volume_api.do_cleanup, service, service.host + self.assertEqual(expected, result) + + def test_cleanup_destination_volume_cluster_cache_hit(self): + cluster = objects.Cluster(id=1, name='mycluster', + binary='cinder-volume') + service = objects.Service(id=2, host='hostname', + cluster_name=cluster.name, + binary='cinder-volume') + cluster_cache = {'cinder-volume': {'mycluster': cluster}} + result = self.manager._cleanup_destination(cluster_cache, service) + expected = self.manager.volume_api.do_cleanup, cluster, cluster.name + self.assertEqual(expected, result) + + @mock.patch('cinder.objects.Cluster.get_by_id') + def test_cleanup_destination_volume_cluster_cache_miss(self, get_mock): + cluster = objects.Cluster(id=1, name='mycluster', + binary='cinder-volume') + service = objects.Service(self.context, + id=2, host='hostname', + cluster_name=cluster.name, + binary='cinder-volume') + get_mock.return_value = cluster + cluster_cache = collections.defaultdict(dict) + result = self.manager._cleanup_destination(cluster_cache, service) + expected = self.manager.volume_api.do_cleanup, cluster, cluster.name + self.assertEqual(expected, result) + + @mock.patch('cinder.scheduler.manager.SchedulerManager.upgrading_cloud') + def test_work_cleanup_upgrading(self, upgrading_mock): + cleanup_request = objects.CleanupRequest(host='myhost') + upgrading_mock.return_value = True + self.assertRaises(exception.UnavailableDuringUpgrade, + self.manager.work_cleanup, + self.context, + cleanup_request) + + @mock.patch('cinder.objects.Cluster.is_up', True) + @mock.patch('cinder.objects.Service.is_up', False) + @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.do_cleanup') + @mock.patch('cinder.volume.rpcapi.VolumeAPI.do_cleanup') + @mock.patch('cinder.objects.ServiceList.get_all') + def test_work_cleanup(self, get_mock, vol_clean_mock, sch_clean_mock): + args = dict(service_id=1, cluster_name='cluster_name', host='host', + binary='cinder-volume', is_up=False, disabled=True, + resource_id=fake.VOLUME_ID, resource_type='Volume') + + cluster = objects.Cluster(id=1, name=args['cluster_name'], + binary='cinder-volume') + services = [objects.Service(self.context, + id=2, host='hostname', + cluster_name=cluster.name, + binary='cinder-volume', + cluster=cluster), + objects.Service(self.context, + id=3, host='hostname', + cluster_name=None, + binary='cinder-scheduler'), + objects.Service(self.context, + id=4, host='hostname', + cluster_name=None, + binary='cinder-volume')] + get_mock.return_value = services + + cleanup_request = objects.CleanupRequest(self.context, **args) + res = self.manager.work_cleanup(self.context, cleanup_request) + self.assertEqual((services[:2], services[2:]), res) + self.assertEqual(1, vol_clean_mock.call_count) + self.assertEqual(1, sch_clean_mock.call_count) + class SchedulerTestCase(test.TestCase): """Test case for base scheduler driver class.""" diff --git a/cinder/tests/unit/test_volume_rpcapi.py b/cinder/tests/unit/test_volume_rpcapi.py index dec49d0adec..a48cdea7c78 100644 --- a/cinder/tests/unit/test_volume_rpcapi.py +++ b/cinder/tests/unit/test_volume_rpcapi.py @@ -25,6 +25,7 @@ from oslo_serialization import jsonutils from cinder.common import constants from cinder import context from cinder import db +from cinder import exception from cinder import objects from cinder.objects import base as ovo_base from cinder.objects import fields @@ -731,3 +732,28 @@ class VolumeRpcAPITestCase(test.TestCase): self._test_group_api('delete_group_snapshot', rpc_method='cast', group_snapshot=self.fake_group_snapshot, version='3.0') + + @ddt.data(('myhost', None), ('myhost', 'mycluster')) + @ddt.unpack + @mock.patch('cinder.volume.rpcapi.VolumeAPI._get_cctxt') + def test_do_cleanup(self, host, cluster, get_cctxt_mock): + cleanup_request = objects.CleanupRequest(self.context, + host=host, + cluster_name=cluster) + rpcapi = volume_rpcapi.VolumeAPI() + rpcapi.do_cleanup(self.context, cleanup_request) + get_cctxt_mock.assert_called_once_with( + cleanup_request.service_topic_queue, '3.7') + get_cctxt_mock.return_value.cast.assert_called_once_with( + self.context, 'do_cleanup', cleanup_request=cleanup_request) + + def test_do_cleanup_too_old(self): + cleanup_request = objects.CleanupRequest(self.context) + rpcapi = volume_rpcapi.VolumeAPI() + with mock.patch.object(rpcapi.client, 'can_send_version', + return_value=False) as can_send_mock: + self.assertRaises(exception.ServiceTooOld, + rpcapi.do_cleanup, + self.context, + cleanup_request) + can_send_mock.assert_called_once_with('3.7') diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index 358354e597a..3bb0dd7a692 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -14,6 +14,8 @@ from cinder.common import constants +from cinder import exception +from cinder.i18n import _ from cinder import objects from cinder import quota from cinder import rpc @@ -118,9 +120,11 @@ class VolumeAPI(rpc.RPCAPI): 3.5 - Adds support for cluster in retype and migrate_volume 3.6 - Switch to use oslo.messaging topics to indicate backends instead of @backend suffixes in server names. + 3.7 - Adds do_cleanup method to do volume cleanups from other nodes + that we were doing in init_host. """ - RPC_API_VERSION = '3.6' + RPC_API_VERSION = '3.7' RPC_DEFAULT_VERSION = '3.0' TOPIC = constants.VOLUME_TOPIC BINARY = 'cinder-volume' @@ -390,3 +394,17 @@ class VolumeAPI(rpc.RPCAPI): cctxt = self._get_cctxt(group_snapshot.service_topic_queue) cctxt.cast(ctxt, 'delete_group_snapshot', group_snapshot=group_snapshot) + + def do_cleanup(self, ctxt, cleanup_request): + """Perform this service/cluster resource cleanup as requested.""" + if not self.client.can_send_version('3.7'): + msg = _('One of cinder-volume services is too old to accept such ' + 'a request. Are you running mixed Newton-Ocata services?') + raise exception.ServiceTooOld(msg) + + destination = cleanup_request.service_topic_queue + cctxt = self._get_cctxt(destination, '3.7') + # NOTE(geguileo): This call goes to do_cleanup code in + # cinder.manager.CleanableManager unless in the future we overwrite it + # in cinder.volume.manager + cctxt.cast(ctxt, 'do_cleanup', cleanup_request=cleanup_request) diff --git a/etc/cinder/policy.json b/etc/cinder/policy.json index 6a6cb4bed81..225257ff979 100644 --- a/etc/cinder/policy.json +++ b/etc/cinder/policy.json @@ -137,5 +137,7 @@ "clusters:get": "rule:admin_api", "clusters:get_all": "rule:admin_api", - "clusters:update": "rule:admin_api" + "clusters:update": "rule:admin_api", + + "workers:cleanup": "rule:admin_api" }