Merge "Huawei: Implement v2 replication (managed)"

This commit is contained in:
Jenkins 2016-02-15 17:59:28 +00:00 committed by Gerrit Code Review
commit f84a98dad8
7 changed files with 1560 additions and 37 deletions

View File

@ -31,6 +31,7 @@ from cinder.volume.drivers.huawei import fc_zone_helper
from cinder.volume.drivers.huawei import huawei_conf
from cinder.volume.drivers.huawei import huawei_driver
from cinder.volume.drivers.huawei import hypermetro
from cinder.volume.drivers.huawei import replication
from cinder.volume.drivers.huawei import rest_client
from cinder.volume.drivers.huawei import smartx
@ -98,6 +99,31 @@ hyper_volume = {'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
'value': '11'}],
}
sync_replica_specs = {'replication_enabled': '<is> True',
'replication_type': '<in> sync'}
async_replica_specs = {'replication_enabled': '<is> True',
'replication_type': '<in> async'}
TEST_PAIR_ID = "3400a30d844d0004"
replication_volume = {
'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
'size': 2,
'volume_name': 'vol1',
'id': '21ec7341-9256-497b-97d9-ef48edcf0635',
'volume_id': '21ec7341-9256-497b-97d9-ef48edcf0635',
'provider_auth': None,
'project_id': 'project',
'display_name': 'vol1',
'display_description': 'test volume',
'volume_type_id': None,
'host': 'ubuntu@huawei#OpenStack_Pool',
'provider_location': '11',
'metadata': {'lun_wwn': '6643e8c1004c5f6723e9f454003'},
'replication_status': 'disabled',
'replication_driver_data':
'{"pair_id": "%s", "rmt_lun_id": "1"}' % TEST_PAIR_ID,
}
test_snap = {'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
'size': 1,
'volume_name': 'vol1',
@ -157,6 +183,22 @@ test_new_type = {
'description': None,
}
test_new_replication_type = {
'name': u'new_type',
'qos_specs_id': None,
'deleted': False,
'created_at': None,
'updated_at': None,
'extra_specs': {
'replication_enabled': '<is> True',
'replication_type': '<in> sync',
},
'is_public': True,
'deleted_at': None,
'id': u'530a56e1-a1a4-49f3-ab6c-779a6e5d999f',
'description': None,
}
hypermetro_devices = """
{
"remote_device": {
@ -279,7 +321,13 @@ FAKE_LUN_INFO_RESPONSE = """
},
"data": {
"ID": "1",
"NAME": "5mFHcBv4RkCcD+JyrWc0SA"
"NAME": "5mFHcBv4RkCcD+JyrWc0SA",
"WWN": "6643e8c1004c5f6723e9f454003",
"DESCRIPTION": "21ec7341-9256-497b-97d9-ef48edcf0635",
"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "27",
"ALLOCTYPE": "1",
"CAPACITY": "2097152"
}
}
"""
@ -294,7 +342,7 @@ FAKE_LUN_GET_SUCCESS_RESPONSE = """
"IOCLASSID": "11",
"NAME": "5mFHcBv4RkCcD+JyrWc0SA",
"DESCRIPTION": "21ec7341-9256-497b-97d9-ef48edcf0635",
"RUNNINGSTATUS": "2",
"RUNNINGSTATUS": "10",
"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "27",
"LUNLIST": "",
@ -309,7 +357,9 @@ FAKE_LUN_GET_SUCCESS_RESPONSE = """
"WRITECACHEPOLICY": "5",
"OWNINGCONTROLLER": "0B",
"SMARTCACHEPARTITIONID": "",
"CACHEPARTITIONID": ""
"CACHEPARTITIONID": "",
"WWN": "6643e8c1004c5f6723e9f454003",
"PARENTNAME": "OpenStack_Pool"
}
}
"""
@ -938,7 +988,8 @@ FAKE_SYSTEM_VERSION_RESPONSE = """
"code": 0
},
"data":{
"PRODUCTVERSION": "V100R001C10"
"PRODUCTVERSION": "V100R001C10",
"wwn": "21003400a30d844d"
}
}
"""
@ -1603,6 +1654,148 @@ MAP_COMMAND_TO_FAKE_RESPONSE['/fc_port/associate?TYPE=213&ASSOCIATEOBJTYPE='
FAKE_PORTS_IN_PG_RESPONSE)
# Replication response
FAKE_GET_REMOTEDEV_RESPONSE = """
{
"data":[{
"ARRAYTYPE":"1",
"HEALTHSTATUS":"1",
"ID":"0",
"NAME":"Huawei.Storage",
"RUNNINGSTATUS":"1",
"WWN":"21003400a30d844d"
}],
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/remote_device/GET'] = (
FAKE_GET_REMOTEDEV_RESPONSE)
FAKE_CREATE_PAIR_RESPONSE = """
{
"data":{
"ID":"%s"
},
"error":{
"code":0,
"description":"0"
}
}
""" % TEST_PAIR_ID
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/POST'] = (
FAKE_CREATE_PAIR_RESPONSE)
FAKE_DELETE_PAIR_RESPONSE = """
{
"data":{},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/DELETE' % TEST_PAIR_ID] = (
FAKE_DELETE_PAIR_RESPONSE)
FAKE_SET_PAIR_ACCESS_RESPONSE = """
{
"data":{},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/PUT' % TEST_PAIR_ID] = (
FAKE_SET_PAIR_ACCESS_RESPONSE)
FAKE_GET_PAIR_NORMAL_RESPONSE = """
{
"data":{
"REPLICATIONMODEL": "1",
"RUNNINGSTATUS": "1",
"SECRESACCESS": "2",
"HEALTHSTATUS": "1",
"ISPRIMARY": "true"
},
"error":{
"code":0,
"description":"0"
}
}
"""
FAKE_GET_PAIR_SPLIT_RESPONSE = """
{
"data":{
"REPLICATIONMODEL": "1",
"RUNNINGSTATUS": "26",
"SECRESACCESS": "2",
"ISPRIMARY": "true"
},
"error":{
"code":0,
"description":"0"
}
}
"""
FAKE_GET_PAIR_SYNC_RESPONSE = """
{
"data":{
"REPLICATIONMODEL": "1",
"RUNNINGSTATUS": "23",
"SECRESACCESS": "2"
},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/GET' % TEST_PAIR_ID] = (
FAKE_GET_PAIR_NORMAL_RESPONSE)
FAKE_SYNC_PAIR_RESPONSE = """
{
"data":{},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/sync/PUT'] = (
FAKE_SYNC_PAIR_RESPONSE)
FAKE_SPLIT_PAIR_RESPONSE = """
{
"data":{},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/split/PUT'] = (
FAKE_SPLIT_PAIR_RESPONSE)
FAKE_SWITCH_PAIR_RESPONSE = """
{
"data":{},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/switch/PUT'] = (
FAKE_SWITCH_PAIR_RESPONSE)
def Fake_sleep(time):
pass
@ -1612,7 +1805,14 @@ class FakeHuaweiConf(object):
self.conf = conf
self.protocol = protocol
def safe_get(self, key):
try:
return getattr(self.conf, key)
except Exception:
return
def update_config_value(self):
setattr(self.conf, 'volume_backend_name', 'huawei_storage')
setattr(self.conf, 'san_address',
['http://100.115.10.69:8082/deviceManager/rest/'])
setattr(self.conf, 'san_user', 'admin')
@ -1646,6 +1846,16 @@ class FakeHuaweiConf(object):
'TargetPortGroup': 'portgroup-test', }
setattr(self.conf, 'iscsi_info', [iscsi_info])
targets = [{'target_device_id': 'huawei-replica-1',
'managed_backend_name': 'ubuntu@huawei2#OpenStack_Pool',
'san_address':
'https://100.97.10.69:8088/deviceManager/rest/',
'san_user': 'admin',
'san_password': 'Admin@storage1'}]
setattr(self.conf, 'replication_device', targets)
setattr(self.conf, 'safe_get', self.safe_get)
class FakeClient(rest_client.RestClient):
@ -1717,6 +1927,11 @@ class FakeDB(object):
return volumes
class FakeReplicaPairManager(replication.ReplicaPairManager):
def _init_rmt_client(self):
self.rmt_client = FakeClient(self.conf)
class FakeISCSIStorage(huawei_driver.HuaweiISCSIDriver):
"""Fake Huawei Storage, Rewrite some methods of HuaweiISCSIDriver."""
@ -1734,6 +1949,7 @@ class FakeISCSIStorage(huawei_driver.HuaweiISCSIDriver):
self.rmt_client,
self.configuration,
self.db)
self.replica = FakeReplicaPairManager(self.client, self.configuration)
class FakeFCStorage(huawei_driver.HuaweiFCDriver):
@ -1754,6 +1970,7 @@ class FakeFCStorage(huawei_driver.HuaweiFCDriver):
self.rmt_client,
self.configuration,
self.db)
self.replica = FakeReplicaPairManager(self.client, self.configuration)
@ddt.ddt
@ -1836,9 +2053,20 @@ class HuaweiISCSIDriverTestCase(test.TestCase):
self.assertTrue(delete_flag)
def test_create_volume_from_snapsuccess(self):
lun_info = self.driver.create_volume_from_snapshot(test_volume,
test_volume)
self.assertEqual('1', lun_info['provider_location'])
self.mock_object(
huawei_driver.HuaweiBaseDriver,
'_get_volume_type',
mock.Mock(return_value={'extra_specs': sync_replica_specs}))
self.mock_object(replication.ReplicaCommonDriver, 'sync')
model_update = self.driver.create_volume_from_snapshot(test_volume,
test_volume)
self.assertEqual('1', model_update['provider_location'])
driver_data = {'pair_id': TEST_PAIR_ID,
'rmt_lun_id': '1'}
driver_data = replication.to_string(driver_data)
self.assertEqual(driver_data, model_update['replication_driver_data'])
self.assertEqual('enabled', model_update['replication_status'])
def test_initialize_connection_success(self):
iscsi_properties = self.driver.initialize_connection(test_volume,
@ -1850,7 +2078,7 @@ class HuaweiISCSIDriverTestCase(test.TestCase):
def test_get_volume_status(self):
data = self.driver.get_volume_stats()
self.assertEqual('2.0.3', data['driver_version'])
self.assertEqual('2.0.5', data['driver_version'])
def test_extend_volume(self):
@ -2501,6 +2729,311 @@ class HuaweiISCSIDriverTestCase(test.TestCase):
self.driver.unmanage_snapshot(test_snapshot)
self.assertEqual(1, mock_rename.call_count)
def test_init_rmt_client(self):
self.mock_object(rest_client, 'RestClient',
mock.Mock(return_value=None))
replica = replication.ReplicaPairManager(self.driver.client,
self.configuration)
self.assertEqual(replica.rmt_pool, 'OpenStack_Pool')
self.assertEqual(replica.target_dev_id, 'huawei-replica-1')
@ddt.data(sync_replica_specs, async_replica_specs)
def test_create_replication_success(self, mock_type):
self.mock_object(replication.ReplicaCommonDriver, 'sync')
self.mock_object(
huawei_driver.HuaweiBaseDriver,
'_get_volume_type',
mock.Mock(return_value={'extra_specs': mock_type}))
model_update = self.driver.create_volume(replication_volume)
driver_data = {'pair_id': TEST_PAIR_ID,
'rmt_lun_id': '1'}
driver_data = replication.to_string(driver_data)
self.assertEqual(driver_data, model_update['replication_driver_data'])
self.assertEqual('enabled', model_update['replication_status'])
@ddt.data(
[
rest_client.RestClient,
'get_array_info',
mock.Mock(
side_effect=exception.VolumeBackendAPIException(data='err'))
],
[
rest_client.RestClient,
'get_remote_devices',
mock.Mock(
side_effect=exception.VolumeBackendAPIException(data='err'))
],
[
rest_client.RestClient,
'get_remote_devices',
mock.Mock(return_value={})
],
[
replication.ReplicaPairManager,
'wait_volume_online',
mock.Mock(side_effect=[
None,
exception.VolumeBackendAPIException(data='err')])
],
[
rest_client.RestClient,
'create_pair',
mock.Mock(
side_effect=exception.VolumeBackendAPIException(data='err'))
],
[
replication.ReplicaCommonDriver,
'sync',
mock.Mock(
side_effect=exception.VolumeBackendAPIException(data='err'))
],
)
@ddt.unpack
def test_create_replication_fail(self, mock_module, mock_func, mock_value):
self.mock_object(
huawei_driver.HuaweiBaseDriver,
'_get_volume_type',
mock.Mock(return_value={'extra_specs': sync_replica_specs}))
self.mock_object(replication.ReplicaPairManager, '_delete_pair')
self.mock_object(mock_module, mock_func, mock_value)
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.create_volume, replication_volume)
def test_delete_replication_success(self):
self.mock_object(replication.ReplicaCommonDriver, 'split')
self.mock_object(
huawei_driver.HuaweiBaseDriver,
'_get_volume_type',
mock.Mock(return_value={'extra_specs': sync_replica_specs}))
self.driver.delete_volume(replication_volume)
self.mock_object(rest_client.RestClient, 'check_lun_exist',
mock.Mock(return_value=False))
self.driver.delete_volume(replication_volume)
def test_wait_volume_online(self):
replica = FakeReplicaPairManager(self.driver.client,
self.configuration)
lun_info = {'ID': '11'}
replica.wait_volume_online(self.driver.client, lun_info)
offline_status = {'RUNNINGSTATUS': '28'}
replica.wait_volume_online(self.driver.client, lun_info)
with mock.patch.object(rest_client.RestClient, 'get_lun_info',
offline_status):
self.assertRaises(exception.VolumeBackendAPIException,
replica.wait_volume_online,
self.driver.client,
lun_info)
def test_wait_second_access(self):
pair_id = '1'
access_ro = constants.REPLICA_SECOND_RO
access_rw = constants.REPLICA_SECOND_RW
op = replication.PairOp(self.driver.client)
common_driver = replication.ReplicaCommonDriver(self.configuration, op)
self.mock_object(replication.PairOp, 'get_replica_info',
mock.Mock(return_value={'SECRESACCESS': access_ro}))
common_driver.wait_second_access(pair_id, access_ro)
self.assertRaises(exception.VolumeBackendAPIException,
common_driver.wait_second_access, pair_id, access_rw)
def test_wait_replica_ready(self):
normal_status = {
'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_NORMAL,
'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
}
split_status = {
'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_SPLIT,
'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
}
sync_status = {
'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_SYNC,
'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
}
pair_id = '1'
op = replication.PairOp(self.driver.client)
common_driver = replication.ReplicaCommonDriver(self.configuration, op)
with mock.patch.object(replication.PairOp, 'get_replica_info',
mock.Mock(return_value=normal_status)):
common_driver.wait_replica_ready(pair_id)
with mock.patch.object(
replication.PairOp,
'get_replica_info',
mock.Mock(side_effect=[sync_status, normal_status])):
common_driver.wait_replica_ready(pair_id)
with mock.patch.object(replication.PairOp, 'get_replica_info',
mock.Mock(return_value=split_status)):
self.assertRaises(exception.VolumeBackendAPIException,
common_driver.wait_replica_ready, pair_id)
def test_replication_enable_success(self):
self.mock_object(replication.ReplicaCommonDriver, 'unprotect_second')
self.mock_object(replication.ReplicaCommonDriver, 'split')
self.mock_object(replication.PairOp, 'is_primary',
mock.Mock(side_effect=[False, True]))
self.driver.replication_enable(None, replication_volume)
@ddt.data(
[
replication.AbsReplicaOp,
'is_running_status',
mock.Mock(return_value=False)
],
[
replication,
'get_replication_driver_data',
mock.Mock(return_value={})
],
[
replication.PairOp,
'get_replica_info',
mock.Mock(return_value={})
],
)
@ddt.unpack
def test_replication_enable_fail(self, mock_module, mock_func, mock_value):
self.mock_object(mock_module, mock_func, mock_value)
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.replication_enable, None, replication_volume)
def test_replication_disable_fail(self):
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.replication_disable, None, replication_volume)
def test_replication_disable_success(self):
self.mock_object(replication.ReplicaCommonDriver, 'split')
self.driver.replication_disable(None, replication_volume)
self.mock_object(replication, 'get_replication_driver_data',
mock.Mock(return_value={}))
self.driver.replication_disable(None, replication_volume)
def test_replication_failover_success(self):
self.mock_object(replication.ReplicaCommonDriver, 'split')
self.mock_object(replication.PairOp, 'is_primary',
mock.Mock(return_value=False))
model_update = self.driver.replication_failover(
None, replication_volume, None)
self.assertEqual('ubuntu@huawei2#OpenStack_Pool', model_update['host'])
self.assertEqual('1', model_update['provider_location'])
driver_data = {'pair_id': TEST_PAIR_ID,
'rmt_lun_id': '11'}
driver_data = replication.to_string(driver_data)
self.assertEqual(driver_data, model_update['replication_driver_data'])
@ddt.data(
[
replication.PairOp,
'is_primary',
mock.Mock(return_value=True)
],
[
replication.PairOp,
'is_primary',
mock.Mock(return_value=False)
],
[
replication,
'get_replication_driver_data',
mock.Mock(return_value={})
],
[
replication,
'get_replication_driver_data',
mock.Mock(return_value={'pair_id': '1'})
],
)
@ddt.unpack
def test_replication_failover_fail(self,
mock_module, mock_func, mock_value):
self.mock_object(
replication.ReplicaCommonDriver,
'wait_second_access',
mock.Mock(
side_effect=exception.VolumeBackendAPIException(data="error")))
self.mock_object(mock_module, mock_func, mock_value)
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.replication_failover,
None,
replication_volume, None)
def test_list_replication_targets(self):
info = self.driver.list_replication_targets(None, replication_volume)
targets = [{'target_device_id': 'huawei-replica-1'}]
self.assertEqual(targets, info['targets'])
self.mock_object(replication, 'get_replication_driver_data',
mock.Mock(return_value={}))
info = self.driver.list_replication_targets(None, replication_volume)
self.assertEqual(targets, info['targets'])
@ddt.data(constants.REPLICA_SECOND_RW, constants.REPLICA_SECOND_RO)
def test_replication_protect_second(self, mock_access):
replica_id = TEST_PAIR_ID
op = replication.PairOp(self.driver.client)
common_driver = replication.ReplicaCommonDriver(self.configuration, op)
self.mock_object(replication.ReplicaCommonDriver, 'wait_second_access')
self.mock_object(
replication.PairOp,
'get_replica_info',
mock.Mock(return_value={'SECRESACCESS': mock_access}))
common_driver.protect_second(replica_id)
common_driver.unprotect_second(replica_id)
def test_replication_sync(self):
replica_id = TEST_PAIR_ID
op = replication.PairOp(self.driver.client)
common_driver = replication.ReplicaCommonDriver(self.configuration, op)
async_normal_status = {
'REPLICATIONMODEL': constants.REPLICA_ASYNC_MODEL,
'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_NORMAL,
'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
}
self.mock_object(replication.ReplicaCommonDriver, 'protect_second')
self.mock_object(replication.PairOp, 'get_replica_info',
mock.Mock(return_value=async_normal_status))
common_driver.sync(replica_id, True)
common_driver.sync(replica_id, False)
def test_replication_split(self):
replica_id = TEST_PAIR_ID
op = replication.PairOp(self.driver.client)
common_driver = replication.ReplicaCommonDriver(self.configuration, op)
self.mock_object(replication.ReplicaCommonDriver, 'wait_expect_state')
self.mock_object(replication.PairOp, 'split', mock.Mock(
side_effect=exception.VolumeBackendAPIException(data='err')))
common_driver.split(replica_id)
def test_replication_base_op(self):
replica_id = '1'
op = replication.AbsReplicaOp(None)
op.create()
op.delete(replica_id)
op.protect_second(replica_id)
op.unprotect_second(replica_id)
op.sync(replica_id)
op.split(replica_id)
op.switch(replica_id)
op.is_primary({})
op.get_replica_info(replica_id)
op._is_status(None, {'key': 'volue'}, None)
class FCSanLookupService(object):
@ -2565,9 +3098,33 @@ class HuaweiFCDriverTestCase(test.TestCase):
self.assertTrue(self.driver.client.terminateFlag)
def test_get_volume_status(self):
remote_device_info = {"ARRAYTYPE": "1",
"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "10"}
self.mock_object(
replication.ReplicaPairManager,
'get_remote_device_by_wwn',
mock.Mock(return_value=remote_device_info))
data = self.driver.get_volume_stats()
self.assertEqual('2.0.4', data['driver_version'])
self.assertEqual('2.0.5', data['driver_version'])
self.assertTrue(data['pools'][0]['replication_enabled'])
self.assertListEqual(['sync', 'async'],
data['pools'][0]['replication_type'])
self.mock_object(
replication.ReplicaPairManager,
'get_remote_device_by_wwn',
mock.Mock(return_value={}))
data = self.driver.get_volume_stats()
self.assertNotIn('replication_enabled', data['pools'][0])
self.mock_object(
replication.ReplicaPairManager,
'try_get_remote_wwn',
mock.Mock(return_value={}))
data = self.driver.get_volume_stats()
self.assertEqual('2.0.5', data['driver_version'])
self.assertNotIn('replication_enabled', data['pools'][0])
def test_extend_volume(self):
self.driver.extend_volume(test_volume, 3)
@ -2755,6 +3312,17 @@ class HuaweiFCDriverTestCase(test.TestCase):
test_new_type, None, test_host)
self.assertTrue(retype)
@mock.patch.object(rest_client.RestClient, 'add_lun_to_partition')
@mock.patch.object(
huawei_driver.HuaweiBaseDriver,
'_get_volume_type',
return_value={'extra_specs': sync_replica_specs})
def test_retype_replication_volume_success(self, mock_get_type,
mock_add_lun_to_partition):
retype = self.driver.retype(None, test_volume,
test_new_replication_type, None, test_host)
self.assertTrue(retype)
def test_retype_volume_cache_fail(self):
self.driver.client.cache_not_exist = True

View File

@ -75,3 +75,31 @@ QOS_KEYS = ['MAXIOPS', 'MINIOPS', 'MINBANDWidth',
'MAXBANDWidth', 'LATENCY', 'IOTYPE']
MAX_LUN_NUM_IN_QOS = 64
HYPERMETRO_CLASS = "cinder.volume.drivers.huawei.hypermetro.HuaweiHyperMetro"
DEFAULT_REPLICA_WAIT_INTERVAL = 1
DEFAULT_REPLICA_WAIT_TIMEOUT = 10
REPLICA_SYNC_MODEL = '1'
REPLICA_ASYNC_MODEL = '2'
REPLICA_SPEED = '2'
REPLICA_PERIOD = '3600'
REPLICA_SECOND_RO = '2'
REPLICA_SECOND_RW = '3'
REPLICA_RUNNING_STATUS_KEY = 'RUNNINGSTATUS'
REPLICA_RUNNING_STATUS_INITIAL_SYNC = '21'
REPLICA_RUNNING_STATUS_SYNC = '23'
REPLICA_RUNNING_STATUS_SYNCED = '24'
REPLICA_RUNNING_STATUS_NORMAL = '1'
REPLICA_RUNNING_STATUS_SPLIT = '26'
REPLICA_RUNNING_STATUS_INVALID = '35'
REPLICA_HEALTH_STATUS_KEY = 'HEALTHSTATUS'
REPLICA_HEALTH_STATUS_NORMAL = '1'
REPLICA_LOCAL_DATA_STATUS_KEY = 'PRIRESDATASTATUS'
REPLICA_REMOTE_DATA_STATUS_KEY = 'SECRESDATASTATUS'
REPLICA_DATA_SYNC_KEY = 'ISDATASYNC'
REPLICA_DATA_STATUS_SYNCED = '1'
REPLICA_DATA_STATUS_COMPLETE = '2'
REPLICA_DATA_STATUS_INCOMPLETE = '3'

View File

@ -151,7 +151,6 @@ class HuaweiConf(object):
else:
msg = (_("Invalid lun type %s is configured.") % lun_type)
LOG.exception(msg)
raise exception.InvalidInput(reason=msg)
setattr(self.conf, 'lun_type', lun_type)

View File

@ -33,6 +33,7 @@ from cinder.volume.drivers.huawei import fc_zone_helper
from cinder.volume.drivers.huawei import huawei_conf
from cinder.volume.drivers.huawei import huawei_utils
from cinder.volume.drivers.huawei import hypermetro
from cinder.volume.drivers.huawei import replication
from cinder.volume.drivers.huawei import rest_client
from cinder.volume.drivers.huawei import smartx
from cinder.volume import utils as volume_utils
@ -90,6 +91,10 @@ class HuaweiBaseDriver(driver.VolumeDriver):
metro_san_password)
self.rmt_client.login()
# init replication manager
self.replica = replication.ReplicaPairManager(self.client,
self.configuration)
def check_for_setup_error(self):
pass
@ -98,7 +103,9 @@ class HuaweiBaseDriver(driver.VolumeDriver):
self.huawei_conf.update_config_value()
if self.metro_flag:
self.rmt_client.get_all_pools()
return self.client.update_volume_stats()
stats = self.client.update_volume_stats()
stats = self.replica.update_replica_capability(stats)
return stats
def _get_volume_type(self, volume):
volume_type = None
@ -127,6 +134,8 @@ class HuaweiBaseDriver(driver.VolumeDriver):
'thin_provisioning_support': False,
'thick_provisioning_support': False,
'hypermetro': False,
'replication_enabled': False,
'replication_type': 'async',
}
opts_value = {
@ -146,6 +155,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
opts_associate,
specs)
opts = smartx.SmartX().get_smartx_specs_opts(opts)
opts = replication.get_replication_opts(opts)
LOG.debug('volume opts %(opts)s.', {'opts': opts})
return opts
@ -172,12 +182,15 @@ class HuaweiBaseDriver(driver.VolumeDriver):
if ((not scope or scope == 'capabilities')
and key in opts_capability):
words = value.split()
if not (words and len(words) == 2 and words[0] == '<is>'):
LOG.error(_LE("Extra specs must be specified as "
"capabilities:%s='<is> True' or "
"'<is> true'."), key)
else:
if words and len(words) == 2 and words[0] in ('<is>', '<in>'):
opts[key] = words[1].lower()
elif key == 'replication_type':
LOG.error(_LE("Extra specs must be specified as "
"replication_type='<in> sync' or "
"'<in> async'."))
else:
LOG.error(_LE("Extra specs must be specified as "
"capabilities:%s='<is> True'."), key)
if ((scope in opts_capability)
and (key in opts_value)
@ -193,7 +206,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
'TYPE': '11',
'NAME': huawei_utils.encode_name(volume['id']),
'PARENTTYPE': '216',
'PARENTID': self.client.get_pool_id(volume, pool_name),
'PARENTID': self.client.get_pool_id(pool_name),
'DESCRIPTION': volume['name'],
'ALLOCTYPE': opts.get('LUNType', self.configuration.lun_type),
'CAPACITY': huawei_utils.get_volume_size(volume),
@ -220,10 +233,11 @@ class HuaweiBaseDriver(driver.VolumeDriver):
model_update['metadata'] = metadata
return lun_info, model_update
def create_volume(self, volume):
"""Create a volume."""
volume_type = self._get_volume_type(volume)
opts = self._get_volume_params(volume_type)
def _create_base_type_volume(self, opts, volume, volume_type):
"""Create volume and add some base type.
Base type is the services won't conflict with the other service.
"""
lun_params = self._get_lun_params(volume, opts)
lun_info, model_update = self._create_volume(volume, lun_params)
lun_id = lun_info['ID']
@ -244,7 +258,17 @@ class HuaweiBaseDriver(driver.VolumeDriver):
msg = _('Create volume error. Because %s.') % six.text_type(err)
raise exception.VolumeBackendAPIException(data=msg)
if (opts.get('hypermetro') and opts.get('hypermetro') == 'true'):
return lun_params, lun_info, model_update
def _add_extend_type_to_volume(self, opts, lun_params, lun_info,
model_update):
"""Add the extend type.
Extend type is the services may conflict with LUNCopy.
So add it after the those services.
"""
lun_id = lun_info['ID']
if opts.get('hypermetro') == 'true':
metro = hypermetro.HuaweiHyperMetro(self.client,
self.rmt_client,
self.configuration,
@ -257,6 +281,35 @@ class HuaweiBaseDriver(driver.VolumeDriver):
self._delete_lun_with_check(lun_id)
raise
if opts.get('replication_enabled') == 'true':
replica_model = opts.get('replication_type')
try:
replica_info = self.replica.create_replica(lun_info,
replica_model)
model_update.update(replica_info)
except Exception as err:
LOG.exception(_LE('Create replication volume error.'))
self._delete_lun_with_check(lun_id)
raise
return model_update
def create_volume(self, volume):
"""Create a volume."""
volume_type = self._get_volume_type(volume)
opts = self._get_volume_params(volume_type)
if (opts.get('hypermetro') == 'true'
and opts.get('replication_enabled') == 'true'):
err_msg = _("Hypermetro and Replication can not be "
"used in the same volume_type.")
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
lun_params, lun_info, model_update = (
self._create_base_type_volume(opts, volume, volume_type))
model_update = self._add_extend_type_to_volume(opts, lun_params,
lun_info, model_update)
return model_update
def _delete_volume(self, volume):
@ -281,7 +334,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
lun_id = volume.get('provider_location')
if not lun_id or not self.client.check_lun_exist(lun_id):
LOG.warning(_LW("Can't find lun %s on the array."), lun_id)
return False
return
qos_id = self.client.get_qosid_by_lunid(lun_id)
if qos_id:
@ -301,6 +354,16 @@ class HuaweiBaseDriver(driver.VolumeDriver):
self._delete_volume(volume)
raise
# Delete a replication volume
replica_data = volume.get('replication_driver_data')
if replica_data:
try:
self.replica.delete_replica(volume)
except exception.VolumeBackendAPIException as err:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Delete replication error."))
self._delete_volume(volume)
self._delete_volume(volume)
def _delete_lun_with_check(self, lun_id):
@ -414,6 +477,15 @@ class HuaweiBaseDriver(driver.VolumeDriver):
def migrate_volume(self, ctxt, volume, host, new_type=None):
"""Migrate a volume within the same array."""
# NOTE(jlc): Replication volume can't migrate. But retype
# can remove replication relationship first then do migrate.
# So don't add this judgement into _check_migration_valid().
volume_type = self._get_volume_type(volume)
opts = self._get_volume_params(volume_type)
if opts.get('replication_enabled') == 'true':
return (False, None)
return self._migrate_volume(volume, host, new_type)
def _check_migration_valid(self, host, volume):
@ -516,6 +588,15 @@ class HuaweiBaseDriver(driver.VolumeDriver):
We use LUNcopy to copy a new volume from snapshot.
The time needed increases as volume size does.
"""
volume_type = self._get_volume_type(volume)
opts = self._get_volume_params(volume_type)
if (opts.get('hypermetro') == 'true'
and opts.get('replication_enabled') == 'true'):
err_msg = _("Hypermetro and Replication can not be "
"used in the same volume_type.")
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
snapshotname = huawei_utils.encode_name(snapshot['id'])
snapshot_id = snapshot.get('provider_location')
if snapshot_id is None:
@ -528,7 +609,9 @@ class HuaweiBaseDriver(driver.VolumeDriver):
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
model_update = self.create_volume(volume)
lun_params, lun_info, model_update = (
self._create_base_type_volume(opts, volume, volume_type))
tgt_lun_id = model_update['provider_location']
luncopy_name = huawei_utils.encode_name(volume['id'])
LOG.info(_LI(
@ -555,6 +638,10 @@ class HuaweiBaseDriver(driver.VolumeDriver):
self._copy_volume(volume, luncopy_name,
snapshot_id, tgt_lun_id)
# NOTE(jlc): Actually, we just only support replication here right
# now, not hypermetro.
model_update = self._add_extend_type_to_volume(opts, lun_params,
lun_info, model_update)
return model_update
def create_cloned_volume(self, volume, src_vref):
@ -598,6 +685,14 @@ class HuaweiBaseDriver(driver.VolumeDriver):
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
volume_type = self._get_volume_type(volume)
opts = self._get_volume_params(volume_type)
if opts.get('replication_enabled') == 'true':
msg = (_("Can't extend replication volume, volume: %(id)s") %
{"id": volume['id']})
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
old_size = huawei_utils.get_volume_size(volume)
new_size = int(new_size) * units.Gi / 512
volume_name = huawei_utils.encode_name(volume['id'])
@ -670,25 +765,51 @@ class HuaweiBaseDriver(driver.VolumeDriver):
migration, change_opts, lun_id = self.determine_changes_when_retype(
volume, new_type, host)
model_update = {}
replica_enabled_change = change_opts.get('replication_enabled')
replica_type_change = change_opts.get('replication_type')
if replica_enabled_change and replica_enabled_change[0] == 'true':
try:
self.replica.delete_replica(volume)
model_update.update({'replication_status': 'disabled',
'replication_driver_data': None})
except exception.VolumeBackendAPIException:
LOG.exception(_LE('Retype volume error. '
'Delete replication failed.'))
return False
try:
if migration:
LOG.debug("Begin to migrate LUN(id: %(lun_id)s) with "
"change %(change_opts)s.",
{"lun_id": lun_id, "change_opts": change_opts})
if self._migrate_volume(volume, host, new_type):
return True
else:
if not self._migrate_volume(volume, host, new_type):
LOG.warning(_LW("Storage-assisted migration failed during "
"retype."))
return False
else:
# Modify lun to change policy
self.modify_lun(lun_id, change_opts)
return True
except exception.VolumeBackendAPIException:
LOG.exception(_LE('Retype volume error.'))
return False
if replica_enabled_change and replica_enabled_change[1] == 'true':
try:
# If replica_enabled_change is not None, the
# replica_type_change won't be None. See function
# determine_changes_when_retype.
lun_info = self.client.get_lun_info(lun_id)
replica_info = self.replica.create_replica(
lun_info, replica_type_change[1])
model_update.update(replica_info)
except exception.VolumeBackendAPIException:
LOG.exception(_LE('Retype volume error. '
'Create replication failed.'))
return False
return (True, model_update)
def modify_lun(self, lun_id, change_opts):
if change_opts.get('partitionid'):
old, new = change_opts['partitionid']
@ -858,6 +979,19 @@ class HuaweiBaseDriver(driver.VolumeDriver):
migration = True
change_opts['LUNType'] = (old_opts['LUNType'], new_opts['LUNType'])
volume_type = self._get_volume_type(volume)
volume_opts = self._get_volume_params(volume_type)
if (volume_opts['replication_enabled'] == 'true'
or new_opts['replication_enabled'] == 'true'):
# If replication_enabled changes,
# then replication_type in change_opts will be set.
change_opts['replication_enabled'] = (
volume_opts['replication_enabled'],
new_opts['replication_enabled'])
change_opts['replication_type'] = (volume_opts['replication_type'],
new_opts['replication_type'])
change_opts = self._check_needed_changes(lun_id, old_opts, new_opts,
change_opts, new_type)
@ -1060,6 +1194,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
# Check other stuffs to determine whether this LUN can be imported.
self._check_lun_valid_for_manage(lun_info, external_ref)
type_id = volume.get('volume_type_id')
new_opts = None
if type_id:
# Handle volume type if specified.
old_opts = self.get_lun_specs(lun_id)
@ -1087,7 +1222,21 @@ class HuaweiBaseDriver(driver.VolumeDriver):
self.client.rename_lun(lun_id, new_name, # pylint: disable=E1121
description)
return {'provider_location': lun_id}
model_update = {}
model_update.update({'provider_location': lun_id})
if new_opts and new_opts.get('replication_enabled'):
LOG.debug("Manage volume need to create replication.")
try:
lun_info = self.client.get_lun_info(lun_id)
replica_info = self.replica.create_replica(
lun_info, new_opts.get('replication_type'))
model_update.update(replica_info)
except exception.VolumeBackendAPIException:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Manage exist volume failed."))
return model_update
def _get_lun_info_by_ref(self, external_ref):
LOG.debug("Get external_ref: %s", external_ref)
@ -1236,6 +1385,27 @@ class HuaweiBaseDriver(driver.VolumeDriver):
{'snapshot_id': snapshot['id'],
'snapshot_name': snapshot_name})
def replication_enable(self, context, volume):
"""Enable replication and do switch role when needed."""
self.replica.enable_replica(volume)
def replication_disable(self, context, volume):
"""Disable replication."""
self.replica.disable_replica(volume)
def replication_failover(self, context, volume, secondary):
"""Disable replication and unprotect remote LUN."""
return self.replica.failover_replica(volume)
def list_replication_targets(self, context, vref):
"""Obtain volume repliction targets."""
return self.replica.list_replica_targets(vref)
def get_replication_updates(self, context):
# NOTE(jlc): The manager does not do aynthing with these updates.
# When that is changed, here must be modified as well.
return []
class HuaweiISCSIDriver(HuaweiBaseDriver, driver.ISCSIDriver):
"""ISCSI driver for Huawei storage arrays.
@ -1254,9 +1424,10 @@ class HuaweiISCSIDriver(HuaweiBaseDriver, driver.ISCSIDriver):
2.0.1 - Manage/unmanage volume support
2.0.2 - Refactor HuaweiISCSIDriver
2.0.3 - Manage/unmanage snapshot support
2.0.5 - Replication V2 support
"""
VERSION = "2.0.3"
VERSION = "2.0.5"
def __init__(self, *args, **kwargs):
super(HuaweiISCSIDriver, self).__init__(*args, **kwargs)
@ -1449,9 +1620,10 @@ class HuaweiFCDriver(HuaweiBaseDriver, driver.FibreChannelDriver):
2.0.2 - Refactor HuaweiFCDriver
2.0.3 - Manage/unmanage snapshot support
2.0.4 - Balanced FC port selection
2.0.5 - Replication V2 support
"""
VERSION = "2.0.4"
VERSION = "2.0.5"
def __init__(self, *args, **kwargs):
super(HuaweiFCDriver, self).__init__(*args, **kwargs)
@ -1515,7 +1687,7 @@ class HuaweiFCDriver(HuaweiBaseDriver, driver.FibreChannelDriver):
if not wwns_in_host and not iqns_in_host:
self.client.remove_host(host_id)
msg = (_('Can not add FC initiator to host.'))
msg = _('Can not add FC initiator to host.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)

View File

@ -0,0 +1,675 @@
# Copyright (c) 2016 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import json
import re
from oslo_log import log as logging
from oslo_utils import excutils
from cinder import exception
from cinder.i18n import _, _LW, _LE
from cinder.volume.drivers.huawei import constants
from cinder.volume.drivers.huawei import huawei_utils
from cinder.volume.drivers.huawei import rest_client
from cinder.volume import utils as volume_utils
LOG = logging.getLogger(__name__)
class AbsReplicaOp(object):
def __init__(self, client):
self.client = client
def create(self, **kwargs):
pass
def delete(self, replica_id):
pass
def protect_second(self, replica_id):
pass
def unprotect_second(self, replica_id):
pass
def sync(self, replica_id):
pass
def split(self, replica_id):
pass
def switch(self, replica_id):
pass
def is_primary(self, replica_info):
flag = replica_info.get('ISPRIMARY')
if flag and flag.lower() == 'true':
return True
return False
def get_replica_info(self, replica_id):
return {}
def _is_status(self, status_key, status, replica_info):
if type(status) in (list, tuple):
return replica_info.get(status_key, '') in status
if type(status) is str:
return replica_info.get(status_key, '') == status
return False
def is_running_status(self, status, replica_info):
return self._is_status(constants.REPLICA_RUNNING_STATUS_KEY,
status, replica_info)
def is_health_status(self, status, replica_info):
return self._is_status(constants.REPLICA_HEALTH_STATUS_KEY,
status, replica_info)
class PairOp(AbsReplicaOp):
def create(self, local_lun_id, rmt_lun_id, rmt_dev_id,
rmt_dev_name, replica_model,
speed=constants.REPLICA_SPEED,
period=constants.REPLICA_PERIOD,
**kwargs):
super(PairOp, self).create(**kwargs)
params = {
"LOCALRESID": local_lun_id,
"LOCALRESTYPE": '11',
"REMOTEDEVICEID": rmt_dev_id,
"REMOTEDEVICENAME": rmt_dev_name,
"REMOTERESID": rmt_lun_id,
"REPLICATIONMODEL": replica_model,
# recovery policy. 1: auto, 2: manual
"RECOVERYPOLICY": '2',
"SPEED": speed,
}
if replica_model == constants.REPLICA_ASYNC_MODEL:
# Synchronize type values:
# 1, manual
# 2, timed wait when synchronization begins
# 3, timed wait when synchronization ends
params['SYNCHRONIZETYPE'] = '2'
params['TIMINGVAL'] = period
try:
pair_info = self.client.create_pair(params)
except Exception as err:
msg = _('Create replication pair failed. Error: %s.') % err
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
return pair_info
def split(self, pair_id):
self.client.split_pair(pair_id)
def delete(self, pair_id, force=False):
self.client.delete_pair(pair_id, force)
def protect_second(self, pair_id):
self.client.set_pair_second_access(pair_id,
constants.REPLICA_SECOND_RO)
def unprotect_second(self, pair_id):
self.client.set_pair_second_access(pair_id,
constants.REPLICA_SECOND_RW)
def sync(self, pair_id):
self.client.sync_pair(pair_id)
def switch(self, pair_id):
self.client.switch_pair(pair_id)
def get_replica_info(self, pair_id):
return self.client.get_pair_by_id(pair_id)
class CGOp(AbsReplicaOp):
pass
class ReplicaCommonDriver(object):
def __init__(self, conf, replica_op):
self.conf = conf
self.op = replica_op
def protect_second(self, replica_id):
info = self.op.get_replica_info(replica_id)
if info.get('SECRESACCESS') == constants.REPLICA_SECOND_RO:
return
self.op.protect_second(replica_id)
self.wait_second_access(replica_id, constants.REPLICA_SECOND_RO)
def unprotect_second(self, replica_id):
info = self.op.get_replica_info(replica_id)
if info.get('SECRESACCESS') == constants.REPLICA_SECOND_RW:
return
self.op.unprotect_second(replica_id)
self.wait_second_access(replica_id, constants.REPLICA_SECOND_RW)
def sync(self, replica_id, wait_complete=False):
self.protect_second(replica_id)
expect_status = (constants.REPLICA_RUNNING_STATUS_NORMAL,
constants.REPLICA_RUNNING_STATUS_SYNC,
constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
info = self.op.get_replica_info(replica_id)
# When running status is synchronizing or normal,
# it's not necessary to do synchronize again.
if (info.get('REPLICATIONMODEL') == constants.REPLICA_SYNC_MODEL
and self.op.is_running_status(expect_status, info)):
return
self.op.sync(replica_id)
self.wait_expect_state(replica_id, expect_status)
if wait_complete:
self.wait_replica_ready(replica_id)
def split(self, replica_id):
running_status = (constants.REPLICA_RUNNING_STATUS_SPLIT,
constants.REPLICA_RUNNING_STATUS_INVALID)
info = self.op.get_replica_info(replica_id)
if self.op.is_running_status(running_status, info):
return
try:
self.op.split(replica_id)
except Exception as err:
LOG.warning(_LW('Split replication exception: %s.'), err)
try:
self.wait_expect_state(replica_id, running_status)
except Exception as err:
msg = _('Split replication failed.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
def enable(self, replica_id, wait_sync_complete=False):
info = self.op.get_replica_info(replica_id)
if not self.op.is_primary(info):
self.switch(replica_id)
self.sync(replica_id)
return None
def disable(self, replica_id):
self.split(replica_id)
return None
def switch(self, replica_id):
self.split(replica_id)
self.unprotect_second(replica_id)
self.op.switch(replica_id)
# Wait to be primary
def _wait_switch_to_primary():
info = self.op.get_replica_info(replica_id)
if self.op.is_primary(info):
return True
return False
interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
huawei_utils.wait_for_condition(_wait_switch_to_primary,
interval,
timeout)
def failover(self, replica_id):
"""Failover replication.
Purpose:
1. Split replication.
2. Set secondary access read & write.
"""
info = self.op.get_replica_info(replica_id)
if self.op.is_primary(info):
msg = _('We should not do switch over on primary array.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
sync_status_set = (constants.REPLICA_RUNNING_STATUS_SYNC,
constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
if self.op.is_running_status(sync_status_set, info):
self.wait_replica_ready(replica_id)
self.split(replica_id)
self.op.unprotect_second(replica_id)
def wait_replica_ready(self, replica_id, interval=None, timeout=None):
LOG.debug('Wait synchronize complete.')
running_status_normal = (constants.REPLICA_RUNNING_STATUS_NORMAL,
constants.REPLICA_RUNNING_STATUS_SYNCED)
running_status_sync = (constants.REPLICA_RUNNING_STATUS_SYNC,
constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
health_status_normal = constants.REPLICA_HEALTH_STATUS_NORMAL
def _replica_ready():
info = self.op.get_replica_info(replica_id)
if (self.op.is_running_status(running_status_normal, info)
and self.op.is_health_status(health_status_normal, info)):
return True
if not self.op.is_running_status(running_status_sync, info):
msg = (_('Wait synchronize failed. Running status: %s.') %
info.get(constants.REPLICA_RUNNING_STATUS_KEY))
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
return False
if not interval:
interval = constants.DEFAULT_WAIT_INTERVAL
if not timeout:
timeout = constants.DEFAULT_WAIT_TIMEOUT
huawei_utils.wait_for_condition(_replica_ready,
interval,
timeout)
def wait_second_access(self, replica_id, access_level):
def _check_access():
info = self.op.get_replica_info(replica_id)
if info.get('SECRESACCESS') == access_level:
return True
return False
interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
huawei_utils.wait_for_condition(_check_access,
interval,
timeout)
def wait_expect_state(self, replica_id,
running_status, health_status=None,
interval=None, timeout=None):
def _check_state():
info = self.op.get_replica_info(replica_id)
if self.op.is_running_status(running_status, info):
if (not health_status
or self.op.is_health_status(health_status, info)):
return True
return False
if not interval:
interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
if not timeout:
timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
huawei_utils.wait_for_condition(_check_state, interval, timeout)
def get_replication_driver_data(volume):
if volume.get('replication_driver_data'):
return json.loads(volume['replication_driver_data'])
return {}
def to_string(dict_data):
if dict_data:
return json.dumps(dict_data)
return ''
class ReplicaPairManager(object):
def __init__(self, local_client, conf):
self.local_client = local_client
self.conf = conf
self.replica_device = self.conf.safe_get('replication_device')
if not self.replica_device:
return
# managed_backed_name format: host_name@backend_name#pool_name
self.rmt_backend = self.replica_device[0]['managed_backend_name']
self.rmt_pool = volume_utils.extract_host(self.rmt_backend,
level='pool')
self.target_dev_id = self.replica_device[0]['target_device_id']
self._init_rmt_client()
self.local_op = PairOp(self.local_client)
self.local_driver = ReplicaCommonDriver(self.conf, self.local_op)
self.rmt_op = PairOp(self.rmt_client)
self.rmt_driver = ReplicaCommonDriver(self.conf, self.rmt_op)
self.try_login_remote_array()
def try_login_remote_array(self):
try:
self.rmt_client.login()
except Exception as err:
LOG.warning(_LW('Remote array login failed. Error: %s.'), err)
def try_get_remote_wwn(self):
try:
info = self.rmt_client.get_array_info()
return info.get('wwn')
except Exception as err:
LOG.warning(_LW('Get remote array wwn failed. Error: %s.'), err)
return None
def get_remote_device_by_wwn(self, wwn):
devices = {}
try:
devices = self.local_client.get_remote_devices()
except Exception as err:
LOG.warning(_LW('Get remote devices failed. Error: %s.'), err)
for device in devices:
if device.get('WWN') == wwn:
return device
return {}
def check_remote_available(self):
if not self.replica_device:
return False
# We get device wwn in every check time.
# If remote array changed, we can run normally.
wwn = self.try_get_remote_wwn()
if not wwn:
return False
device = self.get_remote_device_by_wwn(wwn)
# Check remote device is available to use.
# If array type is replication, 'ARRAYTYPE' == '1'.
# If health status is normal, 'HEALTHSTATUS' == '1'.
if (device and device.get('ARRAYTYPE') == '1'
and device.get('HEALTHSTATUS') == '1'
and device.get('RUNNINGSTATUS') == constants.STATUS_RUNNING):
return True
return False
def update_replica_capability(self, stats):
is_rmt_dev_available = self.check_remote_available()
if not is_rmt_dev_available:
if self.replica_device:
LOG.warning(_LW('Remote device is unavailable. '
'Remote backend: %s.'),
self.rmt_backend)
return stats
for pool in stats['pools']:
pool['replication_enabled'] = True
pool['replication_type'] = ['sync', 'async']
return stats
def _init_rmt_client(self):
# Multiple addresses support.
rmt_addrs = self.replica_device[0]['san_address'].split(';')
rmt_addrs = list(set([x.strip() for x in rmt_addrs if x.strip()]))
rmt_user = self.replica_device[0]['san_user']
rmt_password = self.replica_device[0]['san_password']
self.rmt_client = rest_client.RestClient(self.conf,
rmt_addrs,
rmt_user,
rmt_password)
def get_rmt_dev_info(self):
wwn = self.try_get_remote_wwn()
if not wwn:
return None, None
device = self.get_remote_device_by_wwn(wwn)
if not device:
return None, None
return device.get('ID'), device.get('NAME')
def build_rmt_lun_params(self, local_lun_info):
params = {
'TYPE': '11',
'NAME': local_lun_info['NAME'],
'PARENTTYPE': '216',
'PARENTID': self.rmt_client.get_pool_id(self.rmt_pool),
'DESCRIPTION': local_lun_info['DESCRIPTION'],
'ALLOCTYPE': local_lun_info['ALLOCTYPE'],
'CAPACITY': local_lun_info['CAPACITY'],
'WRITEPOLICY': self.conf.lun_write_type,
'MIRRORPOLICY': self.conf.lun_mirror_switch,
'PREFETCHPOLICY': self.conf.lun_prefetch_type,
'PREFETCHVALUE': self.conf.lun_prefetch_value,
'DATATRANSFERPOLICY': self.conf.lun_policy,
'READCACHEPOLICY': self.conf.lun_read_cache_policy,
'WRITECACHEPOLICY': self.conf.lun_write_cache_policy,
}
LOG.debug('Remote lun params: %s.', params)
return params
def wait_volume_online(self, client, lun_info,
interval=None, timeout=None):
online_status = constants.STATUS_VOLUME_READY
if lun_info.get('RUNNINGSTATUS') == online_status:
return
lun_id = lun_info['ID']
def _wait_online():
info = client.get_lun_info(lun_id)
return info.get('RUNNINGSTATUS') == online_status
if not interval:
interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
if not timeout:
timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
huawei_utils.wait_for_condition(_wait_online,
interval,
timeout)
def create_rmt_lun(self, local_lun_info):
# Create on rmt array. If failed, raise exception.
lun_params = self.build_rmt_lun_params(local_lun_info)
lun_info = self.rmt_client.create_lun(lun_params)
try:
self.wait_volume_online(self.rmt_client, lun_info)
except exception.VolumeBackendAPIException:
with excutils.save_and_reraise_exception():
self.rmt_client.delete_lun(lun_info['ID'])
return lun_info
def create_replica(self, local_lun_info, replica_model):
"""Create remote LUN and replication pair.
Purpose:
1. create remote lun
2. create replication pair
3. enable replication pair
"""
LOG.debug(('Create replication, local lun info: %(info)s, '
'replication model: %(model)s.'),
{'info': local_lun_info, 'model': replica_model})
local_lun_id = local_lun_info['ID']
self.wait_volume_online(self.local_client, local_lun_info)
# step1, create remote lun
rmt_lun_info = self.create_rmt_lun(local_lun_info)
rmt_lun_id = rmt_lun_info['ID']
# step2, get remote device info
rmt_dev_id, rmt_dev_name = self.get_rmt_dev_info()
if not rmt_lun_id or not rmt_dev_name:
self._delete_rmt_lun(rmt_lun_id)
msg = _('Get remote device info failed.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
# step3, create replication pair
try:
pair_info = self.local_op.create(local_lun_id,
rmt_lun_id, rmt_dev_id,
rmt_dev_name, replica_model)
pair_id = pair_info['ID']
except Exception as err:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Create pair failed. Error: %s.'), err)
self._delete_rmt_lun(rmt_lun_id)
# step4, start sync manually. If replication type is sync,
# then wait for sync complete.
wait_complete = (replica_model == constants.REPLICA_SYNC_MODEL)
try:
self.local_driver.sync(pair_id, wait_complete)
except Exception as err:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Start synchronization failed. Error: %s.'), err)
self._delete_pair(pair_id)
self._delete_rmt_lun(rmt_lun_id)
model_update = {}
driver_data = {'pair_id': pair_id,
'rmt_lun_id': rmt_lun_id}
model_update['replication_driver_data'] = to_string(driver_data)
model_update['replication_status'] = 'enabled'
LOG.debug('Create replication, return info: %s.', model_update)
return model_update
def _delete_pair(self, pair_id):
if (not pair_id
or not self.local_client.check_pair_exist(pair_id)):
return
self.local_driver.split(pair_id)
self.local_op.delete(pair_id)
def _delete_rmt_lun(self, lun_id):
if lun_id and self.rmt_client.check_lun_exist(lun_id):
self.rmt_client.delete_lun(lun_id)
def delete_replica(self, volume):
"""Delete replication pair and remote lun.
Purpose:
1. delete replication pair
2. delete remote_lun
"""
LOG.debug('Delete replication, volume: %s.', volume['id'])
info = get_replication_driver_data(volume)
pair_id = info.get('pair_id')
if pair_id:
self._delete_pair(pair_id)
# Delete remote_lun
rmt_lun_id = info.get('rmt_lun_id')
if rmt_lun_id:
self._delete_rmt_lun(rmt_lun_id)
def enable_replica(self, volume):
"""Enable replication.
Purpose:
1. If local backend's array is secondary, switch to primary
2. Synchronize data
"""
LOG.debug('Enable replication, volume: %s.', volume['id'])
info = get_replication_driver_data(volume)
pair_id = info.get('pair_id')
if not pair_id:
msg = _('No pair id in volume replication_driver_data.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
info = self.local_op.get_replica_info(pair_id)
if not info:
msg = _('Pair does not exist on array. Pair id: %s.') % pair_id
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
wait_sync_complete = False
if info.get('REPLICATIONMODEL') == constants.REPLICA_SYNC_MODEL:
wait_sync_complete = True
return self.local_driver.enable(pair_id, wait_sync_complete)
def disable_replica(self, volume):
"""We consider that all abnormal states is disabled."""
LOG.debug('Disable replication, volume: %s.', volume['id'])
info = get_replication_driver_data(volume)
pair_id = info.get('pair_id')
if not pair_id:
LOG.warning(_LW('No pair id in volume replication_driver_data.'))
return None
return self.local_driver.disable(pair_id)
def failover_replica(self, volume):
"""Just make the secondary available."""
LOG.debug('Failover replication, volume: %s.', volume['id'])
info = get_replication_driver_data(volume)
pair_id = info.get('pair_id')
if not pair_id:
msg = _('No pair id in volume replication_driver_data.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
rmt_lun_id = info.get('rmt_lun_id')
if not rmt_lun_id:
msg = _('No remote LUN id in volume replication_driver_data.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
# Remote array must be available. So we can get the real pool info.
lun_info = self.rmt_client.get_lun_info(rmt_lun_id)
lun_wwn = lun_info.get('WWN')
lun_pool = lun_info.get('PARENTNAME')
new_backend = re.sub(r'(?<=#).*$', lun_pool, self.rmt_backend)
self.rmt_driver.failover(pair_id)
metadata = huawei_utils.get_volume_metadata(volume)
metadata.update({'lun_wwn': lun_wwn})
new_driver_data = {'pair_id': pair_id,
'rmt_lun_id': volume['provider_location']}
new_driver_data = to_string(new_driver_data)
return {'host': new_backend,
'provider_location': rmt_lun_id,
'replication_driver_data': new_driver_data,
'metadata': metadata}
def list_replica_targets(self, volume):
info = get_replication_driver_data(volume)
if not info:
LOG.warning(_LW('Replication driver data does not exist. '
'Volume: %s'), volume['id'])
targets = [{'target_device_id': self.target_dev_id}]
return {'volume_id': volume['id'],
'targets': targets}
def get_replication_opts(opts):
if opts.get('replication_type') == 'sync':
opts['replication_type'] = constants.REPLICA_SYNC_MODEL
else:
opts['replication_type'] = constants.REPLICA_ASYNC_MODEL
return opts

View File

@ -231,7 +231,7 @@ class RestClient(object):
return info
def get_pool_id(self, volume, pool_name):
def get_pool_id(self, pool_name):
pools = self.get_all_pools()
pool_info = self.get_pool_info(pool_name, pools)
if not pool_info:
@ -1562,11 +1562,15 @@ class RestClient(object):
self._assert_rest_result(result, _('Add lun to cache error.'))
def find_array_version(self):
def get_array_info(self):
url = "/system/"
result = self.call(url, None, "GET")
self._assert_rest_result(result, _('Find array version error.'))
return result['data']['PRODUCTVERSION']
self._assert_rest_result(result, _('Get array info error.'))
return result.get('data', None)
def find_array_version(self):
info = self.get_array_info()
return info.get('PRODUCTVERSION', None)
def remove_host(self, host_id):
url = "/host/%s" % host_id
@ -2008,3 +2012,78 @@ class RestClient(object):
for item in result.get('data', []):
wwns.append(item['WWN'])
return wwns
def get_remote_devices(self):
url = "/remote_device"
result = self.call(url, None, "GET")
self._assert_rest_result(result, _('Get remote devices error.'))
return result.get('data', [])
def create_pair(self, pair_params):
url = "/REPLICATIONPAIR"
result = self.call(url, pair_params, "POST")
msg = _('Create replication error.')
self._assert_rest_result(result, msg)
self._assert_data_in_result(result, msg)
return result['data']
def get_pair_by_id(self, pair_id):
url = "/REPLICATIONPAIR/" + pair_id
result = self.call(url, None, "GET")
msg = _('Get pair failed.')
self._assert_rest_result(result, msg)
return result.get('data', {})
def switch_pair(self, pair_id):
url = '/REPLICATIONPAIR/switch'
data = {"ID": pair_id,
"TYPE": "263"}
result = self.call(url, data, "PUT")
msg = _('Switch over pair error.')
self._assert_rest_result(result, msg)
def split_pair(self, pair_id):
url = '/REPLICATIONPAIR/split'
data = {"ID": pair_id,
"TYPE": "263"}
result = self.call(url, data, "PUT")
msg = _('Split pair error.')
self._assert_rest_result(result, msg)
def delete_pair(self, pair_id, force=False):
url = "/REPLICATIONPAIR/" + pair_id
data = None
if force:
data = {"ISLOCALDELETE": force}
result = self.call(url, data, "DELETE")
msg = _('delete_replication error.')
self._assert_rest_result(result, msg)
def sync_pair(self, pair_id):
url = "/REPLICATIONPAIR/sync"
data = {"ID": pair_id,
"TYPE": "263"}
result = self.call(url, data, "PUT")
msg = _('Sync pair error.')
self._assert_rest_result(result, msg)
def check_pair_exist(self, pair_id):
url = "/REPLICATIONPAIR/" + pair_id
result = self.call(url, None, "GET")
return result['error']['code'] == 0
def set_pair_second_access(self, pair_id, access):
url = "/REPLICATIONPAIR/" + pair_id
data = {"ID": pair_id,
"SECRESACCESS": access}
result = self.call(url, data, "PUT")
msg = _('Set pair secondary access error.')
self._assert_rest_result(result, msg)

View File

@ -0,0 +1,2 @@
features:
- Added Replication V2 support for Huawei drivers.