Merge "Add scheduler RPC API v2.0"
This commit is contained in:
commit
72b1b5a7c2
@ -66,6 +66,7 @@ class SchedulerManager(manager.Manager):
|
||||
scheduler_driver = CONF.scheduler_driver
|
||||
self.driver = importutils.import_object(scheduler_driver)
|
||||
super(SchedulerManager, self).__init__(*args, **kwargs)
|
||||
self.additional_endpoints.append(_SchedulerV2Proxy(self))
|
||||
self._startup_delay = True
|
||||
|
||||
def init_host_with_rpc(self):
|
||||
@ -315,3 +316,59 @@ class SchedulerManager(manager.Manager):
|
||||
rpc.get_notifier("scheduler").error(context,
|
||||
'scheduler.' + method,
|
||||
payload)
|
||||
|
||||
|
||||
# TODO(dulek): This goes away immediately in Newton and is just present in
|
||||
# Mitaka so that we can receive v1.x and v2.0 messages.
|
||||
class _SchedulerV2Proxy(object):
|
||||
|
||||
target = messaging.Target(version='2.0')
|
||||
|
||||
def __init__(self, manager):
|
||||
self.manager = manager
|
||||
|
||||
def update_service_capabilities(self, context, service_name=None,
|
||||
host=None, capabilities=None, **kwargs):
|
||||
return self.manager.update_service_capabilities(
|
||||
context, service_name=service_name, host=host,
|
||||
capabilities=capabilities, **kwargs)
|
||||
|
||||
def create_consistencygroup(self, context, topic, group,
|
||||
request_spec_list=None,
|
||||
filter_properties_list=None):
|
||||
return self.manager.create_consistencygroup(
|
||||
context, topic, group, request_spec_list=request_spec_list,
|
||||
filter_properties_list=None)
|
||||
|
||||
def create_volume(self, context, topic, volume_id, snapshot_id=None,
|
||||
image_id=None, request_spec=None, filter_properties=None,
|
||||
volume=None):
|
||||
return self.manager.create_volume(
|
||||
context, topic, volume_id, snapshot_id=snapshot_id,
|
||||
image_id=image_id, request_spec=request_spec,
|
||||
filter_properties=filter_properties, volume=volume)
|
||||
|
||||
def request_service_capabilities(self, context):
|
||||
return self.manager.request_service_capabilities(context)
|
||||
|
||||
def migrate_volume_to_host(self, context, topic, volume_id, host,
|
||||
force_host_copy, request_spec,
|
||||
filter_properties=None, volume=None):
|
||||
return self.manager.migrate_volume_to_host(
|
||||
context, topic, volume_id, host, force_host_copy, request_spec,
|
||||
filter_properties=filter_properties, volume=volume)
|
||||
|
||||
def retype(self, context, topic, volume_id, request_spec,
|
||||
filter_properties=None, volume=None):
|
||||
return self.manager.retype(context, topic, volume_id, request_spec,
|
||||
filter_properties=filter_properties,
|
||||
volume=volume)
|
||||
|
||||
def manage_existing(self, context, topic, volume_id, request_spec,
|
||||
filter_properties=None):
|
||||
return self.manager.manage_existing(
|
||||
context, topic, volume_id, request_spec,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
def get_pools(self, context, filters=None):
|
||||
return self.manager.get_pools(context, filters=filters)
|
||||
|
@ -44,17 +44,29 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
1.10 - Adds support for sending objects over RPC in retype()
|
||||
1.11 - Adds support for sending objects over RPC in
|
||||
migrate_volume_to_host()
|
||||
|
||||
... Mitaka supports messaging 1.11. Any changes to existing methods in
|
||||
1.x after this point should be done so that they can handle version cap
|
||||
set to 1.11.
|
||||
|
||||
2.0 - Remove 1.x compatibility
|
||||
"""
|
||||
|
||||
RPC_API_VERSION = '1.11'
|
||||
RPC_API_VERSION = '2.0'
|
||||
TOPIC = CONF.scheduler_topic
|
||||
BINARY = 'cinder-scheduler'
|
||||
|
||||
def _compat_ver(self, current, legacy):
|
||||
if self.client.can_send_version(current):
|
||||
return current
|
||||
else:
|
||||
return legacy
|
||||
|
||||
def create_consistencygroup(self, ctxt, topic, group,
|
||||
request_spec_list=None,
|
||||
filter_properties_list=None):
|
||||
|
||||
cctxt = self.client.prepare(version='1.8')
|
||||
version = self._compat_ver('2.0', '1.8')
|
||||
cctxt = self.client.prepare(version=version)
|
||||
request_spec_p_list = []
|
||||
for request_spec in request_spec_list:
|
||||
request_spec_p = jsonutils.to_primitive(request_spec)
|
||||
@ -69,13 +81,15 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
|
||||
image_id=None, request_spec=None,
|
||||
filter_properties=None, volume=None):
|
||||
|
||||
request_spec_p = jsonutils.to_primitive(request_spec)
|
||||
msg_args = {'topic': topic, 'volume_id': volume_id,
|
||||
'snapshot_id': snapshot_id, 'image_id': image_id,
|
||||
'request_spec': request_spec_p,
|
||||
'filter_properties': filter_properties}
|
||||
if self.client.can_send_version('1.9'):
|
||||
if self.client.can_send_version('2.0'):
|
||||
version = '2.0'
|
||||
msg_args['volume'] = volume
|
||||
elif self.client.can_send_version('1.9'):
|
||||
version = '1.9'
|
||||
msg_args['volume'] = volume
|
||||
else:
|
||||
@ -92,7 +106,10 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
'host': host, 'force_host_copy': force_host_copy,
|
||||
'request_spec': request_spec_p,
|
||||
'filter_properties': filter_properties}
|
||||
if self.client.can_send_version('1.11'):
|
||||
if self.client.can_send_version('2.0'):
|
||||
version = '2.0'
|
||||
msg_args['volume'] = volume
|
||||
elif self.client.can_send_version('1.11'):
|
||||
version = '1.11'
|
||||
msg_args['volume'] = volume
|
||||
else:
|
||||
@ -108,7 +125,10 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
msg_args = {'topic': topic, 'volume_id': volume_id,
|
||||
'request_spec': request_spec_p,
|
||||
'filter_properties': filter_properties}
|
||||
if self.client.can_send_version('1.10'):
|
||||
if self.client.can_send_version('2.0'):
|
||||
version = '2.0'
|
||||
msg_args['volume'] = volume
|
||||
elif self.client.can_send_version('1.10'):
|
||||
version = '1.10'
|
||||
msg_args['volume'] = volume
|
||||
else:
|
||||
@ -119,7 +139,8 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
|
||||
def manage_existing(self, ctxt, topic, volume_id,
|
||||
request_spec=None, filter_properties=None):
|
||||
cctxt = self.client.prepare(version='1.5')
|
||||
version = self._compat_ver('2.0', '1.5')
|
||||
cctxt = self.client.prepare(version=version)
|
||||
request_spec_p = jsonutils.to_primitive(request_spec)
|
||||
return cctxt.cast(ctxt, 'manage_existing',
|
||||
topic=topic,
|
||||
@ -128,7 +149,8 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
filter_properties=filter_properties)
|
||||
|
||||
def get_pools(self, ctxt, filters=None):
|
||||
cctxt = self.client.prepare(version='1.7')
|
||||
version = self._compat_ver('2.0', '1.7')
|
||||
cctxt = self.client.prepare(version=version)
|
||||
return cctxt.call(ctxt, 'get_pools',
|
||||
filters=filters)
|
||||
|
||||
@ -136,7 +158,8 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
service_name, host,
|
||||
capabilities):
|
||||
# FIXME(flaper87): What to do with fanout?
|
||||
cctxt = self.client.prepare(fanout=True, version='1.0')
|
||||
version = self._compat_ver('2.0', '1.0')
|
||||
cctxt = self.client.prepare(fanout=True, version=version)
|
||||
cctxt.cast(ctxt, 'update_service_capabilities',
|
||||
service_name=service_name, host=host,
|
||||
capabilities=capabilities)
|
||||
|
@ -75,7 +75,20 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
for kwarg, value in self.fake_kwargs.items():
|
||||
self.assertEqual(expected_msg[kwarg], value)
|
||||
|
||||
def test_update_service_capabilities(self):
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
|
||||
def test_update_service_capabilities(self, can_send_version):
|
||||
self._test_scheduler_api('update_service_capabilities',
|
||||
rpc_method='cast',
|
||||
service_name='fake_name',
|
||||
host='fake_host',
|
||||
capabilities='fake_capabilities',
|
||||
fanout=True,
|
||||
version='2.0')
|
||||
can_send_version.assert_called_once_with('2.0')
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=False)
|
||||
def test_update_service_capabilities_old(self, can_send_version):
|
||||
self._test_scheduler_api('update_service_capabilities',
|
||||
rpc_method='cast',
|
||||
service_name='fake_name',
|
||||
@ -83,6 +96,7 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
capabilities='fake_capabilities',
|
||||
fanout=True,
|
||||
version='1.0')
|
||||
can_send_version.assert_called_once_with('2.0')
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=True)
|
||||
@ -96,8 +110,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
volume='volume',
|
||||
version='1.9')
|
||||
can_send_version.assert_called_once_with('1.9')
|
||||
version='2.0')
|
||||
can_send_version.assert_called_once_with('2.0')
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=False)
|
||||
@ -112,7 +126,7 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
version='1.2')
|
||||
can_send_version.assert_called_once_with('1.9')
|
||||
can_send_version.assert_has_calls([mock.call('2.0'), mock.call('1.9')])
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=True)
|
||||
@ -126,8 +140,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
volume='volume',
|
||||
version='1.11')
|
||||
can_send_version.assert_called_once_with('1.11')
|
||||
version='2.0')
|
||||
can_send_version.assert_called_once_with('2.0')
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=False)
|
||||
@ -142,7 +156,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
filter_properties='filter_properties',
|
||||
volume='volume',
|
||||
version='1.3')
|
||||
can_send_version.assert_called_once_with('1.11')
|
||||
can_send_version.assert_has_calls([mock.call('2.0'),
|
||||
mock.call('1.11')])
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=True)
|
||||
@ -154,8 +169,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
volume='volume',
|
||||
version='1.10')
|
||||
can_send_version.assert_called_with('1.10')
|
||||
version='2.0')
|
||||
can_send_version.assert_called_with('2.0')
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=False)
|
||||
@ -168,9 +183,23 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
filter_properties='filter_properties',
|
||||
volume='volume',
|
||||
version='1.4')
|
||||
can_send_version.assert_called_with('1.10')
|
||||
can_send_version.assert_has_calls([mock.call('2.0'),
|
||||
mock.call('1.10')])
|
||||
|
||||
def test_manage_existing(self):
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
|
||||
def test_manage_existing(self, can_send_version):
|
||||
self._test_scheduler_api('manage_existing',
|
||||
rpc_method='cast',
|
||||
topic='topic',
|
||||
volume_id='volume_id',
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
version='2.0')
|
||||
can_send_version.assert_called_with('2.0')
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=False)
|
||||
def test_manage_existing_old(self, can_send_version):
|
||||
self._test_scheduler_api('manage_existing',
|
||||
rpc_method='cast',
|
||||
topic='topic',
|
||||
@ -178,9 +207,21 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
version='1.5')
|
||||
can_send_version.assert_called_with('2.0')
|
||||
|
||||
def test_get_pools(self):
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
|
||||
def test_get_pools(self, can_send_version):
|
||||
self._test_scheduler_api('get_pools',
|
||||
rpc_method='call',
|
||||
filters=None,
|
||||
version='2.0')
|
||||
can_send_version.assert_called_with('2.0')
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=False)
|
||||
def test_get_pools_old(self, can_send_version):
|
||||
self._test_scheduler_api('get_pools',
|
||||
rpc_method='call',
|
||||
filters=None,
|
||||
version='1.7')
|
||||
can_send_version.assert_called_with('2.0')
|
||||
|
Loading…
x
Reference in New Issue
Block a user