From 1c7b0a50da6e6361510953e7b4b600d940fa062a Mon Sep 17 00:00:00 2001 From: Szymon Wroblewski Date: Sat, 25 Jul 2015 22:05:23 +0200 Subject: [PATCH] Start/Stop coordinator with Volume service To make use of the coordinator we need to start and stop it together with the service that uses it. This patch adds start and stop method calls in Volume service and makes it easier to start it from other services. Restored from https://review.openstack.org/#/c/205839 Co-Authored-By: Gorka Eguileor Co-Authored-By: Szymon Borkowski Implements: blueprint cinder-volume-active-active-support Change-Id: Ie5c551f3c7441eb7dc55fa62bed3db8c5a97e7fd --- cinder/cmd/all.py | 6 ++++-- cinder/cmd/volume.py | 3 ++- cinder/coordination.py | 7 +++++-- cinder/service.py | 20 +++++++++++++++++--- cinder/tests/unit/test_cmd.py | 19 +++++++++++++------ cinder/tests/unit/test_coordination.py | 16 ++++++++++------ 6 files changed, 51 insertions(+), 20 deletions(-) diff --git a/cinder/cmd/all.py b/cinder/cmd/all.py index 33b035624ae..33ed5ac0391 100644 --- a/cinder/cmd/all.py +++ b/cinder/cmd/all.py @@ -95,7 +95,8 @@ def main(): host = "%s@%s" % (backend_host or CONF.host, backend) server = service.Service.create(host=host, service_name=backend, - binary='cinder-volume') + binary='cinder-volume', + coordination=True) # Dispose of the whole DB connection pool here before # starting another process. Otherwise we run into cases # where child processes share DB connections which results @@ -103,7 +104,8 @@ def main(): session.dispose_engine() launcher.launch_service(server) else: - server = service.Service.create(binary='cinder-volume') + server = service.Service.create(binary='cinder-volume', + coordination=True) launcher.launch_service(server) except (Exception, SystemExit): LOG.exception(_LE('Failed to load cinder-volume')) diff --git a/cinder/cmd/volume.py b/cinder/cmd/volume.py index 580c9265cd2..877dfe22b86 100644 --- a/cinder/cmd/volume.py +++ b/cinder/cmd/volume.py @@ -78,7 +78,8 @@ def main(): try: server = service.Service.create(host=host, service_name=backend, - binary='cinder-volume') + binary='cinder-volume', + coordination=True) except Exception: msg = _('Volume service %s failed to start.') % host LOG.exception(msg) diff --git a/cinder/coordination.py b/cinder/coordination.py index a4797c02706..075ceb4e39a 100644 --- a/cinder/coordination.py +++ b/cinder/coordination.py @@ -113,8 +113,10 @@ class Coordinator(object): :param str name: The lock name that is used to identify it across all nodes. """ + # NOTE(bluex): Tooz expects lock name as a byte string. + lock_name = (self.prefix + name).encode('ascii') if self.coordinator is not None: - return self.coordinator.get_lock(self.prefix + name) + return self.coordinator.get_lock(lock_name) else: raise exception.LockCreationFailed(_('Coordinator uninitialized.')) @@ -139,7 +141,8 @@ class Coordinator(object): self._dead.wait(cfg.CONF.coordination.heartbeat) def _start(self): - member_id = self.prefix + self.agent_id + # NOTE(bluex): Tooz expects member_id as a byte string. + member_id = (self.prefix + self.agent_id).encode('ascii') self.coordinator = coordination.get_coordinator( cfg.CONF.coordination.backend_url, member_id) self.coordinator.start() diff --git a/cinder/service.py b/cinder/service.py index b57db2e97b8..69500a2655d 100644 --- a/cinder/service.py +++ b/cinder/service.py @@ -37,6 +37,7 @@ osprofiler_web = importutils.try_import('osprofiler.web') profiler_opts = importutils.try_import('osprofiler.opts') from cinder import context +from cinder import coordination from cinder import exception from cinder.i18n import _, _LE, _LI, _LW from cinder import objects @@ -115,7 +116,7 @@ class Service(service.Service): def __init__(self, host, binary, topic, manager, report_interval=None, periodic_interval=None, periodic_fuzzy_delay=None, - service_name=None, *args, **kwargs): + service_name=None, coordination=False, *args, **kwargs): super(Service, self).__init__() if not rpc.initialized(): @@ -125,6 +126,7 @@ class Service(service.Service): self.binary = binary self.topic = topic self.manager_class_name = manager + self.coordination = coordination manager_class = importutils.import_class(self.manager_class_name) if CONF.profiler.enabled: manager_class = profiler.trace_cls("rpc")(manager_class) @@ -163,6 +165,10 @@ class Service(service.Service): LOG.info(_LI('Starting %(topic)s node (version %(version_string)s)'), {'topic': self.topic, 'version_string': version_string}) self.model_disconnected = False + + if self.coordination: + coordination.COORDINATOR.start() + self.manager.init_host() LOG.debug("Creating RPC server for service %s", self.topic) @@ -234,7 +240,8 @@ class Service(service.Service): @classmethod def create(cls, host=None, binary=None, topic=None, manager=None, report_interval=None, periodic_interval=None, - periodic_fuzzy_delay=None, service_name=None): + periodic_fuzzy_delay=None, service_name=None, + coordination=False): """Instantiates class and passes back application object. :param host: defaults to CONF.host @@ -265,7 +272,8 @@ class Service(service.Service): report_interval=report_interval, periodic_interval=periodic_interval, periodic_fuzzy_delay=periodic_fuzzy_delay, - service_name=service_name) + service_name=service_name, + coordination=coordination) return service_obj @@ -283,6 +291,12 @@ class Service(service.Service): x.stop() except Exception: self.timers_skip.append(x) + + if self.coordination: + try: + coordination.COORDINATOR.stop() + except Exception: + pass super(Service, self).stop(graceful=True) def wait(self): diff --git a/cinder/tests/unit/test_cmd.py b/cinder/tests/unit/test_cmd.py index a24385e3782..5e1aebc0c90 100644 --- a/cinder/tests/unit/test_cmd.py +++ b/cinder/tests/unit/test_cmd.py @@ -149,9 +149,11 @@ class TestCinderAllCmd(test.TestCase): wsgi_service.assert_called_once_with('osapi_volume') launcher.launch_service.assert_any_call(server, workers=server.workers) - service_create.assert_has_calls([mock.call(binary='cinder-scheduler'), - mock.call(binary='cinder-backup'), - mock.call(binary='cinder-volume')]) + service_create.assert_has_calls([ + mock.call(binary='cinder-scheduler'), + mock.call(binary='cinder-backup'), + mock.call(binary='cinder-volume', coordination=True)], + any_order=True) self.assertEqual(3, service_create.call_count) launcher.launch_service.assert_has_calls([mock.call(service)] * 3) self.assertEqual(4, launcher.launch_service.call_count) @@ -191,7 +193,9 @@ class TestCinderAllCmd(test.TestCase): mock.call(binary='cinder-backup'), mock.call(binary='cinder-volume', host='host@backend1', - service_name='backend1')]) + service_name='backend1', + coordination=True)], + any_order=True) self.assertEqual(3, service_create.call_count) launcher.launch_service.assert_has_calls([mock.call(service)] * 3) self.assertEqual(4, launcher.launch_service.call_count) @@ -274,8 +278,11 @@ class TestCinderAllCmd(test.TestCase): wsgi_service.assert_called_once_with('osapi_volume') launcher.launch_service.assert_any_call(server, workers=server.workers) - for binary in ['cinder-volume', 'cinder-scheduler', 'cinder-backup']: - service_create.assert_any_call(binary=binary) + services = (('cinder-volume', {'coordination': True}), + ('cinder-backup', {}), + ('cinder-scheduler', {})) + for binary, params in services: + service_create.assert_any_call(binary=binary, **params) launcher.launch_service.assert_called_with(service) rpc_init.assert_called_once_with(CONF) self.assertTrue(mock_log.exception.called) diff --git a/cinder/tests/unit/test_coordination.py b/cinder/tests/unit/test_coordination.py index 04b7ed10ead..5473f16e81e 100644 --- a/cinder/tests/unit/test_coordination.py +++ b/cinder/tests/unit/test_coordination.py @@ -75,12 +75,16 @@ class CoordinatorTestCase(test.TestCase): agent1.start() agent2 = coordination.Coordinator() agent2.start() - self.assertNotIn('lock', MockToozLock.active_locks) - with agent1.get_lock('lock'): - self.assertIn('lock', MockToozLock.active_locks) - self.assertRaises(Locked, agent1.get_lock('lock').acquire) - self.assertRaises(Locked, agent2.get_lock('lock').acquire) - self.assertNotIn('lock', MockToozLock.active_locks) + + lock_name = 'lock' + expected_name = lock_name.encode('ascii') + + self.assertNotIn(expected_name, MockToozLock.active_locks) + with agent1.get_lock(lock_name): + self.assertIn(expected_name, MockToozLock.active_locks) + self.assertRaises(Locked, agent1.get_lock(lock_name).acquire) + self.assertRaises(Locked, agent2.get_lock(lock_name).acquire) + self.assertNotIn(expected_name, MockToozLock.active_locks) def test_coordinator_offline(self, get_coordinator, heartbeat): crd = get_coordinator.return_value