diff --git a/cinder/backup/rpcapi.py b/cinder/backup/rpcapi.py index feb7ec80a62..dbce65a487d 100644 --- a/cinder/backup/rpcapi.py +++ b/cinder/backup/rpcapi.py @@ -48,32 +48,24 @@ class BackupAPI(rpc.RPCAPI): """ RPC_API_VERSION = '2.0' + RPC_DEFAULT_VERSION = '2.0' TOPIC = constants.BACKUP_TOPIC BINARY = 'cinder-backup' - def _compat_ver(self, current, legacy): - if self.client.can_send_version(current): - return current - else: - return legacy - def create_backup(self, ctxt, backup): LOG.debug("create_backup in rpcapi backup_id %s", backup.id) - version = '2.0' - cctxt = self.client.prepare(server=backup.host, version=version) + cctxt = self._get_cctxt(server=backup.host) cctxt.cast(ctxt, 'create_backup', backup=backup) def restore_backup(self, ctxt, volume_host, backup, volume_id): LOG.debug("restore_backup in rpcapi backup_id %s", backup.id) - version = '2.0' - cctxt = self.client.prepare(server=volume_host, version=version) + cctxt = self._get_cctxt(server=volume_host) cctxt.cast(ctxt, 'restore_backup', backup=backup, volume_id=volume_id) def delete_backup(self, ctxt, backup): LOG.debug("delete_backup rpcapi backup_id %s", backup.id) - version = '2.0' - cctxt = self.client.prepare(server=backup.host, version=version) + cctxt = self._get_cctxt(server=backup.host) cctxt.cast(ctxt, 'delete_backup', backup=backup) def export_record(self, ctxt, backup): @@ -81,24 +73,15 @@ class BackupAPI(rpc.RPCAPI): "on host %(host)s.", {'id': backup.id, 'host': backup.host}) - version = '2.0' - cctxt = self.client.prepare(server=backup.host, version=version) + cctxt = self._get_cctxt(server=backup.host) return cctxt.call(ctxt, 'export_record', backup=backup) - def import_record(self, - ctxt, - host, - backup, - backup_service, - backup_url, + def import_record(self, ctxt, host, backup, backup_service, backup_url, backup_hosts): LOG.debug("import_record rpcapi backup id %(id)s " "on host %(host)s for backup_url %(url)s.", - {'id': backup.id, - 'host': host, - 'url': backup_url}) - version = '2.0' - cctxt = self.client.prepare(server=host, version=version) + {'id': backup.id, 'host': host, 'url': backup_url}) + cctxt = self._get_cctxt(server=host) cctxt.cast(ctxt, 'import_record', backup=backup, backup_service=backup_service, @@ -108,15 +91,12 @@ class BackupAPI(rpc.RPCAPI): def reset_status(self, ctxt, backup, status): LOG.debug("reset_status in rpcapi backup_id %(id)s " "on host %(host)s.", - {'id': backup.id, - 'host': backup.host}) - version = '2.0' - cctxt = self.client.prepare(server=backup.host, version=version) + {'id': backup.id, 'host': backup.host}) + cctxt = self._get_cctxt(server=backup.host) return cctxt.cast(ctxt, 'reset_status', backup=backup, status=status) def check_support_to_force_delete(self, ctxt, host): LOG.debug("Check if backup driver supports force delete " "on host %(host)s.", {'host': host}) - version = '2.0' - cctxt = self.client.prepare(server=host, version=version) + cctxt = self._get_cctxt(server=host) return cctxt.call(ctxt, 'check_support_to_force_delete') diff --git a/cinder/rpc.py b/cinder/rpc.py index 4308a8819e9..4016e74535c 100644 --- a/cinder/rpc.py +++ b/cinder/rpc.py @@ -29,7 +29,6 @@ __all__ = [ from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging -from oslo_serialization import jsonutils from oslo_utils import importutils profiler = importutils.try_import('osprofiler.profiler') @@ -74,7 +73,7 @@ def init(conf): allowed_remote_exmods=exmods, aliases=TRANSPORT_ALIASES) - serializer = RequestContextSerializer(JsonPayloadSerializer()) + serializer = RequestContextSerializer(messaging.JsonPayloadSerializer()) NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT, serializer=serializer) @@ -108,12 +107,6 @@ def get_allowed_exmods(): return ALLOWED_EXMODS + EXTRA_EXMODS -class JsonPayloadSerializer(messaging.NoOpSerializer): - @staticmethod - def serialize_entity(context, entity): - return jsonutils.to_primitive(entity, convert_instances=True) - - class RequestContextSerializer(messaging.Serializer): def __init__(self, base): @@ -185,6 +178,7 @@ class RPCAPI(object): """Mixin class aggregating methods related to RPC API compatibility.""" RPC_API_VERSION = '1.0' + RPC_DEFAULT_VERSION = '1.0' TOPIC = '' BINARY = '' @@ -205,6 +199,20 @@ class RPCAPI(object): return version return versions[-1] + def _get_cctxt(self, host=None, version=None, **kwargs): + """Prepare client context + + Version parameter accepts single version string or tuple of strings. + Compatible version can be obtained later using: + cctxt = _get_cctxt(...) + version = cctxt.target.version + """ + if version is None: + version = self.RPC_DEFAULT_VERSION + if isinstance(version, tuple): + version = self._compat_ver(*version) + return self.client.prepare(version=version, **kwargs) + @classmethod def determine_rpc_version_cap(cls): global LAST_RPC_VERSIONS diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index 888c4c39d48..6dd00cba55f 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -61,18 +61,15 @@ class SchedulerAPI(rpc.RPCAPI): """ RPC_API_VERSION = '3.0' + RPC_DEFAULT_VERSION = '3.0' TOPIC = constants.SCHEDULER_TOPIC BINARY = 'cinder-scheduler' def create_consistencygroup(self, ctxt, group, request_spec_list=None, filter_properties_list=None): - version = '3.0' - cctxt = self.client.prepare(version=version) - request_spec_p_list = [] - for request_spec in request_spec_list: - request_spec_p = jsonutils.to_primitive(request_spec) - request_spec_p_list.append(request_spec_p) - + cctxt = self._get_cctxt() + request_spec_p_list = [jsonutils.to_primitive(rs) + for rs in request_spec_list] msg_args = { 'group': group, 'request_spec_list': request_spec_p_list, 'filter_properties_list': filter_properties_list, @@ -83,14 +80,10 @@ class SchedulerAPI(rpc.RPCAPI): def create_group(self, ctxt, group, group_spec=None, request_spec_list=None, group_filter_properties=None, filter_properties_list=None): - version = '3.0' - cctxt = self.client.prepare(version=version) - request_spec_p_list = [] - for request_spec in request_spec_list: - request_spec_p = jsonutils.to_primitive(request_spec) - request_spec_p_list.append(request_spec_p) + cctxt = self._get_cctxt() + request_spec_p_list = [jsonutils.to_primitive(rs) + for rs in request_spec_list] group_spec_p = jsonutils.to_primitive(group_spec) - msg_args = { 'group': group, 'group_spec': group_spec_p, 'request_spec_list': request_spec_p_list, @@ -102,52 +95,46 @@ class SchedulerAPI(rpc.RPCAPI): def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None, request_spec=None, filter_properties=None): + cctxt = self._get_cctxt() msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id, 'request_spec': request_spec, 'filter_properties': filter_properties, 'volume': volume} - version = '3.0' - cctxt = self.client.prepare(version=version) return cctxt.cast(ctxt, 'create_volume', **msg_args) def migrate_volume_to_host(self, ctxt, volume, host, force_host_copy=False, request_spec=None, filter_properties=None): + cctxt = self._get_cctxt() request_spec_p = jsonutils.to_primitive(request_spec) msg_args = {'host': host, 'force_host_copy': force_host_copy, 'request_spec': request_spec_p, 'filter_properties': filter_properties, 'volume': volume} - version = '3.0' - cctxt = self.client.prepare(version=version) + return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args) def retype(self, ctxt, volume, request_spec=None, filter_properties=None): + cctxt = self._get_cctxt() request_spec_p = jsonutils.to_primitive(request_spec) msg_args = {'request_spec': request_spec_p, 'filter_properties': filter_properties, 'volume': volume} - version = '3.0' - cctxt = self.client.prepare(version=version) return cctxt.cast(ctxt, 'retype', **msg_args) def manage_existing(self, ctxt, volume, request_spec=None, filter_properties=None): + cctxt = self._get_cctxt() request_spec_p = jsonutils.to_primitive(request_spec) msg_args = { 'request_spec': request_spec_p, 'filter_properties': filter_properties, 'volume': volume, } - version = '3.0' - cctxt = self.client.prepare(version=version) return cctxt.cast(ctxt, 'manage_existing', **msg_args) def get_pools(self, ctxt, filters=None): - version = '3.0' - cctxt = self.client.prepare(version=version) - return cctxt.call(ctxt, 'get_pools', - filters=filters) + cctxt = self._get_cctxt() + return cctxt.call(ctxt, 'get_pools', filters=filters) def update_service_capabilities(self, ctxt, service_name, host, capabilities): - version = '3.0' - cctxt = self.client.prepare(fanout=True, version=version) + cctxt = self._get_cctxt(fanout=True) cctxt.cast(ctxt, 'update_service_capabilities', service_name=service_name, host=host, capabilities=capabilities) diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index 0448fc7c4a4..7c55fd03894 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -113,29 +113,26 @@ class VolumeAPI(rpc.RPCAPI): """ RPC_API_VERSION = '3.1' + RPC_DEFAULT_VERSION = '3.0' TOPIC = constants.VOLUME_TOPIC BINARY = 'cinder-volume' - def _get_cctxt(self, host, version): - new_host = utils.get_volume_rpc_host(host) - return self.client.prepare(server=new_host, version=version) + def _get_cctxt(self, host=None, **kwargs): + if host is not None: + kwargs['server'] = utils.get_volume_rpc_host(host) + return super(VolumeAPI, self)._get_cctxt(**kwargs) def create_consistencygroup(self, ctxt, group, host): - version = '3.0' - cctxt = self._get_cctxt(host, version) - cctxt.cast(ctxt, 'create_consistencygroup', - group=group) + cctxt = self._get_cctxt(host) + cctxt.cast(ctxt, 'create_consistencygroup', group=group) def delete_consistencygroup(self, ctxt, group): - version = '3.0' - cctxt = self._get_cctxt(group.host, version) - cctxt.cast(ctxt, 'delete_consistencygroup', - group=group) + cctxt = self._get_cctxt(group.host) + cctxt.cast(ctxt, 'delete_consistencygroup', group=group) def update_consistencygroup(self, ctxt, group, add_volumes=None, remove_volumes=None): - version = '3.0' - cctxt = self._get_cctxt(group.host, version) + cctxt = self._get_cctxt(group.host) cctxt.cast(ctxt, 'update_consistencygroup', group=group, add_volumes=add_volumes, @@ -143,38 +140,31 @@ class VolumeAPI(rpc.RPCAPI): def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None, source_cg=None): - version = '3.0' - cctxt = self._get_cctxt(group.host, version) + cctxt = self._get_cctxt(group.host) cctxt.cast(ctxt, 'create_consistencygroup_from_src', group=group, cgsnapshot=cgsnapshot, source_cg=source_cg) def create_cgsnapshot(self, ctxt, cgsnapshot): - version = '3.0' - cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, version) + cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host) cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot) def delete_cgsnapshot(self, ctxt, cgsnapshot): - version = '3.0' - cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, version) + cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host) cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot) def create_volume(self, ctxt, volume, host, request_spec, filter_properties, allow_reschedule=True): - msg_args = {'request_spec': request_spec, - 'filter_properties': filter_properties, - 'allow_reschedule': allow_reschedule, - 'volume': volume, - } - version = '3.0' - - cctxt = self._get_cctxt(host, version) - cctxt.cast(ctxt, 'create_volume', **msg_args) + cctxt = self._get_cctxt(host) + cctxt.cast(ctxt, 'create_volume', + request_spec=request_spec, + filter_properties=filter_properties, + allow_reschedule=allow_reschedule, + volume=volume) def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False): - version = '3.0' - cctxt = self._get_cctxt(volume.host, version) + cctxt = self._get_cctxt(volume.host) msg_args = { 'volume': volume, 'unmanage_only': unmanage_only, 'cascade': cascade, @@ -183,20 +173,17 @@ class VolumeAPI(rpc.RPCAPI): cctxt.cast(ctxt, 'delete_volume', **msg_args) def create_snapshot(self, ctxt, volume, snapshot): - version = '3.0' - cctxt = self._get_cctxt(volume['host'], version) + cctxt = self._get_cctxt(volume['host']) cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot) def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False): - version = '3.0' - cctxt = self._get_cctxt(host, version) + cctxt = self._get_cctxt(host) cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot, unmanage_only=unmanage_only) def attach_volume(self, ctxt, volume, instance_uuid, host_name, mountpoint, mode): - version = '3.0' - cctxt = self._get_cctxt(volume['host'], version) + cctxt = self._get_cctxt(volume['host']) return cctxt.call(ctxt, 'attach_volume', volume_id=volume['id'], instance_uuid=instance_uuid, @@ -205,209 +192,153 @@ class VolumeAPI(rpc.RPCAPI): mode=mode) def detach_volume(self, ctxt, volume, attachment_id): - version = '3.0' - cctxt = self._get_cctxt(volume['host'], version) + cctxt = self._get_cctxt(volume['host']) return cctxt.call(ctxt, 'detach_volume', volume_id=volume['id'], attachment_id=attachment_id) def copy_volume_to_image(self, ctxt, volume, image_meta): - version = '3.0' - cctxt = self._get_cctxt(volume['host'], version) + cctxt = self._get_cctxt(volume['host']) cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'], image_meta=image_meta) def initialize_connection(self, ctxt, volume, connector): - version = '3.0' - msg_args = {'connector': connector, 'volume': volume} - - cctxt = self._get_cctxt(volume['host'], version=version) - return cctxt.call(ctxt, 'initialize_connection', **msg_args) + cctxt = self._get_cctxt(volume['host']) + return cctxt.call(ctxt, 'initialize_connection', connector=connector, + volume=volume) def terminate_connection(self, ctxt, volume, connector, force=False): - version = '3.0' - cctxt = self._get_cctxt(volume['host'], version) + cctxt = self._get_cctxt(volume['host']) return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'], connector=connector, force=force) def remove_export(self, ctxt, volume): - version = '3.0' - cctxt = self._get_cctxt(volume['host'], version) + cctxt = self._get_cctxt(volume['host']) cctxt.cast(ctxt, 'remove_export', volume_id=volume['id']) def publish_service_capabilities(self, ctxt): - version = '3.0' - cctxt = self.client.prepare(fanout=True, version=version) + cctxt = self._get_cctxt(fanout=True) cctxt.cast(ctxt, 'publish_service_capabilities') def accept_transfer(self, ctxt, volume, new_user, new_project): - version = '3.0' - cctxt = self._get_cctxt(volume['host'], version) + cctxt = self._get_cctxt(volume['host']) return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'], new_user=new_user, new_project=new_project) def extend_volume(self, ctxt, volume, new_size, reservations): - version = '3.0' - cctxt = self._get_cctxt(volume.host, version) - msg_args = { - 'volume': volume, 'new_size': new_size, - 'reservations': reservations, - } - cctxt.cast(ctxt, 'extend_volume', **msg_args) + cctxt = self._get_cctxt(volume.host) + cctxt.cast(ctxt, 'extend_volume', volume=volume, new_size=new_size, + reservations=reservations) def migrate_volume(self, ctxt, volume, dest_host, force_host_copy): host_p = {'host': dest_host.host, 'capabilities': dest_host.capabilities} - version = '3.0' - cctxt = self._get_cctxt(volume.host, version) - - msg_args = { - 'volume': volume, 'host': host_p, - 'force_host_copy': force_host_copy, - } - - cctxt.cast(ctxt, 'migrate_volume', **msg_args) + cctxt = self._get_cctxt(volume.host) + cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=host_p, + force_host_copy=force_host_copy) def migrate_volume_completion(self, ctxt, volume, new_volume, error): - version = '3.0' - cctxt = self._get_cctxt(volume.host, version) - - msg_args = { - 'volume': volume, 'new_volume': new_volume, 'error': error, - } - - return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args) + cctxt = self._get_cctxt(volume.host) + return cctxt.call(ctxt, 'migrate_volume_completion', volume=volume, + new_volume=new_volume, error=error,) def retype(self, ctxt, volume, new_type_id, dest_host, migration_policy='never', reservations=None, old_reservations=None): host_p = {'host': dest_host.host, 'capabilities': dest_host.capabilities} - version = '3.0' - cctxt = self._get_cctxt(volume.host, version) - - msg_args = { - 'volume': volume, 'new_type_id': new_type_id, 'host': host_p, - 'migration_policy': migration_policy, 'reservations': reservations, - 'old_reservations': old_reservations, - } - - cctxt.cast(ctxt, 'retype', **msg_args) + cctxt = self._get_cctxt(volume.host) + cctxt.cast(ctxt, 'retype', volume=volume, new_type_id=new_type_id, + host=host_p, migration_policy=migration_policy, + reservations=reservations, + old_reservations=old_reservations) def manage_existing(self, ctxt, volume, ref): - msg_args = { - 'ref': ref, 'volume': volume, - } - version = '3.0' - cctxt = self._get_cctxt(volume.host, version) - cctxt.cast(ctxt, 'manage_existing', **msg_args) + cctxt = self._get_cctxt(volume.host) + cctxt.cast(ctxt, 'manage_existing', ref=ref, volume=volume) def update_migrated_volume(self, ctxt, volume, new_volume, original_volume_status): - version = '3.0' - cctxt = self._get_cctxt(new_volume['host'], version) - cctxt.call(ctxt, - 'update_migrated_volume', + cctxt = self._get_cctxt(new_volume['host']) + cctxt.call(ctxt, 'update_migrated_volume', volume=volume, new_volume=new_volume, volume_status=original_volume_status) def freeze_host(self, ctxt, host): """Set backend host to frozen.""" - version = '3.0' - cctxt = self._get_cctxt(host, version) + cctxt = self._get_cctxt(host) return cctxt.call(ctxt, 'freeze_host') def thaw_host(self, ctxt, host): """Clear the frozen setting on a backend host.""" - version = '3.0' - cctxt = self._get_cctxt(host, version) + cctxt = self._get_cctxt(host) return cctxt.call(ctxt, 'thaw_host') def failover_host(self, ctxt, host, secondary_backend_id=None): - """Failover host to the specified backend_id (secondary). """ - version = '3.0' - cctxt = self._get_cctxt(host, version) + """Failover host to the specified backend_id (secondary).""" + cctxt = self._get_cctxt(host) cctxt.cast(ctxt, 'failover_host', secondary_backend_id=secondary_backend_id) def manage_existing_snapshot(self, ctxt, snapshot, ref, host): - version = '3.0' - cctxt = self._get_cctxt(host, version) + cctxt = self._get_cctxt(host) cctxt.cast(ctxt, 'manage_existing_snapshot', snapshot=snapshot, ref=ref) def get_capabilities(self, ctxt, host, discover): - version = '3.0' - cctxt = self._get_cctxt(host, version) + cctxt = self._get_cctxt(host) return cctxt.call(ctxt, 'get_capabilities', discover=discover) def get_backup_device(self, ctxt, backup, volume): - version = '3.0' - cctxt = self._get_cctxt(volume.host, version) + cctxt = self._get_cctxt(volume.host) backup_dict = cctxt.call(ctxt, 'get_backup_device', backup=backup) return backup_dict def secure_file_operations_enabled(self, ctxt, volume): - version = '3.0' - cctxt = self._get_cctxt(volume.host, version) + cctxt = self._get_cctxt(volume.host) return cctxt.call(ctxt, 'secure_file_operations_enabled', volume=volume) def get_manageable_volumes(self, ctxt, host, marker, limit, offset, sort_keys, sort_dirs): - version = '3.0' - cctxt = self._get_cctxt(host, version) + cctxt = self._get_cctxt(host) return cctxt.call(ctxt, 'get_manageable_volumes', marker=marker, limit=limit, offset=offset, sort_keys=sort_keys, sort_dirs=sort_dirs) def get_manageable_snapshots(self, ctxt, host, marker, limit, offset, sort_keys, sort_dirs): - version = '3.0' - cctxt = self._get_cctxt(host, version) + cctxt = self._get_cctxt(host) return cctxt.call(ctxt, 'get_manageable_snapshots', marker=marker, limit=limit, offset=offset, sort_keys=sort_keys, sort_dirs=sort_dirs) def create_group(self, ctxt, group, host): - version = '3.0' - cctxt = self._get_cctxt(host, version) - cctxt.cast(ctxt, 'create_group', - group=group) + cctxt = self._get_cctxt(host) + cctxt.cast(ctxt, 'create_group', group=group) def delete_group(self, ctxt, group): - version = '3.0' - cctxt = self._get_cctxt(group.host, version) - cctxt.cast(ctxt, 'delete_group', - group=group) + cctxt = self._get_cctxt(group.host) + cctxt.cast(ctxt, 'delete_group', group=group) - def update_group(self, ctxt, group, add_volumes=None, - remove_volumes=None): - version = '3.0' - cctxt = self._get_cctxt(group.host, version) - cctxt.cast(ctxt, 'update_group', - group=group, - add_volumes=add_volumes, + def update_group(self, ctxt, group, add_volumes=None, remove_volumes=None): + cctxt = self._get_cctxt(group.host) + cctxt.cast(ctxt, 'update_group', group=group, add_volumes=add_volumes, remove_volumes=remove_volumes) def create_group_from_src(self, ctxt, group, group_snapshot=None, source_group=None): - version = '3.0' - cctxt = self._get_cctxt(group.host, version) - cctxt.cast(ctxt, 'create_group_from_src', - group=group, - group_snapshot=group_snapshot, - source_group=source_group) + cctxt = self._get_cctxt(group.host) + cctxt.cast(ctxt, 'create_group_from_src', group=group, + group_snapshot=group_snapshot, source_group=source_group) def create_group_snapshot(self, ctxt, group_snapshot): - version = '3.0' - cctxt = self._get_cctxt(group_snapshot.group.host, version) + cctxt = self._get_cctxt(group_snapshot.group.host) cctxt.cast(ctxt, 'create_group_snapshot', group_snapshot=group_snapshot) def delete_group_snapshot(self, ctxt, group_snapshot): - version = '3.0' - cctxt = self._get_cctxt(group_snapshot.group.host, version) + cctxt = self._get_cctxt(group_snapshot.group.host) cctxt.cast(ctxt, 'delete_group_snapshot', group_snapshot=group_snapshot)