Allow create_volume() to retry when exception happened
Due to the fact that certain volume back-ends cannot easily report simple total_capacity_gb/free_capacity_gb for their internal implementation complexity, scheduler is updated to let those back-ends who report unclear capacity pass capacity filter, thus there is chance create_volume() request would fail. In a more general case, when a volume back-end failed to serve create_volume request for whatever reason it'd be good that we have a mechanism to 'retry' the request. So the idea is when volume manager catches the exception from driver.create_volume() call, it checks if the request is allowed to be rescheduled (requests that are not: clone volume and create volume from snapshot while 'snapshot_same_host' option is true), it composes a new request back to scheduler with additional information to mark this specific back-end has been tried (so that scheduler may choose to skip this back-end if needed). Scheduler is (filter scheduler only, simple and chance scheduler is not supported) is updated as well so that it only retry scheduler_max_attempts times. In order to skip/rule out previously tried back-ends in next schedule task, a new RetryFilter is added. Changes: 1) volume RPC API create_volume() is updated with new parameters to save original request information in case rescheduling is needed. This bumps volume RPC API to 1.4. 2) implementation of create_volume() method in volume API is refactored in order to distinguish if a request is allowed to do reschedule (i.e. currently create volume from source volume bypasses scheduler, not rescheduling is allowed). 3) add reschedule functionality in create_volume() of volume manager so that it's able to send the request back to scheduler. 4) add schedule_max_attempts config option in scheduler/driver.py 5) add RetryFitler 6) change scheduler_driver default option to FilterScheduler Change-Id: Ia46b5eb4dc033d73734b6aea82ada34ba5731075
This commit is contained in:
parent
275ce2d171
commit
d17cc23c64
@ -33,7 +33,11 @@ from cinder.volume import rpcapi as volume_rpcapi
|
||||
scheduler_driver_opts = [
|
||||
cfg.StrOpt('scheduler_host_manager',
|
||||
default='cinder.scheduler.host_manager.HostManager',
|
||||
help='The scheduler host manager class to use'), ]
|
||||
help='The scheduler host manager class to use'),
|
||||
cfg.IntOpt('scheduler_max_attempts',
|
||||
default=3,
|
||||
help='Maximum number of attempts to schedule an volume'),
|
||||
]
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
FLAGS.register_opts(scheduler_driver_opts)
|
||||
|
@ -40,6 +40,7 @@ class FilterScheduler(driver.Scheduler):
|
||||
super(FilterScheduler, self).__init__(*args, **kwargs)
|
||||
self.cost_function_cache = None
|
||||
self.options = scheduler_options.SchedulerOptions()
|
||||
self.max_attempts = self._max_attempts()
|
||||
|
||||
def schedule(self, context, topic, method, *args, **kwargs):
|
||||
"""The schedule() contract requires we return the one
|
||||
@ -74,8 +75,91 @@ class FilterScheduler(driver.Scheduler):
|
||||
image_id = request_spec['image_id']
|
||||
|
||||
updated_volume = driver.volume_update_db(context, volume_id, host)
|
||||
self._post_select_populate_filter_properties(filter_properties,
|
||||
weighed_host.obj)
|
||||
|
||||
# context is not serializable
|
||||
filter_properties.pop('context', None)
|
||||
|
||||
self.volume_rpcapi.create_volume(context, updated_volume, host,
|
||||
snapshot_id, image_id)
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties,
|
||||
allow_reschedule=True,
|
||||
snapshot_id=snapshot_id,
|
||||
image_id=image_id)
|
||||
|
||||
def _post_select_populate_filter_properties(self, filter_properties,
|
||||
host_state):
|
||||
"""Add additional information to the filter properties after a host has
|
||||
been selected by the scheduling process.
|
||||
"""
|
||||
# Add a retry entry for the selected volume backend:
|
||||
self._add_retry_host(filter_properties, host_state.host)
|
||||
|
||||
def _add_retry_host(self, filter_properties, host):
|
||||
"""Add a retry entry for the selected volume backend. In the event that
|
||||
the request gets re-scheduled, this entry will signal that the given
|
||||
backend has already been tried.
|
||||
"""
|
||||
retry = filter_properties.get('retry', None)
|
||||
if not retry:
|
||||
return
|
||||
hosts = retry['hosts']
|
||||
hosts.append(host)
|
||||
|
||||
def _max_attempts(self):
|
||||
max_attempts = FLAGS.scheduler_max_attempts
|
||||
if max_attempts < 1:
|
||||
msg = _("Invalid value for 'scheduler_max_attempts', "
|
||||
"must be >=1")
|
||||
raise exception.InvalidParameterValue(err=msg)
|
||||
return max_attempts
|
||||
|
||||
def _log_volume_error(self, volume_id, retry):
|
||||
"""If the request contained an exception from a previous volume
|
||||
create operation, log it to aid debugging
|
||||
"""
|
||||
exc = retry.pop('exc', None) # string-ified exception from volume
|
||||
if not exc:
|
||||
return # no exception info from a previous attempt, skip
|
||||
|
||||
hosts = retry.get('hosts', None)
|
||||
if not hosts:
|
||||
return # no previously attempted hosts, skip
|
||||
|
||||
last_host = hosts[-1]
|
||||
msg = _("Error from last vol-service: %(last_host)s : "
|
||||
"%(exc)s") % locals()
|
||||
LOG.error(msg, volume_id=volume_id)
|
||||
|
||||
def _populate_retry(self, filter_properties, properties):
|
||||
"""Populate filter properties with history of retries for this
|
||||
request. If maximum retries is exceeded, raise NoValidHost.
|
||||
"""
|
||||
max_attempts = self.max_attempts
|
||||
retry = filter_properties.pop('retry', {})
|
||||
|
||||
if max_attempts == 1:
|
||||
# re-scheduling is disabled.
|
||||
return
|
||||
|
||||
# retry is enabled, update attempt count:
|
||||
if retry:
|
||||
retry['num_attempts'] += 1
|
||||
else:
|
||||
retry = {
|
||||
'num_attempts': 1,
|
||||
'hosts': [] # list of volume service hosts tried
|
||||
}
|
||||
filter_properties['retry'] = retry
|
||||
|
||||
volume_id = properties.get('volume_id')
|
||||
self._log_volume_error(volume_id, retry)
|
||||
|
||||
if retry['num_attempts'] > max_attempts:
|
||||
msg = _("Exceeded max scheduling attempts %(max_attempts)d for "
|
||||
"volume %(volume_id)s") % locals()
|
||||
raise exception.NoValidHost(reason=msg)
|
||||
|
||||
def _schedule(self, context, request_spec, filter_properties=None):
|
||||
"""Returns a list of hosts that meet the required specs,
|
||||
@ -84,9 +168,9 @@ class FilterScheduler(driver.Scheduler):
|
||||
elevated = context.elevated()
|
||||
|
||||
volume_properties = request_spec['volume_properties']
|
||||
# Since Nova is using mixed filters from Oslo and it's own, which
|
||||
# takes 'resource_XX' and 'instance_XX' as input respectively, copying
|
||||
# 'instance_XX' to 'resource_XX' will make both filters happy.
|
||||
# Since Cinder is using mixed filters from Oslo and it's own, which
|
||||
# takes 'resource_XX' and 'volume_XX' as input respectively, copying
|
||||
# 'volume_XX' to 'resource_XX' will make both filters happy.
|
||||
resource_properties = volume_properties.copy()
|
||||
volume_type = request_spec.get("volume_type", None)
|
||||
resource_type = request_spec.get("volume_type", None)
|
||||
@ -96,6 +180,8 @@ class FilterScheduler(driver.Scheduler):
|
||||
|
||||
if filter_properties is None:
|
||||
filter_properties = {}
|
||||
self._populate_retry(filter_properties, resource_properties)
|
||||
|
||||
filter_properties.update({'context': context,
|
||||
'request_spec': request_spec,
|
||||
'config_options': config_options,
|
||||
|
45
cinder/scheduler/filters/retry_filter.py
Normal file
45
cinder/scheduler/filters/retry_filter.py
Normal file
@ -0,0 +1,45 @@
|
||||
# Copyright (c) 2012 OpenStack, LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder.openstack.common.scheduler import filters
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RetryFilter(filters.BaseHostFilter):
|
||||
"""Filter out nodes that have already been attempted for scheduling
|
||||
purposes
|
||||
"""
|
||||
|
||||
def host_passes(self, host_state, filter_properties):
|
||||
"""Skip nodes that have already been attempted."""
|
||||
retry = filter_properties.get('retry', None)
|
||||
if not retry:
|
||||
# Re-scheduling is disabled
|
||||
LOG.debug("Re-scheduling is disabled")
|
||||
return True
|
||||
|
||||
hosts = retry.get('hosts', [])
|
||||
host = host_state.host
|
||||
|
||||
passes = host not in hosts
|
||||
pass_msg = "passes" if passes else "fails"
|
||||
|
||||
LOG.debug(_("Host %(host)s %(pass_msg)s. Previously tried hosts: "
|
||||
"%(hosts)s") % locals())
|
||||
|
||||
# Host passes if it's not in the list of previously attempted hosts:
|
||||
return passes
|
@ -37,8 +37,8 @@ from cinder.volume import rpcapi as volume_rpcapi
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
|
||||
default='cinder.scheduler.simple.'
|
||||
'SimpleScheduler',
|
||||
default='cinder.scheduler.filter_scheduler.'
|
||||
'FilterScheduler',
|
||||
help='Default scheduler driver to use')
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
|
@ -22,6 +22,7 @@ from cinder import test
|
||||
|
||||
from cinder.openstack.common.scheduler import weights
|
||||
from cinder.scheduler import filter_scheduler
|
||||
from cinder.scheduler import host_manager
|
||||
from cinder.tests.scheduler import fakes
|
||||
from cinder.tests.scheduler import test_scheduler
|
||||
from cinder.tests import utils as test_utils
|
||||
@ -53,7 +54,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
||||
'volume_type': {'name': 'LVM_iSCSI'},
|
||||
'volume_id': ['fake-id1']}
|
||||
self.assertRaises(exception.NoValidHost, sched.schedule_create_volume,
|
||||
fake_context, request_spec, None)
|
||||
fake_context, request_spec, {})
|
||||
|
||||
@test.skip_if(not test_utils.is_cinder_installed(),
|
||||
'Test requires Cinder installed (try setup.py develop')
|
||||
@ -78,7 +79,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
||||
'volume_type': {'name': 'LVM_iSCSI'},
|
||||
'volume_id': ['fake-id1']}
|
||||
self.assertRaises(exception.NoValidHost, sched.schedule_create_volume,
|
||||
fake_context, request_spec, None)
|
||||
fake_context, request_spec, {})
|
||||
self.assertTrue(self.was_admin)
|
||||
|
||||
@test.skip_if(not test_utils.is_cinder_installed(),
|
||||
@ -110,3 +111,108 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
||||
self.mox.ReplayAll()
|
||||
weighed_host = sched._schedule(fake_context, request_spec, {})
|
||||
self.assertTrue(weighed_host.obj is not None)
|
||||
|
||||
def test_max_attempts(self):
|
||||
self.flags(scheduler_max_attempts=4)
|
||||
|
||||
sched = fakes.FakeFilterScheduler()
|
||||
self.assertEqual(4, sched._max_attempts())
|
||||
|
||||
def test_invalid_max_attempts(self):
|
||||
self.flags(scheduler_max_attempts=0)
|
||||
|
||||
self.assertRaises(exception.InvalidParameterValue,
|
||||
fakes.FakeFilterScheduler)
|
||||
|
||||
def test_retry_disabled(self):
|
||||
# Retry info should not get populated when re-scheduling is off.
|
||||
self.flags(scheduler_max_attempts=1)
|
||||
sched = fakes.FakeFilterScheduler()
|
||||
|
||||
request_spec = {'volume_type': {'name': 'LVM_iSCSI'},
|
||||
'volume_properties': {'project_id': 1,
|
||||
'size': 1}}
|
||||
filter_properties = {}
|
||||
|
||||
sched._schedule(self.context, request_spec,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
# should not have retry info in the populated filter properties:
|
||||
self.assertFalse("retry" in filter_properties)
|
||||
|
||||
def test_retry_attempt_one(self):
|
||||
# Test retry logic on initial scheduling attempt.
|
||||
self.flags(scheduler_max_attempts=2)
|
||||
sched = fakes.FakeFilterScheduler()
|
||||
|
||||
request_spec = {'volume_type': {'name': 'LVM_iSCSI'},
|
||||
'volume_properties': {'project_id': 1,
|
||||
'size': 1}}
|
||||
filter_properties = {}
|
||||
|
||||
sched._schedule(self.context, request_spec,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
num_attempts = filter_properties['retry']['num_attempts']
|
||||
self.assertEqual(1, num_attempts)
|
||||
|
||||
def test_retry_attempt_two(self):
|
||||
# Test retry logic when re-scheduling.
|
||||
self.flags(scheduler_max_attempts=2)
|
||||
sched = fakes.FakeFilterScheduler()
|
||||
|
||||
request_spec = {'volume_type': {'name': 'LVM_iSCSI'},
|
||||
'volume_properties': {'project_id': 1,
|
||||
'size': 1}}
|
||||
|
||||
retry = dict(num_attempts=1)
|
||||
filter_properties = dict(retry=retry)
|
||||
|
||||
sched._schedule(self.context, request_spec,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
num_attempts = filter_properties['retry']['num_attempts']
|
||||
self.assertEqual(2, num_attempts)
|
||||
|
||||
def test_retry_exceeded_max_attempts(self):
|
||||
# Test for necessary explosion when max retries is exceeded.
|
||||
self.flags(scheduler_max_attempts=2)
|
||||
sched = fakes.FakeFilterScheduler()
|
||||
|
||||
request_spec = {'volume_type': {'name': 'LVM_iSCSI'},
|
||||
'volume_properties': {'project_id': 1,
|
||||
'size': 1}}
|
||||
|
||||
retry = dict(num_attempts=2)
|
||||
filter_properties = dict(retry=retry)
|
||||
|
||||
self.assertRaises(exception.NoValidHost, sched._schedule, self.context,
|
||||
request_spec, filter_properties=filter_properties)
|
||||
|
||||
def test_add_retry_host(self):
|
||||
retry = dict(num_attempts=1, hosts=[])
|
||||
filter_properties = dict(retry=retry)
|
||||
host = "fakehost"
|
||||
|
||||
sched = fakes.FakeFilterScheduler()
|
||||
sched._add_retry_host(filter_properties, host)
|
||||
|
||||
hosts = filter_properties['retry']['hosts']
|
||||
self.assertEqual(1, len(hosts))
|
||||
self.assertEqual(host, hosts[0])
|
||||
|
||||
def test_post_select_populate(self):
|
||||
# Test addition of certain filter props after a node is selected.
|
||||
retry = {'hosts': [], 'num_attempts': 1}
|
||||
filter_properties = {'retry': retry}
|
||||
sched = fakes.FakeFilterScheduler()
|
||||
|
||||
host_state = host_manager.HostState('host')
|
||||
host_state.total_capacity_gb = 1024
|
||||
sched._post_select_populate_filter_properties(filter_properties,
|
||||
host_state)
|
||||
|
||||
self.assertEqual('host',
|
||||
filter_properties['retry']['hosts'][0])
|
||||
|
||||
self.assertEqual(1024, host_state.total_capacity_gb)
|
||||
|
@ -128,3 +128,32 @@ class HostFiltersTestCase(test.TestCase):
|
||||
'updated_at': None,
|
||||
'service': service})
|
||||
self.assertTrue(filt_cls.host_passes(host, filter_properties))
|
||||
|
||||
@test.skip_if(not test_utils.is_cinder_installed(),
|
||||
'Test requires Cinder installed')
|
||||
def test_retry_filter_disabled(self):
|
||||
# Test case where retry/re-scheduling is disabled.
|
||||
filt_cls = self.class_map['RetryFilter']()
|
||||
host = fakes.FakeHostState('host1', {})
|
||||
filter_properties = {}
|
||||
self.assertTrue(filt_cls.host_passes(host, filter_properties))
|
||||
|
||||
@test.skip_if(not test_utils.is_cinder_installed(),
|
||||
'Test requires Cinder installed')
|
||||
def test_retry_filter_pass(self):
|
||||
# Node not previously tried.
|
||||
filt_cls = self.class_map['RetryFilter']()
|
||||
host = fakes.FakeHostState('host1', {})
|
||||
retry = dict(num_attempts=2, hosts=['host2'])
|
||||
filter_properties = dict(retry=retry)
|
||||
self.assertTrue(filt_cls.host_passes(host, filter_properties))
|
||||
|
||||
@test.skip_if(not test_utils.is_cinder_installed(),
|
||||
'Test requires Cinder installed')
|
||||
def test_retry_filter_fail(self):
|
||||
# Node was already tried.
|
||||
filt_cls = self.class_map['RetryFilter']()
|
||||
host = fakes.FakeHostState('host1', {})
|
||||
retry = dict(num_attempts=1, hosts=['host1'])
|
||||
filter_properties = dict(retry=retry)
|
||||
self.assertFalse(filt_cls.host_passes(host, filter_properties))
|
||||
|
@ -508,7 +508,8 @@ class VolumeTestCase(test.TestCase):
|
||||
def fake_local_path(volume):
|
||||
return dst_path
|
||||
|
||||
def fake_copy_image_to_volume(context, volume, image_id):
|
||||
def fake_copy_image_to_volume(context, volume,
|
||||
image_service, image_id):
|
||||
pass
|
||||
|
||||
def fake_fetch_to_raw(context, image_service, image_id, vol_path):
|
||||
@ -545,11 +546,6 @@ class VolumeTestCase(test.TestCase):
|
||||
db.volume_destroy(self.context, volume_id)
|
||||
os.unlink(dst_path)
|
||||
|
||||
def test_create_volume_from_image_status_downloading(self):
|
||||
"""Verify that before copying image to volume, it is in downloading
|
||||
state."""
|
||||
self._create_volume_from_image('downloading', True)
|
||||
|
||||
def test_create_volume_from_image_status_available(self):
|
||||
"""Verify that before copying image to volume, it is in available
|
||||
state."""
|
||||
@ -577,7 +573,7 @@ class VolumeTestCase(test.TestCase):
|
||||
self.assertRaises(exception.ImageNotFound,
|
||||
self.volume.create_volume,
|
||||
self.context,
|
||||
volume_id,
|
||||
volume_id, None, None, None,
|
||||
None,
|
||||
image_id)
|
||||
volume = db.volume_get(self.context, volume_id)
|
||||
|
@ -112,10 +112,13 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
rpc_method='cast',
|
||||
volume=self.fake_volume,
|
||||
host='fake_host1',
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='fake_properties',
|
||||
allow_reschedule=True,
|
||||
snapshot_id='fake_snapshot_id',
|
||||
image_id='fake_image_id',
|
||||
source_volid='fake_src_id',
|
||||
version='1.1')
|
||||
version='1.4')
|
||||
|
||||
def test_delete_volume(self):
|
||||
self._test_volume_api('delete_volume',
|
||||
|
@ -242,8 +242,12 @@ class API(base.Base):
|
||||
self.volume_rpcapi.create_volume(context,
|
||||
volume_ref,
|
||||
volume_ref['host'],
|
||||
snapshot_id,
|
||||
image_id)
|
||||
request_spec=request_spec,
|
||||
filter_properties=
|
||||
filter_properties,
|
||||
allow_reschedule=False,
|
||||
snapshot_id=snapshot_id,
|
||||
image_id=image_id)
|
||||
elif source_volid:
|
||||
source_volume_ref = self.db.volume_get(context,
|
||||
source_volid)
|
||||
@ -255,18 +259,22 @@ class API(base.Base):
|
||||
self.volume_rpcapi.create_volume(context,
|
||||
volume_ref,
|
||||
volume_ref['host'],
|
||||
snapshot_id,
|
||||
image_id,
|
||||
source_volid)
|
||||
request_spec=request_spec,
|
||||
filter_properties=
|
||||
filter_properties,
|
||||
allow_reschedule=False,
|
||||
snapshot_id=snapshot_id,
|
||||
image_id=image_id,
|
||||
source_volid=source_volid)
|
||||
else:
|
||||
self.scheduler_rpcapi.create_volume(
|
||||
context,
|
||||
FLAGS.volume_topic,
|
||||
volume_id,
|
||||
snapshot_id,
|
||||
image_id,
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties)
|
||||
self.scheduler_rpcapi.create_volume(context,
|
||||
FLAGS.volume_topic,
|
||||
volume_id,
|
||||
snapshot_id,
|
||||
image_id,
|
||||
request_spec=request_spec,
|
||||
filter_properties=
|
||||
filter_properties)
|
||||
|
||||
@wrap_check_policy
|
||||
def delete(self, context, volume, force=False):
|
||||
|
@ -37,6 +37,10 @@ intact.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
from cinder import context
|
||||
from cinder import exception
|
||||
from cinder import flags
|
||||
@ -103,7 +107,7 @@ MAPPING = {
|
||||
class VolumeManager(manager.SchedulerDependentManager):
|
||||
"""Manages attachable block storage devices."""
|
||||
|
||||
RPC_API_VERSION = '1.3'
|
||||
RPC_API_VERSION = '1.4'
|
||||
|
||||
def __init__(self, volume_driver=None, *args, **kwargs):
|
||||
"""Load the driver from the one specified in args, or from flags."""
|
||||
@ -146,10 +150,44 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
# collect and publish service capabilities
|
||||
self.publish_service_capabilities(ctxt)
|
||||
|
||||
def create_volume(self, context, volume_id, snapshot_id=None,
|
||||
image_id=None, source_volid=None):
|
||||
def _create_volume(self, context, volume_ref, snapshot_ref,
|
||||
srcvol_ref, image_service, image_id, image_location):
|
||||
cloned = None
|
||||
model_update = False
|
||||
|
||||
if all(x is None for x in(snapshot_ref, image_location, srcvol_ref)):
|
||||
model_update = self.driver.create_volume(volume_ref)
|
||||
elif snapshot_ref is not None:
|
||||
model_update = self.driver.create_volume_from_snapshot(
|
||||
volume_ref,
|
||||
snapshot_ref)
|
||||
elif srcvol_ref is not None:
|
||||
model_update = self.driver.create_cloned_volume(volume_ref,
|
||||
srcvol_ref)
|
||||
else:
|
||||
# create the volume from an image
|
||||
cloned = self.driver.clone_image(volume_ref, image_location)
|
||||
if not cloned:
|
||||
model_update = self.driver.create_volume(volume_ref)
|
||||
#copy the image onto the volume.
|
||||
status = 'downloading'
|
||||
self.db.volume_update(context,
|
||||
volume_ref['id'],
|
||||
{'status': status})
|
||||
self._copy_image_to_volume(context,
|
||||
volume_ref,
|
||||
image_service,
|
||||
image_id)
|
||||
|
||||
return model_update, cloned
|
||||
|
||||
def create_volume(self, context, volume_id, request_spec=None,
|
||||
filter_properties=None, allow_reschedule=True,
|
||||
snapshot_id=None, image_id=None, source_volid=None):
|
||||
"""Creates and exports the volume."""
|
||||
context = context.elevated()
|
||||
if filter_properties is None:
|
||||
filter_properties = {}
|
||||
volume_ref = self.db.volume_get(context, volume_id)
|
||||
self._notify_about_volume_usage(context, volume_ref, "create.start")
|
||||
LOG.info(_("volume %s: creating"), volume_ref['name'])
|
||||
@ -167,36 +205,52 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
vol_size = volume_ref['size']
|
||||
LOG.debug(_("volume %(vol_name)s: creating lv of"
|
||||
" size %(vol_size)sG") % locals())
|
||||
if all(x is None for x in(snapshot_id, image_id, source_volid)):
|
||||
model_update = self.driver.create_volume(volume_ref)
|
||||
elif snapshot_id is not None:
|
||||
snapshot_ref = None
|
||||
sourcevol_ref = None
|
||||
image_service = None
|
||||
image_location = None
|
||||
image_meta = None
|
||||
|
||||
if snapshot_id is not None:
|
||||
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
|
||||
model_update = self.driver.create_volume_from_snapshot(
|
||||
volume_ref,
|
||||
snapshot_ref)
|
||||
elif source_volid is not None:
|
||||
src_vref = self.db.volume_get(context, source_volid)
|
||||
model_update = self.driver.create_cloned_volume(volume_ref,
|
||||
src_vref)
|
||||
self.db.volume_glance_metadata_copy_from_volume_to_volume(
|
||||
context,
|
||||
source_volid,
|
||||
volume_id)
|
||||
else:
|
||||
sourcevol_ref = self.db.volume_get(context, source_volid)
|
||||
elif image_id is not None:
|
||||
# create the volume from an image
|
||||
image_service, image_id = \
|
||||
glance.get_remote_image_service(context,
|
||||
image_id)
|
||||
image_location = image_service.get_location(context, image_id)
|
||||
image_meta = image_service.show(context, image_id)
|
||||
cloned = self.driver.clone_image(volume_ref, image_location)
|
||||
if not cloned:
|
||||
model_update = self.driver.create_volume(volume_ref)
|
||||
status = 'downloading'
|
||||
|
||||
try:
|
||||
model_update, cloned = self._create_volume(context,
|
||||
volume_ref,
|
||||
snapshot_ref,
|
||||
sourcevol_ref,
|
||||
image_service,
|
||||
image_id,
|
||||
image_location)
|
||||
except Exception:
|
||||
# restore source volume status before reschedule
|
||||
if sourcevol_ref is not None:
|
||||
self.db.volume_update(context, sourcevol_ref['id'],
|
||||
{'status': sourcevol_ref['status']})
|
||||
exc_info = sys.exc_info()
|
||||
# try to re-schedule volume:
|
||||
self._reschedule_or_reraise(context, volume_id, exc_info,
|
||||
snapshot_id, image_id,
|
||||
request_spec, filter_properties,
|
||||
allow_reschedule)
|
||||
|
||||
if model_update:
|
||||
volume_ref = self.db.volume_update(
|
||||
context, volume_ref['id'], model_update)
|
||||
if sourcevol_ref is not None:
|
||||
self.db.volume_glance_metadata_copy_from_volume_to_volume(
|
||||
context,
|
||||
source_volid,
|
||||
volume_id)
|
||||
|
||||
LOG.debug(_("volume %s: creating export"), volume_ref['name'])
|
||||
model_update = self.driver.create_export(context, volume_ref)
|
||||
@ -214,13 +268,6 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
volume_ref['id'],
|
||||
snapshot_id)
|
||||
|
||||
now = timeutils.utcnow()
|
||||
self.db.volume_update(context,
|
||||
volume_ref['id'], {'status': status,
|
||||
'launched_at': now})
|
||||
LOG.debug(_("volume %s: created successfully"), volume_ref['name'])
|
||||
self._reset_stats()
|
||||
|
||||
if image_id and not cloned:
|
||||
if image_meta:
|
||||
# Copy all of the Glance image properties to the
|
||||
@ -239,11 +286,91 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
volume_ref['id'],
|
||||
key, value)
|
||||
|
||||
# Copy the image onto the volume.
|
||||
self._copy_image_to_volume(context, volume_ref, image_id)
|
||||
now = timeutils.utcnow()
|
||||
self.db.volume_update(context,
|
||||
volume_ref['id'], {'status': status,
|
||||
'launched_at': now})
|
||||
LOG.debug(_("volume %s: created successfully"), volume_ref['name'])
|
||||
self._reset_stats()
|
||||
|
||||
self._notify_about_volume_usage(context, volume_ref, "create.end")
|
||||
return volume_ref['id']
|
||||
|
||||
def _log_original_error(self, exc_info, volume_id):
|
||||
type_, value, tb = exc_info
|
||||
LOG.error(_('Error: %s') %
|
||||
traceback.format_exception(type_, value, tb),
|
||||
volume_id=volume_id)
|
||||
|
||||
def _reschedule_or_reraise(self, context, volume_id, exc_info,
|
||||
snapshot_id, image_id, request_spec,
|
||||
filter_properties, allow_reschedule):
|
||||
"""Try to re-schedule the create or re-raise the original error to
|
||||
error out the volume.
|
||||
"""
|
||||
if not allow_reschedule:
|
||||
raise exc_info[0], exc_info[1], exc_info[2]
|
||||
|
||||
rescheduled = False
|
||||
|
||||
try:
|
||||
method_args = (FLAGS.volume_topic, volume_id, snapshot_id,
|
||||
image_id, request_spec, filter_properties)
|
||||
|
||||
rescheduled = self._reschedule(context, request_spec,
|
||||
filter_properties, volume_id,
|
||||
self.scheduler_rpcapi.create_volume,
|
||||
method_args,
|
||||
exc_info)
|
||||
|
||||
except Exception:
|
||||
rescheduled = False
|
||||
LOG.exception(_("Error trying to reschedule"),
|
||||
volume_id=volume_id)
|
||||
|
||||
if rescheduled:
|
||||
# log the original build error
|
||||
self._log_original_error(exc_info, volume_id)
|
||||
else:
|
||||
# not re-scheduling
|
||||
raise exc_info[0], exc_info[1], exc_info[2]
|
||||
|
||||
def _reschedule(self, context, request_spec, filter_properties,
|
||||
volume_id, scheduler_method, method_args,
|
||||
exc_info=None):
|
||||
"""Attempt to re-schedule a volume operation."""
|
||||
|
||||
retry = filter_properties.get('retry', None)
|
||||
if not retry:
|
||||
# no retry information, do not reschedule.
|
||||
LOG.debug(_("Retry info not present, will not reschedule"),
|
||||
volume_id=volume_id)
|
||||
return
|
||||
|
||||
if not request_spec:
|
||||
LOG.debug(_("No request spec, will not reschedule"),
|
||||
volume_id=volume_id)
|
||||
return
|
||||
|
||||
request_spec['volume_id'] = [volume_id]
|
||||
|
||||
LOG.debug(_("Re-scheduling %(method)s: attempt %(num)d") %
|
||||
{'method': scheduler_method.func_name,
|
||||
'num': retry['num_attempts']}, volume_id=volume_id)
|
||||
|
||||
# reset the volume state:
|
||||
now = timeutils.utcnow()
|
||||
self.db.volume_update(context, volume_id,
|
||||
{'status': 'creating',
|
||||
'scheduled_at': now})
|
||||
|
||||
if exc_info:
|
||||
# stringify to avoid circular ref problem in json serialization:
|
||||
retry['exc'] = traceback.format_exception(*exc_info)
|
||||
|
||||
scheduler_method(context, *method_args)
|
||||
return True
|
||||
|
||||
def delete_volume(self, context, volume_id):
|
||||
"""Deletes and unexports volume."""
|
||||
context = context.elevated()
|
||||
@ -408,23 +535,14 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
volume_ref['name'] not in volume_ref['provider_location']):
|
||||
self.driver.ensure_export(context, volume_ref)
|
||||
|
||||
def _copy_image_to_volume(self, context, volume, image_id):
|
||||
def _copy_image_to_volume(self, context, volume, image_service, image_id):
|
||||
"""Downloads Glance image to the specified volume. """
|
||||
volume_id = volume['id']
|
||||
payload = {'volume_id': volume_id, 'image_id': image_id}
|
||||
try:
|
||||
image_service, image_id = glance.get_remote_image_service(context,
|
||||
image_id)
|
||||
self.driver.copy_image_to_volume(context, volume, image_service,
|
||||
image_id)
|
||||
LOG.debug(_("Downloaded image %(image_id)s to %(volume_id)s "
|
||||
"successfully") % locals())
|
||||
self.db.volume_update(context, volume_id,
|
||||
{'status': 'available'})
|
||||
except Exception, error:
|
||||
with excutils.save_and_reraise_exception():
|
||||
payload['message'] = unicode(error)
|
||||
self.db.volume_update(context, volume_id, {'status': 'error'})
|
||||
self.driver.copy_image_to_volume(context, volume,
|
||||
image_service,
|
||||
image_id)
|
||||
LOG.debug(_("Downloaded image %(image_id)s to %(volume_id)s "
|
||||
"successfully") % locals())
|
||||
|
||||
def copy_volume_to_image(self, context, volume_id, image_meta):
|
||||
"""Uploads the specified volume to Glance.
|
||||
|
@ -36,6 +36,8 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
|
||||
1.1 - Adds clone volume option to create_volume.
|
||||
1.2 - Add publish_service_capabilities() method.
|
||||
1.3 - Pass all image metadata (not just ID) in copy_volume_to_image
|
||||
1.4 - Add request_spec, filter_properties and
|
||||
allow_reschedule arguments to create_volume().
|
||||
'''
|
||||
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
@ -46,18 +48,23 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
|
||||
default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def create_volume(self, ctxt, volume, host,
|
||||
request_spec, filter_properties,
|
||||
allow_reschedule=True,
|
||||
snapshot_id=None, image_id=None,
|
||||
source_volid=None):
|
||||
self.cast(ctxt,
|
||||
self.make_msg('create_volume',
|
||||
volume_id=volume['id'],
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties,
|
||||
allow_reschedule=allow_reschedule,
|
||||
snapshot_id=snapshot_id,
|
||||
image_id=image_id,
|
||||
source_volid=source_volid),
|
||||
topic=rpc.queue_get_for(ctxt,
|
||||
self.topic,
|
||||
host),
|
||||
version='1.1')
|
||||
version='1.4')
|
||||
|
||||
def delete_volume(self, ctxt, volume):
|
||||
self.cast(ctxt,
|
||||
|
2
setup.py
2
setup.py
@ -34,6 +34,8 @@ filters = [
|
||||
"cinder.scheduler.filters.capacity_filter:CapacityFilter",
|
||||
"JsonFilter = "
|
||||
"cinder.openstack.common.scheduler.filters.json_filter:JsonFilter",
|
||||
"RetryFilter = "
|
||||
"cinder.scheduler.filters.retry_filter:RetryFilter",
|
||||
]
|
||||
|
||||
weights = [
|
||||
|
Loading…
x
Reference in New Issue
Block a user