Add scheduler RPC API v2.0

This patch creates scheduler RPC API version 2.0, while retaining
compatibility in rpcapi and manager for 1.x, allowing for continuous
deployment scenarios.

This should be merged just before the Mitaka release.

UpgradeImpact - Deployments doing continous deployment should not
upgrade into Newton before doing an upgrade which includes all the
Mitaka's RPC API version bump commits (scheduler, volume, backup).

Change-Id: I9870462cf32be102a895f6e70ef843bfadf85a9d
Related-Blueprint: rpc-object-compatibility
This commit is contained in:
Michał Dulko 2016-02-17 11:15:57 +01:00
parent 4d093d4eca
commit cb4d320e02
3 changed files with 143 additions and 22 deletions
cinder
scheduler
tests/unit/scheduler

@ -66,6 +66,7 @@ class SchedulerManager(manager.Manager):
scheduler_driver = CONF.scheduler_driver
self.driver = importutils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(*args, **kwargs)
self.additional_endpoints.append(_SchedulerV2Proxy(self))
self._startup_delay = True
def init_host_with_rpc(self):
@ -315,3 +316,59 @@ class SchedulerManager(manager.Manager):
rpc.get_notifier("scheduler").error(context,
'scheduler.' + method,
payload)
# TODO(dulek): This goes away immediately in Newton and is just present in
# Mitaka so that we can receive v1.x and v2.0 messages.
class _SchedulerV2Proxy(object):
target = messaging.Target(version='2.0')
def __init__(self, manager):
self.manager = manager
def update_service_capabilities(self, context, service_name=None,
host=None, capabilities=None, **kwargs):
return self.manager.update_service_capabilities(
context, service_name=service_name, host=host,
capabilities=capabilities, **kwargs)
def create_consistencygroup(self, context, topic, group,
request_spec_list=None,
filter_properties_list=None):
return self.manager.create_consistencygroup(
context, topic, group, request_spec_list=request_spec_list,
filter_properties_list=None)
def create_volume(self, context, topic, volume_id, snapshot_id=None,
image_id=None, request_spec=None, filter_properties=None,
volume=None):
return self.manager.create_volume(
context, topic, volume_id, snapshot_id=snapshot_id,
image_id=image_id, request_spec=request_spec,
filter_properties=filter_properties, volume=volume)
def request_service_capabilities(self, context):
return self.manager.request_service_capabilities(context)
def migrate_volume_to_host(self, context, topic, volume_id, host,
force_host_copy, request_spec,
filter_properties=None, volume=None):
return self.manager.migrate_volume_to_host(
context, topic, volume_id, host, force_host_copy, request_spec,
filter_properties=filter_properties, volume=volume)
def retype(self, context, topic, volume_id, request_spec,
filter_properties=None, volume=None):
return self.manager.retype(context, topic, volume_id, request_spec,
filter_properties=filter_properties,
volume=volume)
def manage_existing(self, context, topic, volume_id, request_spec,
filter_properties=None):
return self.manager.manage_existing(
context, topic, volume_id, request_spec,
filter_properties=filter_properties)
def get_pools(self, context, filters=None):
return self.manager.get_pools(context, filters=filters)

