Merge "Support A/A on Scheduler operations"
This commit is contained in:
commit
4a556303e2
@ -349,6 +349,13 @@ cgsnapshot_id:
|
||||
in: body
|
||||
required: false
|
||||
type: string
|
||||
cluster_mutex:
|
||||
description: |
|
||||
The OpenStack Block Storage cluster where the resource resides. Optional
|
||||
only if host field is provided.
|
||||
in: body
|
||||
required: false
|
||||
type: string
|
||||
connector:
|
||||
description: |
|
||||
The ``connector`` object.
|
||||
@ -638,6 +645,13 @@ host:
|
||||
in: body
|
||||
required: true
|
||||
type: string
|
||||
host_mutex:
|
||||
description: |
|
||||
The OpenStack Block Storage host where the existing resource resides.
|
||||
Optional only if cluster field is provided.
|
||||
in: body
|
||||
required: false
|
||||
type: string
|
||||
host_name:
|
||||
description: |
|
||||
The name of the attaching host.
|
||||
|
19
api-ref/source/v3/samples/volume-manage-request-cluster.json
Normal file
19
api-ref/source/v3/samples/volume-manage-request-cluster.json
Normal file
@ -0,0 +1,19 @@
|
||||
{
|
||||
"volume": {
|
||||
"host": null,
|
||||
"cluster": "cluster@backend",
|
||||
"ref": {
|
||||
"source-name": "existingLV",
|
||||
"source-id": "1234"
|
||||
},
|
||||
"name": "New Volume",
|
||||
"availability_zone": "az2",
|
||||
"description": "Volume imported from existingLV",
|
||||
"volume_type": null,
|
||||
"bootable": true,
|
||||
"metadata": {
|
||||
"key1": "value1",
|
||||
"key2": "value2"
|
||||
}
|
||||
}
|
||||
}
|
@ -24,6 +24,10 @@ or source-name element, if possible.
|
||||
The API chooses the size of the volume by rounding up the size of
|
||||
the existing storage volume to the next gibibyte (GiB).
|
||||
|
||||
Prior to microversion 3.16 host field was required, with the possibility of
|
||||
defining the cluster it is no longer required, but we must have either a host
|
||||
or a cluster field but we cannot have them both with values.
|
||||
|
||||
Error response codes:202,
|
||||
|
||||
|
||||
@ -38,7 +42,8 @@ Request
|
||||
- volume_type: volume_type
|
||||
- name: name
|
||||
- volume: volume
|
||||
- host: host
|
||||
- host: host_mutex
|
||||
- cluster: cluster_mutex
|
||||
- ref: ref
|
||||
- metadata: metadata
|
||||
- tenant_id: tenant_id
|
||||
@ -48,3 +53,6 @@ Request Example
|
||||
|
||||
.. literalinclude:: ./samples/volume-manage-request.json
|
||||
:language: javascript
|
||||
|
||||
.. literalinclude:: ./samples/volume-manage-request-cluster.json
|
||||
:language: javascript
|
||||
|
@ -370,3 +370,17 @@ class ViewBuilder(object):
|
||||
url_parts[2] = prefix_parts[2] + url_parts[2]
|
||||
|
||||
return urllib.parse.urlunsplit(url_parts).rstrip('/')
|
||||
|
||||
|
||||
def get_cluster_host(req, params, cluster_version):
|
||||
if req.api_version_request.matches(cluster_version):
|
||||
cluster_name = params.get('cluster')
|
||||
msg = _('One and only one of cluster and host must be set.')
|
||||
else:
|
||||
cluster_name = None
|
||||
msg = _('Host field is missing.')
|
||||
|
||||
host = params.get('host')
|
||||
if bool(cluster_name) == bool(host):
|
||||
raise exception.InvalidInput(reason=msg)
|
||||
return cluster_name, host
|
||||
|
@ -17,6 +17,7 @@ import oslo_messaging as messaging
|
||||
import webob
|
||||
from webob import exc
|
||||
|
||||
from cinder.api import common
|
||||
from cinder.api import extensions
|
||||
from cinder.api.openstack import wsgi
|
||||
from cinder import backup
|
||||
@ -241,14 +242,12 @@ class VolumeAdminController(AdminController):
|
||||
# Not found exception will be handled at the wsgi level
|
||||
volume = self._get(context, id)
|
||||
params = body['os-migrate_volume']
|
||||
try:
|
||||
host = params['host']
|
||||
except KeyError:
|
||||
raise exc.HTTPBadRequest(explanation=_("Must specify 'host'."))
|
||||
|
||||
cluster_name, host = common.get_cluster_host(req, params, '3.16')
|
||||
force_host_copy = utils.get_bool_param('force_host_copy', params)
|
||||
lock_volume = utils.get_bool_param('lock_volume', params)
|
||||
self.volume_api.migrate_volume(context, volume, host, force_host_copy,
|
||||
lock_volume)
|
||||
self.volume_api.migrate_volume(context, volume, host, cluster_name,
|
||||
force_host_copy, lock_volume)
|
||||
return webob.Response(status_int=202)
|
||||
|
||||
@wsgi.action('os-migrate_volume_completion')
|
||||
|
@ -13,8 +13,8 @@
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
from webob import exc
|
||||
|
||||
from cinder.api import common
|
||||
from cinder.api.contrib import resource_common_manage
|
||||
from cinder.api import extensions
|
||||
from cinder.api.openstack import wsgi
|
||||
@ -64,6 +64,7 @@ class VolumeManageController(wsgi.Controller):
|
||||
'volume':
|
||||
{
|
||||
'host': <Cinder host on which the existing storage resides>,
|
||||
'cluster': <Cinder cluster on which the storage resides>,
|
||||
'ref': <Driver-specific reference to existing storage object>,
|
||||
}
|
||||
}
|
||||
@ -106,13 +107,10 @@ class VolumeManageController(wsgi.Controller):
|
||||
|
||||
# Check that the required keys are present, return an error if they
|
||||
# are not.
|
||||
required_keys = set(['ref', 'host'])
|
||||
missing_keys = list(required_keys - set(volume.keys()))
|
||||
if 'ref' not in volume:
|
||||
raise exception.MissingRequired(element='ref')
|
||||
|
||||
if missing_keys:
|
||||
msg = _("The following elements are required: %s") % \
|
||||
', '.join(missing_keys)
|
||||
raise exc.HTTPBadRequest(explanation=msg)
|
||||
cluster_name, host = common.get_cluster_host(req, volume, '3.16')
|
||||
|
||||
LOG.debug('Manage volume request body: %s', body)
|
||||
|
||||
@ -139,7 +137,8 @@ class VolumeManageController(wsgi.Controller):
|
||||
|
||||
try:
|
||||
new_volume = self.volume_api.manage_existing(context,
|
||||
volume['host'],
|
||||
host,
|
||||
cluster_name,
|
||||
volume['ref'],
|
||||
**kwargs)
|
||||
except exception.ServiceNotFound:
|
||||
|
@ -64,6 +64,7 @@ REST_API_VERSION_HISTORY = """
|
||||
* 3.14 - Add group snapshot and create group from src APIs.
|
||||
* 3.15 - Inject the response's `Etag` header to avoid the lost update
|
||||
problem with volume metadata.
|
||||
* 3.16 - Migrate volume now supports cluster
|
||||
"""
|
||||
|
||||
# The minimum and maximum versions of the API supported
|
||||
@ -71,7 +72,7 @@ REST_API_VERSION_HISTORY = """
|
||||
# minimum version of the API supported.
|
||||
# Explicitly using /v1 or /v2 enpoints will still work
|
||||
_MIN_API_VERSION = "3.0"
|
||||
_MAX_API_VERSION = "3.15"
|
||||
_MAX_API_VERSION = "3.16"
|
||||
_LEGACY_API_VERSION1 = "1.0"
|
||||
_LEGACY_API_VERSION2 = "2.0"
|
||||
|
||||
|
@ -191,3 +191,12 @@ user documentation.
|
||||
------------------------
|
||||
Added injecting the response's `Etag` header to avoid the lost update
|
||||
problem with volume metadata.
|
||||
|
||||
3.16
|
||||
----
|
||||
os-migrate_volume now accepts ``cluster`` parameter when we want to migrate a
|
||||
volume to a cluster. If we pass the ``host`` parameter for a volume that is
|
||||
in a cluster, the request will be sent to the cluster as if we had requested
|
||||
that specific cluster. Only ``host`` or ``cluster`` can be provided.
|
||||
|
||||
Creating a managed volume also supports the cluster parameter.
|
||||
|
@ -424,7 +424,7 @@ def _filter_host(field, value, match_level=None):
|
||||
|
||||
def _service_query(context, session=None, read_deleted='no', host=None,
|
||||
cluster_name=None, is_up=None, backend_match_level=None,
|
||||
**filters):
|
||||
disabled=None, **filters):
|
||||
filters = _clean_filters(filters)
|
||||
if filters and not is_valid_model_filters(models.Service, filters):
|
||||
return None
|
||||
@ -442,6 +442,22 @@ def _service_query(context, session=None, read_deleted='no', host=None,
|
||||
query = query.filter(_filter_host(models.Service.cluster_name,
|
||||
cluster_name, backend_match_level))
|
||||
|
||||
# Now that we have clusters, a service is disabled if the service doesn't
|
||||
# belong to a cluster or if it belongs to a cluster and the cluster itself
|
||||
# is disabled.
|
||||
if disabled is not None:
|
||||
disabled_filter = or_(
|
||||
and_(models.Service.cluster_name.is_(None),
|
||||
models.Service.disabled),
|
||||
and_(models.Service.cluster_name.isnot(None),
|
||||
sql.exists().where(and_(
|
||||
models.Cluster.name == models.Service.cluster_name,
|
||||
models.Cluster.binary == models.Service.binary,
|
||||
~models.Cluster.deleted,
|
||||
models.Cluster.disabled))))
|
||||
if not disabled:
|
||||
disabled_filter = ~disabled_filter
|
||||
query = query.filter(disabled_filter)
|
||||
if filters:
|
||||
query = query.filter_by(**filters)
|
||||
|
||||
@ -5074,16 +5090,14 @@ def consistencygroup_create(context, values, cg_snap_id=None, cg_id=None):
|
||||
|
||||
if conditions:
|
||||
# We don't want duplicated field values
|
||||
values.pop('volume_type_id', None)
|
||||
values.pop('availability_zone', None)
|
||||
values.pop('host', None)
|
||||
names = ['volume_type_id', 'availability_zone', 'host',
|
||||
'cluster_name']
|
||||
for name in names:
|
||||
values.pop(name, None)
|
||||
|
||||
sel = session.query(cg_model.volume_type_id,
|
||||
cg_model.availability_zone,
|
||||
cg_model.host,
|
||||
*(bindparam(k, v) for k, v in values.items())
|
||||
).filter(*conditions)
|
||||
names = ['volume_type_id', 'availability_zone', 'host']
|
||||
fields = [getattr(cg_model, name) for name in names]
|
||||
fields.extend(bindparam(k, v) for k, v in values.items())
|
||||
sel = session.query(*fields).filter(*conditions)
|
||||
names.extend(values.keys())
|
||||
insert_stmt = cg_model.__table__.insert().from_select(names, sel)
|
||||
result = session.execute(insert_stmt)
|
||||
|
@ -177,7 +177,8 @@ class SchedulerDependentManager(Manager):
|
||||
context,
|
||||
self.service_name,
|
||||
self.host,
|
||||
self.last_capabilities)
|
||||
self.last_capabilities,
|
||||
self.cluster)
|
||||
try:
|
||||
self.scheduler_rpcapi.notify_service_capabilities(
|
||||
context,
|
||||
|
@ -457,6 +457,10 @@ class ClusteredObject(object):
|
||||
def service_topic_queue(self):
|
||||
return self.cluster_name or self.host
|
||||
|
||||
@property
|
||||
def is_clustered(self):
|
||||
return bool(self.cluster_name)
|
||||
|
||||
|
||||
class CinderObjectSerializer(base.VersionedObjectSerializer):
|
||||
OBJ_BASE_CLASS = CinderObject
|
||||
|
@ -41,13 +41,14 @@ CONF = cfg.CONF
|
||||
CONF.register_opts(scheduler_driver_opts)
|
||||
|
||||
|
||||
def volume_update_db(context, volume_id, host):
|
||||
"""Set the host and set the scheduled_at field of a volume.
|
||||
def volume_update_db(context, volume_id, host, cluster_name):
|
||||
"""Set the host, cluster_name, and set the scheduled_at field of a volume.
|
||||
|
||||
:returns: A Volume with the updated fields set properly.
|
||||
"""
|
||||
volume = objects.Volume.get_by_id(context, volume_id)
|
||||
volume.host = host
|
||||
volume.cluster_name = cluster_name
|
||||
volume.scheduled_at = timeutils.utcnow()
|
||||
volume.save()
|
||||
|
||||
@ -56,22 +57,24 @@ def volume_update_db(context, volume_id, host):
|
||||
return volume
|
||||
|
||||
|
||||
def group_update_db(context, group, host):
|
||||
def group_update_db(context, group, host, cluster_name):
|
||||
"""Set the host and the scheduled_at field of a consistencygroup.
|
||||
|
||||
:returns: A Consistencygroup with the updated fields set properly.
|
||||
"""
|
||||
group.update({'host': host, 'updated_at': timeutils.utcnow()})
|
||||
group.update({'host': host, 'updated_at': timeutils.utcnow(),
|
||||
'cluster_name': cluster_name})
|
||||
group.save()
|
||||
return group
|
||||
|
||||
|
||||
def generic_group_update_db(context, group, host):
|
||||
def generic_group_update_db(context, group, host, cluster_name):
|
||||
"""Set the host and the scheduled_at field of a group.
|
||||
|
||||
:returns: A Group with the updated fields set properly.
|
||||
"""
|
||||
group.update({'host': host, 'updated_at': timeutils.utcnow()})
|
||||
group.update({'host': host, 'updated_at': timeutils.utcnow(),
|
||||
'cluster_name': cluster_name})
|
||||
group.save()
|
||||
return group
|
||||
|
||||
@ -97,11 +100,14 @@ class Scheduler(object):
|
||||
|
||||
return self.host_manager.has_all_capabilities()
|
||||
|
||||
def update_service_capabilities(self, service_name, host, capabilities):
|
||||
def update_service_capabilities(self, service_name, host, capabilities,
|
||||
cluster_name, timestamp):
|
||||
"""Process a capability update from a service node."""
|
||||
self.host_manager.update_service_capabilities(service_name,
|
||||
host,
|
||||
capabilities)
|
||||
capabilities,
|
||||
cluster_name,
|
||||
timestamp)
|
||||
|
||||
def notify_service_capabilities(self, service_name, host, capabilities):
|
||||
"""Notify capability update from a service node."""
|
||||
|
@ -74,12 +74,11 @@ class FilterScheduler(driver.Scheduler):
|
||||
if not weighed_host:
|
||||
raise exception.NoValidHost(reason=_("No weighed hosts available"))
|
||||
|
||||
host = weighed_host.obj.host
|
||||
backend = weighed_host.obj
|
||||
updated_group = driver.group_update_db(context, group, backend.host,
|
||||
backend.cluster_name)
|
||||
|
||||
updated_group = driver.group_update_db(context, group, host)
|
||||
|
||||
self.volume_rpcapi.create_consistencygroup(context,
|
||||
updated_group, host)
|
||||
self.volume_rpcapi.create_consistencygroup(context, updated_group)
|
||||
|
||||
def schedule_create_group(self, context, group,
|
||||
group_spec,
|
||||
@ -96,12 +95,13 @@ class FilterScheduler(driver.Scheduler):
|
||||
if not weighed_host:
|
||||
raise exception.NoValidHost(reason=_("No weighed hosts available"))
|
||||
|
||||
host = weighed_host.obj.host
|
||||
backend = weighed_host.obj
|
||||
|
||||
updated_group = driver.generic_group_update_db(context, group, host)
|
||||
updated_group = driver.generic_group_update_db(context, group,
|
||||
backend.host,
|
||||
backend.cluster_name)
|
||||
|
||||
self.volume_rpcapi.create_group(context,
|
||||
updated_group, host)
|
||||
self.volume_rpcapi.create_group(context, updated_group)
|
||||
|
||||
def schedule_create_volume(self, context, request_spec, filter_properties):
|
||||
weighed_host = self._schedule(context, request_spec,
|
||||
@ -110,18 +110,20 @@ class FilterScheduler(driver.Scheduler):
|
||||
if not weighed_host:
|
||||
raise exception.NoValidHost(reason=_("No weighed hosts available"))
|
||||
|
||||
host = weighed_host.obj.host
|
||||
backend = weighed_host.obj
|
||||
volume_id = request_spec['volume_id']
|
||||
|
||||
updated_volume = driver.volume_update_db(context, volume_id, host)
|
||||
updated_volume = driver.volume_update_db(context, volume_id,
|
||||
backend.host,
|
||||
backend.cluster_name)
|
||||
self._post_select_populate_filter_properties(filter_properties,
|
||||
weighed_host.obj)
|
||||
backend)
|
||||
|
||||
# context is not serializable
|
||||
filter_properties.pop('context', None)
|
||||
|
||||
self.volume_rpcapi.create_volume(context, updated_volume, host,
|
||||
request_spec, filter_properties,
|
||||
self.volume_rpcapi.create_volume(context, updated_volume, request_spec,
|
||||
filter_properties,
|
||||
allow_reschedule=True)
|
||||
|
||||
def host_passes_filters(self, context, host, request_spec,
|
||||
@ -131,7 +133,7 @@ class FilterScheduler(driver.Scheduler):
|
||||
filter_properties)
|
||||
for weighed_host in weighed_hosts:
|
||||
host_state = weighed_host.obj
|
||||
if host_state.host == host:
|
||||
if host_state.backend_id == host:
|
||||
return host_state
|
||||
|
||||
volume_id = request_spec.get('volume_id', '??volume_id missing??')
|
||||
@ -144,26 +146,27 @@ class FilterScheduler(driver.Scheduler):
|
||||
migration_policy='never'):
|
||||
"""Find a host that can accept the volume with its new type."""
|
||||
filter_properties = filter_properties or {}
|
||||
current_host = request_spec['volume_properties']['host']
|
||||
backend = (request_spec['volume_properties'].get('cluster_name')
|
||||
or request_spec['volume_properties']['host'])
|
||||
|
||||
# The volume already exists on this host, and so we shouldn't check if
|
||||
# it can accept the volume again in the CapacityFilter.
|
||||
filter_properties['vol_exists_on'] = current_host
|
||||
# The volume already exists on this backend, and so we shouldn't check
|
||||
# if it can accept the volume again in the CapacityFilter.
|
||||
filter_properties['vol_exists_on'] = backend
|
||||
|
||||
weighed_hosts = self._get_weighted_candidates(context, request_spec,
|
||||
filter_properties)
|
||||
if not weighed_hosts:
|
||||
weighed_backends = self._get_weighted_candidates(context, request_spec,
|
||||
filter_properties)
|
||||
if not weighed_backends:
|
||||
raise exception.NoValidHost(reason=_('No valid hosts for volume '
|
||||
'%(id)s with type %(type)s') %
|
||||
{'id': request_spec['volume_id'],
|
||||
'type': request_spec['volume_type']})
|
||||
|
||||
for weighed_host in weighed_hosts:
|
||||
host_state = weighed_host.obj
|
||||
if host_state.host == current_host:
|
||||
return host_state
|
||||
for weighed_backend in weighed_backends:
|
||||
backend_state = weighed_backend.obj
|
||||
if backend_state.backend_id == backend:
|
||||
return backend_state
|
||||
|
||||
if utils.extract_host(current_host, 'pool') is None:
|
||||
if utils.extract_host(backend, 'pool') is None:
|
||||
# legacy volumes created before pool is introduced has no pool
|
||||
# info in host. But host_state.host always include pool level
|
||||
# info. In this case if above exact match didn't work out, we
|
||||
@ -172,11 +175,12 @@ class FilterScheduler(driver.Scheduler):
|
||||
# cause migration between pools on same host, which we consider
|
||||
# it is different from migration between hosts thus allow that
|
||||
# to happen even migration policy is 'never'.
|
||||
for weighed_host in weighed_hosts:
|
||||
host_state = weighed_host.obj
|
||||
backend = utils.extract_host(host_state.host, 'backend')
|
||||
if backend == current_host:
|
||||
return host_state
|
||||
for weighed_backend in weighed_backends:
|
||||
backend_state = weighed_backend.obj
|
||||
new_backend = utils.extract_host(backend_state.backend_id,
|
||||
'backend')
|
||||
if new_backend == backend:
|
||||
return backend_state
|
||||
|
||||
if migration_policy == 'never':
|
||||
raise exception.NoValidHost(reason=_('Current host not valid for '
|
||||
@ -186,7 +190,7 @@ class FilterScheduler(driver.Scheduler):
|
||||
{'id': request_spec['volume_id'],
|
||||
'type': request_spec['volume_type']})
|
||||
|
||||
top_host = self._choose_top_host(weighed_hosts, request_spec)
|
||||
top_host = self._choose_top_host(weighed_backends, request_spec)
|
||||
return top_host.obj
|
||||
|
||||
def get_pools(self, context, filters):
|
||||
@ -201,7 +205,7 @@ class FilterScheduler(driver.Scheduler):
|
||||
been selected by the scheduling process.
|
||||
"""
|
||||
# Add a retry entry for the selected volume backend:
|
||||
self._add_retry_host(filter_properties, host_state.host)
|
||||
self._add_retry_host(filter_properties, host_state.backend_id)
|
||||
|
||||
def _add_retry_host(self, filter_properties, host):
|
||||
"""Add a retry entry for the selected volume backend.
|
||||
@ -418,8 +422,8 @@ class FilterScheduler(driver.Scheduler):
|
||||
for host2 in temp_weighed_hosts:
|
||||
# Should schedule creation of CG on backend level,
|
||||
# not pool level.
|
||||
if (utils.extract_host(host1.obj.host) ==
|
||||
utils.extract_host(host2.obj.host)):
|
||||
if (utils.extract_host(host1.obj.backend_id) ==
|
||||
utils.extract_host(host2.obj.backend_id)):
|
||||
new_weighed_hosts.append(host1)
|
||||
weighed_hosts = new_weighed_hosts
|
||||
if not weighed_hosts:
|
||||
@ -530,8 +534,8 @@ class FilterScheduler(driver.Scheduler):
|
||||
for host2 in host_list2:
|
||||
# Should schedule creation of group on backend level,
|
||||
# not pool level.
|
||||
if (utils.extract_host(host1.obj.host) ==
|
||||
utils.extract_host(host2.obj.host)):
|
||||
if (utils.extract_host(host1.obj.backend_id) ==
|
||||
utils.extract_host(host2.obj.backend_id)):
|
||||
new_hosts.append(host1)
|
||||
if not new_hosts:
|
||||
return []
|
||||
@ -615,7 +619,7 @@ class FilterScheduler(driver.Scheduler):
|
||||
# Get host name including host@backend#pool info from
|
||||
# weighed_hosts.
|
||||
for host in weighed_hosts[::-1]:
|
||||
backend = utils.extract_host(host.obj.host)
|
||||
backend = utils.extract_host(host.obj.backend_id)
|
||||
if backend != group_backend:
|
||||
weighed_hosts.remove(host)
|
||||
if not weighed_hosts:
|
||||
@ -651,7 +655,7 @@ class FilterScheduler(driver.Scheduler):
|
||||
def _choose_top_host(self, weighed_hosts, request_spec):
|
||||
top_host = weighed_hosts[0]
|
||||
host_state = top_host.obj
|
||||
LOG.debug("Choosing %s", host_state.host)
|
||||
LOG.debug("Choosing %s", host_state.backend_id)
|
||||
volume_properties = request_spec['volume_properties']
|
||||
host_state.consume_from_volume(volume_properties)
|
||||
return top_host
|
||||
@ -659,11 +663,11 @@ class FilterScheduler(driver.Scheduler):
|
||||
def _choose_top_host_group(self, weighed_hosts, request_spec_list):
|
||||
top_host = weighed_hosts[0]
|
||||
host_state = top_host.obj
|
||||
LOG.debug("Choosing %s", host_state.host)
|
||||
LOG.debug("Choosing %s", host_state.backend_id)
|
||||
return top_host
|
||||
|
||||
def _choose_top_host_generic_group(self, weighed_hosts):
|
||||
top_host = weighed_hosts[0]
|
||||
host_state = top_host.obj
|
||||
LOG.debug("Choosing %s", host_state.host)
|
||||
LOG.debug("Choosing %s", host_state.backend_id)
|
||||
return top_host
|
||||
|
@ -24,6 +24,14 @@ class AffinityFilter(filters.BaseHostFilter):
|
||||
def __init__(self):
|
||||
self.volume_api = volume.API()
|
||||
|
||||
def _get_volumes(self, context, affinity_uuids, backend_state):
|
||||
filters = {'id': affinity_uuids, 'deleted': False}
|
||||
if backend_state.cluster_name:
|
||||
filters['cluster_name'] = backend_state.cluster_name
|
||||
else:
|
||||
filters['host'] = backend_state.host
|
||||
return self.volume_api.get_all(context, filters=filters)
|
||||
|
||||
|
||||
class DifferentBackendFilter(AffinityFilter):
|
||||
"""Schedule volume on a different back-end from a set of volumes."""
|
||||
@ -53,11 +61,8 @@ class DifferentBackendFilter(AffinityFilter):
|
||||
return False
|
||||
|
||||
if affinity_uuids:
|
||||
return not self.volume_api.get_all(
|
||||
context, filters={'host': host_state.host,
|
||||
'id': affinity_uuids,
|
||||
'deleted': False})
|
||||
|
||||
return not self._get_volumes(context, affinity_uuids,
|
||||
host_state)
|
||||
# With no different_host key
|
||||
return True
|
||||
|
||||
@ -90,10 +95,7 @@ class SameBackendFilter(AffinityFilter):
|
||||
return False
|
||||
|
||||
if affinity_uuids:
|
||||
return self.volume_api.get_all(
|
||||
context, filters={'host': host_state.host,
|
||||
'id': affinity_uuids,
|
||||
'deleted': False})
|
||||
return self._get_volumes(context, affinity_uuids, host_state)
|
||||
|
||||
# With no same_host key
|
||||
return True
|
||||
|
@ -36,25 +36,29 @@ class CapacityFilter(filters.BaseHostFilter):
|
||||
|
||||
# If the volume already exists on this host, don't fail it for
|
||||
# insufficient capacity (e.g., if we are retyping)
|
||||
if host_state.host == filter_properties.get('vol_exists_on'):
|
||||
if host_state.backend_id == filter_properties.get('vol_exists_on'):
|
||||
return True
|
||||
|
||||
spec = filter_properties.get('request_spec')
|
||||
if spec:
|
||||
volid = spec.get('volume_id')
|
||||
|
||||
grouping = 'cluster' if host_state.cluster_name else 'host'
|
||||
if filter_properties.get('new_size'):
|
||||
# If new_size is passed, we are allocating space to extend a volume
|
||||
requested_size = (int(filter_properties.get('new_size')) -
|
||||
int(filter_properties.get('size')))
|
||||
LOG.debug('Checking if host %(host)s can extend the volume %(id)s'
|
||||
'in %(size)s GB', {'host': host_state.host, 'id': volid,
|
||||
'size': requested_size})
|
||||
LOG.debug('Checking if %(grouping)s %(grouping_name)s can extend '
|
||||
'the volume %(id)s in %(size)s GB',
|
||||
{'grouping': grouping,
|
||||
'grouping_name': host_state.backend_id, 'id': volid,
|
||||
'size': requested_size})
|
||||
else:
|
||||
requested_size = filter_properties.get('size')
|
||||
LOG.debug('Checking if host %(host)s can create a %(size)s GB '
|
||||
'volume (%(id)s)',
|
||||
{'host': host_state.host, 'id': volid,
|
||||
LOG.debug('Checking if %(grouping)s %(grouping_name)s can create '
|
||||
'a %(size)s GB volume (%(id)s)',
|
||||
{'grouping': grouping,
|
||||
'grouping_name': host_state.backend_id, 'id': volid,
|
||||
'size': requested_size})
|
||||
|
||||
if host_state.free_capacity_gb is None:
|
||||
@ -85,18 +89,16 @@ class CapacityFilter(filters.BaseHostFilter):
|
||||
total = float(total_space)
|
||||
if total <= 0:
|
||||
LOG.warning(_LW("Insufficient free space for volume creation. "
|
||||
"Total capacity is %(total).2f on host %(host)s."),
|
||||
"Total capacity is %(total).2f on %(grouping)s "
|
||||
"%(grouping_name)s."),
|
||||
{"total": total,
|
||||
"host": host_state.host})
|
||||
"grouping": grouping,
|
||||
"grouping_name": host_state.backend_id})
|
||||
return False
|
||||
# Calculate how much free space is left after taking into account
|
||||
# the reserved space.
|
||||
free = free_space - math.floor(total * reserved)
|
||||
|
||||
msg_args = {"host": host_state.host,
|
||||
"requested": requested_size,
|
||||
"available": free}
|
||||
|
||||
# NOTE(xyang): If 'provisioning:type' is 'thick' in extra_specs,
|
||||
# we will not use max_over_subscription_ratio and
|
||||
# provisioned_capacity_gb to determine whether a volume can be
|
||||
@ -117,15 +119,18 @@ class CapacityFilter(filters.BaseHostFilter):
|
||||
provisioned_ratio = ((host_state.provisioned_capacity_gb +
|
||||
requested_size) / total)
|
||||
if provisioned_ratio > host_state.max_over_subscription_ratio:
|
||||
msg_args = {
|
||||
"provisioned_ratio": provisioned_ratio,
|
||||
"oversub_ratio": host_state.max_over_subscription_ratio,
|
||||
"grouping": grouping,
|
||||
"grouping_name": host_state.backend_id,
|
||||
}
|
||||
LOG.warning(_LW(
|
||||
"Insufficient free space for thin provisioning. "
|
||||
"The ratio of provisioned capacity over total capacity "
|
||||
"%(provisioned_ratio).2f has exceeded the maximum over "
|
||||
"subscription ratio %(oversub_ratio).2f on host "
|
||||
"%(host)s."),
|
||||
{"provisioned_ratio": provisioned_ratio,
|
||||
"oversub_ratio": host_state.max_over_subscription_ratio,
|
||||
"host": host_state.host})
|
||||
"subscription ratio %(oversub_ratio).2f on %(grouping)s "
|
||||
"%(grouping_name)s."), msg_args)
|
||||
return False
|
||||
else:
|
||||
# Thin provisioning is enabled and projected over-subscription
|
||||
@ -138,23 +143,30 @@ class CapacityFilter(filters.BaseHostFilter):
|
||||
free * host_state.max_over_subscription_ratio)
|
||||
return adjusted_free_virtual >= requested_size
|
||||
elif thin and host_state.thin_provisioning_support:
|
||||
LOG.warning(_LW("Filtering out host %(host)s with an invalid "
|
||||
"maximum over subscription ratio of "
|
||||
"%(oversub_ratio).2f. The ratio should be a "
|
||||
LOG.warning(_LW("Filtering out %(grouping)s %(grouping_name)s "
|
||||
"with an invalid maximum over subscription ratio "
|
||||
"of %(oversub_ratio).2f. The ratio should be a "
|
||||
"minimum of 1.0."),
|
||||
{"oversub_ratio":
|
||||
host_state.max_over_subscription_ratio,
|
||||
"host": host_state.host})
|
||||
"grouping": grouping,
|
||||
"grouping_name": host_state.backend_id})
|
||||
return False
|
||||
|
||||
msg_args = {"grouping_name": host_state.backend_id,
|
||||
"grouping": grouping,
|
||||
"requested": requested_size,
|
||||
"available": free}
|
||||
|
||||
if free < requested_size:
|
||||
LOG.warning(_LW("Insufficient free space for volume creation "
|
||||
"on host %(host)s (requested / avail): "
|
||||
"%(requested)s/%(available)s"), msg_args)
|
||||
"on %(grouping)s %(grouping_name)s (requested / "
|
||||
"avail): %(requested)s/%(available)s"),
|
||||
msg_args)
|
||||
return False
|
||||
|
||||
LOG.debug("Space information for volume creation "
|
||||
"on host %(host)s (requested / avail): "
|
||||
"on %(grouping)s %(grouping_name)s (requested / avail): "
|
||||
"%(requested)s/%(available)s", msg_args)
|
||||
|
||||
return True
|
||||
|
@ -35,10 +35,11 @@ class DriverFilter(filters.BaseHostFilter):
|
||||
"""Determines whether a host has a passing filter_function or not."""
|
||||
stats = self._generate_stats(host_state, filter_properties)
|
||||
|
||||
LOG.debug("Checking host '%s'", stats['host_stats']['host'])
|
||||
LOG.debug("Checking backend '%s'", stats['host_stats']['backend_id'])
|
||||
result = self._check_filter_function(stats)
|
||||
LOG.debug("Result: %s", result)
|
||||
LOG.debug("Done checking host '%s'", stats['host_stats']['host'])
|
||||
LOG.debug("Done checking backend '%s'",
|
||||
stats['host_stats']['backend_id'])
|
||||
|
||||
return result
|
||||
|
||||
@ -89,6 +90,8 @@ class DriverFilter(filters.BaseHostFilter):
|
||||
|
||||
host_stats = {
|
||||
'host': host_state.host,
|
||||
'cluster_name': host_state.cluster_name,
|
||||
'backend_id': host_state.backend_id,
|
||||
'volume_backend_name': host_state.volume_backend_name,
|
||||
'vendor_name': host_state.vendor_name,
|
||||
'driver_version': host_state.driver_version,
|
||||
|
@ -45,7 +45,7 @@ class IgnoreAttemptedHostsFilter(filters.BaseHostFilter):
|
||||
return True
|
||||
|
||||
hosts = attempted.get('hosts', [])
|
||||
host = host_state.host
|
||||
host = host_state.backend_id
|
||||
|
||||
passes = host not in hosts
|
||||
pass_msg = "passes" if passes else "fails"
|
||||
|
@ -69,9 +69,9 @@ class InstanceLocalityFilter(filters.BaseHostFilter):
|
||||
|
||||
return self._nova_ext_srv_attr
|
||||
|
||||
def host_passes(self, host_state, filter_properties):
|
||||
def host_passes(self, backend_state, filter_properties):
|
||||
context = filter_properties['context']
|
||||
host = volume_utils.extract_host(host_state.host, 'host')
|
||||
host = volume_utils.extract_host(backend_state.backend_id, 'host')
|
||||
|
||||
scheduler_hints = filter_properties.get('scheduler_hints') or {}
|
||||
instance_uuid = scheduler_hints.get(HINT_KEYWORD, None)
|
||||
|
@ -86,10 +86,11 @@ class ReadOnlyDict(collections.Mapping):
|
||||
class HostState(object):
|
||||
"""Mutable and immutable information tracked for a volume backend."""
|
||||
|
||||
def __init__(self, host, capabilities=None, service=None):
|
||||
def __init__(self, host, cluster_name, capabilities=None, service=None):
|
||||
self.capabilities = None
|
||||
self.service = None
|
||||
self.host = host
|
||||
self.cluster_name = cluster_name
|
||||
self.update_capabilities(capabilities, service)
|
||||
|
||||
self.volume_backend_name = None
|
||||
@ -122,6 +123,10 @@ class HostState(object):
|
||||
|
||||
self.updated = None
|
||||
|
||||
@property
|
||||
def backend_id(self):
|
||||
return self.cluster_name or self.host
|
||||
|
||||
def update_capabilities(self, capabilities=None, service=None):
|
||||
# Read-only capability dicts
|
||||
|
||||
@ -210,7 +215,8 @@ class HostState(object):
|
||||
cur_pool = self.pools.get(pool_name, None)
|
||||
if not cur_pool:
|
||||
# Add new pool
|
||||
cur_pool = PoolState(self.host, pool_cap, pool_name)
|
||||
cur_pool = PoolState(self.host, self.cluster_name,
|
||||
pool_cap, pool_name)
|
||||
self.pools[pool_name] = cur_pool
|
||||
cur_pool.update_from_volume_capability(pool_cap, service)
|
||||
|
||||
@ -227,7 +233,8 @@ class HostState(object):
|
||||
|
||||
if len(self.pools) == 0:
|
||||
# No pool was there
|
||||
single_pool = PoolState(self.host, capability, pool_name)
|
||||
single_pool = PoolState(self.host, self.cluster_name,
|
||||
capability, pool_name)
|
||||
self._append_backend_info(capability)
|
||||
self.pools[pool_name] = single_pool
|
||||
else:
|
||||
@ -235,7 +242,8 @@ class HostState(object):
|
||||
try:
|
||||
single_pool = self.pools[pool_name]
|
||||
except KeyError:
|
||||
single_pool = PoolState(self.host, capability, pool_name)
|
||||
single_pool = PoolState(self.host, self.cluster_name,
|
||||
capability, pool_name)
|
||||
self._append_backend_info(capability)
|
||||
self.pools[pool_name] = single_pool
|
||||
|
||||
@ -293,14 +301,18 @@ class HostState(object):
|
||||
# FIXME(zhiteng) backend level free_capacity_gb isn't as
|
||||
# meaningful as it used to be before pool is introduced, we'd
|
||||
# come up with better representation of HostState.
|
||||
return ("host '%s': free_capacity_gb: %s, pools: %s" %
|
||||
(self.host, self.free_capacity_gb, self.pools))
|
||||
grouping = 'cluster' if self.cluster_name else 'host'
|
||||
grouping_name = self.backend_id
|
||||
|
||||
return ("%s '%s': free_capacity_gb: %s, pools: %s" %
|
||||
(grouping, grouping_name, self.free_capacity_gb, self.pools))
|
||||
|
||||
|
||||
class PoolState(HostState):
|
||||
def __init__(self, host, capabilities, pool_name):
|
||||
def __init__(self, host, cluster_name, capabilities, pool_name):
|
||||
new_host = vol_utils.append_host(host, pool_name)
|
||||
super(PoolState, self).__init__(new_host, capabilities)
|
||||
new_cluster = vol_utils.append_host(cluster_name, pool_name)
|
||||
super(PoolState, self).__init__(new_host, new_cluster, capabilities)
|
||||
self.pool_name = pool_name
|
||||
# No pools in pool
|
||||
self.pools = None
|
||||
@ -443,7 +455,8 @@ class HostManager(object):
|
||||
hosts,
|
||||
weight_properties)
|
||||
|
||||
def update_service_capabilities(self, service_name, host, capabilities):
|
||||
def update_service_capabilities(self, service_name, host, capabilities,
|
||||
cluster_name, timestamp):
|
||||
"""Update the per-service capabilities based on this notification."""
|
||||
if service_name != 'volume':
|
||||
LOG.debug('Ignoring %(service_name)s service update '
|
||||
@ -451,9 +464,12 @@ class HostManager(object):
|
||||
{'service_name': service_name, 'host': host})
|
||||
return
|
||||
|
||||
# TODO(geguileo): In P - Remove the next line since we receive the
|
||||
# timestamp
|
||||
timestamp = timestamp or timeutils.utcnow()
|
||||
# Copy the capabilities, so we don't modify the original dict
|
||||
capab_copy = dict(capabilities)
|
||||
capab_copy["timestamp"] = timeutils.utcnow() # Reported time
|
||||
capab_copy["timestamp"] = timestamp
|
||||
|
||||
# Set the default capabilities in case None is set.
|
||||
capab_old = self.service_states.get(host, {"timestamp": 0})
|
||||
@ -474,15 +490,19 @@ class HostManager(object):
|
||||
|
||||
self.service_states[host] = capab_copy
|
||||
|
||||
LOG.debug("Received %(service_name)s service update from "
|
||||
"%(host)s: %(cap)s",
|
||||
cluster_msg = (('Cluster: %s - Host: ' % cluster_name) if cluster_name
|
||||
else '')
|
||||
LOG.debug("Received %(service_name)s service update from %(cluster)s"
|
||||
"%(host)s: %(cap)s%(cluster)s",
|
||||
{'service_name': service_name, 'host': host,
|
||||
'cap': capabilities})
|
||||
'cap': capabilities,
|
||||
'cluster': cluster_msg})
|
||||
|
||||
self._no_capabilities_hosts.discard(host)
|
||||
|
||||
def notify_service_capabilities(self, service_name, host, capabilities):
|
||||
"""Notify the ceilometer with updated volume stats"""
|
||||
# TODO(geguileo): Make this work with Active/Active
|
||||
if service_name != 'volume':
|
||||
return
|
||||
|
||||
@ -519,6 +539,7 @@ class HostManager(object):
|
||||
volume_services = objects.ServiceList.get_all_by_topic(context,
|
||||
topic,
|
||||
disabled=False)
|
||||
active_backends = set()
|
||||
active_hosts = set()
|
||||
no_capabilities_hosts = set()
|
||||
for service in volume_services.objects:
|
||||
@ -526,32 +547,46 @@ class HostManager(object):
|
||||
if not service.is_up:
|
||||
LOG.warning(_LW("volume service is down. (host: %s)"), host)
|
||||
continue
|
||||
|
||||
capabilities = self.service_states.get(host, None)
|
||||
if capabilities is None:
|
||||
no_capabilities_hosts.add(host)
|
||||
continue
|
||||
|
||||
host_state = self.host_state_map.get(host)
|
||||
if not host_state:
|
||||
host_state = self.host_state_cls(host,
|
||||
capabilities=capabilities,
|
||||
service=
|
||||
dict(service))
|
||||
self.host_state_map[host] = host_state
|
||||
# update capabilities and attributes in host_state
|
||||
host_state.update_from_volume_capability(capabilities,
|
||||
service=
|
||||
dict(service))
|
||||
# Since the service could have been added or remove from a cluster
|
||||
backend_key = service.service_topic_queue
|
||||
backend_state = self.host_state_map.get(backend_key, None)
|
||||
if not backend_state:
|
||||
backend_state = self.host_state_cls(
|
||||
host,
|
||||
service.cluster_name,
|
||||
capabilities=capabilities,
|
||||
service=dict(service))
|
||||
self.host_state_map[backend_key] = backend_state
|
||||
|
||||
# We may be receiving capability reports out of order from
|
||||
# different volume services in a cluster, so we drop older updates
|
||||
# and only update for newer capability reports.
|
||||
if (backend_state.capabilities['timestamp'] <=
|
||||
capabilities['timestamp']):
|
||||
# update capabilities and attributes in backend_state
|
||||
backend_state.update_from_volume_capability(
|
||||
capabilities, service=dict(service))
|
||||
active_backends.add(backend_key)
|
||||
active_hosts.add(host)
|
||||
|
||||
self._no_capabilities_hosts = no_capabilities_hosts
|
||||
|
||||
# remove non-active hosts from host_state_map
|
||||
nonactive_hosts = set(self.host_state_map.keys()) - active_hosts
|
||||
for host in nonactive_hosts:
|
||||
LOG.info(_LI("Removing non-active host: %(host)s from "
|
||||
"scheduler cache."), {'host': host})
|
||||
del self.host_state_map[host]
|
||||
# remove non-active keys from host_state_map
|
||||
inactive_backend_keys = set(self.host_state_map) - active_backends
|
||||
for backend_key in inactive_backend_keys:
|
||||
# NOTE(geguileo): We don't want to log the removal of a host from
|
||||
# the map when we are removing it because it has been added to a
|
||||
# cluster.
|
||||
if backend_key not in active_hosts:
|
||||
LOG.info(_LI("Removing non-active backend: %(backend)s from "
|
||||
"scheduler cache."), {'backend': backend_key})
|
||||
del self.host_state_map[backend_key]
|
||||
|
||||
def get_all_host_states(self, context):
|
||||
"""Returns a dict of all the hosts the HostManager knows about.
|
||||
|
@ -19,12 +19,15 @@
|
||||
Scheduler Service
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
|
||||
from cinder import context
|
||||
@ -33,6 +36,7 @@ from cinder import exception
|
||||
from cinder import flow_utils
|
||||
from cinder.i18n import _, _LE
|
||||
from cinder import manager
|
||||
from cinder import objects
|
||||
from cinder import quota
|
||||
from cinder import rpc
|
||||
from cinder.scheduler.flows import create_volume
|
||||
@ -53,7 +57,7 @@ QUOTAS = quota.QUOTAS
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SchedulerManager(manager.Manager):
|
||||
class SchedulerManager(manager.CleanableManager, manager.Manager):
|
||||
"""Chooses a host to create volumes."""
|
||||
|
||||
RPC_API_VERSION = scheduler_rpcapi.SchedulerAPI.RPC_API_VERSION
|
||||
@ -80,13 +84,22 @@ class SchedulerManager(manager.Manager):
|
||||
self.driver.reset()
|
||||
|
||||
def update_service_capabilities(self, context, service_name=None,
|
||||
host=None, capabilities=None, **kwargs):
|
||||
host=None, capabilities=None,
|
||||
cluster_name=None, timestamp=None,
|
||||
**kwargs):
|
||||
"""Process a capability update from a service node."""
|
||||
if capabilities is None:
|
||||
capabilities = {}
|
||||
# If we received the timestamp we have to deserialize it
|
||||
elif timestamp:
|
||||
timestamp = datetime.strptime(timestamp,
|
||||
timeutils.PERFECT_TIME_FORMAT)
|
||||
|
||||
self.driver.update_service_capabilities(service_name,
|
||||
host,
|
||||
capabilities)
|
||||
capabilities,
|
||||
cluster_name,
|
||||
timestamp)
|
||||
|
||||
def notify_service_capabilities(self, context, service_name,
|
||||
host, capabilities):
|
||||
@ -150,9 +163,9 @@ class SchedulerManager(manager.Manager):
|
||||
group.status = 'error'
|
||||
group.save()
|
||||
|
||||
@objects.Volume.set_workers
|
||||
def create_volume(self, context, volume, snapshot_id=None, image_id=None,
|
||||
request_spec=None, filter_properties=None):
|
||||
|
||||
self._wait_for_scheduler()
|
||||
|
||||
try:
|
||||
@ -171,13 +184,21 @@ class SchedulerManager(manager.Manager):
|
||||
with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
|
||||
flow_engine.run()
|
||||
|
||||
def _do_cleanup(self, ctxt, vo_resource):
|
||||
# We can only receive cleanup requests for volumes, but we check anyway
|
||||
# We need to cleanup the volume status for cases where the scheduler
|
||||
# died while scheduling the volume creation.
|
||||
if (isinstance(vo_resource, objects.Volume) and
|
||||
vo_resource.status == 'creating'):
|
||||
vo_resource.status = 'error'
|
||||
vo_resource.save()
|
||||
|
||||
def request_service_capabilities(self, context):
|
||||
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
|
||||
|
||||
def migrate_volume_to_host(self, context, volume, host, force_host_copy,
|
||||
request_spec, filter_properties=None):
|
||||
def migrate_volume(self, context, volume, backend, force_copy,
|
||||
request_spec, filter_properties):
|
||||
"""Ensure that the host exists and can accept the volume."""
|
||||
|
||||
self._wait_for_scheduler()
|
||||
|
||||
def _migrate_volume_set_error(self, context, ex, request_spec):
|
||||
@ -193,9 +214,9 @@ class SchedulerManager(manager.Manager):
|
||||
context, ex, request_spec)
|
||||
|
||||
try:
|
||||
tgt_host = self.driver.host_passes_filters(context, host,
|
||||
request_spec,
|
||||
filter_properties)
|
||||
tgt_backend = self.driver.host_passes_filters(context, backend,
|
||||
request_spec,
|
||||
filter_properties)
|
||||
except exception.NoValidHost as ex:
|
||||
_migrate_volume_set_error(self, context, ex, request_spec)
|
||||
except Exception as ex:
|
||||
@ -203,8 +224,14 @@ class SchedulerManager(manager.Manager):
|
||||
_migrate_volume_set_error(self, context, ex, request_spec)
|
||||
else:
|
||||
volume_rpcapi.VolumeAPI().migrate_volume(context, volume,
|
||||
tgt_host,
|
||||
force_host_copy)
|
||||
tgt_backend,
|
||||
force_copy)
|
||||
|
||||
# FIXME(geguileo): Remove this in v4.0 of RPC API.
|
||||
def migrate_volume_to_host(self, context, volume, host, force_host_copy,
|
||||
request_spec, filter_properties=None):
|
||||
return self.migrate_volume(context, volume, host, force_host_copy,
|
||||
request_spec, filter_properties)
|
||||
|
||||
def retype(self, context, volume, request_spec, filter_properties=None):
|
||||
"""Schedule the modification of a volume's type.
|
||||
@ -272,7 +299,7 @@ class SchedulerManager(manager.Manager):
|
||||
|
||||
try:
|
||||
self.driver.host_passes_filters(context,
|
||||
volume.host,
|
||||
volume.service_topic_queue,
|
||||
request_spec,
|
||||
filter_properties)
|
||||
except exception.NoValidHost as ex:
|
||||
@ -306,7 +333,8 @@ class SchedulerManager(manager.Manager):
|
||||
|
||||
filter_properties['new_size'] = new_size
|
||||
try:
|
||||
self.driver.host_passes_filters(context, volume.host,
|
||||
self.driver.host_passes_filters(context,
|
||||
volume.service_topic_queue,
|
||||
request_spec, filter_properties)
|
||||
volume_rpcapi.VolumeAPI().extend_volume(context, volume, new_size,
|
||||
reservations)
|
||||
|
@ -17,6 +17,7 @@ Client side of the scheduler manager RPC API.
|
||||
"""
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from cinder.common import constants
|
||||
from cinder import exception
|
||||
@ -62,9 +63,12 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
3.0 - Remove 2.x compatibility
|
||||
3.1 - Adds notify_service_capabilities()
|
||||
3.2 - Adds extend_volume()
|
||||
3.3 - Add cluster support to migrate_volume, and to
|
||||
update_service_capabilities and send the timestamp from the
|
||||
capabilities.
|
||||
"""
|
||||
|
||||
RPC_API_VERSION = '3.2'
|
||||
RPC_API_VERSION = '3.3'
|
||||
RPC_DEFAULT_VERSION = '3.0'
|
||||
TOPIC = constants.SCHEDULER_TOPIC
|
||||
BINARY = 'cinder-scheduler'
|
||||
@ -106,15 +110,24 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
'filter_properties': filter_properties, 'volume': volume}
|
||||
return cctxt.cast(ctxt, 'create_volume', **msg_args)
|
||||
|
||||
def migrate_volume_to_host(self, ctxt, volume, host, force_host_copy=False,
|
||||
request_spec=None, filter_properties=None):
|
||||
cctxt = self._get_cctxt()
|
||||
def migrate_volume(self, ctxt, volume, backend, force_copy=False,
|
||||
request_spec=None, filter_properties=None):
|
||||
request_spec_p = jsonutils.to_primitive(request_spec)
|
||||
msg_args = {'host': host, 'force_host_copy': force_host_copy,
|
||||
'request_spec': request_spec_p,
|
||||
msg_args = {'request_spec': request_spec_p,
|
||||
'filter_properties': filter_properties, 'volume': volume}
|
||||
version = '3.3'
|
||||
if self.client.can_send_version(version):
|
||||
msg_args['backend'] = backend
|
||||
msg_args['force_copy'] = force_copy
|
||||
method = 'migrate_volume'
|
||||
else:
|
||||
version = '3.0'
|
||||
msg_args['host'] = backend
|
||||
msg_args['force_host_copy'] = force_copy
|
||||
method = 'migrate_volume_to_host'
|
||||
|
||||
return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args)
|
||||
cctxt = self._get_cctxt(version=version)
|
||||
return cctxt.cast(ctxt, method, **msg_args)
|
||||
|
||||
def retype(self, ctxt, volume, request_spec=None, filter_properties=None):
|
||||
cctxt = self._get_cctxt()
|
||||
@ -157,14 +170,27 @@ class SchedulerAPI(rpc.RPCAPI):
|
||||
return cctxt.call(ctxt, 'get_pools', filters=filters)
|
||||
|
||||
def update_service_capabilities(self, ctxt, service_name, host,
|
||||
capabilities):
|
||||
cctxt = self._get_cctxt(fanout=True)
|
||||
cctxt.cast(ctxt, 'update_service_capabilities',
|
||||
service_name=service_name, host=host,
|
||||
capabilities=capabilities)
|
||||
capabilities, cluster_name,
|
||||
timestamp=None):
|
||||
msg_args = dict(service_name=service_name, host=host,
|
||||
capabilities=capabilities)
|
||||
|
||||
version = '3.3'
|
||||
# If server accepts timestamping the capabilities and the cluster name
|
||||
if self.client.can_send_version(version):
|
||||
# Serialize the timestamp
|
||||
timestamp = timestamp or timeutils.utcnow()
|
||||
msg_args.update(cluster_name=cluster_name,
|
||||
timestamp=jsonutils.to_primitive(timestamp))
|
||||
else:
|
||||
version = '3.0'
|
||||
|
||||
cctxt = self._get_cctxt(fanout=True, version=version)
|
||||
cctxt.cast(ctxt, 'update_service_capabilities', **msg_args)
|
||||
|
||||
def notify_service_capabilities(self, ctxt, service_name,
|
||||
host, capabilities):
|
||||
# TODO(geguileo): Make this work with Active/Active
|
||||
cctxt = self._get_cctxt(version='3.1')
|
||||
if not cctxt.can_send_version('3.1'):
|
||||
msg = _('notify_service_capabilities requires cinder-scheduler '
|
||||
|
@ -10,6 +10,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import ddt
|
||||
import fixtures
|
||||
import mock
|
||||
from oslo_concurrency import lockutils
|
||||
@ -21,6 +22,7 @@ import webob
|
||||
from webob import exc
|
||||
|
||||
from cinder.api.contrib import admin_actions
|
||||
from cinder.api.openstack import api_version_request as api_version
|
||||
from cinder.backup import api as backup_api
|
||||
from cinder.backup import rpcapi as backup_rpcapi
|
||||
from cinder.common import constants
|
||||
@ -70,6 +72,7 @@ class BaseAdminTest(test.TestCase):
|
||||
return volume
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class AdminActionsTest(BaseAdminTest):
|
||||
def setUp(self):
|
||||
super(AdminActionsTest, self).setUp()
|
||||
@ -101,6 +104,7 @@ class AdminActionsTest(BaseAdminTest):
|
||||
|
||||
self.patch('cinder.objects.Service.get_minimum_rpc_version',
|
||||
side_effect=_get_minimum_rpc_version_mock)
|
||||
self.controller = admin_actions.VolumeAdminController()
|
||||
|
||||
def tearDown(self):
|
||||
self.svc.stop()
|
||||
@ -138,7 +142,7 @@ class AdminActionsTest(BaseAdminTest):
|
||||
updated_status)
|
||||
|
||||
def test_valid_updates(self):
|
||||
vac = admin_actions.VolumeAdminController()
|
||||
vac = self.controller
|
||||
|
||||
vac.validate_update({'status': 'creating'})
|
||||
vac.validate_update({'status': 'available'})
|
||||
@ -503,10 +507,74 @@ class AdminActionsTest(BaseAdminTest):
|
||||
{'host': 'test2',
|
||||
'topic': constants.VOLUME_TOPIC,
|
||||
'created_at': timeutils.utcnow()})
|
||||
db.service_create(self.ctx,
|
||||
{'host': 'clustered_host',
|
||||
'topic': constants.VOLUME_TOPIC,
|
||||
'binary': constants.VOLUME_BINARY,
|
||||
'cluster_name': 'cluster',
|
||||
'created_at': timeutils.utcnow()})
|
||||
db.cluster_create(self.ctx,
|
||||
{'name': 'cluster',
|
||||
'binary': constants.VOLUME_BINARY})
|
||||
# current status is available
|
||||
volume = self._create_volume(self.ctx)
|
||||
return volume
|
||||
|
||||
def _migrate_volume_3_exec(self, ctx, volume, host, expected_status,
|
||||
force_host_copy=False, version=None,
|
||||
cluster=None):
|
||||
# build request to migrate to host
|
||||
# req = fakes.HTTPRequest.blank('/v3/%s/volumes/%s/action' % (
|
||||
# fake.PROJECT_ID, volume['id']))
|
||||
req = webob.Request.blank('/v3/%s/volumes/%s/action' % (
|
||||
fake.PROJECT_ID, volume['id']))
|
||||
req.method = 'POST'
|
||||
req.headers['content-type'] = 'application/json'
|
||||
body = {'os-migrate_volume': {'host': host,
|
||||
'force_host_copy': force_host_copy}}
|
||||
version = version or '3.0'
|
||||
req.headers = {'OpenStack-API-Version': 'volume %s' % version}
|
||||
req.api_version_request = api_version.APIVersionRequest(version)
|
||||
if version == '3.16':
|
||||
body['os-migrate_volume']['cluster'] = cluster
|
||||
req.body = jsonutils.dump_as_bytes(body)
|
||||
req.environ['cinder.context'] = ctx
|
||||
resp = self.controller._migrate_volume(req, volume.id, body)
|
||||
|
||||
# verify status
|
||||
self.assertEqual(expected_status, resp.status_int)
|
||||
volume = db.volume_get(self.ctx, volume['id'])
|
||||
return volume
|
||||
|
||||
@ddt.data('3.0', '3.15', '3.16')
|
||||
def test_migrate_volume_success_3(self, version):
|
||||
expected_status = 202
|
||||
host = 'test2'
|
||||
volume = self._migrate_volume_prep()
|
||||
volume = self._migrate_volume_3_exec(self.ctx, volume, host,
|
||||
expected_status, version=version)
|
||||
self.assertEqual('starting', volume['migration_status'])
|
||||
|
||||
def test_migrate_volume_success_cluster(self):
|
||||
expected_status = 202
|
||||
# We cannot provide host and cluster, so send host to None
|
||||
host = None
|
||||
cluster = 'cluster'
|
||||
volume = self._migrate_volume_prep()
|
||||
volume = self._migrate_volume_3_exec(self.ctx, volume, host,
|
||||
expected_status, version='3.16',
|
||||
cluster=cluster)
|
||||
self.assertEqual('starting', volume['migration_status'])
|
||||
|
||||
def test_migrate_volume_fail_host_and_cluster(self):
|
||||
# We cannot send host and cluster in the request
|
||||
host = 'test2'
|
||||
cluster = 'cluster'
|
||||
volume = self._migrate_volume_prep()
|
||||
self.assertRaises(exception.InvalidInput,
|
||||
self._migrate_volume_3_exec, self.ctx, volume, host,
|
||||
None, version='3.16', cluster=cluster)
|
||||
|
||||
def _migrate_volume_exec(self, ctx, volume, host, expected_status,
|
||||
force_host_copy=False):
|
||||
# build request to migrate to host
|
||||
|
@ -45,7 +45,7 @@ def volume_get(self, context, volume_id, viewable_admin_meta=False):
|
||||
if volume_id == fake.VOLUME_ID:
|
||||
return objects.Volume(context, id=fake.VOLUME_ID,
|
||||
_name_id=fake.VOLUME2_ID,
|
||||
host='fake_host')
|
||||
host='fake_host', cluster_name=None)
|
||||
raise exception.VolumeNotFound(volume_id=volume_id)
|
||||
|
||||
|
||||
@ -109,7 +109,7 @@ class SnapshotManageTest(test.TestCase):
|
||||
|
||||
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
|
||||
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
|
||||
@mock.patch('cinder.db.service_get')
|
||||
@mock.patch('cinder.db.sqlalchemy.api.service_get')
|
||||
def test_manage_snapshot_ok(self, mock_db,
|
||||
mock_create_snapshot, mock_rpcapi):
|
||||
"""Test successful manage snapshot execution.
|
||||
@ -128,7 +128,8 @@ class SnapshotManageTest(test.TestCase):
|
||||
|
||||
# Check the db.service_get was called with correct arguments.
|
||||
mock_db.assert_called_once_with(
|
||||
mock.ANY, host='fake_host', binary='cinder-volume')
|
||||
mock.ANY, None, host='fake_host', binary='cinder-volume',
|
||||
cluster_name=None)
|
||||
|
||||
# Check the create_snapshot_in_db was called with correct arguments.
|
||||
self.assertEqual(1, mock_create_snapshot.call_count)
|
||||
@ -149,7 +150,7 @@ class SnapshotManageTest(test.TestCase):
|
||||
new_callable=mock.PropertyMock)
|
||||
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
|
||||
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
|
||||
@mock.patch('cinder.db.service_get')
|
||||
@mock.patch('cinder.db.sqlalchemy.api.service_get')
|
||||
def test_manage_snapshot_disabled(self, mock_db, mock_create_snapshot,
|
||||
mock_rpcapi, mock_is_up):
|
||||
"""Test manage snapshot failure due to disabled service."""
|
||||
@ -168,7 +169,7 @@ class SnapshotManageTest(test.TestCase):
|
||||
new_callable=mock.PropertyMock)
|
||||
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
|
||||
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
|
||||
@mock.patch('cinder.db.service_get')
|
||||
@mock.patch('cinder.db.sqlalchemy.api.service_get')
|
||||
def test_manage_snapshot_is_down(self, mock_db, mock_create_snapshot,
|
||||
mock_rpcapi, mock_is_up):
|
||||
"""Test manage snapshot failure due to down service."""
|
||||
@ -280,7 +281,7 @@ class SnapshotManageTest(test.TestCase):
|
||||
sort_dirs=['asc'], sort_keys=['reference'])
|
||||
|
||||
@mock.patch('cinder.objects.service.Service.is_up', return_value=True)
|
||||
@mock.patch('cinder.db.service_get')
|
||||
@mock.patch('cinder.db.sqlalchemy.api.service_get')
|
||||
def test_get_manageable_snapshots_disabled(self, mock_db, mock_is_up):
|
||||
mock_db.return_value = fake_service.fake_service_obj(self._admin_ctxt,
|
||||
disabled=True)
|
||||
@ -292,7 +293,7 @@ class SnapshotManageTest(test.TestCase):
|
||||
|
||||
@mock.patch('cinder.objects.service.Service.is_up', return_value=False,
|
||||
new_callable=mock.PropertyMock)
|
||||
@mock.patch('cinder.db.service_get')
|
||||
@mock.patch('cinder.db.sqlalchemy.api.service_get')
|
||||
def test_get_manageable_snapshots_is_down(self, mock_db, mock_is_up):
|
||||
mock_db.return_value = fake_service.fake_service_obj(self._admin_ctxt)
|
||||
res = self._get_resp_get('host_ok', False, True)
|
||||
|
@ -23,6 +23,7 @@ except ImportError:
|
||||
from urllib.parse import urlencode
|
||||
import webob
|
||||
|
||||
from cinder.api.contrib import volume_manage
|
||||
from cinder.api.openstack import api_version_request as api_version
|
||||
from cinder import context
|
||||
from cinder import exception
|
||||
@ -51,7 +52,7 @@ def app_v3():
|
||||
return mapper
|
||||
|
||||
|
||||
def service_get(context, host, binary):
|
||||
def service_get(context, id, host=None, binary=None, *args, **kwargs):
|
||||
"""Replacement for Service.service_get_by_host_and_topic.
|
||||
|
||||
We mock the Service.service_get_by_host_and_topic method to return
|
||||
@ -146,7 +147,7 @@ def api_get_manageable_volumes(*args, **kwargs):
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@mock.patch('cinder.db.service_get', service_get)
|
||||
@mock.patch('cinder.db.sqlalchemy.api.service_get', service_get)
|
||||
@mock.patch('cinder.volume.volume_types.get_volume_type_by_name',
|
||||
vt_get_volume_type_by_name)
|
||||
@mock.patch('cinder.volume.volume_types.get_volume_type',
|
||||
@ -173,6 +174,7 @@ class VolumeManageTest(test.TestCase):
|
||||
self._non_admin_ctxt = context.RequestContext(fake.USER_ID,
|
||||
fake.PROJECT_ID,
|
||||
is_admin=False)
|
||||
self.controller = volume_manage.VolumeManageController()
|
||||
|
||||
def _get_resp_post(self, body):
|
||||
"""Helper to execute a POST os-volume-manage API call."""
|
||||
@ -196,10 +198,11 @@ class VolumeManageTest(test.TestCase):
|
||||
res = req.get_response(app_v3())
|
||||
return res
|
||||
|
||||
@ddt.data(False, True)
|
||||
@mock.patch('cinder.volume.api.API.manage_existing', wraps=api_manage)
|
||||
@mock.patch(
|
||||
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
|
||||
def test_manage_volume_ok(self, mock_validate, mock_api_manage):
|
||||
def test_manage_volume_ok(self, cluster, mock_validate, mock_api_manage):
|
||||
"""Test successful manage volume execution.
|
||||
|
||||
Tests for correct operation when valid arguments are passed in the
|
||||
@ -209,6 +212,9 @@ class VolumeManageTest(test.TestCase):
|
||||
"""
|
||||
body = {'volume': {'host': 'host_ok',
|
||||
'ref': 'fake_ref'}}
|
||||
# This will be ignored
|
||||
if cluster:
|
||||
body['volume']['cluster'] = 'cluster'
|
||||
res = self._get_resp_post(body)
|
||||
self.assertEqual(202, res.status_int)
|
||||
|
||||
@ -216,9 +222,48 @@ class VolumeManageTest(test.TestCase):
|
||||
self.assertEqual(1, mock_api_manage.call_count)
|
||||
args = mock_api_manage.call_args[0]
|
||||
self.assertEqual(body['volume']['host'], args[1])
|
||||
self.assertEqual(body['volume']['ref'], args[2])
|
||||
self.assertIsNone(args[2]) # Cluster argument
|
||||
self.assertEqual(body['volume']['ref'], args[3])
|
||||
self.assertTrue(mock_validate.called)
|
||||
|
||||
def _get_resp_create(self, body, version='3.0'):
|
||||
url = '/v3/%s/os-volume-manage' % fake.PROJECT_ID
|
||||
req = webob.Request.blank(url, base_url='http://localhost.com' + url)
|
||||
req.method = 'POST'
|
||||
req.headers['Content-Type'] = 'application/json'
|
||||
req.environ['cinder.context'] = self._admin_ctxt
|
||||
req.body = jsonutils.dump_as_bytes(body)
|
||||
req.headers = {'OpenStack-API-Version': 'volume %s' % version}
|
||||
req.api_version_request = api_version.APIVersionRequest(version)
|
||||
res = self.controller.create(req, body)
|
||||
return res
|
||||
|
||||
@mock.patch('cinder.volume.api.API.manage_existing', wraps=api_manage)
|
||||
@mock.patch(
|
||||
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
|
||||
def test_manage_volume_ok_cluster(self, mock_validate, mock_api_manage):
|
||||
body = {'volume': {'cluster': 'cluster',
|
||||
'ref': 'fake_ref'}}
|
||||
res = self._get_resp_create(body, '3.16')
|
||||
self.assertEqual(['volume'], list(res.keys()))
|
||||
|
||||
# Check that the manage API was called with the correct arguments.
|
||||
self.assertEqual(1, mock_api_manage.call_count)
|
||||
args = mock_api_manage.call_args[0]
|
||||
self.assertIsNone(args[1])
|
||||
self.assertEqual(body['volume']['cluster'], args[2])
|
||||
self.assertEqual(body['volume']['ref'], args[3])
|
||||
self.assertTrue(mock_validate.called)
|
||||
|
||||
@mock.patch(
|
||||
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
|
||||
def test_manage_volume_fail_host_cluster(self, mock_validate):
|
||||
body = {'volume': {'host': 'host_ok',
|
||||
'cluster': 'cluster',
|
||||
'ref': 'fake_ref'}}
|
||||
self.assertRaises(exception.InvalidInput,
|
||||
self._get_resp_create, body, '3.16')
|
||||
|
||||
def test_manage_volume_missing_host(self):
|
||||
"""Test correct failure when host is not specified."""
|
||||
body = {'volume': {'ref': 'fake_ref'}}
|
||||
|
@ -60,7 +60,7 @@ class SnapshotManageTest(test.TestCase):
|
||||
|
||||
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
|
||||
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
|
||||
@mock.patch('cinder.objects.service.Service.get_by_args')
|
||||
@mock.patch('cinder.objects.service.Service.get_by_id')
|
||||
def test_manage_snapshot_route(self, mock_service_get,
|
||||
mock_create_snapshot, mock_rpcapi):
|
||||
"""Test call to manage snapshot.
|
||||
|
@ -23,6 +23,9 @@ from cinder.scheduler import filter_scheduler
|
||||
from cinder.scheduler import host_manager
|
||||
|
||||
|
||||
UTC_NOW = timeutils.utcnow()
|
||||
|
||||
|
||||
class FakeFilterScheduler(filter_scheduler.FilterScheduler):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(FakeFilterScheduler, self).__init__(*args, **kwargs)
|
||||
@ -43,7 +46,7 @@ class FakeHostManager(host_manager.HostManager):
|
||||
'thick_provisioning_support': True,
|
||||
'reserved_percentage': 10,
|
||||
'volume_backend_name': 'lvm1',
|
||||
'timestamp': None},
|
||||
'timestamp': UTC_NOW},
|
||||
'host2': {'total_capacity_gb': 2048,
|
||||
'free_capacity_gb': 300,
|
||||
'allocated_capacity_gb': 1748,
|
||||
@ -53,7 +56,7 @@ class FakeHostManager(host_manager.HostManager):
|
||||
'thick_provisioning_support': False,
|
||||
'reserved_percentage': 10,
|
||||
'volume_backend_name': 'lvm2',
|
||||
'timestamp': None},
|
||||
'timestamp': UTC_NOW},
|
||||
'host3': {'total_capacity_gb': 512,
|
||||
'free_capacity_gb': 256,
|
||||
'allocated_capacity_gb': 256,
|
||||
@ -63,7 +66,7 @@ class FakeHostManager(host_manager.HostManager):
|
||||
'thick_provisioning_support': True,
|
||||
'reserved_percentage': 0,
|
||||
'volume_backend_name': 'lvm3',
|
||||
'timestamp': None},
|
||||
'timestamp': UTC_NOW},
|
||||
'host4': {'total_capacity_gb': 2048,
|
||||
'free_capacity_gb': 200,
|
||||
'allocated_capacity_gb': 1848,
|
||||
@ -73,7 +76,7 @@ class FakeHostManager(host_manager.HostManager):
|
||||
'thick_provisioning_support': False,
|
||||
'reserved_percentage': 5,
|
||||
'volume_backend_name': 'lvm4',
|
||||
'timestamp': None,
|
||||
'timestamp': UTC_NOW,
|
||||
'consistencygroup_support': True},
|
||||
'host5': {'total_capacity_gb': 'infinite',
|
||||
'free_capacity_gb': 'unknown',
|
||||
@ -83,13 +86,13 @@ class FakeHostManager(host_manager.HostManager):
|
||||
'thin_provisioning_support': True,
|
||||
'thick_provisioning_support': False,
|
||||
'reserved_percentage': 5,
|
||||
'timestamp': None},
|
||||
'timestamp': UTC_NOW},
|
||||
}
|
||||
|
||||
|
||||
class FakeHostState(host_manager.HostState):
|
||||
def __init__(self, host, attribute_dict):
|
||||
super(FakeHostState, self).__init__(host)
|
||||
super(FakeHostState, self).__init__(host, None)
|
||||
for (key, val) in attribute_dict.items():
|
||||
setattr(self, key, val)
|
||||
|
||||
|
@ -178,7 +178,7 @@ class TestBaseFilterHandler(test.TestCase):
|
||||
def test_get_filtered_objects_info_and_debug_log_none_returned(self):
|
||||
|
||||
all_filters = [FilterA, FilterA, FilterB]
|
||||
fake_hosts = [host_manager.HostState('fake_host%s' % x)
|
||||
fake_hosts = [host_manager.HostState('fake_host%s' % x, None)
|
||||
for x in range(1, 4)]
|
||||
|
||||
filt_props = {"request_spec": {'volume_id': fake.VOLUME_ID,
|
||||
|
@ -15,6 +15,7 @@
|
||||
"""
|
||||
Tests For Capacity Weigher.
|
||||
"""
|
||||
from datetime import datetime
|
||||
|
||||
import ddt
|
||||
import mock
|
||||
@ -248,7 +249,7 @@ class CapacityWeigherTestCase(test.TestCase):
|
||||
'thin_provisioning_support': True,
|
||||
'thick_provisioning_support': False,
|
||||
'reserved_percentage': 5,
|
||||
'timestamp': None}
|
||||
'timestamp': datetime.utcnow()}
|
||||
hostinfo_list = self._get_all_hosts()
|
||||
|
||||
# host1: thin_provisioning_support = False
|
||||
@ -290,7 +291,7 @@ class CapacityWeigherTestCase(test.TestCase):
|
||||
'thin_provisioning_support': True,
|
||||
'thick_provisioning_support': False,
|
||||
'reserved_percentage': 5,
|
||||
'timestamp': None}
|
||||
'timestamp': datetime.utcnow()}
|
||||
hostinfo_list = self._get_all_hosts()
|
||||
|
||||
# host1: thin_provisioning_support = False
|
||||
@ -332,7 +333,7 @@ class CapacityWeigherTestCase(test.TestCase):
|
||||
'thin_provisioning_support': True,
|
||||
'thick_provisioning_support': False,
|
||||
'reserved_percentage': 5,
|
||||
'timestamp': None}
|
||||
'timestamp': datetime.utcnow()}
|
||||
hostinfo_list = self._get_all_hosts()
|
||||
|
||||
# host1: thin_provisioning_support = False
|
||||
@ -374,7 +375,7 @@ class CapacityWeigherTestCase(test.TestCase):
|
||||
'thin_provisioning_support': True,
|
||||
'thick_provisioning_support': False,
|
||||
'reserved_percentage': 5,
|
||||
'timestamp': None}
|
||||
'timestamp': datetime.utcnow()}
|
||||
hostinfo_list = self._get_all_hosts()
|
||||
|
||||
# host1: thin_provisioning_support = False
|
||||
|
@ -58,7 +58,7 @@ class ChanceWeigherTestCase(test.TestCase):
|
||||
# ensure we don't lose any hosts when weighing with
|
||||
# the ChanceWeigher
|
||||
hm = host_manager.HostManager()
|
||||
fake_hosts = [host_manager.HostState('fake_host%s' % x)
|
||||
fake_hosts = [host_manager.HostState('fake_host%s' % x, None)
|
||||
for x in range(1, 5)]
|
||||
weighed_hosts = hm.get_weighed_hosts(fake_hosts, {}, 'ChanceWeigher')
|
||||
self.assertEqual(4, len(weighed_hosts))
|
||||
|
@ -414,7 +414,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
||||
filter_properties = {'retry': retry}
|
||||
sched = fakes.FakeFilterScheduler()
|
||||
|
||||
host_state = host_manager.HostState('host')
|
||||
host_state = host_manager.HostState('host', None)
|
||||
host_state.total_capacity_gb = 1024
|
||||
sched._post_select_populate_filter_properties(filter_properties,
|
||||
host_state)
|
||||
|
@ -19,14 +19,18 @@ Tests For HostManager
|
||||
from datetime import datetime
|
||||
|
||||
import mock
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from cinder.common import constants
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder import exception
|
||||
from cinder import objects
|
||||
from cinder.scheduler import filters
|
||||
from cinder.scheduler import host_manager
|
||||
from cinder import test
|
||||
from cinder.tests.unit import fake_constants as fake
|
||||
from cinder.tests.unit.objects import test_service
|
||||
|
||||
|
||||
@ -46,7 +50,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(HostManagerTestCase, self).setUp()
|
||||
self.host_manager = host_manager.HostManager()
|
||||
self.fake_hosts = [host_manager.HostState('fake_host%s' % x)
|
||||
self.fake_hosts = [host_manager.HostState('fake_host%s' % x, None)
|
||||
for x in range(1, 5)]
|
||||
# For a second scheduler service.
|
||||
self.host_manager_1 = host_manager.HostManager()
|
||||
@ -93,27 +97,29 @@ class HostManagerTestCase(test.TestCase):
|
||||
_mock_get_updated_pools):
|
||||
service_states = self.host_manager.service_states
|
||||
self.assertDictMatch({}, service_states)
|
||||
_mock_utcnow.side_effect = [31337, 31338, 31339]
|
||||
_mock_utcnow.side_effect = [31338, 31339]
|
||||
|
||||
_mock_get_updated_pools.return_value = []
|
||||
host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1)
|
||||
host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1)
|
||||
host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1)
|
||||
timestamp = jsonutils.to_primitive(datetime.utcnow())
|
||||
host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=timestamp)
|
||||
host2_volume_capabs = dict(free_capacity_gb=5432)
|
||||
host3_volume_capabs = dict(free_capacity_gb=6543)
|
||||
|
||||
service_name = 'volume'
|
||||
self.host_manager.update_service_capabilities(service_name, 'host1',
|
||||
host1_volume_capabs)
|
||||
host1_volume_capabs,
|
||||
None, timestamp)
|
||||
self.host_manager.update_service_capabilities(service_name, 'host2',
|
||||
host2_volume_capabs)
|
||||
host2_volume_capabs,
|
||||
None, None)
|
||||
self.host_manager.update_service_capabilities(service_name, 'host3',
|
||||
host3_volume_capabs)
|
||||
host3_volume_capabs,
|
||||
None, None)
|
||||
|
||||
# Make sure dictionary isn't re-assigned
|
||||
self.assertEqual(service_states, self.host_manager.service_states)
|
||||
# Make sure original dictionary wasn't copied
|
||||
self.assertEqual(1, host1_volume_capabs['timestamp'])
|
||||
|
||||
host1_volume_capabs['timestamp'] = 31337
|
||||
host1_volume_capabs['timestamp'] = timestamp
|
||||
host2_volume_capabs['timestamp'] = 31338
|
||||
host3_volume_capabs['timestamp'] = 31339
|
||||
|
||||
@ -150,7 +156,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
|
||||
# S0: update_service_capabilities()
|
||||
self.host_manager.update_service_capabilities(service_name, 'host1',
|
||||
capab1)
|
||||
capab1, None, None)
|
||||
self.assertDictMatch(dict(dict(timestamp=31337), **capab1),
|
||||
self.host_manager.service_states['host1'])
|
||||
|
||||
@ -168,7 +174,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
|
||||
# S1: update_service_capabilities()
|
||||
self.host_manager_1.update_service_capabilities(service_name, 'host1',
|
||||
capab1)
|
||||
capab1, None, None)
|
||||
|
||||
self.assertDictMatch(dict(dict(timestamp=31339), **capab1),
|
||||
self.host_manager_1.service_states['host1'])
|
||||
@ -208,7 +214,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
|
||||
# S0: update_service_capabilities()
|
||||
self.host_manager.update_service_capabilities(service_name, 'host1',
|
||||
capab1)
|
||||
capab1, None, None)
|
||||
|
||||
self.assertDictMatch(dict(dict(timestamp=31340), **capab1),
|
||||
self.host_manager.service_states['host1'])
|
||||
@ -219,7 +225,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
|
||||
# S1: update_service_capabilities()
|
||||
self.host_manager_1.update_service_capabilities(service_name, 'host1',
|
||||
capab1)
|
||||
capab1, None, None)
|
||||
|
||||
self.assertDictMatch(dict(dict(timestamp=31341), **capab1),
|
||||
self.host_manager_1.service_states['host1'])
|
||||
@ -292,7 +298,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
|
||||
# S0: update_service_capabilities()
|
||||
self.host_manager.update_service_capabilities(service_name, 'host1',
|
||||
capab1)
|
||||
capab1, None, None)
|
||||
|
||||
self.assertDictMatch(
|
||||
dict(dict(timestamp=31340), **capab1),
|
||||
@ -303,7 +309,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
|
||||
# S1: update_service_capabilities()
|
||||
self.host_manager_1.update_service_capabilities(service_name, 'host1',
|
||||
capab1)
|
||||
capab1, None, None)
|
||||
self.assertDictMatch(dict(dict(timestamp=31345), **capab1),
|
||||
self.host_manager_1.service_states['host1'])
|
||||
|
||||
@ -355,7 +361,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
|
||||
# S0: update_service_capabilities()
|
||||
self.host_manager.update_service_capabilities(service_name, 'host1',
|
||||
capab2)
|
||||
capab2, None, None)
|
||||
self.assertDictMatch(
|
||||
dict(dict(timestamp=31340), **capab1),
|
||||
self.host_manager.service_states_last_update['host1'])
|
||||
@ -378,7 +384,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
|
||||
# S1: update_service_capabilities()
|
||||
self.host_manager_1.update_service_capabilities(service_name, 'host1',
|
||||
capab2)
|
||||
capab2, None, None)
|
||||
self.assertDictMatch(dict(dict(timestamp=31348), **capab2),
|
||||
self.host_manager_1.service_states['host1'])
|
||||
|
||||
@ -452,7 +458,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
|
||||
# S0: update_service_capabilities()
|
||||
self.host_manager.update_service_capabilities(service_name, 'host1',
|
||||
capab2)
|
||||
capab2, None, None)
|
||||
self.assertDictMatch(
|
||||
dict(dict(timestamp=31349), **capab2),
|
||||
self.host_manager.service_states_last_update['host1'])
|
||||
@ -462,7 +468,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
|
||||
# S1: update_service_capabilities()
|
||||
self.host_manager_1.update_service_capabilities(service_name, 'host1',
|
||||
capab2)
|
||||
capab2, None, None)
|
||||
|
||||
self.assertDictMatch(
|
||||
dict(dict(timestamp=31348), **capab2),
|
||||
@ -490,19 +496,23 @@ class HostManagerTestCase(test.TestCase):
|
||||
self.host_manager = host_manager.HostManager()
|
||||
self.assertFalse(self.host_manager.has_all_capabilities())
|
||||
|
||||
host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1)
|
||||
host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1)
|
||||
host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1)
|
||||
timestamp = jsonutils.to_primitive(datetime.utcnow())
|
||||
host1_volume_capabs = dict(free_capacity_gb=4321)
|
||||
host2_volume_capabs = dict(free_capacity_gb=5432)
|
||||
host3_volume_capabs = dict(free_capacity_gb=6543)
|
||||
|
||||
service_name = 'volume'
|
||||
self.host_manager.update_service_capabilities(service_name, 'host1',
|
||||
host1_volume_capabs)
|
||||
host1_volume_capabs,
|
||||
None, timestamp)
|
||||
self.assertFalse(self.host_manager.has_all_capabilities())
|
||||
self.host_manager.update_service_capabilities(service_name, 'host2',
|
||||
host2_volume_capabs)
|
||||
host2_volume_capabs,
|
||||
None, timestamp)
|
||||
self.assertFalse(self.host_manager.has_all_capabilities())
|
||||
self.host_manager.update_service_capabilities(service_name, 'host3',
|
||||
host3_volume_capabs)
|
||||
host3_volume_capabs,
|
||||
None, timestamp)
|
||||
self.assertTrue(self.host_manager.has_all_capabilities())
|
||||
|
||||
@mock.patch('cinder.db.service_get_all')
|
||||
@ -532,7 +542,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
mocked_service_states = {
|
||||
'host1': dict(volume_backend_name='AAA',
|
||||
total_capacity_gb=512, free_capacity_gb=200,
|
||||
timestamp=None, reserved_percentage=0),
|
||||
timestamp=dates[1], reserved_percentage=0),
|
||||
}
|
||||
|
||||
_mock_service_get_all.return_value = services
|
||||
@ -547,24 +557,93 @@ class HostManagerTestCase(test.TestCase):
|
||||
mocked_service_states):
|
||||
self.host_manager.update_service_capabilities(service_name,
|
||||
'host1',
|
||||
host_volume_capabs)
|
||||
host_volume_capabs,
|
||||
None, None)
|
||||
res = self.host_manager.get_pools(context)
|
||||
self.assertEqual(1, len(res))
|
||||
self.assertEqual(dates[1], res[0]['capabilities']['timestamp'])
|
||||
|
||||
self.host_manager.update_service_capabilities(service_name,
|
||||
'host1',
|
||||
host_volume_capabs)
|
||||
host_volume_capabs,
|
||||
None, None)
|
||||
res = self.host_manager.get_pools(context)
|
||||
self.assertEqual(1, len(res))
|
||||
self.assertEqual(dates[2], res[0]['capabilities']['timestamp'])
|
||||
|
||||
@mock.patch('cinder.objects.Service.is_up', True)
|
||||
def test_get_all_host_states_cluster(self):
|
||||
"""Test get_all_host_states when we have clustered services.
|
||||
|
||||
Confirm that clustered services are grouped and that only the latest
|
||||
of the capability reports is relevant.
|
||||
"""
|
||||
ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
|
||||
|
||||
cluster_name = 'cluster'
|
||||
db.cluster_create(ctxt, {'name': cluster_name,
|
||||
'binary': constants.VOLUME_BINARY})
|
||||
|
||||
services = (
|
||||
db.service_create(ctxt,
|
||||
{'host': 'clustered_host_1',
|
||||
'topic': constants.VOLUME_TOPIC,
|
||||
'binary': constants.VOLUME_BINARY,
|
||||
'cluster_name': cluster_name,
|
||||
'created_at': timeutils.utcnow()}),
|
||||
# Even if this service is disabled, since it belongs to an enabled
|
||||
# cluster, it's not really disabled.
|
||||
db.service_create(ctxt,
|
||||
{'host': 'clustered_host_2',
|
||||
'topic': constants.VOLUME_TOPIC,
|
||||
'binary': constants.VOLUME_BINARY,
|
||||
'disabled': True,
|
||||
'cluster_name': cluster_name,
|
||||
'created_at': timeutils.utcnow()}),
|
||||
db.service_create(ctxt,
|
||||
{'host': 'clustered_host_3',
|
||||
'topic': constants.VOLUME_TOPIC,
|
||||
'binary': constants.VOLUME_BINARY,
|
||||
'cluster_name': cluster_name,
|
||||
'created_at': timeutils.utcnow()}),
|
||||
db.service_create(ctxt,
|
||||
{'host': 'non_clustered_host',
|
||||
'topic': constants.VOLUME_TOPIC,
|
||||
'binary': constants.VOLUME_BINARY,
|
||||
'created_at': timeutils.utcnow()}),
|
||||
# This service has no capabilities
|
||||
db.service_create(ctxt,
|
||||
{'host': 'no_capabilities_host',
|
||||
'topic': constants.VOLUME_TOPIC,
|
||||
'binary': constants.VOLUME_BINARY,
|
||||
'created_at': timeutils.utcnow()}),
|
||||
)
|
||||
|
||||
capabilities = ((1, {'free_capacity_gb': 1000}),
|
||||
# This is the capacity that will be selected for the
|
||||
# cluster because is the one with the latest timestamp.
|
||||
(3, {'free_capacity_gb': 2000}),
|
||||
(2, {'free_capacity_gb': 3000}),
|
||||
(1, {'free_capacity_gb': 4000}))
|
||||
|
||||
for i in range(len(capabilities)):
|
||||
self.host_manager.update_service_capabilities(
|
||||
'volume', services[i].host, capabilities[i][1],
|
||||
services[i].cluster_name, capabilities[i][0])
|
||||
|
||||
res = self.host_manager.get_all_host_states(ctxt)
|
||||
result = {(s.cluster_name or s.host, s.free_capacity_gb) for s in res}
|
||||
expected = {(cluster_name + '#_pool0', 2000),
|
||||
('non_clustered_host#_pool0', 4000)}
|
||||
self.assertSetEqual(expected, result)
|
||||
|
||||
@mock.patch('cinder.db.service_get_all')
|
||||
@mock.patch('cinder.objects.service.Service.is_up',
|
||||
new_callable=mock.PropertyMock)
|
||||
def test_get_all_host_states(self, _mock_service_is_up,
|
||||
_mock_service_get_all):
|
||||
context = 'fake_context'
|
||||
timestamp = datetime.utcnow()
|
||||
topic = constants.VOLUME_TOPIC
|
||||
|
||||
services = [
|
||||
@ -596,15 +675,15 @@ class HostManagerTestCase(test.TestCase):
|
||||
service_states = {
|
||||
'host1': dict(volume_backend_name='AAA',
|
||||
total_capacity_gb=512, free_capacity_gb=200,
|
||||
timestamp=None, reserved_percentage=0,
|
||||
timestamp=timestamp, reserved_percentage=0,
|
||||
provisioned_capacity_gb=312),
|
||||
'host2': dict(volume_backend_name='BBB',
|
||||
total_capacity_gb=256, free_capacity_gb=100,
|
||||
timestamp=None, reserved_percentage=0,
|
||||
timestamp=timestamp, reserved_percentage=0,
|
||||
provisioned_capacity_gb=156),
|
||||
'host3': dict(volume_backend_name='CCC',
|
||||
total_capacity_gb=10000, free_capacity_gb=700,
|
||||
timestamp=None, reserved_percentage=0,
|
||||
timestamp=timestamp, reserved_percentage=0,
|
||||
provisioned_capacity_gb=9300),
|
||||
}
|
||||
# First test: service.is_up is always True, host5 is disabled,
|
||||
@ -665,6 +744,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
def test_get_pools(self, _mock_service_is_up,
|
||||
_mock_service_get_all):
|
||||
context = 'fake_context'
|
||||
timestamp = datetime.utcnow()
|
||||
|
||||
services = [
|
||||
dict(id=1, host='host1', topic='volume', disabled=False,
|
||||
@ -678,15 +758,15 @@ class HostManagerTestCase(test.TestCase):
|
||||
mocked_service_states = {
|
||||
'host1': dict(volume_backend_name='AAA',
|
||||
total_capacity_gb=512, free_capacity_gb=200,
|
||||
timestamp=None, reserved_percentage=0,
|
||||
timestamp=timestamp, reserved_percentage=0,
|
||||
provisioned_capacity_gb=312),
|
||||
'host2@back1': dict(volume_backend_name='BBB',
|
||||
total_capacity_gb=256, free_capacity_gb=100,
|
||||
timestamp=None, reserved_percentage=0,
|
||||
timestamp=timestamp, reserved_percentage=0,
|
||||
provisioned_capacity_gb=156),
|
||||
'host2@back2': dict(volume_backend_name='CCC',
|
||||
total_capacity_gb=10000, free_capacity_gb=700,
|
||||
timestamp=None, reserved_percentage=0,
|
||||
timestamp=timestamp, reserved_percentage=0,
|
||||
provisioned_capacity_gb=9300),
|
||||
}
|
||||
|
||||
@ -706,7 +786,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
{
|
||||
'name': 'host1#AAA',
|
||||
'capabilities': {
|
||||
'timestamp': None,
|
||||
'timestamp': timestamp,
|
||||
'volume_backend_name': 'AAA',
|
||||
'free_capacity_gb': 200,
|
||||
'driver_version': None,
|
||||
@ -719,7 +799,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
{
|
||||
'name': 'host2@back1#BBB',
|
||||
'capabilities': {
|
||||
'timestamp': None,
|
||||
'timestamp': timestamp,
|
||||
'volume_backend_name': 'BBB',
|
||||
'free_capacity_gb': 100,
|
||||
'driver_version': None,
|
||||
@ -732,7 +812,7 @@ class HostManagerTestCase(test.TestCase):
|
||||
{
|
||||
'name': 'host2@back2#CCC',
|
||||
'capabilities': {
|
||||
'timestamp': None,
|
||||
'timestamp': timestamp,
|
||||
'volume_backend_name': 'CCC',
|
||||
'free_capacity_gb': 700,
|
||||
'driver_version': None,
|
||||
@ -887,7 +967,7 @@ class HostStateTestCase(test.TestCase):
|
||||
"""Test case for HostState class."""
|
||||
|
||||
def test_update_from_volume_capability_nopool(self):
|
||||
fake_host = host_manager.HostState('host1')
|
||||
fake_host = host_manager.HostState('host1', None)
|
||||
self.assertIsNone(fake_host.free_capacity_gb)
|
||||
|
||||
volume_capability = {'total_capacity_gb': 1024,
|
||||
@ -922,7 +1002,7 @@ class HostStateTestCase(test.TestCase):
|
||||
self.assertRaises(KeyError, lambda: fake_host.pools['pool0'])
|
||||
|
||||
def test_update_from_volume_capability_with_pools(self):
|
||||
fake_host = host_manager.HostState('host1')
|
||||
fake_host = host_manager.HostState('host1', None)
|
||||
self.assertIsNone(fake_host.free_capacity_gb)
|
||||
capability = {
|
||||
'volume_backend_name': 'Local iSCSI',
|
||||
@ -1014,7 +1094,7 @@ class HostStateTestCase(test.TestCase):
|
||||
fake_host.pools['3rd pool'].provisioned_capacity_gb)
|
||||
|
||||
def test_update_from_volume_infinite_capability(self):
|
||||
fake_host = host_manager.HostState('host1')
|
||||
fake_host = host_manager.HostState('host1', None)
|
||||
self.assertIsNone(fake_host.free_capacity_gb)
|
||||
|
||||
volume_capability = {'total_capacity_gb': 'infinite',
|
||||
@ -1035,7 +1115,7 @@ class HostStateTestCase(test.TestCase):
|
||||
fake_host.pools['_pool0'].free_capacity_gb)
|
||||
|
||||
def test_update_from_volume_unknown_capability(self):
|
||||
fake_host = host_manager.HostState('host1')
|
||||
fake_host = host_manager.HostState('host1', None)
|
||||
self.assertIsNone(fake_host.free_capacity_gb)
|
||||
|
||||
volume_capability = {'total_capacity_gb': 'infinite',
|
||||
@ -1056,7 +1136,7 @@ class HostStateTestCase(test.TestCase):
|
||||
fake_host.pools['_pool0'].free_capacity_gb)
|
||||
|
||||
def test_update_from_empty_volume_capability(self):
|
||||
fake_host = host_manager.HostState('host1')
|
||||
fake_host = host_manager.HostState('host1', None)
|
||||
|
||||
vol_cap = {'timestamp': None}
|
||||
|
||||
@ -1076,7 +1156,7 @@ class PoolStateTestCase(test.TestCase):
|
||||
"""Test case for HostState class."""
|
||||
|
||||
def test_update_from_volume_capability(self):
|
||||
fake_pool = host_manager.PoolState('host1', None, 'pool0')
|
||||
fake_pool = host_manager.PoolState('host1', None, None, 'pool0')
|
||||
self.assertIsNone(fake_pool.free_capacity_gb)
|
||||
|
||||
volume_capability = {'total_capacity_gb': 1024,
|
||||
|
@ -17,6 +17,7 @@
|
||||
Unit Tests for cinder.scheduler.rpcapi
|
||||
"""
|
||||
|
||||
import ddt
|
||||
import mock
|
||||
|
||||
from cinder import context
|
||||
@ -27,6 +28,7 @@ from cinder.tests.unit import fake_constants
|
||||
from cinder.tests.unit import fake_volume
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class SchedulerRpcAPITestCase(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -75,14 +77,20 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
for kwarg, value in self.fake_kwargs.items():
|
||||
self.assertEqual(expected_msg[kwarg], value)
|
||||
|
||||
def test_update_service_capabilities(self):
|
||||
@ddt.data('3.0', '3.3')
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version')
|
||||
def test_update_service_capabilities(self, version, can_send_version):
|
||||
can_send_version.side_effect = lambda x: x == version
|
||||
self._test_scheduler_api('update_service_capabilities',
|
||||
rpc_method='cast',
|
||||
service_name='fake_name',
|
||||
host='fake_host',
|
||||
capabilities='fake_capabilities',
|
||||
cluster_name='cluster_name',
|
||||
capabilities={},
|
||||
fanout=True,
|
||||
version='3.0')
|
||||
version=version,
|
||||
timestamp='123')
|
||||
can_send_version.assert_called_once_with('3.3')
|
||||
|
||||
def test_create_volume(self):
|
||||
volume = fake_volume.fake_volume_obj(self.context)
|
||||
@ -135,17 +143,18 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
version='3.0')
|
||||
create_worker_mock.assert_called_once()
|
||||
|
||||
def test_migrate_volume_to_host(self):
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version')
|
||||
def test_migrate_volume(self, can_send_version):
|
||||
volume = fake_volume.fake_volume_obj(self.context)
|
||||
create_worker_mock = self.mock_object(volume, 'create_worker')
|
||||
self._test_scheduler_api('migrate_volume_to_host',
|
||||
self._test_scheduler_api('migrate_volume',
|
||||
rpc_method='cast',
|
||||
host='host',
|
||||
force_host_copy=True,
|
||||
backend='host',
|
||||
force_copy=True,
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
volume=volume,
|
||||
version='3.0')
|
||||
version='3.3')
|
||||
create_worker_mock.assert_not_called()
|
||||
|
||||
def test_retype(self):
|
||||
|
@ -103,7 +103,7 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
self.manager.update_service_capabilities(self.context,
|
||||
service_name=service,
|
||||
host=host)
|
||||
_mock_update_cap.assert_called_once_with(service, host, {})
|
||||
_mock_update_cap.assert_called_once_with(service, host, {}, None, None)
|
||||
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.'
|
||||
'update_service_capabilities')
|
||||
@ -117,7 +117,8 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
service_name=service,
|
||||
host=host,
|
||||
capabilities=capabilities)
|
||||
_mock_update_cap.assert_called_once_with(service, host, capabilities)
|
||||
_mock_update_cap.assert_called_once_with(service, host, capabilities,
|
||||
None, None)
|
||||
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
|
||||
@mock.patch('cinder.message.api.API.create')
|
||||
@ -164,6 +165,19 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
request_spec_obj, {})
|
||||
self.assertFalse(_mock_sleep.called)
|
||||
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
|
||||
@mock.patch('eventlet.sleep')
|
||||
def test_create_volume_set_worker(self, _mock_sleep, _mock_sched_create):
|
||||
"""Make sure that the worker is created when creating a volume."""
|
||||
volume = tests_utils.create_volume(self.context, status='creating')
|
||||
|
||||
request_spec = {'volume_id': volume.id}
|
||||
|
||||
self.manager.create_volume(self.context, volume,
|
||||
request_spec=request_spec,
|
||||
filter_properties={})
|
||||
volume.set_worker.assert_called_once_with()
|
||||
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.is_ready')
|
||||
@mock.patch('eventlet.sleep')
|
||||
@ -326,6 +340,13 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
|
||||
self.manager.driver = original_driver
|
||||
|
||||
def test_do_cleanup(self):
|
||||
vol = tests_utils.create_volume(self.context, status='creating')
|
||||
self.manager._do_cleanup(self.context, vol)
|
||||
|
||||
vol.refresh()
|
||||
self.assertEqual('error', vol.status)
|
||||
|
||||
|
||||
class SchedulerTestCase(test.TestCase):
|
||||
"""Test case for base scheduler driver class."""
|
||||
@ -346,9 +367,9 @@ class SchedulerTestCase(test.TestCase):
|
||||
host = 'fake_host'
|
||||
capabilities = {'fake_capability': 'fake_value'}
|
||||
self.driver.update_service_capabilities(service_name, host,
|
||||
capabilities)
|
||||
capabilities, None)
|
||||
_mock_update_cap.assert_called_once_with(service_name, host,
|
||||
capabilities)
|
||||
capabilities, None)
|
||||
|
||||
@mock.patch('cinder.scheduler.host_manager.HostManager.'
|
||||
'has_all_capabilities', return_value=False)
|
||||
@ -387,8 +408,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
|
||||
volume = fake_volume.fake_volume_obj(self.context)
|
||||
_mock_volume_get.return_value = volume
|
||||
|
||||
driver.volume_update_db(self.context, volume.id, 'fake_host')
|
||||
driver.volume_update_db(self.context, volume.id, 'fake_host',
|
||||
'fake_cluster')
|
||||
scheduled_at = volume.scheduled_at.replace(tzinfo=None)
|
||||
_mock_vol_update.assert_called_once_with(
|
||||
self.context, volume.id, {'host': 'fake_host',
|
||||
'cluster_name': 'fake_cluster',
|
||||
'scheduled_at': scheduled_at})
|
||||
|
@ -167,6 +167,39 @@ class DBAPIServiceTestCase(BaseTest):
|
||||
real_service1 = db.service_get(self.ctxt, host='host1', topic='topic1')
|
||||
self._assertEqualObjects(service1, real_service1)
|
||||
|
||||
def test_service_get_all_disabled_by_cluster(self):
|
||||
values = [
|
||||
# Enabled services
|
||||
{'host': 'host1', 'binary': 'b1', 'disabled': False},
|
||||
{'host': 'host2', 'binary': 'b1', 'disabled': False,
|
||||
'cluster_name': 'enabled_cluster'},
|
||||
{'host': 'host3', 'binary': 'b1', 'disabled': True,
|
||||
'cluster_name': 'enabled_cluster'},
|
||||
|
||||
# Disabled services
|
||||
{'host': 'host4', 'binary': 'b1', 'disabled': True},
|
||||
{'host': 'host5', 'binary': 'b1', 'disabled': False,
|
||||
'cluster_name': 'disabled_cluster'},
|
||||
{'host': 'host6', 'binary': 'b1', 'disabled': True,
|
||||
'cluster_name': 'disabled_cluster'},
|
||||
]
|
||||
|
||||
db.cluster_create(self.ctxt, {'name': 'enabled_cluster',
|
||||
'binary': 'b1',
|
||||
'disabled': False}),
|
||||
db.cluster_create(self.ctxt, {'name': 'disabled_cluster',
|
||||
'binary': 'b1',
|
||||
'disabled': True}),
|
||||
services = [self._create_service(vals) for vals in values]
|
||||
|
||||
enabled = db.service_get_all(self.ctxt, disabled=False)
|
||||
disabled = db.service_get_all(self.ctxt, disabled=True)
|
||||
|
||||
self.assertSetEqual({s.host for s in services[:3]},
|
||||
{s.host for s in enabled})
|
||||
self.assertSetEqual({s.host for s in services[3:]},
|
||||
{s.host for s in disabled})
|
||||
|
||||
def test_service_get_all(self):
|
||||
expired = (datetime.datetime.utcnow()
|
||||
- datetime.timedelta(seconds=CONF.service_down_time + 1))
|
||||
|
@ -344,20 +344,19 @@ class VolumeTestCase(base.BaseVolumeTestCase):
|
||||
def test_init_host_added_to_cluster(self, cg_include_mock,
|
||||
vol_include_mock, vol_get_all_mock,
|
||||
snap_get_all_mock):
|
||||
self.mock_object(self.volume, 'cluster', mock.sentinel.cluster)
|
||||
cluster = str(mock.sentinel.cluster)
|
||||
self.mock_object(self.volume, 'cluster', cluster)
|
||||
self.volume.init_host(added_to_cluster=True,
|
||||
service_id=self.service_id)
|
||||
|
||||
vol_include_mock.assert_called_once_with(mock.ANY,
|
||||
mock.sentinel.cluster,
|
||||
vol_include_mock.assert_called_once_with(mock.ANY, cluster,
|
||||
host=self.volume.host)
|
||||
cg_include_mock.assert_called_once_with(mock.ANY,
|
||||
mock.sentinel.cluster,
|
||||
cg_include_mock.assert_called_once_with(mock.ANY, cluster,
|
||||
host=self.volume.host)
|
||||
vol_get_all_mock.assert_called_once_with(
|
||||
mock.ANY, filters={'cluster_name': mock.sentinel.cluster})
|
||||
mock.ANY, filters={'cluster_name': cluster})
|
||||
snap_get_all_mock.assert_called_once_with(
|
||||
mock.ANY, search_opts={'cluster_name': mock.sentinel.cluster})
|
||||
mock.ANY, search_opts={'cluster_name': cluster})
|
||||
|
||||
@mock.patch('cinder.objects.service.Service.get_minimum_rpc_version')
|
||||
@mock.patch('cinder.objects.service.Service.get_minimum_obj_version')
|
||||
@ -4785,7 +4784,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase):
|
||||
self.assertEqual('newhost', volume.host)
|
||||
self.assertEqual('success', volume.migration_status)
|
||||
|
||||
def _fake_create_volume(self, ctxt, volume, host, req_spec, filters,
|
||||
def _fake_create_volume(self, ctxt, volume, req_spec, filters,
|
||||
allow_reschedule=True):
|
||||
return db.volume_update(ctxt, volume['id'],
|
||||
{'status': self.expected_status})
|
||||
@ -4880,7 +4879,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase):
|
||||
nova_api, create_volume,
|
||||
save):
|
||||
def fake_create_volume(*args, **kwargs):
|
||||
context, volume, host, request_spec, filter_properties = args
|
||||
context, volume, request_spec, filter_properties = args
|
||||
fake_db = mock.Mock()
|
||||
task = create_volume_manager.ExtractVolumeSpecTask(fake_db)
|
||||
specs = task.execute(context, volume, {})
|
||||
@ -4916,7 +4915,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase):
|
||||
migrate_volume_completion,
|
||||
nova_api, create_volume, save):
|
||||
def fake_create_volume(*args, **kwargs):
|
||||
context, volume, host, request_spec, filter_properties = args
|
||||
context, volume, request_spec, filter_properties = args
|
||||
fake_db = mock.Mock()
|
||||
task = create_volume_manager.ExtractVolumeSpecTask(fake_db)
|
||||
specs = task.execute(context, volume, {})
|
||||
|
@ -26,6 +26,7 @@ from cinder.common import constants
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder import objects
|
||||
from cinder.objects import base as ovo_base
|
||||
from cinder.objects import fields
|
||||
from cinder import test
|
||||
from cinder.tests.unit.backup import fake_backup
|
||||
@ -158,8 +159,12 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
if 'dest_host' in expected_msg:
|
||||
dest_host = expected_msg.pop('dest_host')
|
||||
dest_host_dict = {'host': dest_host.host,
|
||||
'cluster_name': dest_host.cluster_name,
|
||||
'capabilities': dest_host.capabilities}
|
||||
expected_msg['host'] = dest_host_dict
|
||||
if 'force_copy' in expected_msg:
|
||||
expected_msg['force_host_copy'] = expected_msg.pop('force_copy')
|
||||
|
||||
if 'new_volume' in expected_msg:
|
||||
volume = expected_msg['new_volume']
|
||||
expected_msg['new_volume_id'] = volume['id']
|
||||
@ -229,26 +234,11 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
self.assertEqual(expected_arg, arg)
|
||||
|
||||
for kwarg, value in self.fake_kwargs.items():
|
||||
if isinstance(value, objects.Snapshot):
|
||||
expected_snapshot = expected_msg[kwarg].obj_to_primitive()
|
||||
snapshot = value.obj_to_primitive()
|
||||
self.assertEqual(expected_snapshot, snapshot)
|
||||
elif isinstance(value, objects.ConsistencyGroup):
|
||||
expected_cg = expected_msg[kwarg].obj_to_primitive()
|
||||
cg = value.obj_to_primitive()
|
||||
self.assertEqual(expected_cg, cg)
|
||||
elif isinstance(value, objects.CGSnapshot):
|
||||
expected_cgsnapshot = expected_msg[kwarg].obj_to_primitive()
|
||||
cgsnapshot = value.obj_to_primitive()
|
||||
self.assertEqual(expected_cgsnapshot, cgsnapshot)
|
||||
elif isinstance(value, objects.Volume):
|
||||
expected_volume = expected_msg[kwarg].obj_to_primitive()
|
||||
volume = value.obj_to_primitive()
|
||||
self.assertDictEqual(expected_volume, volume)
|
||||
elif isinstance(value, objects.Backup):
|
||||
expected_backup = expected_msg[kwarg].obj_to_primitive()
|
||||
backup = value.obj_to_primitive()
|
||||
self.assertEqual(expected_backup, backup)
|
||||
if isinstance(value, ovo_base.CinderObject):
|
||||
expected = expected_msg[kwarg].obj_to_primitive()
|
||||
primitive = value.obj_to_primitive()
|
||||
self.assertEqual(expected, primitive)
|
||||
|
||||
else:
|
||||
self.assertEqual(expected_msg[kwarg], value)
|
||||
|
||||
@ -328,8 +318,7 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
|
||||
def test_create_consistencygroup(self):
|
||||
self._test_volume_api('create_consistencygroup', rpc_method='cast',
|
||||
group=self.fake_cg, host='fake_host1',
|
||||
version='3.0')
|
||||
group=self.fake_cg, version='3.0')
|
||||
|
||||
def test_delete_consistencygroup(self):
|
||||
self._test_volume_api('delete_consistencygroup', rpc_method='cast',
|
||||
@ -358,7 +347,6 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
self._test_volume_api('create_volume',
|
||||
rpc_method='cast',
|
||||
volume=self.fake_volume_obj,
|
||||
host='fake_host1',
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='fake_properties',
|
||||
allow_reschedule=True,
|
||||
@ -540,7 +528,13 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
'-8ffd-0800200c9a66',
|
||||
version='3.0')
|
||||
|
||||
def test_extend_volume(self):
|
||||
def _change_cluster_name(self, resource, cluster_name):
|
||||
resource.cluster_name = cluster_name
|
||||
resource.obj_reset_changes()
|
||||
|
||||
@ddt.data(None, 'mycluster')
|
||||
def test_extend_volume(self, cluster_name):
|
||||
self._change_cluster_name(self.fake_volume_obj, cluster_name)
|
||||
self._test_volume_api('extend_volume',
|
||||
rpc_method='cast',
|
||||
volume=self.fake_volume_obj,
|
||||
@ -548,10 +542,12 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
reservations=self.fake_reservations,
|
||||
version='3.0')
|
||||
|
||||
def test_migrate_volume(self):
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
|
||||
def test_migrate_volume(self, can_send_version):
|
||||
class FakeHost(object):
|
||||
def __init__(self):
|
||||
self.host = 'host'
|
||||
self.cluster_name = 'cluster_name'
|
||||
self.capabilities = {}
|
||||
dest_host = FakeHost()
|
||||
self._test_volume_api('migrate_volume',
|
||||
@ -559,7 +555,7 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
volume=self.fake_volume_obj,
|
||||
dest_host=dest_host,
|
||||
force_host_copy=True,
|
||||
version='3.0')
|
||||
version='3.5')
|
||||
|
||||
def test_migrate_volume_completion(self):
|
||||
self._test_volume_api('migrate_volume_completion',
|
||||
@ -569,10 +565,12 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
error=False,
|
||||
version='3.0')
|
||||
|
||||
def test_retype(self):
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
|
||||
def test_retype(self, can_send_version):
|
||||
class FakeHost(object):
|
||||
def __init__(self):
|
||||
self.host = 'host'
|
||||
self.cluster_name = 'cluster_name'
|
||||
self.capabilities = {}
|
||||
dest_host = FakeHost()
|
||||
self._test_volume_api('retype',
|
||||
@ -583,7 +581,7 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
migration_policy='never',
|
||||
reservations=self.fake_reservations,
|
||||
old_reservations=self.fake_reservations,
|
||||
version='3.0')
|
||||
version='3.5')
|
||||
|
||||
def test_manage_existing(self):
|
||||
self._test_volume_api('manage_existing',
|
||||
@ -685,8 +683,7 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
|
||||
def test_create_group(self):
|
||||
self._test_group_api('create_group', rpc_method='cast',
|
||||
group=self.fake_group, host='fake_host1',
|
||||
version='3.0')
|
||||
group=self.fake_group, version='3.0')
|
||||
|
||||
def test_delete_group(self):
|
||||
self._test_group_api('delete_group', rpc_method='cast',
|
||||
|
@ -1344,32 +1344,47 @@ class API(base.Base):
|
||||
resource=volume)
|
||||
|
||||
@wrap_check_policy
|
||||
def migrate_volume(self, context, volume, host, force_host_copy,
|
||||
def migrate_volume(self, context, volume, host, cluster_name, force_copy,
|
||||
lock_volume):
|
||||
"""Migrate the volume to the specified host."""
|
||||
# Make sure the host is in the list of available hosts
|
||||
"""Migrate the volume to the specified host or cluster."""
|
||||
elevated = context.elevated()
|
||||
topic = constants.VOLUME_TOPIC
|
||||
services = objects.ServiceList.get_all_by_topic(
|
||||
elevated, topic, disabled=False)
|
||||
found = False
|
||||
svc_host = volume_utils.extract_host(host, 'backend')
|
||||
for service in services:
|
||||
if service.is_up and service.host == svc_host:
|
||||
found = True
|
||||
break
|
||||
if not found:
|
||||
msg = _('No available service named %s') % host
|
||||
|
||||
# If we received a request to migrate to a host
|
||||
# Look for the service - must be up and enabled
|
||||
svc_host = host and volume_utils.extract_host(host, 'backend')
|
||||
svc_cluster = cluster_name and volume_utils.extract_host(cluster_name,
|
||||
'backend')
|
||||
# NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get
|
||||
# a service from the DB we are getting either one specific service from
|
||||
# a host or any service from a cluster that is up, which means that the
|
||||
# cluster itself is also up.
|
||||
try:
|
||||
svc = objects.Service.get_by_id(elevated, None, is_up=True,
|
||||
topic=constants.VOLUME_TOPIC,
|
||||
host=svc_host, disabled=False,
|
||||
cluster_name=svc_cluster,
|
||||
backend_match_level='pool')
|
||||
except exception.ServiceNotFound:
|
||||
msg = _('No available service named %s') % cluster_name or host
|
||||
LOG.error(msg)
|
||||
raise exception.InvalidHost(reason=msg)
|
||||
# Even if we were requested to do a migration to a host, if the host is
|
||||
# in a cluster we will do a cluster migration.
|
||||
cluster_name = svc.cluster_name
|
||||
|
||||
# Build required conditions for conditional update
|
||||
expected = {'status': ('available', 'in-use'),
|
||||
'migration_status': self.AVAILABLE_MIGRATION_STATUS,
|
||||
'replication_status': (None, 'disabled'),
|
||||
'consistencygroup_id': (None, ''),
|
||||
'group_id': (None, ''),
|
||||
'host': db.Not(host)}
|
||||
'group_id': (None, '')}
|
||||
|
||||
# We want to make sure that the migration is to another host or
|
||||
# another cluster.
|
||||
if cluster_name:
|
||||
expected['cluster_name'] = db.Not(cluster_name)
|
||||
else:
|
||||
expected['host'] = db.Not(host)
|
||||
|
||||
filters = [~db.volume_has_snapshots_filter()]
|
||||
|
||||
@ -1392,8 +1407,8 @@ class API(base.Base):
|
||||
if not result:
|
||||
msg = _('Volume %s status must be available or in-use, must not '
|
||||
'be migrating, have snapshots, be replicated, be part of '
|
||||
'a group and destination host must be different than the '
|
||||
'current host') % {'vol_id': volume.id}
|
||||
'a group and destination host/cluster must be different '
|
||||
'than the current one') % {'vol_id': volume.id}
|
||||
LOG.error(msg)
|
||||
raise exception.InvalidVolume(reason=msg)
|
||||
|
||||
@ -1406,11 +1421,11 @@ class API(base.Base):
|
||||
request_spec = {'volume_properties': volume,
|
||||
'volume_type': volume_type,
|
||||
'volume_id': volume.id}
|
||||
self.scheduler_rpcapi.migrate_volume_to_host(context,
|
||||
volume,
|
||||
host,
|
||||
force_host_copy,
|
||||
request_spec)
|
||||
self.scheduler_rpcapi.migrate_volume(context,
|
||||
volume,
|
||||
cluster_name or host,
|
||||
force_copy,
|
||||
request_spec)
|
||||
LOG.info(_LI("Migrate volume request issued successfully."),
|
||||
resource=volume)
|
||||
|
||||
@ -1556,19 +1571,31 @@ class API(base.Base):
|
||||
LOG.info(_LI("Retype volume request issued successfully."),
|
||||
resource=volume)
|
||||
|
||||
def _get_service_by_host(self, context, host, resource='volume'):
|
||||
def _get_service_by_host_cluster(self, context, host, cluster_name,
|
||||
resource='volume'):
|
||||
elevated = context.elevated()
|
||||
|
||||
svc_cluster = cluster_name and volume_utils.extract_host(cluster_name,
|
||||
'backend')
|
||||
svc_host = host and volume_utils.extract_host(host, 'backend')
|
||||
|
||||
# NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get
|
||||
# a service from the DB we are getting either one specific service from
|
||||
# a host or any service that is up from a cluster, which means that the
|
||||
# cluster itself is also up.
|
||||
try:
|
||||
svc_host = volume_utils.extract_host(host, 'backend')
|
||||
service = objects.Service.get_by_args(
|
||||
elevated, svc_host, 'cinder-volume')
|
||||
service = objects.Service.get_by_id(elevated, None, host=svc_host,
|
||||
binary='cinder-volume',
|
||||
cluster_name=svc_cluster)
|
||||
except exception.ServiceNotFound:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE('Unable to find service: %(service)s for '
|
||||
'given host: %(host)s.'),
|
||||
{'service': constants.VOLUME_BINARY, 'host': host})
|
||||
'given host: %(host)s and cluster %(cluster)s.'),
|
||||
{'service': constants.VOLUME_BINARY, 'host': host,
|
||||
'cluster': cluster_name})
|
||||
|
||||
if service.disabled:
|
||||
if service.disabled and (not service.cluster_name or
|
||||
service.cluster.disabled):
|
||||
LOG.error(_LE('Unable to manage existing %s on a disabled '
|
||||
'service.'), resource)
|
||||
raise exception.ServiceUnavailable()
|
||||
@ -1580,15 +1607,16 @@ class API(base.Base):
|
||||
|
||||
return service
|
||||
|
||||
def manage_existing(self, context, host, ref, name=None, description=None,
|
||||
volume_type=None, metadata=None,
|
||||
def manage_existing(self, context, host, cluster_name, ref, name=None,
|
||||
description=None, volume_type=None, metadata=None,
|
||||
availability_zone=None, bootable=False):
|
||||
if volume_type and 'extra_specs' not in volume_type:
|
||||
extra_specs = volume_types.get_volume_type_extra_specs(
|
||||
volume_type['id'])
|
||||
volume_type['extra_specs'] = extra_specs
|
||||
|
||||
service = self._get_service_by_host(context, host)
|
||||
service = self._get_service_by_host_cluster(context, host,
|
||||
cluster_name)
|
||||
|
||||
if availability_zone is None:
|
||||
availability_zone = service.availability_zone
|
||||
@ -1597,7 +1625,8 @@ class API(base.Base):
|
||||
'context': context,
|
||||
'name': name,
|
||||
'description': description,
|
||||
'host': host,
|
||||
'host': service.host,
|
||||
'cluster_name': service.cluster_name,
|
||||
'ref': ref,
|
||||
'volume_type': volume_type,
|
||||
'metadata': metadata,
|
||||
@ -1626,7 +1655,7 @@ class API(base.Base):
|
||||
|
||||
def get_manageable_volumes(self, context, host, marker=None, limit=None,
|
||||
offset=None, sort_keys=None, sort_dirs=None):
|
||||
self._get_service_by_host(context, host)
|
||||
self._get_service_by_host_cluster(context, host, None)
|
||||
return self.volume_rpcapi.get_manageable_volumes(context, host,
|
||||
marker, limit,
|
||||
offset, sort_keys,
|
||||
@ -1635,18 +1664,21 @@ class API(base.Base):
|
||||
def manage_existing_snapshot(self, context, ref, volume,
|
||||
name=None, description=None,
|
||||
metadata=None):
|
||||
service = self._get_service_by_host(context, volume.host, 'snapshot')
|
||||
service = self._get_service_by_host_cluster(context, volume.host,
|
||||
volume.cluster_name,
|
||||
'snapshot')
|
||||
|
||||
snapshot_object = self.create_snapshot_in_db(context, volume, name,
|
||||
description, True,
|
||||
metadata, None,
|
||||
commit_quota=False)
|
||||
self.volume_rpcapi.manage_existing_snapshot(context, snapshot_object,
|
||||
ref, service.host)
|
||||
self.volume_rpcapi.manage_existing_snapshot(
|
||||
context, snapshot_object, ref, service.service_topic_queue)
|
||||
return snapshot_object
|
||||
|
||||
def get_manageable_snapshots(self, context, host, marker=None, limit=None,
|
||||
offset=None, sort_keys=None, sort_dirs=None):
|
||||
self._get_service_by_host(context, host, resource='snapshot')
|
||||
self._get_service_by_host_cluster(context, host, None, 'snapshot')
|
||||
return self.volume_rpcapi.get_manageable_snapshots(context, host,
|
||||
marker, limit,
|
||||
offset, sort_keys,
|
||||
|
@ -339,6 +339,7 @@ class BaseVD(object):
|
||||
# NOTE(vish): db is set by Manager
|
||||
self.db = kwargs.get('db')
|
||||
self.host = kwargs.get('host')
|
||||
self.cluster_name = kwargs.get('cluster_name')
|
||||
self.configuration = kwargs.get('configuration', None)
|
||||
|
||||
if self.configuration:
|
||||
|
@ -703,13 +703,13 @@ class VolumeCastTask(flow_utils.CinderTask):
|
||||
self.db = db
|
||||
|
||||
def _cast_create_volume(self, context, request_spec, filter_properties):
|
||||
source_volid = request_spec['source_volid']
|
||||
source_replicaid = request_spec['source_replicaid']
|
||||
source_volume_ref = None
|
||||
source_volid = (request_spec['source_volid'] or
|
||||
request_spec['source_replicaid'])
|
||||
volume = request_spec['volume']
|
||||
snapshot_id = request_spec['snapshot_id']
|
||||
image_id = request_spec['image_id']
|
||||
cgroup_id = request_spec['consistencygroup_id']
|
||||
host = None
|
||||
cgsnapshot_id = request_spec['cgsnapshot_id']
|
||||
group_id = request_spec['group_id']
|
||||
if cgroup_id:
|
||||
@ -734,19 +734,11 @@ class VolumeCastTask(flow_utils.CinderTask):
|
||||
# snapshot resides instead of passing it through the scheduler, so
|
||||
# snapshot can be copied to the new volume.
|
||||
snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
|
||||
source_volume_ref = objects.Volume.get_by_id(context,
|
||||
snapshot.volume_id)
|
||||
host = source_volume_ref.host
|
||||
source_volume_ref = snapshot.volume
|
||||
elif source_volid:
|
||||
source_volume_ref = objects.Volume.get_by_id(context,
|
||||
source_volid)
|
||||
host = source_volume_ref.host
|
||||
elif source_replicaid:
|
||||
source_volume_ref = objects.Volume.get_by_id(context,
|
||||
source_replicaid)
|
||||
host = source_volume_ref.host
|
||||
source_volume_ref = objects.Volume.get_by_id(context, source_volid)
|
||||
|
||||
if not host:
|
||||
if not source_volume_ref:
|
||||
# Cast to the scheduler and let it handle whatever is needed
|
||||
# to select the target host for this volume.
|
||||
self.scheduler_rpcapi.create_volume(
|
||||
@ -759,14 +751,14 @@ class VolumeCastTask(flow_utils.CinderTask):
|
||||
else:
|
||||
# Bypass the scheduler and send the request directly to the volume
|
||||
# manager.
|
||||
volume.host = host
|
||||
volume.host = source_volume_ref.host
|
||||
volume.cluster_name = source_volume_ref.cluster_name
|
||||
volume.scheduled_at = timeutils.utcnow()
|
||||
volume.save()
|
||||
if not cgsnapshot_id:
|
||||
self.volume_rpcapi.create_volume(
|
||||
context,
|
||||
volume,
|
||||
volume.host,
|
||||
request_spec,
|
||||
filter_properties,
|
||||
allow_reschedule=False)
|
||||
|
@ -37,7 +37,8 @@ class EntryCreateTask(flow_utils.CinderTask):
|
||||
|
||||
def __init__(self, db):
|
||||
requires = ['availability_zone', 'description', 'metadata',
|
||||
'name', 'host', 'bootable', 'volume_type', 'ref']
|
||||
'name', 'host', 'cluster_name', 'bootable', 'volume_type',
|
||||
'ref']
|
||||
super(EntryCreateTask, self).__init__(addons=[ACTION],
|
||||
requires=requires)
|
||||
self.db = db
|
||||
@ -62,6 +63,7 @@ class EntryCreateTask(flow_utils.CinderTask):
|
||||
'display_description': kwargs.pop('description'),
|
||||
'display_name': kwargs.pop('name'),
|
||||
'host': kwargs.pop('host'),
|
||||
'cluster_name': kwargs.pop('cluster_name'),
|
||||
'availability_zone': kwargs.pop('availability_zone'),
|
||||
'volume_type_id': volume_type_id,
|
||||
'metadata': kwargs.pop('metadata') or {},
|
||||
|
@ -222,6 +222,7 @@ class VolumeManager(manager.CleanableManager,
|
||||
configuration=self.configuration,
|
||||
db=self.db,
|
||||
host=self.host,
|
||||
cluster_name=self.cluster,
|
||||
is_vol_db_empty=vol_db_empty,
|
||||
active_backend_id=curr_active_backend_id)
|
||||
|
||||
@ -548,6 +549,12 @@ class VolumeManager(manager.CleanableManager,
|
||||
"""
|
||||
return self.driver.initialized
|
||||
|
||||
def _set_resource_host(self, resource):
|
||||
"""Set the host field on the DB to our own when we are clustered."""
|
||||
if resource.is_clustered and resource.host != self.host:
|
||||
resource.host = self.host
|
||||
resource.save()
|
||||
|
||||
@objects.Volume.set_workers
|
||||
def create_volume(self, context, volume, request_spec=None,
|
||||
filter_properties=None, allow_reschedule=True):
|
||||
@ -555,6 +562,9 @@ class VolumeManager(manager.CleanableManager,
|
||||
# Log about unsupported drivers
|
||||
utils.log_unsupported_driver_warning(self.driver)
|
||||
|
||||
# Make sure the host in the DB matches our own when clustered
|
||||
self._set_resource_host(volume)
|
||||
|
||||
context_elevated = context.elevated()
|
||||
if filter_properties is None:
|
||||
filter_properties = {}
|
||||
@ -1683,12 +1693,13 @@ class VolumeManager(manager.CleanableManager,
|
||||
remote=src_remote,
|
||||
attach_encryptor=attach_encryptor)
|
||||
|
||||
def _migrate_volume_generic(self, ctxt, volume, host, new_type_id):
|
||||
def _migrate_volume_generic(self, ctxt, volume, backend, new_type_id):
|
||||
rpcapi = volume_rpcapi.VolumeAPI()
|
||||
|
||||
# Create new volume on remote host
|
||||
tmp_skip = {'snapshot_id', 'source_volid'}
|
||||
skip = self._VOLUME_CLONE_SKIP_PROPERTIES | tmp_skip | {'host'}
|
||||
skip = self._VOLUME_CLONE_SKIP_PROPERTIES | tmp_skip | {'host',
|
||||
'cluster_name'}
|
||||
new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip}
|
||||
if new_type_id:
|
||||
new_vol_values['volume_type_id'] = new_type_id
|
||||
@ -1700,15 +1711,16 @@ class VolumeManager(manager.CleanableManager,
|
||||
|
||||
new_volume = objects.Volume(
|
||||
context=ctxt,
|
||||
host=host['host'],
|
||||
host=backend['host'],
|
||||
cluster_name=backend.get('cluster_name'),
|
||||
status='creating',
|
||||
attach_status=fields.VolumeAttachStatus.DETACHED,
|
||||
migration_status='target:%s' % volume['id'],
|
||||
**new_vol_values
|
||||
)
|
||||
new_volume.create()
|
||||
rpcapi.create_volume(ctxt, new_volume, host['host'],
|
||||
None, None, allow_reschedule=False)
|
||||
rpcapi.create_volume(ctxt, new_volume, None, None,
|
||||
allow_reschedule=False)
|
||||
|
||||
# Wait for new_volume to become ready
|
||||
starttime = time.time()
|
||||
@ -1720,13 +1732,13 @@ class VolumeManager(manager.CleanableManager,
|
||||
tries += 1
|
||||
now = time.time()
|
||||
if new_volume.status == 'error':
|
||||
msg = _("failed to create new_volume on destination host")
|
||||
msg = _("failed to create new_volume on destination")
|
||||
self._clean_temporary_volume(ctxt, volume,
|
||||
new_volume,
|
||||
clean_db_only=True)
|
||||
raise exception.VolumeMigrationFailed(reason=msg)
|
||||
elif now > deadline:
|
||||
msg = _("timeout creating new_volume on destination host")
|
||||
msg = _("timeout creating new_volume on destination")
|
||||
self._clean_temporary_volume(ctxt, volume,
|
||||
new_volume,
|
||||
clean_db_only=True)
|
||||
@ -1931,6 +1943,7 @@ class VolumeManager(manager.CleanableManager,
|
||||
host)
|
||||
if moved:
|
||||
updates = {'host': host['host'],
|
||||
'cluster_name': host.get('cluster_name'),
|
||||
'migration_status': 'success',
|
||||
'previous_status': volume.status}
|
||||
if status_update:
|
||||
@ -1948,8 +1961,7 @@ class VolumeManager(manager.CleanableManager,
|
||||
volume.save()
|
||||
if not moved:
|
||||
try:
|
||||
self._migrate_volume_generic(ctxt, volume, host,
|
||||
new_type_id)
|
||||
self._migrate_volume_generic(ctxt, volume, host, new_type_id)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
updates = {'migration_status': 'error'}
|
||||
@ -2238,17 +2250,22 @@ class VolumeManager(manager.CleanableManager,
|
||||
# Call driver to try and change the type
|
||||
retype_model_update = None
|
||||
|
||||
# NOTE(jdg): Check to see if the destination host is the same
|
||||
# as the current. If it's not don't call the driver.retype
|
||||
# method, otherwise drivers that implement retype may report
|
||||
# success, but it's invalid in the case of a migrate.
|
||||
# NOTE(jdg): Check to see if the destination host or cluster (depending
|
||||
# if it's the volume is in a clustered backend or not) is the same as
|
||||
# the current. If it's not don't call the driver.retype method,
|
||||
# otherwise drivers that implement retype may report success, but it's
|
||||
# invalid in the case of a migrate.
|
||||
|
||||
# We assume that those that support pools do this internally
|
||||
# so we strip off the pools designation
|
||||
|
||||
if (not retyped and
|
||||
not diff.get('encryption') and
|
||||
vol_utils.hosts_are_equivalent(self.driver.host,
|
||||
host['host'])):
|
||||
((not host.get('cluster_name') and
|
||||
vol_utils.hosts_are_equivalent(self.driver.host,
|
||||
host['host'])) or
|
||||
(vol_utils.hosts_are_equivalent(self.driver.cluster_name,
|
||||
host.get('cluster_name'))))):
|
||||
try:
|
||||
new_type = volume_types.get_volume_type(context, new_type_id)
|
||||
ret = self.driver.retype(context,
|
||||
@ -2311,6 +2328,7 @@ class VolumeManager(manager.CleanableManager,
|
||||
else:
|
||||
model_update = {'volume_type_id': new_type_id,
|
||||
'host': host['host'],
|
||||
'cluster_name': host.get('cluster_name'),
|
||||
'status': status_update['status']}
|
||||
if retype_model_update:
|
||||
model_update.update(retype_model_update)
|
||||
@ -2407,6 +2425,9 @@ class VolumeManager(manager.CleanableManager,
|
||||
def _create_group(self, context, group, is_generic_group=True):
|
||||
context = context.elevated()
|
||||
|
||||
# Make sure the host in the DB matches our own when clustered
|
||||
self._set_resource_host(group)
|
||||
|
||||
status = fields.GroupStatus.AVAILABLE
|
||||
model_update = None
|
||||
|
||||
|
@ -115,9 +115,10 @@ class VolumeAPI(rpc.RPCAPI):
|
||||
get_backup_device().
|
||||
3.3 - Adds support for sending objects over RPC in attach_volume().
|
||||
3.4 - Adds support for sending objects over RPC in detach_volume().
|
||||
3.5 - Adds support for cluster in retype and migrate_volume
|
||||
"""
|
||||
|
||||
RPC_API_VERSION = '3.4'
|
||||
RPC_API_VERSION = '3.5'
|
||||
RPC_DEFAULT_VERSION = '3.0'
|
||||
TOPIC = constants.VOLUME_TOPIC
|
||||
BINARY = 'cinder-volume'
|
||||
@ -125,10 +126,10 @@ class VolumeAPI(rpc.RPCAPI):
|
||||
def _get_cctxt(self, host=None, version=None, **kwargs):
|
||||
if host is not None:
|
||||
kwargs['server'] = utils.get_volume_rpc_host(host)
|
||||
return super(VolumeAPI, self)._get_cctxt(version, **kwargs)
|
||||
return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs)
|
||||
|
||||
def create_consistencygroup(self, ctxt, group, host):
|
||||
cctxt = self._get_cctxt(host)
|
||||
def create_consistencygroup(self, ctxt, group):
|
||||
cctxt = self._get_cctxt(group.service_topic_queue)
|
||||
cctxt.cast(ctxt, 'create_consistencygroup', group=group)
|
||||
|
||||
def delete_consistencygroup(self, ctxt, group):
|
||||
@ -145,7 +146,7 @@ class VolumeAPI(rpc.RPCAPI):
|
||||
|
||||
def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None,
|
||||
source_cg=None):
|
||||
cctxt = self._get_cctxt(group.host)
|
||||
cctxt = self._get_cctxt(group.service_topic_queue)
|
||||
cctxt.cast(ctxt, 'create_consistencygroup_from_src',
|
||||
group=group,
|
||||
cgsnapshot=cgsnapshot,
|
||||
@ -159,9 +160,9 @@ class VolumeAPI(rpc.RPCAPI):
|
||||
cctxt = self._get_cctxt(cgsnapshot.service_topic_queue)
|
||||
cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot)
|
||||
|
||||
def create_volume(self, ctxt, volume, host, request_spec,
|
||||
filter_properties, allow_reschedule=True):
|
||||
cctxt = self._get_cctxt(host)
|
||||
def create_volume(self, ctxt, volume, request_spec, filter_properties,
|
||||
allow_reschedule=True):
|
||||
cctxt = self._get_cctxt(volume.service_topic_queue)
|
||||
cctxt.cast(ctxt, 'create_volume',
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties,
|
||||
@ -239,35 +240,48 @@ class VolumeAPI(rpc.RPCAPI):
|
||||
new_user=new_user, new_project=new_project)
|
||||
|
||||
def extend_volume(self, ctxt, volume, new_size, reservations):
|
||||
cctxt = self._get_cctxt(volume.host)
|
||||
cctxt = self._get_cctxt(volume.service_topic_queue)
|
||||
cctxt.cast(ctxt, 'extend_volume', volume=volume, new_size=new_size,
|
||||
reservations=reservations)
|
||||
|
||||
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
|
||||
host_p = {'host': dest_host.host,
|
||||
'capabilities': dest_host.capabilities}
|
||||
cctxt = self._get_cctxt(volume.host)
|
||||
cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=host_p,
|
||||
backend_p = {'host': dest_host.host,
|
||||
'cluster_name': dest_host.cluster_name,
|
||||
'capabilities': dest_host.capabilities}
|
||||
|
||||
version = '3.5'
|
||||
if not self.client.can_send_version(version):
|
||||
version = '3.0'
|
||||
del backend_p['cluster_name']
|
||||
|
||||
cctxt = self._get_cctxt(volume.service_topic_queue, version)
|
||||
cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=backend_p,
|
||||
force_host_copy=force_host_copy)
|
||||
|
||||
def migrate_volume_completion(self, ctxt, volume, new_volume, error):
|
||||
cctxt = self._get_cctxt(volume.host)
|
||||
cctxt = self._get_cctxt(volume.service_topic_queue)
|
||||
return cctxt.call(ctxt, 'migrate_volume_completion', volume=volume,
|
||||
new_volume=new_volume, error=error,)
|
||||
|
||||
def retype(self, ctxt, volume, new_type_id, dest_host,
|
||||
migration_policy='never', reservations=None,
|
||||
old_reservations=None):
|
||||
host_p = {'host': dest_host.host,
|
||||
'capabilities': dest_host.capabilities}
|
||||
cctxt = self._get_cctxt(volume.host)
|
||||
backend_p = {'host': dest_host.host,
|
||||
'cluster_name': dest_host.cluster_name,
|
||||
'capabilities': dest_host.capabilities}
|
||||
version = '3.5'
|
||||
if not self.client.can_send_version(version):
|
||||
version = '3.0'
|
||||
del backend_p['cluster_name']
|
||||
|
||||
cctxt = self._get_cctxt(volume.service_topic_queue, version)
|
||||
cctxt.cast(ctxt, 'retype', volume=volume, new_type_id=new_type_id,
|
||||
host=host_p, migration_policy=migration_policy,
|
||||
host=backend_p, migration_policy=migration_policy,
|
||||
reservations=reservations,
|
||||
old_reservations=old_reservations)
|
||||
|
||||
def manage_existing(self, ctxt, volume, ref):
|
||||
cctxt = self._get_cctxt(volume.host)
|
||||
cctxt = self._get_cctxt(volume.service_topic_queue)
|
||||
cctxt.cast(ctxt, 'manage_existing', ref=ref, volume=volume)
|
||||
|
||||
def update_migrated_volume(self, ctxt, volume, new_volume,
|
||||
@ -334,8 +348,8 @@ class VolumeAPI(rpc.RPCAPI):
|
||||
limit=limit, offset=offset, sort_keys=sort_keys,
|
||||
sort_dirs=sort_dirs)
|
||||
|
||||
def create_group(self, ctxt, group, host):
|
||||
cctxt = self._get_cctxt(host)
|
||||
def create_group(self, ctxt, group):
|
||||
cctxt = self._get_cctxt(group.service_topic_queue)
|
||||
cctxt.cast(ctxt, 'create_group', group=group)
|
||||
|
||||
def delete_group(self, ctxt, group):
|
||||
@ -349,7 +363,7 @@ class VolumeAPI(rpc.RPCAPI):
|
||||
|
||||
def create_group_from_src(self, ctxt, group, group_snapshot=None,
|
||||
source_group=None):
|
||||
cctxt = self._get_cctxt(group.host)
|
||||
cctxt = self._get_cctxt(group.service_topic_queue)
|
||||
cctxt.cast(ctxt, 'create_group_from_src', group=group,
|
||||
group_snapshot=group_snapshot, source_group=source_group)
|
||||
|
||||
|
@ -752,6 +752,9 @@ def matching_backend_name(src_volume_type, volume_type):
|
||||
|
||||
|
||||
def hosts_are_equivalent(host_1, host_2):
|
||||
# In case host_1 or host_2 are None
|
||||
if not (host_1 and host_2):
|
||||
return host_1 == host_2
|
||||
return extract_host(host_1) == extract_host(host_2)
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user