Merge "Sheepdog:make full use of all sheepdog nodes"
This commit is contained in:
commit
c1c4533757
@ -111,6 +111,10 @@ class SheepdogDriverTestDataGenerator(object):
|
||||
return ('env', 'LC_ALL=C', 'LANG=C', 'dog', 'node', 'info',
|
||||
'-a', SHEEP_ADDR, '-p', SHEEP_PORT, '-r')
|
||||
|
||||
def cmd_dog_node_list(self):
|
||||
return ('env', 'LC_ALL=C', 'LANG=C', 'dog', 'node', 'list',
|
||||
'-a', SHEEP_ADDR, '-p', SHEEP_PORT, '-r')
|
||||
|
||||
CMD_DOG_CLUSTER_INFO = ('env', 'LC_ALL=C', 'LANG=C', 'dog', 'cluster',
|
||||
'info', '-a', SHEEP_ADDR, '-p', SHEEP_PORT)
|
||||
|
||||
@ -151,6 +155,10 @@ class SheepdogDriverTestDataGenerator(object):
|
||||
COLLIE_NODE_INFO = """
|
||||
0 107287605248 3623897354 3%
|
||||
Total 107287605248 3623897354 3% 54760833024
|
||||
"""
|
||||
|
||||
COLLIE_NODE_LIST = """
|
||||
0 127.0.0.1:7000 128 1
|
||||
"""
|
||||
|
||||
COLLIE_CLUSTER_INFO_0_5 = """\
|
||||
@ -390,7 +398,8 @@ class SheepdogClientTestCase(test.TestCase):
|
||||
self.driver.db = self.db
|
||||
self.driver.do_setup(None)
|
||||
self.test_data = SheepdogDriverTestDataGenerator()
|
||||
self.client = self.driver.client
|
||||
node_list = [SHEEP_ADDR]
|
||||
self.client = sheepdog.SheepdogClient(node_list, SHEEP_PORT)
|
||||
self._addr = SHEEP_ADDR
|
||||
self._port = SHEEP_PORT
|
||||
self._vdiname = self.test_data.TEST_VOLUME.name
|
||||
@ -466,7 +475,6 @@ class SheepdogClientTestCase(test.TestCase):
|
||||
def test_run_dog_unknown_error(self, fake_logger, fake_execute):
|
||||
args = ('cluster', 'info')
|
||||
cmd = self.test_data.CMD_DOG_CLUSTER_INFO
|
||||
cmd = self.test_data.CMD_DOG_CLUSTER_INFO
|
||||
exit_code = 1
|
||||
stdout = 'stdout dummy'
|
||||
stderr = 'stderr dummy'
|
||||
@ -1055,6 +1063,32 @@ class SheepdogClientTestCase(test.TestCase):
|
||||
self.assertTrue(fake_logger.error.called)
|
||||
self.assertEqual(expected_msg, ex.msg)
|
||||
|
||||
@mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
|
||||
def test_update_node_list_success(self, fake_execute):
|
||||
expected_cmd = ('node', 'list', '-r')
|
||||
fake_execute.return_value = (self.test_data.COLLIE_NODE_LIST, '')
|
||||
self.client.update_node_list()
|
||||
fake_execute.assert_called_once_with(*expected_cmd)
|
||||
|
||||
@mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
|
||||
@mock.patch.object(sheepdog, 'LOG')
|
||||
def test_update_node_list_unknown_error(self, fake_logger, fake_execute):
|
||||
cmd = self.test_data.cmd_dog_node_list()
|
||||
exit_code = 2
|
||||
stdout = 'stdout_dummy'
|
||||
stderr = 'stderr_dummy'
|
||||
expected_msg = self.test_data.sheepdog_cmd_error(cmd=cmd,
|
||||
exit_code=exit_code,
|
||||
stdout=stdout,
|
||||
stderr=stderr)
|
||||
fake_execute.side_effect = exception.SheepdogCmdError(
|
||||
cmd=cmd, exit_code=exit_code, stdout=stdout.replace('\n', '\\n'),
|
||||
stderr=stderr.replace('\n', '\\n'))
|
||||
ex = self.assertRaises(exception.SheepdogCmdError,
|
||||
self.client.update_node_list)
|
||||
self.assertTrue(fake_logger.error.called)
|
||||
self.assertEqual(expected_msg, ex.msg)
|
||||
|
||||
|
||||
class SheepdogDriverTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
@ -1078,10 +1112,12 @@ class SheepdogDriverTestCase(test.TestCase):
|
||||
self._dst_vdiname = self.test_data.TEST_CLONED_VOLUME.name
|
||||
self._dst_vdisize = self.test_data.TEST_CLONED_VOLUME.size
|
||||
|
||||
@mock.patch.object(sheepdog.SheepdogClient, 'update_node_list')
|
||||
@mock.patch.object(sheepdog.SheepdogClient, 'check_cluster_status')
|
||||
def test_check_for_setup_error(self, fake_execute):
|
||||
def test_check_for_setup_error(self, fake_check, fake_update):
|
||||
self.driver.check_for_setup_error()
|
||||
fake_execute.assert_called_once_with()
|
||||
fake_check.assert_called_once_with()
|
||||
fake_update.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(sheepdog.SheepdogClient, 'create')
|
||||
def test_create_volume(self, fake_execute):
|
||||
|
@ -22,6 +22,7 @@ SheepDog Volume Driver.
|
||||
import errno
|
||||
import eventlet
|
||||
import io
|
||||
import random
|
||||
import re
|
||||
|
||||
from oslo_concurrency import processutils
|
||||
@ -55,6 +56,7 @@ CONF.register_opts(sheepdog_opts)
|
||||
|
||||
class SheepdogClient(object):
|
||||
"""Sheepdog command executor."""
|
||||
|
||||
QEMU_SHEEPDOG_PREFIX = 'sheepdog:'
|
||||
DOG_RESP_CONNECTION_ERROR = 'failed to connect to'
|
||||
DOG_RESP_CLUSTER_RUNNING = 'Cluster status: running'
|
||||
@ -76,13 +78,26 @@ class SheepdogClient(object):
|
||||
QEMU_IMG_RESP_VDI_NOT_FOUND = 'No vdi found'
|
||||
QEMU_IMG_RESP_SIZE_TOO_LARGE = 'An image is too large.'
|
||||
|
||||
def __init__(self, addr, port):
|
||||
self.addr = addr
|
||||
def __init__(self, node_list, port):
|
||||
self.node_list = node_list
|
||||
self.port = port
|
||||
|
||||
def get_addr(self):
|
||||
"""Get a random node in sheepdog cluster."""
|
||||
return self.node_list[random.randint(0, len(self.node_list) - 1)]
|
||||
|
||||
def local_path(self, volume):
|
||||
"""Return a sheepdog location path."""
|
||||
return "sheepdog:%(addr)s:%(port)s:%(name)s" % {
|
||||
'addr': self.get_addr(),
|
||||
'port': self.port,
|
||||
'name': volume['name']}
|
||||
|
||||
def _run_dog(self, command, subcommand, *params):
|
||||
"""Execute dog command wrapper."""
|
||||
addr = self.get_addr()
|
||||
cmd = ('env', 'LC_ALL=C', 'LANG=C', 'dog', command, subcommand,
|
||||
'-a', self.addr, '-p', self.port) + params
|
||||
'-a', addr, '-p', self.port) + params
|
||||
try:
|
||||
(_stdout, _stderr) = utils.execute(*cmd)
|
||||
if _stderr.startswith(self.DOG_RESP_CONNECTION_ERROR):
|
||||
@ -95,7 +110,7 @@ class SheepdogClient(object):
|
||||
# by old Sheepdog users.
|
||||
reason = (_('Failed to connect to sheep daemon. '
|
||||
'addr: %(addr)s, port: %(port)s'),
|
||||
{'addr': self.addr, 'port': self.port})
|
||||
{'addr': addr, 'port': self.port})
|
||||
raise exception.SheepdogError(reason=reason)
|
||||
return (_stdout, _stderr)
|
||||
except OSError as e:
|
||||
@ -111,7 +126,7 @@ class SheepdogClient(object):
|
||||
if _stderr.startswith(self.DOG_RESP_CONNECTION_ERROR):
|
||||
reason = (_('Failed to connect to sheep daemon. '
|
||||
'addr: %(addr)s, port: %(port)s'),
|
||||
{'addr': self.addr, 'port': self.port})
|
||||
{'addr': addr, 'port': self.port})
|
||||
raise exception.SheepdogError(reason=reason)
|
||||
raise exception.SheepdogCmdError(
|
||||
cmd=e.cmd,
|
||||
@ -120,7 +135,8 @@ class SheepdogClient(object):
|
||||
stderr=e.stderr.replace('\n', '\\n'))
|
||||
|
||||
def _run_qemu_img(self, command, *params):
|
||||
"""Executes qemu-img command wrapper"""
|
||||
"""Executes qemu-img command wrapper."""
|
||||
addr = self.get_addr()
|
||||
cmd = ['env', 'LC_ALL=C', 'LANG=C', 'qemu-img', command]
|
||||
for param in params:
|
||||
if param.startswith(self.QEMU_SHEEPDOG_PREFIX):
|
||||
@ -129,7 +145,7 @@ class SheepdogClient(object):
|
||||
param = param.replace(self.QEMU_SHEEPDOG_PREFIX,
|
||||
'%(prefix)s%(addr)s:%(port)s:' %
|
||||
{'prefix': self.QEMU_SHEEPDOG_PREFIX,
|
||||
'addr': self.addr, 'port': self.port},
|
||||
'addr': addr, 'port': self.port},
|
||||
1)
|
||||
cmd.append(param)
|
||||
try:
|
||||
@ -147,7 +163,7 @@ class SheepdogClient(object):
|
||||
if self.QEMU_IMG_RESP_CONNECTION_ERROR in _stderr:
|
||||
reason = (_('Failed to connect to sheep daemon. '
|
||||
'addr: %(addr)s, port: %(port)s'),
|
||||
{'addr': self.addr, 'port': self.port})
|
||||
{'addr': addr, 'port': self.port})
|
||||
raise exception.SheepdogError(reason=reason)
|
||||
raise exception.SheepdogCmdError(
|
||||
cmd=e.cmd,
|
||||
@ -296,6 +312,18 @@ class SheepdogClient(object):
|
||||
LOG.error(_LE('Failed to get volume status. %s'), e)
|
||||
return _stdout
|
||||
|
||||
def update_node_list(self):
|
||||
try:
|
||||
(_stdout, _stderr) = self._run_dog('node', 'list', '-r')
|
||||
except exception.SheepdogCmdError as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE('Failed to get node list. %s'), e)
|
||||
node_list = []
|
||||
stdout = _stdout.strip('\n')
|
||||
for line in stdout.split('\n'):
|
||||
node_list.append(line.split()[1].split(':')[0])
|
||||
self.node_list = node_list
|
||||
|
||||
|
||||
class SheepdogIOWrapper(io.RawIOBase):
|
||||
"""File-like object with Sheepdog backend."""
|
||||
@ -403,18 +431,20 @@ class SheepdogDriver(driver.VolumeDriver):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(SheepdogDriver, self).__init__(*args, **kwargs)
|
||||
self.configuration.append_config_values(sheepdog_opts)
|
||||
self.addr = self.configuration.sheepdog_store_address
|
||||
addr = self.configuration.sheepdog_store_address
|
||||
self.port = self.configuration.sheepdog_store_port
|
||||
self.client = SheepdogClient(self.addr, self.port)
|
||||
self.stats_pattern = re.compile(r'[\w\s%]*Total\s(\d+)\s(\d+)*')
|
||||
self._stats = {}
|
||||
self.node_list = [addr]
|
||||
self.client = SheepdogClient(self.node_list, self.port)
|
||||
|
||||
def check_for_setup_error(self):
|
||||
"""Check cluster status and update node list."""
|
||||
self.client.check_cluster_status()
|
||||
self.client.update_node_list()
|
||||
|
||||
def _is_cloneable(self, image_location, image_meta):
|
||||
"""Check the image can be clone or not."""
|
||||
|
||||
if image_location is None:
|
||||
return False
|
||||
|
||||
@ -459,12 +489,11 @@ class SheepdogDriver(driver.VolumeDriver):
|
||||
self.create_cloned_volume(volume, volume_ref)
|
||||
self.client.resize(volume.name, volume.size)
|
||||
|
||||
vol_path = self.local_path(volume)
|
||||
vol_path = self.client.local_path(volume)
|
||||
return {'provider_location': vol_path}, True
|
||||
|
||||
def create_cloned_volume(self, volume, src_vref):
|
||||
"""Clone a sheepdog volume from another volume."""
|
||||
|
||||
snapshot_name = src_vref['name'] + '-temp-snapshot'
|
||||
snapshot = {
|
||||
'name': snapshot_name,
|
||||
@ -508,7 +537,8 @@ class SheepdogDriver(driver.VolumeDriver):
|
||||
# see volume/drivers/manager.py:_create_volume
|
||||
self.client.delete(volume.name)
|
||||
# convert and store into sheepdog
|
||||
image_utils.convert_image(tmp, self.local_path(volume), 'raw')
|
||||
image_utils.convert_image(tmp, self.client.local_path(volume),
|
||||
'raw')
|
||||
self.client.resize(volume.name, volume.size)
|
||||
|
||||
def copy_volume_to_image(self, context, volume, image_service, image_meta):
|
||||
@ -523,7 +553,7 @@ class SheepdogDriver(driver.VolumeDriver):
|
||||
'-f', 'raw',
|
||||
'-t', 'none',
|
||||
'-O', 'raw',
|
||||
self.local_path(volume),
|
||||
self.client.local_path(volume),
|
||||
tmp)
|
||||
self._try_execute(*cmd)
|
||||
|
||||
@ -538,12 +568,6 @@ class SheepdogDriver(driver.VolumeDriver):
|
||||
"""Delete a sheepdog snapshot."""
|
||||
self.client.delete_snapshot(snapshot.volume_name, snapshot.name)
|
||||
|
||||
def local_path(self, volume):
|
||||
return "sheepdog:%(addr)s:%(port)s:%(name)s" % {
|
||||
'addr': self.addr,
|
||||
'port': self.port,
|
||||
'name': volume['name']}
|
||||
|
||||
def ensure_export(self, context, volume):
|
||||
"""Safely and synchronously recreate an export for a logical volume."""
|
||||
pass
|
||||
@ -561,7 +585,7 @@ class SheepdogDriver(driver.VolumeDriver):
|
||||
'driver_volume_type': 'sheepdog',
|
||||
'data': {
|
||||
'name': volume['name'],
|
||||
'hosts': [self.addr],
|
||||
'hosts': [self.client.get_addr()],
|
||||
'ports': ["%d" % self.port],
|
||||
}
|
||||
}
|
||||
@ -631,7 +655,7 @@ class SheepdogDriver(driver.VolumeDriver):
|
||||
raise exception.SheepdogError(reason=msg)
|
||||
|
||||
try:
|
||||
sheepdog_fd = SheepdogIOWrapper(self.addr, self.port,
|
||||
sheepdog_fd = SheepdogIOWrapper(self.client.get_addr(), self.port,
|
||||
src_volume, temp_snapshot_name)
|
||||
backup_service.backup(backup, sheepdog_fd)
|
||||
finally:
|
||||
@ -639,5 +663,6 @@ class SheepdogDriver(driver.VolumeDriver):
|
||||
|
||||
def restore_backup(self, context, backup, volume, backup_service):
|
||||
"""Restore an existing backup to a new or existing volume."""
|
||||
sheepdog_fd = SheepdogIOWrapper(self.addr, self.port, volume)
|
||||
sheepdog_fd = SheepdogIOWrapper(self.client.get_addr(),
|
||||
self.port, volume)
|
||||
backup_service.restore(backup, volume['id'], sheepdog_fd)
|
||||
|
Loading…
x
Reference in New Issue
Block a user