@ -44,17 +44,29 @@ class SchedulerAPI(rpc.RPCAPI):
1.10 - Adds support for sending objects over RPC in retype()
1.11 - Adds support for sending objects over RPC in
migrate_volume_to_host()
... Mitaka supports messaging 1.11. Any changes to existing methods in
1.x after this point should be done so that they can handle version cap
set to 1.11.
2.0 - Remove 1.x compatibility
"""
RPC_API_VERSION = '1.11'
RPC_API_VERSION = '2.0'
TOPIC = CONF.scheduler_topic
BINARY = 'cinder-scheduler'
def _compat_ver(self, current, legacy):
if self.client.can_send_version(current):
return current
else:
return legacy
def create_consistencygroup(self, ctxt, topic, group,
request_spec_list=None,
filter_properties_list=None):
cctxt = self.client.prepare(version='1.8')
version = self._compat_ver('2.0', '1.8')
cctxt = self.client.prepare(version=version)
request_spec_p_list = []
for request_spec in request_spec_list:
request_spec_p = jsonutils.to_primitive(request_spec)
@ -69,13 +81,15 @@ class SchedulerAPI(rpc.RPCAPI):
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
image_id=None, request_spec=None,
filter_properties=None, volume=None):
request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {'topic': topic, 'volume_id': volume_id,
'snapshot_id': snapshot_id, 'image_id': image_id,
'request_spec': request_spec_p,
'filter_properties': filter_properties}
if self.client.can_send_version('1.9'):
if self.client.can_send_version('2.0'):
version = '2.0'
msg_args['volume'] = volume
elif self.client.can_send_version('1.9'):
version = '1.9'
msg_args['volume'] = volume
else:
@ -92,7 +106,10 @@ class SchedulerAPI(rpc.RPCAPI):
'host': host, 'force_host_copy': force_host_copy,
'request_spec': request_spec_p,
'filter_properties': filter_properties}
if self.client.can_send_version('1.11'):
if self.client.can_send_version('2.0'):
version = '2.0'
msg_args['volume'] = volume
elif self.client.can_send_version('1.11'):
version = '1.11'
msg_args['volume'] = volume
else:
@ -108,7 +125,10 @@ class SchedulerAPI(rpc.RPCAPI):
msg_args = {'topic': topic, 'volume_id': volume_id,
'request_spec': request_spec_p,
'filter_properties': filter_properties}
if self.client.can_send_version('1.10'):
if self.client.can_send_version('2.0'):
version = '2.0'
msg_args['volume'] = volume
elif self.client.can_send_version('1.10'):
version = '1.10'
msg_args['volume'] = volume
else:
@ -119,7 +139,8 @@ class SchedulerAPI(rpc.RPCAPI):
def manage_existing(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None):
cctxt = self.client.prepare(version='1.5')
version = self._compat_ver('2.0', '1.5')
cctxt = self.client.prepare(version=version)
request_spec_p = jsonutils.to_primitive(request_spec)
return cctxt.cast(ctxt, 'manage_existing',
topic=topic,
@ -128,7 +149,8 @@ class SchedulerAPI(rpc.RPCAPI):
filter_properties=filter_properties)
def get_pools(self, ctxt, filters=None):
cctxt = self.client.prepare(version='1.7')
version = self._compat_ver('2.0', '1.7')
cctxt = self.client.prepare(version=version)
return cctxt.call(ctxt, 'get_pools',
filters=filters)
@ -136,7 +158,8 @@ class SchedulerAPI(rpc.RPCAPI):
service_name, host,
capabilities):
# FIXME(flaper87): What to do with fanout?
cctxt = self.client.prepare(fanout=True, version='1.0')
version = self._compat_ver('2.0', '1.0')
cctxt = self.client.prepare(fanout=True, version=version)
cctxt.cast(ctxt, 'update_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities)

@ -75,7 +75,20 @@ class SchedulerRpcAPITestCase(test.TestCase):
for kwarg, value in self.fake_kwargs.items():
self.assertEqual(expected_msg[kwarg], value)
def test_update_service_capabilities(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_update_service_capabilities(self, can_send_version):
self._test_scheduler_api('update_service_capabilities',
rpc_method='cast',
service_name='fake_name',
host='fake_host',
capabilities='fake_capabilities',
fanout=True,
version='2.0')
can_send_version.assert_called_once_with('2.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_update_service_capabilities_old(self, can_send_version):
self._test_scheduler_api('update_service_capabilities',
rpc_method='cast',
service_name='fake_name',
@ -83,6 +96,7 @@ class SchedulerRpcAPITestCase(test.TestCase):
capabilities='fake_capabilities',
fanout=True,
version='1.0')
can_send_version.assert_called_once_with('2.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
@ -96,8 +110,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='1.9')
can_send_version.assert_called_once_with('1.9')
version='2.0')
can_send_version.assert_called_once_with('2.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
@ -112,7 +126,7 @@ class SchedulerRpcAPITestCase(test.TestCase):
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.2')
can_send_version.assert_called_once_with('1.9')
can_send_version.assert_has_calls([mock.call('2.0'), mock.call('1.9')])
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
@ -126,8 +140,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='1.11')
can_send_version.assert_called_once_with('1.11')
version='2.0')
can_send_version.assert_called_once_with('2.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
@ -142,7 +156,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
filter_properties='filter_properties',
volume='volume',
version='1.3')
can_send_version.assert_called_once_with('1.11')
can_send_version.assert_has_calls([mock.call('2.0'),
mock.call('1.11')])
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
@ -154,8 +169,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='1.10')
can_send_version.assert_called_with('1.10')
version='2.0')
can_send_version.assert_called_with('2.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
@ -168,9 +183,23 @@ class SchedulerRpcAPITestCase(test.TestCase):
filter_properties='filter_properties',
volume='volume',
version='1.4')
can_send_version.assert_called_with('1.10')
can_send_version.assert_has_calls([mock.call('2.0'),
mock.call('1.10')])
def test_manage_existing(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_manage_existing(self, can_send_version):
self._test_scheduler_api('manage_existing',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='2.0')
can_send_version.assert_called_with('2.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_manage_existing_old(self, can_send_version):
self._test_scheduler_api('manage_existing',
rpc_method='cast',
topic='topic',
@ -178,9 +207,21 @@ class SchedulerRpcAPITestCase(test.TestCase):
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.5')
can_send_version.assert_called_with('2.0')
def test_get_pools(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_get_pools(self, can_send_version):
self._test_scheduler_api('get_pools',
rpc_method='call',
filters=None,
version='2.0')
can_send_version.assert_called_with('2.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_get_pools_old(self, can_send_version):
self._test_scheduler_api('get_pools',
rpc_method='call',
filters=None,
version='1.7')
can_send_version.assert_called_with('2.0')