Start/Stop coordinator with Volume service
To make use of the coordinator we need to start and stop it together with the service that uses it. This patch adds start and stop method calls in Volume service and makes it easier to start it from other services. Restored from https://review.openstack.org/#/c/205839 Co-Authored-By: Gorka Eguileor <geguileo@redhat.com> Co-Authored-By: Szymon Borkowski <szymon.borkowski@intel.com> Implements: blueprint cinder-volume-active-active-support Change-Id: Ie5c551f3c7441eb7dc55fa62bed3db8c5a97e7fd
This commit is contained in:
parent
1c9b6f116f
commit
1c7b0a50da
cinder
@ -95,7 +95,8 @@ def main():
|
||||
host = "%s@%s" % (backend_host or CONF.host, backend)
|
||||
server = service.Service.create(host=host,
|
||||
service_name=backend,
|
||||
binary='cinder-volume')
|
||||
binary='cinder-volume',
|
||||
coordination=True)
|
||||
# Dispose of the whole DB connection pool here before
|
||||
# starting another process. Otherwise we run into cases
|
||||
# where child processes share DB connections which results
|
||||
@ -103,7 +104,8 @@ def main():
|
||||
session.dispose_engine()
|
||||
launcher.launch_service(server)
|
||||
else:
|
||||
server = service.Service.create(binary='cinder-volume')
|
||||
server = service.Service.create(binary='cinder-volume',
|
||||
coordination=True)
|
||||
launcher.launch_service(server)
|
||||
except (Exception, SystemExit):
|
||||
LOG.exception(_LE('Failed to load cinder-volume'))
|
||||
|
@ -78,7 +78,8 @@ def main():
|
||||
try:
|
||||
server = service.Service.create(host=host,
|
||||
service_name=backend,
|
||||
binary='cinder-volume')
|
||||
binary='cinder-volume',
|
||||
coordination=True)
|
||||
except Exception:
|
||||
msg = _('Volume service %s failed to start.') % host
|
||||
LOG.exception(msg)
|
||||
|
@ -113,8 +113,10 @@ class Coordinator(object):
|
||||
:param str name: The lock name that is used to identify it
|
||||
across all nodes.
|
||||
"""
|
||||
# NOTE(bluex): Tooz expects lock name as a byte string.
|
||||
lock_name = (self.prefix + name).encode('ascii')
|
||||
if self.coordinator is not None:
|
||||
return self.coordinator.get_lock(self.prefix + name)
|
||||
return self.coordinator.get_lock(lock_name)
|
||||
else:
|
||||
raise exception.LockCreationFailed(_('Coordinator uninitialized.'))
|
||||
|
||||
@ -139,7 +141,8 @@ class Coordinator(object):
|
||||
self._dead.wait(cfg.CONF.coordination.heartbeat)
|
||||
|
||||
def _start(self):
|
||||
member_id = self.prefix + self.agent_id
|
||||
# NOTE(bluex): Tooz expects member_id as a byte string.
|
||||
member_id = (self.prefix + self.agent_id).encode('ascii')
|
||||
self.coordinator = coordination.get_coordinator(
|
||||
cfg.CONF.coordination.backend_url, member_id)
|
||||
self.coordinator.start()
|
||||
|
@ -37,6 +37,7 @@ osprofiler_web = importutils.try_import('osprofiler.web')
|
||||
profiler_opts = importutils.try_import('osprofiler.opts')
|
||||
|
||||
from cinder import context
|
||||
from cinder import coordination
|
||||
from cinder import exception
|
||||
from cinder.i18n import _, _LE, _LI, _LW
|
||||
from cinder import objects
|
||||
@ -115,7 +116,7 @@ class Service(service.Service):
|
||||
|
||||
def __init__(self, host, binary, topic, manager, report_interval=None,
|
||||
periodic_interval=None, periodic_fuzzy_delay=None,
|
||||
service_name=None, *args, **kwargs):
|
||||
service_name=None, coordination=False, *args, **kwargs):
|
||||
super(Service, self).__init__()
|
||||
|
||||
if not rpc.initialized():
|
||||
@ -125,6 +126,7 @@ class Service(service.Service):
|
||||
self.binary = binary
|
||||
self.topic = topic
|
||||
self.manager_class_name = manager
|
||||
self.coordination = coordination
|
||||
manager_class = importutils.import_class(self.manager_class_name)
|
||||
if CONF.profiler.enabled:
|
||||
manager_class = profiler.trace_cls("rpc")(manager_class)
|
||||
@ -163,6 +165,10 @@ class Service(service.Service):
|
||||
LOG.info(_LI('Starting %(topic)s node (version %(version_string)s)'),
|
||||
{'topic': self.topic, 'version_string': version_string})
|
||||
self.model_disconnected = False
|
||||
|
||||
if self.coordination:
|
||||
coordination.COORDINATOR.start()
|
||||
|
||||
self.manager.init_host()
|
||||
|
||||
LOG.debug("Creating RPC server for service %s", self.topic)
|
||||
@ -234,7 +240,8 @@ class Service(service.Service):
|
||||
@classmethod
|
||||
def create(cls, host=None, binary=None, topic=None, manager=None,
|
||||
report_interval=None, periodic_interval=None,
|
||||
periodic_fuzzy_delay=None, service_name=None):
|
||||
periodic_fuzzy_delay=None, service_name=None,
|
||||
coordination=False):
|
||||
"""Instantiates class and passes back application object.
|
||||
|
||||
:param host: defaults to CONF.host
|
||||
@ -265,7 +272,8 @@ class Service(service.Service):
|
||||
report_interval=report_interval,
|
||||
periodic_interval=periodic_interval,
|
||||
periodic_fuzzy_delay=periodic_fuzzy_delay,
|
||||
service_name=service_name)
|
||||
service_name=service_name,
|
||||
coordination=coordination)
|
||||
|
||||
return service_obj
|
||||
|
||||
@ -283,6 +291,12 @@ class Service(service.Service):
|
||||
x.stop()
|
||||
except Exception:
|
||||
self.timers_skip.append(x)
|
||||
|
||||
if self.coordination:
|
||||
try:
|
||||
coordination.COORDINATOR.stop()
|
||||
except Exception:
|
||||
pass
|
||||
super(Service, self).stop(graceful=True)
|
||||
|
||||
def wait(self):
|
||||
|
@ -149,9 +149,11 @@ class TestCinderAllCmd(test.TestCase):
|
||||
wsgi_service.assert_called_once_with('osapi_volume')
|
||||
launcher.launch_service.assert_any_call(server, workers=server.workers)
|
||||
|
||||
service_create.assert_has_calls([mock.call(binary='cinder-scheduler'),
|
||||
mock.call(binary='cinder-backup'),
|
||||
mock.call(binary='cinder-volume')])
|
||||
service_create.assert_has_calls([
|
||||
mock.call(binary='cinder-scheduler'),
|
||||
mock.call(binary='cinder-backup'),
|
||||
mock.call(binary='cinder-volume', coordination=True)],
|
||||
any_order=True)
|
||||
self.assertEqual(3, service_create.call_count)
|
||||
launcher.launch_service.assert_has_calls([mock.call(service)] * 3)
|
||||
self.assertEqual(4, launcher.launch_service.call_count)
|
||||
@ -191,7 +193,9 @@ class TestCinderAllCmd(test.TestCase):
|
||||
mock.call(binary='cinder-backup'),
|
||||
mock.call(binary='cinder-volume',
|
||||
host='host@backend1',
|
||||
service_name='backend1')])
|
||||
service_name='backend1',
|
||||
coordination=True)],
|
||||
any_order=True)
|
||||
self.assertEqual(3, service_create.call_count)
|
||||
launcher.launch_service.assert_has_calls([mock.call(service)] * 3)
|
||||
self.assertEqual(4, launcher.launch_service.call_count)
|
||||
@ -274,8 +278,11 @@ class TestCinderAllCmd(test.TestCase):
|
||||
wsgi_service.assert_called_once_with('osapi_volume')
|
||||
launcher.launch_service.assert_any_call(server,
|
||||
workers=server.workers)
|
||||
for binary in ['cinder-volume', 'cinder-scheduler', 'cinder-backup']:
|
||||
service_create.assert_any_call(binary=binary)
|
||||
services = (('cinder-volume', {'coordination': True}),
|
||||
('cinder-backup', {}),
|
||||
('cinder-scheduler', {}))
|
||||
for binary, params in services:
|
||||
service_create.assert_any_call(binary=binary, **params)
|
||||
launcher.launch_service.assert_called_with(service)
|
||||
rpc_init.assert_called_once_with(CONF)
|
||||
self.assertTrue(mock_log.exception.called)
|
||||
|
@ -75,12 +75,16 @@ class CoordinatorTestCase(test.TestCase):
|
||||
agent1.start()
|
||||
agent2 = coordination.Coordinator()
|
||||
agent2.start()
|
||||
self.assertNotIn('lock', MockToozLock.active_locks)
|
||||
with agent1.get_lock('lock'):
|
||||
self.assertIn('lock', MockToozLock.active_locks)
|
||||
self.assertRaises(Locked, agent1.get_lock('lock').acquire)
|
||||
self.assertRaises(Locked, agent2.get_lock('lock').acquire)
|
||||
self.assertNotIn('lock', MockToozLock.active_locks)
|
||||
|
||||
lock_name = 'lock'
|
||||
expected_name = lock_name.encode('ascii')
|
||||
|
||||
self.assertNotIn(expected_name, MockToozLock.active_locks)
|
||||
with agent1.get_lock(lock_name):
|
||||
self.assertIn(expected_name, MockToozLock.active_locks)
|
||||
self.assertRaises(Locked, agent1.get_lock(lock_name).acquire)
|
||||
self.assertRaises(Locked, agent2.get_lock(lock_name).acquire)
|
||||
self.assertNotIn(expected_name, MockToozLock.active_locks)
|
||||
|
||||
def test_coordinator_offline(self, get_coordinator, heartbeat):
|
||||
crd = get_coordinator.return_value
|
||||
|
Loading…
x
Reference in New Issue
Block a user