From ee451e548a5c611bc9872d89fef3e5d9f1f0365e Mon Sep 17 00:00:00 2001 From: Gorka Eguileor Date: Wed, 23 Mar 2016 16:27:00 +0100 Subject: [PATCH] Add worker's DB operations This patch adds DB sqlalchemy operations for the new workers table that will be used for the new cleanup mechanism implemented to support Active-Active configurations. They will be used for non Active-Active configurations as well. Specs: https://review.openstack.org/236977 Implements: blueprint cinder-volume-active-active-support Change-Id: Id1f09edaa4fb803d522dd1859f6f3ca8b18771ab --- cinder/db/api.py | 35 ++++ cinder/db/sqlalchemy/api.py | 106 +++++++++- cinder/db/sqlalchemy/models.py | 1 + cinder/exception.py | 14 ++ cinder/test.py | 41 ++++ cinder/tests/unit/test_db_api.py | 43 +--- cinder/tests/unit/test_db_worker_api.py | 249 ++++++++++++++++++++++++ 7 files changed, 446 insertions(+), 43 deletions(-) create mode 100644 cinder/tests/unit/test_db_worker_api.py diff --git a/cinder/db/api.py b/cinder/db/api.py index 4b99a1e2539..53c9a0e8db6 100644 --- a/cinder/db/api.py +++ b/cinder/db/api.py @@ -1305,6 +1305,41 @@ def message_destroy(context, message_id): ################### +def worker_create(context, **values): + """Create a worker entry from optional arguments.""" + return IMPL.worker_create(context, **values) + + +def worker_get(context, **filters): + """Get a worker or raise exception if it does not exist.""" + return IMPL.worker_get(context, **filters) + + +def worker_get_all(context, until=None, db_filters=None, **filters): + """Get all workers that match given criteria.""" + return IMPL.worker_get_all(context, until=until, db_filters=db_filters, + **filters) + + +def worker_update(context, id, filters=None, orm_worker=None, **values): + """Update a worker with given values.""" + return IMPL.worker_update(context, id, filters=filters, + orm_worker=orm_worker, **values) + + +def worker_claim_for_cleanup(context, claimer_id, orm_worker): + """Soft delete a worker, change the service_id and update the worker.""" + return IMPL.worker_claim_for_cleanup(context, claimer_id, orm_worker) + + +def worker_destroy(context, **filters): + """Delete a worker (no soft delete).""" + return IMPL.worker_destroy(context, **filters) + + +################### + + def resource_exists(context, model, resource_id): return IMPL.resource_exists(context, model, resource_id) diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index f345a2d8863..d977b49d754 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -4989,6 +4989,107 @@ def image_volume_cache_get_all_for_host(context, host): all() +################### + + +def _worker_query(context, session=None, until=None, db_filters=None, + **filters): + # Remove all filters based on the workers table that are set to None + filters = _clean_filters(filters) + + if filters and not is_valid_model_filters(models.Worker, filters): + return None + + query = model_query(context, models.Worker, session=session) + + if until: + db_filters = list(db_filters) if db_filters else [] + # Since we set updated_at at creation time we don't need to check + # created_at field. + db_filters.append(models.Worker.updated_at <= until) + + if db_filters: + query = query.filter(and_(*db_filters)) + + if filters: + query = query.filter_by(**filters) + + return query + + +def worker_create(context, **values): + """Create a worker entry from optional arguments.""" + worker = models.Worker(**values) + session = get_session() + try: + with session.begin(): + worker.save(session) + except db_exc.DBDuplicateEntry: + raise exception.WorkerExists(type=values.get('resource_type'), + id=values.get('resource_id')) + return worker + + +def worker_get(context, **filters): + """Get a worker or raise exception if it does not exist.""" + query = _worker_query(context, **filters) + worker = query.first() if query else None + if not worker: + raise exception.WorkerNotFound(**filters) + return worker + + +def worker_get_all(context, **filters): + """Get all workers that match given criteria.""" + query = _worker_query(context, **filters) + return query.all() if query else [] + + +def _orm_worker_update(worker, values): + if not worker: + return + for key, value in values.items(): + setattr(worker, key, value) + + +def worker_update(context, id, filters=None, orm_worker=None, **values): + """Update a worker with given values.""" + filters = filters or {} + query = _worker_query(context, id=id, **filters) + result = query.update(values) + if not result: + raise exception.WorkerNotFound(id=id, **filters) + _orm_worker_update(orm_worker, values) + return result + + +def worker_claim_for_cleanup(context, claimer_id, orm_worker): + """Claim a worker entry for cleanup.""" + # We set updated_at value so we are sure we update the DB entry even if the + # service_id is the same in the DB, thus flagging the claim. + values = {'service_id': claimer_id, + 'updated_at': timeutils.utcnow()} + + # We only update the worker entry if it hasn't been claimed by other host + # or thread + query = _worker_query(context, + status=orm_worker.status, + service_id=orm_worker.service_id, + until=orm_worker.updated_at, + id=orm_worker.id) + + result = query.update(values, synchronize_session=False) + if result: + _orm_worker_update(orm_worker, values) + return result + + +def worker_destroy(context, **filters): + """Delete a worker (no soft delete).""" + query = _worker_query(context, **filters) + return query.delete() + + ############################### @@ -5012,7 +5113,10 @@ def get_model_for_versioned_object(versioned_object): 'CGSnapshot': models.Cgsnapshot, } - model_name = versioned_object.obj_name() + if isinstance(versioned_object, six.string_types): + model_name = versioned_object + else: + model_name = versioned_object.obj_name() return (VO_TO_MODEL_EXCEPTIONS.get(model_name) or getattr(models, model_name)) diff --git a/cinder/db/sqlalchemy/models.py b/cinder/db/sqlalchemy/models.py index 8e0a2cec376..bb28b971d59 100644 --- a/cinder/db/sqlalchemy/models.py +++ b/cinder/db/sqlalchemy/models.py @@ -759,6 +759,7 @@ def register_models(): ConsistencyGroup, Cgsnapshot, Cluster, + Worker, ) engine = create_engine(CONF.database.connection, echo=False) for model in models: diff --git a/cinder/exception.py b/cinder/exception.py index d95c3300c99..0987c901c67 100644 --- a/cinder/exception.py +++ b/cinder/exception.py @@ -402,6 +402,20 @@ class ServiceTooOld(Invalid): message = _("Service is too old to fulfil this request.") +class WorkerNotFound(NotFound): + message = _("Worker with %s could not be found.") + + def __init__(self, message=None, **kwargs): + keys_list = ('{0}=%({0})s'.format(key) for key in kwargs) + placeholder = ', '.join(keys_list) + self.message = self.message % placeholder + super(WorkerNotFound, self).__init__(message, **kwargs) + + +class WorkerExists(Duplicate): + message = _("Worker for %(type)s %(id)s already exists.") + + class ClusterNotFound(NotFound): message = _('Cluster %(id)s could not be found.') diff --git a/cinder/test.py b/cinder/test.py index a17c792cdde..5139ecff54f 100644 --- a/cinder/test.py +++ b/cinder/test.py @@ -37,6 +37,7 @@ from oslo_messaging import conffixture as messaging_conffixture from oslo_utils import strutils from oslo_utils import timeutils from oslotest import moxstubout +import six import testtools from cinder.common import config # noqa Need to register global_opts @@ -395,3 +396,43 @@ class TestCase(testtools.TestCase): self.assertEqual(call[0], posargs[0]) self.assertEqual(call[1], posargs[2]) + + +class ModelsObjectComparatorMixin(object): + def _dict_from_object(self, obj, ignored_keys): + if ignored_keys is None: + ignored_keys = [] + if isinstance(obj, dict): + items = obj.items() + else: + items = obj.iteritems() + return {k: v for k, v in items + if k not in ignored_keys} + + def _assertEqualObjects(self, obj1, obj2, ignored_keys=None): + obj1 = self._dict_from_object(obj1, ignored_keys) + obj2 = self._dict_from_object(obj2, ignored_keys) + + self.assertEqual( + len(obj1), len(obj2), + "Keys mismatch: %s" % six.text_type( + set(obj1.keys()) ^ set(obj2.keys()))) + for key, value in obj1.items(): + self.assertEqual(value, obj2[key]) + + def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None, + msg=None): + obj_to_dict = lambda o: self._dict_from_object(o, ignored_keys) + sort_key = lambda d: [d[k] for k in sorted(d)] + conv_and_sort = lambda obj: sorted(map(obj_to_dict, obj), key=sort_key) + + self.assertListEqual(conv_and_sort(objs1), conv_and_sort(objs2), + msg=msg) + + def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2): + self.assertEqual(len(primitives1), len(primitives2)) + for primitive in primitives1: + self.assertIn(primitive, primitives2) + + for primitive in primitives2: + self.assertIn(primitive, primitives1) diff --git a/cinder/tests/unit/test_db_api.py b/cinder/tests/unit/test_db_api.py index 3c922a883bf..592fbcb8784 100644 --- a/cinder/tests/unit/test_db_api.py +++ b/cinder/tests/unit/test_db_api.py @@ -22,7 +22,6 @@ import mock from oslo_config import cfg from oslo_utils import timeutils from oslo_utils import uuidutils -import six from cinder.api import common from cinder import context @@ -72,47 +71,7 @@ def _quota_reserve(context, project_id): ) -class ModelsObjectComparatorMixin(object): - def _dict_from_object(self, obj, ignored_keys): - if ignored_keys is None: - ignored_keys = [] - if isinstance(obj, dict): - items = obj.items() - else: - items = obj.iteritems() - return {k: v for k, v in items - if k not in ignored_keys} - - def _assertEqualObjects(self, obj1, obj2, ignored_keys=None): - obj1 = self._dict_from_object(obj1, ignored_keys) - obj2 = self._dict_from_object(obj2, ignored_keys) - - self.assertEqual( - len(obj1), len(obj2), - "Keys mismatch: %s" % six.text_type( - set(obj1.keys()) ^ set(obj2.keys()))) - for key, value in obj1.items(): - self.assertEqual(value, obj2[key]) - - def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None, - msg=None): - obj_to_dict = lambda o: self._dict_from_object(o, ignored_keys) - sort_key = lambda d: [d[k] for k in sorted(d)] - conv_and_sort = lambda obj: sorted(map(obj_to_dict, obj), key=sort_key) - - self.assertListEqual(conv_and_sort(objs1), conv_and_sort(objs2), - msg=msg) - - def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2): - self.assertEqual(len(primitives1), len(primitives2)) - for primitive in primitives1: - self.assertIn(primitive, primitives2) - - for primitive in primitives2: - self.assertIn(primitive, primitives1) - - -class BaseTest(test.TestCase, ModelsObjectComparatorMixin): +class BaseTest(test.TestCase, test.ModelsObjectComparatorMixin): def setUp(self): super(BaseTest, self).setUp() self.ctxt = context.get_admin_context() diff --git a/cinder/tests/unit/test_db_worker_api.py b/cinder/tests/unit/test_db_worker_api.py new file mode 100644 index 00000000000..be15640e6c9 --- /dev/null +++ b/cinder/tests/unit/test_db_worker_api.py @@ -0,0 +1,249 @@ +# Copyright (c) 2016 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Unit tests for cinder.db.api.Worker""" + +import time +import uuid + +from oslo_db import exception as db_exception +import six + +from cinder import context +from cinder import db +from cinder import exception +from cinder import test +from cinder.tests.unit import fake_constants as fake + + +class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin): + worker_fields = {'resource_type': 'Volume', + 'resource_id': fake.VOLUME_ID, + 'status': 'creating'} + + def _uuid(self): + return six.text_type(uuid.uuid4()) + + def setUp(self): + super(DBAPIWorkerTestCase, self).setUp() + self.ctxt = context.get_admin_context() + + def test_worker_create_and_get(self): + """Test basic creation of a worker record.""" + worker = db.worker_create(self.ctxt, **self.worker_fields) + db_worker = db.worker_get(self.ctxt, id=worker.id) + self._assertEqualObjects(worker, db_worker) + + def test_worker_create_unique_constrains(self): + """Test when we use an already existing resource type and id.""" + db.worker_create(self.ctxt, **self.worker_fields) + self.assertRaises(exception.WorkerExists, db.worker_create, + self.ctxt, + resource_type=self.worker_fields['resource_type'], + resource_id=self.worker_fields['resource_id'], + status='not_' + self.worker_fields['status']) + + def test_worker_create_missing_required_field(self): + """Try creating a worker with a missing required field.""" + for field in self.worker_fields: + params = self.worker_fields.copy() + del params[field] + self.assertRaises(db_exception.DBError, db.worker_create, + self.ctxt, **params) + + def test_worker_create_invalid_field(self): + """Try creating a worker with a non existent db field.""" + self.assertRaises(TypeError, db.worker_create, self.ctxt, + myfield='123', **self.worker_fields) + + def test_worker_get_non_existent(self): + """Check basic non existent worker record get method.""" + db.worker_create(self.ctxt, **self.worker_fields) + self.assertRaises(exception.WorkerNotFound, db.worker_get, + self.ctxt, service_id='1', **self.worker_fields) + + def _create_workers(self, num, read_back=False, **fields): + workers = [] + base_params = self.worker_fields.copy() + base_params.update(fields) + + for i in range(num): + params = base_params.copy() + params['resource_id'] = self._uuid() + workers.append(db.worker_create(self.ctxt, **params)) + + if read_back: + for i in range(len(workers)): + workers[i] = db.worker_get(self.ctxt, id=workers[i].id) + + return workers + + def test_worker_get_all(self): + """Test basic get_all method.""" + self._create_workers(1) + service = db.service_create(self.ctxt, {}) + workers = self._create_workers(3, service_id=service.id) + + db_workers = db.worker_get_all(self.ctxt, service_id=service.id) + self._assertEqualListsOfObjects(workers, db_workers) + + def test_worker_get_all_until(self): + """Test get_all until a specific time.""" + workers = self._create_workers(3, read_back=True) + timestamp = workers[-1].updated_at + time.sleep(0.1) + self._create_workers(3) + + db_workers = db.worker_get_all(self.ctxt, until=timestamp) + self._assertEqualListsOfObjects(workers, db_workers) + + def test_worker_get_all_returns_empty(self): + """Test that get_all returns an empty list when there's no results.""" + self._create_workers(3, deleted=True) + db_workers = db.worker_get_all(self.ctxt) + self.assertListEqual([], db_workers) + + def test_worker_update_not_exists(self): + """Test worker update when the worker doesn't exist.""" + self.assertRaises(exception.WorkerNotFound, db.worker_update, + self.ctxt, 1) + + def test_worker_update(self): + """Test basic worker update.""" + worker = self._create_workers(1)[0] + worker = db.worker_get(self.ctxt, id=worker.id) + res = db.worker_update(self.ctxt, worker.id, service_id=1) + self.assertEqual(1, res) + worker.service_id = 1 + + db_worker = db.worker_get(self.ctxt, id=worker.id) + self._assertEqualObjects(worker, db_worker, ['updated_at']) + + def test_worker_update_update_orm(self): + """Test worker update updating the worker orm object.""" + worker = self._create_workers(1)[0] + res = db.worker_update(self.ctxt, worker.id, orm_worker=worker, + service_id=1) + self.assertEqual(1, res) + + db_worker = db.worker_get(self.ctxt, id=worker.id) + self._assertEqualObjects(worker, db_worker, ['updated_at']) + + def test_worker_destroy(self): + """Test that worker destroy really deletes the DB entry.""" + worker = self._create_workers(1)[0] + res = db.worker_destroy(self.ctxt, id=worker.id) + self.assertEqual(1, res) + + db_workers = db.worker_get_all(self.ctxt, read_deleted='yes') + self.assertListEqual([], db_workers) + + def test_worker_destroy_non_existent(self): + """Test that worker destroy returns 0 when entry doesn't exist.""" + res = db.worker_destroy(self.ctxt, id=1) + self.assertEqual(0, res) + + def test_worker_claim(self): + """Test worker claim of normal DB entry.""" + service_id = 1 + worker = db.worker_create(self.ctxt, resource_type='Volume', + resource_id=fake.VOLUME_ID, + status='deleting') + + res = db.worker_claim_for_cleanup(self.ctxt, service_id, worker) + self.assertEqual(1, res) + + db_worker = db.worker_get(self.ctxt, id=worker.id) + + self._assertEqualObjects(worker, db_worker, ['updated_at']) + self.assertEqual(service_id, db_worker.service_id) + self.assertEqual(worker.service_id, db_worker.service_id) + + def test_worker_claim_fails_status_change(self): + """Test that claim fails if the work entry has changed its status.""" + worker = db.worker_create(self.ctxt, resource_type='Volume', + resource_id=fake.VOLUME_ID, + status='deleting') + worker.status = 'creating' + + res = db.worker_claim_for_cleanup(self.ctxt, 1, worker) + self.assertEqual(0, res) + + db_worker = db.worker_get(self.ctxt, id=worker.id) + self._assertEqualObjects(worker, db_worker, ['status']) + self.assertIsNone(db_worker.service_id) + + def test_worker_claim_fails_service_change(self): + """Test that claim fails on worker service change.""" + failed_service = 1 + working_service = 2 + this_service = 3 + worker = db.worker_create(self.ctxt, resource_type='Volume', + resource_id=fake.VOLUME_ID, + status='deleting', + service_id=working_service) + + worker.service_id = failed_service + res = db.worker_claim_for_cleanup(self.ctxt, this_service, worker) + self.assertEqual(0, res) + db_worker = db.worker_get(self.ctxt, id=worker.id) + self.assertEqual(working_service, db_worker.service_id) + + def test_worker_claim_same_service(self): + """Test worker claim of a DB entry that has our service_id.""" + service_id = 1 + worker = db.worker_create(self.ctxt, resource_type='Volume', + resource_id=fake.VOLUME_ID, + status='deleting', service_id=service_id) + # Read from DB to get updated_at field + worker = db.worker_get(self.ctxt, id=worker.id) + claimed_worker = db.worker_get(self.ctxt, id=worker.id) + + res = db.worker_claim_for_cleanup(self.ctxt, + service_id, + claimed_worker) + self.assertEqual(1, res) + + db_worker = db.worker_get(self.ctxt, id=worker.id) + + self._assertEqualObjects(claimed_worker, db_worker) + self._assertEqualObjects(worker, db_worker, ['updated_at']) + self.assertNotEqual(worker.updated_at, db_worker.updated_at) + + def test_worker_claim_fails_this_service_claimed(self): + """Test claim fails when worker was already claimed by this service.""" + service_id = 1 + worker = db.worker_create(self.ctxt, resource_type='Volume', + resource_id=fake.VOLUME_ID, + status='creating', + service_id=service_id) + + # Read it back to have the updated_at value + worker = db.worker_get(self.ctxt, id=worker.id) + claimed_worker = db.worker_get(self.ctxt, id=worker.id) + + time.sleep(0.1) + # Simulate that this service starts processing this entry + res = db.worker_claim_for_cleanup(self.ctxt, + service_id, + claimed_worker) + self.assertEqual(1, res) + + res = db.worker_claim_for_cleanup(self.ctxt, service_id, worker) + self.assertEqual(0, res) + db_worker = db.worker_get(self.ctxt, id=worker.id) + self._assertEqualObjects(claimed_worker, db_worker) + self._assertEqualObjects(worker, db_worker, ['updated_at']) + self.assertNotEqual(worker.updated_at, db_worker.updated_at)