Open the source code of ibm_storage driver

Opening up the source code of ibm_storage driver. The driver supports
the IBM Spectrum Accelerate family (XIV, Spectrum Accelerate, A9000
and A9000R) and DS8000.

Change-Id: I1abc6062dd84eeb6868ebedfc7032ff41030741f
This commit is contained in:
Alon Marx 2016-11-29 12:32:33 +02:00
parent 7dbb000437
commit 1c7fe96ea3
19 changed files with 11056 additions and 41 deletions

View File

@ -135,8 +135,10 @@ from cinder.volume.drivers.ibm import flashsystem_fc as \
from cinder.volume.drivers.ibm import flashsystem_iscsi as \
cinder_volume_drivers_ibm_flashsystemiscsi
from cinder.volume.drivers.ibm import gpfs as cinder_volume_drivers_ibm_gpfs
from cinder.volume.drivers.ibm import ibm_storage as \
cinder_volume_drivers_ibm_ibmstorage
from cinder.volume.drivers.ibm.ibm_storage import ds8k_proxy as \
cinder_volume_drivers_ibm_ibm_storage_ds8kproxy
from cinder.volume.drivers.ibm.ibm_storage import ibm_storage as \
cinder_volume_drivers_ibm_ibm_storage_ibmstorage
from cinder.volume.drivers.ibm.storwize_svc import storwize_svc_common as \
cinder_volume_drivers_ibm_storwize_svc_storwizesvccommon
from cinder.volume.drivers.ibm.storwize_svc import storwize_svc_fc as \
@ -310,7 +312,8 @@ def list_opts():
cinder_volume_drivers_ibm_flashsystemiscsi.
flashsystem_iscsi_opts,
cinder_volume_drivers_ibm_gpfs.gpfs_opts,
cinder_volume_drivers_ibm_ibmstorage.driver_opts,
cinder_volume_drivers_ibm_ibm_storage_ds8kproxy.ds8k_opts,
cinder_volume_drivers_ibm_ibm_storage_ibmstorage.driver_opts,
cinder_volume_drivers_ibm_storwize_svc_storwizesvccommon.
storwize_svc_opts,
cinder_volume_drivers_ibm_storwize_svc_storwizesvcfc.

View File

@ -0,0 +1,31 @@
# Copyright (c) 2016 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
""" Fake pyxcli-client for testing the driver without installing pyxcli"""
import mock
import sys
from cinder.tests.unit.volume.drivers.ibm import fake_pyxcli_exceptions
pyxcli_client = mock.Mock()
pyxcli_client.errors = fake_pyxcli_exceptions
pyxcli_client.events = mock.Mock()
pyxcli_client.mirroring = mock.Mock()
pyxcli_client.transports = fake_pyxcli_exceptions
sys.modules['pyxcli'] = pyxcli_client
sys.modules['pyxcli.events'] = pyxcli_client.events
sys.modules['pyxcli.mirroring'] = pyxcli_client.mirroring

View File

@ -0,0 +1,88 @@
# Copyright (c) 2016 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
""" Fake pyxcli exceptions for testing the driver without installing pyxcli"""
class XCLIError(Exception):
pass
class VolumeBadNameError(XCLIError):
pass
class CredentialsError(XCLIError):
pass
class ConnectionError(XCLIError):
pass
class CgHasMirrorError(XCLIError):
pass
class CgDoesNotExistError(XCLIError):
pass
class CgEmptyError(XCLIError):
pass
class PoolSnapshotLimitReachedError(XCLIError):
pass
class CommandFailedRuntimeError(XCLIError):
pass
class PoolOutOfSpaceError(XCLIError):
pass
class CgLimitReachedError(XCLIError):
pass
class HostBadNameError(XCLIError):
pass
class CgNotEmptyError(XCLIError):
pass
class SystemOutOfSpaceError(XCLIError):
pass
class CgNameExistsError(XCLIError):
pass
class CgBadNameError(XCLIError):
pass
class SnapshotGroupDoesNotExistError(XCLIError):
pass
class ClosedTransportError(XCLIError):
pass

File diff suppressed because it is too large Load Diff

View File

@ -25,7 +25,7 @@ from cinder.i18n import _
from cinder.objects import fields
from cinder import test
from cinder.volume import configuration as conf
from cinder.volume.drivers.ibm import ibm_storage
from cinder.volume.drivers.ibm.ibm_storage import ibm_storage
from cinder.volume import volume_types
FAKE = "fake"
@ -439,7 +439,7 @@ class IBMStorageVolumeDriverTest(test.TestCase):
return_size = self.driver.manage_existing_get_size(
VOLUME,
existing_ref)
self.assertEqual(return_size, MANAGED_VOLUME['size'])
self.assertEqual(MANAGED_VOLUME['size'], return_size)
# cover both case, whether driver renames the volume or not
self.driver.delete_volume(VOLUME)
@ -463,7 +463,7 @@ class IBMStorageVolumeDriverTest(test.TestCase):
self.driver.create_volume(MANAGED_VOLUME)
existing_ref = {'source-name': MANAGED_VOLUME['name']}
self.driver.manage_existing(VOLUME, existing_ref)
self.assertEqual(VOLUME['size'], MANAGED_VOLUME['size'])
self.assertEqual(MANAGED_VOLUME['size'], VOLUME['size'])
# cover both case, whether driver renames the volume or not
self.driver.delete_volume(VOLUME)
@ -492,10 +492,7 @@ class IBMStorageVolumeDriverTest(test.TestCase):
CONTEXT,
replicated_volume
)
self.assertEqual(
model_update['replication_status'],
'active'
)
self.assertEqual('active', model_update['replication_status'])
def test_get_replication_status_fail_on_exception(self):
"""Test that get_replication_status fails on exception"""

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,138 @@
# Copyright (c) 2016 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from cinder import exception
from cinder.i18n import _
BLOCKS_PER_17_GIGABYTES = 33554432.0
XIV_LOG_PREFIX = "[IBM XIV STORAGE]:"
XIV_CONNECTION_TYPE_ISCSI = 'iscsi'
XIV_CONNECTION_TYPE_FC = 'fibre_channel'
XIV_CONNECTION_TYPE_FC_ECKD = 'fibre_channel_eckd'
CHAP_NONE = 'disabled'
CHAP_ENABLED = 'enabled'
STORAGE_DRIVER_XIV = 'xiv'
STORAGE_DRIVER_DS8K = 'ds8k'
CONF_KEYS = {
'driver': "volume_driver",
'proxy': "proxy",
'user': "san_login",
'password': "san_password",
'storage_pool': "san_clustername",
'address': "san_ip",
'driver_version': "ibm_storage_driver_version",
'volume_api_class': "volume_api_class",
'volume_backend': "volume_backend_name",
'connection_type': "connection_type",
'management_ips': "management_ips",
'chap': 'chap',
'system_id': 'system_id',
'replication_device': 'replication_device'
}
CONF_BACKEND_KEYS = {
'user': "san_login",
'password': "san_password",
'storage_pool': "san_clustername",
'address': "san_ip",
'volume_backend': "volume_backend_name",
'connection_type': "connection_type",
'management_ips': "management_ips",
}
FLAG_KEYS = {
'user': "user",
'password': "password",
'storage_pool': "vol_pool",
'address': "address",
'connection_type': "connection_type",
'bypass_connection_check': "XIV_BYPASS_CONNECTION_CHECK",
'management_ips': "management_ips"
}
METADATA_KEYS = {
'ibm_storage_version': 'openstack_ibm_storage_driver_version',
'openstack_version': 'openstack_version',
'pool_host_key': 'openstack_compute_node_%(hostname)s',
'pool_volume_os': 'openstack_volume_os',
'pool_volume_hostname': 'openstack_volume_hostname'
}
def get_host_or_create_from_iqn(connector, connection=None):
"""Get host name.
Return the hostname if existing at the connector (nova-compute info)
If not, generate one from the IQN or HBA
"""
if connection is None and connector.get('host', None):
return connector['host']
if connection != XIV_CONNECTION_TYPE_FC and 'initiator' in connector:
try:
initiator = connector['initiator']
iqn_suffix = initiator.split('.')[-1].replace(":", "_")
except Exception:
if connector.get('initiator', 'None'):
raise exception.VolumeDriverException(message=(
_("Initiator format: %(iqn)s")) %
{'iqn': connector.get('initiator', 'None')})
else:
raise exception.VolumeDriverException(
message=_("Initiator is missing from connector object"))
return "nova-compute-%s" % iqn_suffix
if connection != XIV_CONNECTION_TYPE_ISCSI and len(
connector.get('wwpns', [])
) > 0:
return "nova-compute-%s" % connector['wwpns'][0].replace(":", "_")
raise exception.VolumeDriverException(
message=_("Compute host missing either iSCSI initiator or FC wwpns"))
def gigabytes_to_blocks(gigabytes):
return int(round((BLOCKS_PER_17_GIGABYTES / 17.0) * float(gigabytes)))
def get_online_iscsi_ports(ibm_storage_cli):
"""Returns online iscsi ports."""
iscsi_ports = [
{
'ip': p.get('address'),
# ipinterface_list returns ports field in Gen3, and
# port field in BlueRidge
'port': p.get('ports', p.get('port')),
'module': p.get('module')
} for p in ibm_storage_cli.cmd.ipinterface_list()
if p.type == 'iSCSI']
iscsi_connected_ports = [
{
'port': p.index,
'module': p.get('module_id')
} for p in ibm_storage_cli.cmd.ipinterface_list_ports()
if p.is_link_up == 'yes' and p.role == 'iSCSI']
to_return = []
for ip in iscsi_ports:
if len([
p for p in iscsi_connected_ports
if (p.get('port') == ip.get('port') and
p.get('module') == ip.get('module'))
]) > 0:
to_return += [ip.get('ip')]
return to_return

View File

@ -0,0 +1,66 @@
# Copyright (c) 2016 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import os
import tempfile
from oslo_log import log as logging
from cinder.i18n import _LE
LOG = logging.getLogger(__name__)
class CertificateCollector(object):
def __init__(self, paths=None):
self.paths_checked = [
'/etc/ssl/certs', '/etc/ssl/certs/xiv', '/etc/pki', '/etc/pki/xiv']
if paths:
self.paths_checked.extend(paths)
self.paths_checked = set(self.paths_checked)
self.tmp_fd = None
self.tmp_path = None
def collect_certificate(self):
self.tmp_fd, self.tmp_path = tempfile.mkstemp()
for path in self.paths_checked:
if os.path.exists(path) and os.path.isdir(path):
dir_contents = os.listdir(path)
for f in dir_contents:
full_path = os.path.join(path, f)
if (os.path.isfile(full_path) and
f.startswith('XIV') and
f.endswith('.pem')):
try:
cert_file = open(full_path, 'r')
os.write(self.tmp_fd, cert_file.read())
cert_file.close()
except Exception:
LOG.exception(_LE("Failed to process certificate"))
os.close(self.tmp_fd)
fsize = os.path.getsize(self.tmp_path)
if fsize > 0:
return self.tmp_path
else:
return None
def free_certificate(self):
if self.tmp_path:
try:
os.remove(self.tmp_path)
except Exception:
pass
self.tmp_path = None

View File

@ -0,0 +1,27 @@
# Copyright (c) 2016 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import base64
def encrypt(string):
return base64.b64encode(string.encode('UTF-8'))
def decrypt(string):
missing_padding = len(string) % 4
if missing_padding != 0:
string += b'=' * (4 - missing_padding)
return base64.b64decode(string)

View File

@ -0,0 +1,151 @@
# Copyright (c) 2016 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import datetime
import hashlib
import re
import ssl
from oslo_log import log as logging
from requests.packages.urllib3 import connection
from requests.packages.urllib3 import connectionpool
from requests.packages.urllib3 import poolmanager
from cinder.i18n import _LW, _
LOG = logging.getLogger(__name__)
try:
from OpenSSL.crypto import FILETYPE_ASN1
from OpenSSL.crypto import load_certificate
except ImportError:
load_certificate = None
FILETYPE_ASN1 = None
_PEM_RE = re.compile(u"""-----BEGIN CERTIFICATE-----\r?
.+?\r?-----END CERTIFICATE-----\r?\n?""", re.DOTALL)
class DS8KHTTPSConnection(connection.VerifiedHTTPSConnection):
"""Extend the HTTPS Connection to do our own Certificate Verification."""
def _verify_cert(self, sock, ca_certs):
# If they asked us to not verify the Certificate then nothing to do
if not ca_certs:
return
# Retrieve the Existing Certificates from the File in Binary Form
peercert = sock.getpeercert(True)
try:
with open(ca_certs, 'r') as f:
certs_str = f.read()
except Exception:
raise ssl.SSLError(_("Failed to read certificate from %s")
% ca_certs)
# Verify the Existing Certificates
found = False
certs = [match.group(0) for match in _PEM_RE.finditer(certs_str)]
for cert in certs:
existcert = ssl.PEM_cert_to_DER_cert(cert)
# First check to make sure the 2 certificates are the same ones
if (hashlib.sha256(existcert).digest() ==
hashlib.sha256(peercert).digest()):
found = True
break
if not found:
raise ssl.SSLError(
_("The certificate doesn't match the trusted one "
"in %s.") % ca_certs)
if load_certificate is None and FILETYPE_ASN1 is None:
raise ssl.SSLError(
_("Missing 'pyOpenSSL' python module, ensure the "
"library is installed."))
# Throw an exception if the certificate given to us has expired
x509 = load_certificate(FILETYPE_ASN1, peercert)
if x509.has_expired():
raise ssl.SSLError(
_("The certificate expired: %s") % x509.get_notAfter())
def connect(self):
"""Override the Connect Method to fix the Certificate Verification."""
# Add certificate verification
conn = self._new_conn()
if getattr(self, '_tunnel_host', None):
# _tunnel_host was added in Python 2.6.3
# (See: http://hg.python.org/cpython/rev/0f57b30a152f)
self.sock = conn
# Calls self._set_hostport(), so self.host is
# self._tunnel_host below.
#
# disable pylint because pylint doesn't support importing
# from six.moves yet. see:
# https://bitbucket.org/logilab/pylint/issue/550/
self._tunnel() # pylint: disable=E1101
# Mark this connection as not reusable
self.auto_open = 0
# The RECENT_DATE is originally taken from requests. The date is just
# an arbitrary value that is used as a sanity test to identify hosts
# that are using the default time after bootup (e.g. 1970), and
# provides information for debugging
RECENT_DATE = datetime.date(2014, 1, 1)
is_time_off = datetime.date.today() < RECENT_DATE
if is_time_off:
msg = _LW('System time is way off (before %s). This will probably '
'lead to SSL verification errors.')
LOG.warning(msg, RECENT_DATE)
# Wrap socket using verification with the root certs in
# trusted_root_certs
self.sock = ssl.wrap_socket(conn)
self._verify_cert(self.sock, self.ca_certs)
self.is_verified = True
def putrequest(self, method, url, **kwargs):
"""Override the Put Request method take the DS8K off of the URL."""
if url and url.startswith('httpsds8k://'):
url = 'https://' + url[12:]
return super(DS8KHTTPSConnection,
self).putrequest(method, url, **kwargs)
def request(self, method, url, **kwargs):
"""Override the Request method take the DS8K off of the URL."""
if url and url.startswith('httpsds8k://'):
url = 'https://' + url[12:]
return super(DS8KHTTPSConnection, self).request(method, url, **kwargs)
class DS8KConnectionPool(connectionpool.HTTPSConnectionPool):
"""Extend the HTTPS Connection Pool to our own Certificate verification."""
scheme = 'httpsds8k'
ConnectionCls = DS8KHTTPSConnection
def urlopen(self, method, url, **kwargs):
"""Override URL Open method to take DS8K out of the URL protocol."""
if url and url.startswith('httpsds8k://'):
url = 'https://' + url[12:]
return super(DS8KConnectionPool, self).urlopen(method, url, **kwargs)
if hasattr(poolmanager, 'key_fn_by_scheme'):
poolmanager.key_fn_by_scheme["httpsds8k"] = (
poolmanager.key_fn_by_scheme["https"])
poolmanager.pool_classes_by_scheme["httpsds8k"] = DS8KConnectionPool

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,993 @@
# Copyright (c) 2016 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""
This is the driver that allows openstack to talk to DS8K.
All volumes are thin provisioned by default, if the machine is licensed for it.
This can be overridden by creating a volume type and specifying a key like so:
#> cinder type-create my_type
#> cinder type-key my_type set drivers:thin_provision=False
#> cinder create --volume-type my_type 123
Sample settings for cinder.conf:
--->
enabled_backends = ibm_ds8k_1, ibm_ds8k_2
[ibm_ds8k_1]
proxy = cinder.volume.drivers.ibm.ibm_storage.ds8k_proxy.DS8KProxy
volume_backend_name = ibm_ds8k_1
san_clustername = P2,P3
san_password = actual_password
san_login = actual_username
san_ip = foo.com
volume_driver =
cinder.volume.drivers.ibm.ibm_storage.ibm_storage.IBMStorageDriver
chap = disabled
connection_type = fibre_channel
replication_device = backend_id: bar,
san_ip: bar.com, san_login: actual_username,
san_password: actual_password, san_clustername: P4,
port_pairs: I0236-I0306; I0237-I0307
[ibm_ds8k_2]
proxy = cinder.volume.drivers.ibm.ibm_storage.ds8k_proxy.DS8KProxy
volume_backend_name = ibm_ds8k_2
san_clustername = P4,P5
san_password = actual_password
san_login = actual_username
san_ip = bar.com
volume_driver =
cinder.volume.drivers.ibm.ibm_storage.ibm_storage.IBMStorageDriver
chap = disabled
connection_type = fibre_channel
<---
"""
import ast
import json
import six
from oslo_config import cfg
from oslo_log import log as logging
from cinder import exception
from cinder.i18n import _, _LI, _LW, _LE
from cinder.objects import fields
from cinder.utils import synchronized
import cinder.volume.drivers.ibm.ibm_storage as storage
from cinder.volume.drivers.ibm.ibm_storage import ds8k_helper as helper
from cinder.volume.drivers.ibm.ibm_storage \
import ds8k_replication as replication
from cinder.volume.drivers.ibm.ibm_storage import ds8k_restclient as restclient
from cinder.volume.drivers.ibm.ibm_storage import proxy
from cinder.volume.drivers.ibm.ibm_storage import strings
from cinder.volume import group_types
from cinder.volume import volume_types
LOG = logging.getLogger(__name__)
VALID_OS400_VOLUME_TYPES = {
'A01': 8, 'A02': 17, 'A04': 66,
'A05': 33, 'A06': 132, 'A07': 263,
'A81': 8, 'A82': 17, 'A84': 66,
'A85': 33, 'A86': 132, 'A87': 263,
'050': '', '099': ''
}
EXTRA_SPECS_DEFAULTS = {
'thin': True,
'replication_enabled': False,
'consistency': False,
'os400': '',
'consistent_group_replication_enabled': False,
'group_replication_enabled': False,
'consistent_group_snapshot_enabled': False,
}
ds8k_opts = [
cfg.StrOpt(
'ds8k_devadd_unitadd_mapping',
default='',
help='Mapping between IODevice address and unit address.'),
cfg.StrOpt(
'ds8k_ssid_prefix',
default='FF',
help='Set the first two digits of SSID'),
cfg.StrOpt(
'ds8k_host_type',
default='auto',
help='Set to zLinux if your OpenStack version is prior to '
'Liberty and you\'re connecting to zLinux systems. '
'Otherwise set to auto. Valid values for this parameter '
'are: %s.' % six.text_type(helper.VALID_HOST_TYPES)[1:-1])
]
CONF = cfg.CONF
CONF.register_opts(ds8k_opts)
class Lun(object):
"""provide volume information for driver from volume db object."""
class FakeLun(object):
def __init__(self, lun, **overrides):
self.size = lun.size
self.os_id = 'fake_os_id'
self.cinder_name = lun.cinder_name
self.is_snapshot = lun.is_snapshot
self.ds_name = lun.ds_name
self.ds_id = None
self.type_thin = lun.type_thin
self.type_os400 = lun.type_os400
self.data_type = lun.data_type
self.type_replication = lun.type_replication
if not self.is_snapshot and self.type_replication:
self.replica_ds_name = lun.replica_ds_name
self.replication_driver_data = lun.replication_driver_data
self.replication_status = lun.replication_status
self.lss_pair = lun.lss_pair
def update_volume(self, lun):
volume_update = lun.get_volume_update()
volume_update['provider_location'] = six.text_type({
'vol_hex_id': self.ds_id})
volume_update['metadata']['vol_hex_id'] = self.ds_id
return volume_update
def __init__(self, volume, is_snapshot=False):
volume_type_id = volume.get('volume_type_id')
self.specs = volume_types.get_volume_type_extra_specs(
volume_type_id) if volume_type_id else {}
os400 = self.specs.get(
'drivers:os400', EXTRA_SPECS_DEFAULTS['os400']
).strip().upper()
self.type_thin = self.specs.get(
'drivers:thin_provision', '%s' % EXTRA_SPECS_DEFAULTS['thin']
).upper() == 'True'.upper()
self.type_replication = self.specs.get(
'replication_enabled',
'<is> %s' % EXTRA_SPECS_DEFAULTS['replication_enabled']
).upper() == strings.METADATA_IS_TRUE
if volume.provider_location:
provider_location = ast.literal_eval(volume.provider_location)
self.ds_id = provider_location[six.text_type('vol_hex_id')]
else:
self.ds_id = None
self.cinder_name = volume.display_name
self.lss_pair = {}
self.is_snapshot = is_snapshot
if self.is_snapshot:
self.size = volume.volume_size
# ds8k supports at most 16 chars
self.ds_name = (
"OS%s:%s" % ('snap', helper.filter_alnum(self.cinder_name))
)[:16]
else:
self.size = volume.size
self.ds_name = (
"OS%s:%s" % ('vol', helper.filter_alnum(self.cinder_name))
)[:16]
self.replica_ds_name = (
"OS%s:%s" % ('Replica', helper.filter_alnum(self.cinder_name))
)[:16]
self.replication_status = volume.replication_status
self.replication_driver_data = (
json.loads(volume.replication_driver_data)
if volume.replication_driver_data else {})
if self.replication_driver_data:
# now only support one replication target.
replication_target = sorted(
self.replication_driver_data.values())[0]
replica_id = replication_target[six.text_type('vol_hex_id')]
self.lss_pair = {
'source': (None, self.ds_id[0:2]),
'target': (None, replica_id[0:2])
}
if os400:
if os400 not in VALID_OS400_VOLUME_TYPES.keys():
msg = (_("The OS400 volume type provided, %s, is not "
"a valid volume type.") % os400)
raise restclient.APIException(data=msg)
self.type_os400 = os400
if os400 not in ['050', '099']:
self.size = VALID_OS400_VOLUME_TYPES[os400]
else:
self.type_os400 = EXTRA_SPECS_DEFAULTS['os400']
self.data_type = self._create_datatype(self.type_os400)
self.os_id = volume.id
self.status = volume.status
self.volume = volume
def _get_volume_metadata(self, volume):
if 'volume_metadata' in volume:
metadata = volume.volume_metadata
return {m['key']: m['value'] for m in metadata}
if 'metadata' in volume:
return volume.metadata
return {}
def _get_snapshot_metadata(self, snapshot):
if 'snapshot_metadata' in snapshot:
metadata = snapshot.snapshot_metadata
return {m['key']: m['value'] for m in metadata}
if 'metadata' in snapshot:
return snapshot.metadata
return {}
def shallow_copy(self, **overrides):
return Lun.FakeLun(self, **overrides)
def _create_datatype(self, t):
if t[0:2] == 'A0':
datatype = t + ' FB 520P'
elif t[0:2] == 'A8':
datatype = t + ' FB 520U'
elif t == '050':
datatype = t + ' FB 520UV'
elif t == '099':
datatype = t + ' FB 520PV'
else:
datatype = None
return datatype
# Note: updating metadata in vol related funcs deletes all prior metadata
def get_volume_update(self):
volume_update = {}
volume_update['provider_location'] = six.text_type(
{'vol_hex_id': self.ds_id})
# update metadata
if self.is_snapshot:
metadata = self._get_snapshot_metadata(self.volume)
else:
metadata = self._get_volume_metadata(self.volume)
if self.type_replication:
metadata['replication'] = six.text_type(
self.replication_driver_data)
else:
metadata.pop('replication', None)
volume_update['replication_driver_data'] = json.dumps(
self.replication_driver_data)
volume_update['replication_status'] = self.replication_status
metadata['data_type'] = (self.data_type if self.data_type else
metadata['data_type'])
metadata['vol_hex_id'] = self.ds_id
volume_update['metadata'] = metadata
# need to update volume size for OS400
if self.type_os400:
volume_update['size'] = self.size
return volume_update
class Group(object):
"""provide group information for driver from group db object."""
def __init__(self, group):
gid = group.get('group_type_id')
specs = group_types.get_group_type_specs(gid) if gid else {}
self.type_cg_snapshot = specs.get(
'consistent_group_snapshot_enabled', '<is> %s' %
EXTRA_SPECS_DEFAULTS['consistent_group_snapshot_enabled']
).upper() == strings.METADATA_IS_TRUE
class DS8KProxy(proxy.IBMStorageProxy):
prefix = "[IBM DS8K STORAGE]:"
def __init__(self, storage_info, logger, exception, driver,
active_backend_id=None, HTTPConnectorObject=None):
proxy.IBMStorageProxy.__init__(
self, storage_info, logger, exception, driver, active_backend_id)
self._helper = None
self._replication = None
self._connector_obj = HTTPConnectorObject
self._replication_enabled = False
self._active_backend_id = active_backend_id
self.configuration = driver.configuration
self.configuration.append_config_values(ds8k_opts)
@proxy._trace_time
def setup(self, ctxt):
LOG.info(_LI("Initiating connection to IBM DS8K storage system."))
connection_type = self.configuration.safe_get('connection_type')
replication_devices = self.configuration.safe_get('replication_device')
if connection_type == storage.XIV_CONNECTION_TYPE_FC:
if not replication_devices:
self._helper = helper.DS8KCommonHelper(self.configuration,
self._connector_obj)
else:
self._helper = (
helper.DS8KReplicationSourceHelper(self.configuration,
self._connector_obj))
elif connection_type == storage.XIV_CONNECTION_TYPE_FC_ECKD:
self._helper = helper.DS8KECKDHelper(self.configuration,
self._connector_obj)
else:
err = (_("Param [connection_type] %s is invalid.")
% connection_type)
raise exception.InvalidParameterValue(err=err)
if replication_devices:
self._do_replication_setup(replication_devices, self._helper)
@proxy.logger
def _do_replication_setup(self, devices, src_helper):
if len(devices) >= 2:
err = _("Param [replication_device] is invalid, Driver "
"support only one replication target.")
raise exception.InvalidParameterValue(err=err)
self._replication = replication.Replication(src_helper, devices[0])
self._replication.check_physical_links()
self._replication.check_connection_type()
if self._active_backend_id:
self._switch_backend_connection(self._active_backend_id)
self._replication_enabled = True
@proxy.logger
def _switch_backend_connection(self, backend_id, repl_luns=None):
repl_luns = self._replication.switch_source_and_target(backend_id,
repl_luns)
self._helper = self._replication._source_helper
return repl_luns
@staticmethod
def _b2gb(b):
return b // (2 ** 30)
@proxy._trace_time
def _update_stats(self):
if self._helper:
storage_pools = self._helper.get_pools()
if not len(storage_pools):
msg = _('No pools found - make sure san_clustername '
'is defined in the config file and that the '
'pools exist on the storage.')
LOG.error(msg)
raise exception.CinderException(message=msg)
else:
msg = (_('Backend %s is not initialized.')
% self.configuration.volume_backend_name)
raise exception.CinderException(data=msg)
stats = {
"volume_backend_name": self.configuration.volume_backend_name,
"serial_number": self._helper.backend['storage_unit'],
"extent_pools": self._helper.backend['pools_str'],
"vendor_name": 'IBM',
"driver_version": self.full_version,
"storage_protocol": self._helper.get_connection_type(),
"total_capacity_gb": self._b2gb(
sum(p['cap'] for p in storage_pools.values())),
"free_capacity_gb": self._b2gb(
sum(p['capavail'] for p in storage_pools.values())),
"reserved_percentage": self.configuration.reserved_percentage,
"consistencygroup_support": True,
"consistent_group_snapshot_enabled": True,
"multiattach": True
}
if self._replication_enabled:
stats['replication_enabled'] = self._replication_enabled
self.meta['stat'] = stats
def _assert(self, assert_condition, exception_message=''):
if not assert_condition:
LOG.error(exception_message)
raise restclient.APIException(data=exception_message)
@proxy.logger
def _create_lun_helper(self, lun, pool=None, find_new_pid=True):
# DS8K supports ECKD ESE volume from 8.1
connection_type = self._helper.get_connection_type()
if connection_type == storage.XIV_CONNECTION_TYPE_FC_ECKD:
thin_provision = self._helper.get_thin_provision()
if lun.type_thin and thin_provision:
if lun.type_replication:
msg = _("The primary or the secondary storage "
"can not support ECKD ESE volume.")
else:
msg = _("Backend can not support ECKD ESE volume.")
LOG.error(msg)
raise restclient.APIException(message=msg)
# There is a time gap between find available LSS slot and
# lun actually occupies it.
excluded_lss = []
while True:
try:
if lun.type_replication and not lun.is_snapshot:
lun.lss_pair = self._replication.find_available_lss_pair(
excluded_lss)
else:
lun.lss_pair['source'] = self._helper.find_available_lss(
pool, find_new_pid, excluded_lss)
return self._helper.create_lun(lun)
except restclient.LssFullException:
msg = _LW("LSS %s is full, find another one.")
LOG.warning(msg, lun.lss_pair['source'][1])
excluded_lss.append(lun.lss_pair['source'][1])
@proxy.logger
def _clone_lun(self, src_lun, tgt_lun):
self._assert(src_lun.size <= tgt_lun.size,
_('Target volume should be bigger or equal '
'to the Source volume in size.'))
self._ensure_vol_not_fc_target(src_lun.ds_id)
# volume ID of src_lun and tgt_lun will be the same one if tgt_lun is
# image-volume, because _clone_image_volume in manager.py does not pop
# the provider_location.
if (tgt_lun.ds_id is None) or (src_lun.ds_id == tgt_lun.ds_id):
# It is a preferred practice to locate the FlashCopy target
# volume on the same DS8000 server as the FlashCopy source volume.
pool = self._helper.get_pool(src_lun.ds_id[0:2])
# flashcopy to larger target only works with thick vols, so we
# emulate for thin by extending after copy
if tgt_lun.type_thin and tgt_lun.size > src_lun.size:
tmp_size = tgt_lun.size
tgt_lun.size = src_lun.size
self._create_lun_helper(tgt_lun, pool)
tgt_lun.size = tmp_size
else:
self._create_lun_helper(tgt_lun, pool)
else:
self._assert(
src_lun.size == tgt_lun.size,
_('When target volume is pre-created, it must be equal '
'in size to source volume.'))
finished = False
try:
vol_pairs = [{
"source_volume": src_lun.ds_id,
"target_volume": tgt_lun.ds_id
}]
self._helper.start_flashcopy(vol_pairs)
fc_finished = self._helper.wait_flashcopy_finished(
[src_lun], [tgt_lun])
if (fc_finished and
tgt_lun.type_thin and
tgt_lun.size > src_lun.size):
param = {
'cap': self._helper._gb2b(tgt_lun.size),
'captype': 'bytes'
}
self._helper.change_lun(tgt_lun.ds_id, param)
finished = fc_finished
finally:
if not finished:
self._helper.delete_lun(tgt_lun)
return tgt_lun
def _ensure_vol_not_fc_target(self, vol_hex_id):
for cp in self._helper.get_flashcopy(vol_hex_id):
if cp['targetvolume']['id'] == vol_hex_id:
msg = (_('Volume %s is currently a target of another '
'FlashCopy operation') % vol_hex_id)
raise restclient.APIException(data=msg)
@proxy._trace_time
def create_volume(self, volume):
lun = self._create_lun_helper(Lun(volume))
if lun.type_replication:
lun = self._replication.create_replica(lun)
return lun.get_volume_update()
@proxy._trace_time
def create_cloned_volume(self, target_vol, source_vol):
lun = self._clone_lun(Lun(source_vol), Lun(target_vol))
if lun.type_replication:
lun = self._replication.create_replica(lun)
return lun.get_volume_update()
@proxy._trace_time
def create_volume_from_snapshot(self, volume, snapshot):
lun = self._clone_lun(Lun(snapshot, is_snapshot=True), Lun(volume))
if lun.type_replication:
lun = self._replication.create_replica(lun)
return lun.get_volume_update()
@proxy._trace_time
def extend_volume(self, volume, new_size):
lun = Lun(volume)
param = {
'cap': self._helper._gb2b(new_size),
'captype': 'bytes'
}
if lun.type_replication:
if not self._active_backend_id:
self._replication.delete_pprc_pairs(lun)
self._helper.change_lun(lun.ds_id, param)
self._replication.extend_replica(lun, param)
self._replication.create_pprc_pairs(lun)
else:
msg = (_("The volume %s has been failed over, it is "
"not suggested to extend it.") % lun.ds_id)
raise exception.CinderException(data=msg)
else:
self._helper.change_lun(lun.ds_id, param)
@proxy._trace_time
def volume_exists(self, volume):
return self._helper.lun_exists(Lun(volume).ds_id)
@proxy._trace_time
def delete_volume(self, volume):
lun = Lun(volume)
if lun.type_replication:
lun = self._replication.delete_replica(lun)
self._helper.delete_lun(lun)
@proxy._trace_time
def create_snapshot(self, snapshot):
return self._clone_lun(Lun(snapshot['volume']), Lun(
snapshot, is_snapshot=True)).get_volume_update()
@proxy._trace_time
def delete_snapshot(self, snapshot):
self._helper.delete_lun(Lun(snapshot, is_snapshot=True))
@proxy._trace_time
def migrate_volume(self, ctxt, volume, backend):
# this and retype is a complete mess, pending cinder changes for fix.
# currently this is only for migrating between pools on the same
# physical machine but different cinder.conf backends.
# volume not allowed to get here if cg or repl
# should probably check volume['status'] in ['available', 'in-use'],
# especially for flashcopy
stats = self.meta['stat']
if backend['capabilities']['vendor_name'] != stats['vendor_name']:
raise exception.VolumeDriverException(_(
'source and destination vendors differ.'))
if backend['capabilities']['serial_number'] != stats['serial_number']:
raise exception.VolumeDriverException(_(
'source and destination serial numbers differ.'))
new_pools = self._helper.get_pools(
backend['capabilities']['extent_pools'])
lun = Lun(volume)
cur_pool_id = self._helper.get_lun(lun.ds_id)['pool']['id']
cur_node = self._helper.get_storage_pools()[cur_pool_id]['node']
# try pools in same rank
for pid, pool in new_pools.items():
if pool['node'] == cur_node:
try:
self._helper.change_lun(lun.ds_id, {'pool': pid})
return (True, None)
except Exception:
pass
# try pools in opposite rank
for pid, pool in new_pools.items():
if pool['node'] != cur_node:
try:
new_lun = lun.shallow_copy()
self._create_lun_helper(new_lun, pid, False)
lun.data_type = new_lun.data_type
self._clone_lun(lun, new_lun)
volume_update = new_lun.update_volume(lun)
try:
self._helper.delete_lun(lun)
except Exception:
pass
return (True, volume_update)
except Exception:
# will ignore missing ds_id if failed create volume
self._helper.delete_lun(new_lun)
return (False, None)
@proxy._trace_time
def retype(self, ctxt, volume, new_type, diff, host):
"""retype the volume.
:param ctxt: Context
:param volume: A dictionary describing the volume to migrate
:param new_type: A dictionary describing the volume type to convert to
:param diff: A dictionary with the difference between the two types
:param host: A dictionary describing the host to migrate to, where
host['host'] is its name, and host['capabilities'] is a
dictionary of its reported capabilities.
"""
def _get_volume_type(key, value):
extra_specs = diff.get('extra_specs')
specific_type = extra_specs.get(key) if extra_specs else None
if specific_type:
old_type = (True if str(specific_type[0]).upper() == value
else False)
new_type = (True if str(specific_type[1]).upper() == value
else False)
else:
old_type = None
new_type = None
return old_type, new_type
def _convert_thin_and_thick(lun, new_type):
new_lun = lun.shallow_copy()
new_lun.type_thin = new_type
self._create_lun_helper(new_lun)
self._clone_lun(lun, new_lun)
try:
self._helper.delete_lun(lun)
except Exception:
pass
lun.ds_id = new_lun.ds_id
lun.data_type = new_lun.data_type
lun.type_thin = new_type
return lun
lun = Lun(volume)
# check thin or thick
old_type_thin, new_type_thin = _get_volume_type(
'drivers:thin_provision', 'True'.upper())
# check replication capability
old_type_replication, new_type_replication = _get_volume_type(
'replication_enabled', strings.METADATA_IS_TRUE)
# start retype
if old_type_thin != new_type_thin:
if old_type_replication:
if not new_type_replication:
lun = self._replication.delete_replica(lun)
lun = _convert_thin_and_thick(lun, new_type_thin)
else:
msg = (_("The volume %s is in replication relationship, "
"it is not supported to retype from thin to "
"thick or vice versus.") % lun.ds_id)
raise exception.CinderException(msg)
else:
lun = _convert_thin_and_thick(lun, new_type_thin)
if new_type_replication:
lun.type_replication = True
lun = self._replication.enable_replication(lun)
else:
if not old_type_replication and new_type_replication:
lun.type_replication = True
lun = self._replication.enable_replication(lun)
elif old_type_replication and not new_type_replication:
lun = self._replication.delete_replica(lun)
lun.type_replication = False
return True, lun.get_volume_update()
@synchronized('OpenStackCinderIBMDS8KMutexConnect-', external=True)
@proxy._trace_time
@proxy.logger
def initialize_connection(self, volume, connector, **kwargs):
"""Attach a volume to the host."""
vol_id = Lun(volume).ds_id
LOG.info(_LI('Attach the volume %s.'), vol_id)
return self._helper.initialize_connection(vol_id, connector, **kwargs)
@synchronized('OpenStackCinderIBMDS8KMutexConnect-', external=True)
@proxy._trace_time
@proxy.logger
def terminate_connection(self, volume, connector, force=False, **kwargs):
"""Detach a volume from a host."""
vol_id = Lun(volume).ds_id
LOG.info(_LI('Detach the volume %s.'), vol_id)
return self._helper.terminate_connection(vol_id, connector,
force, **kwargs)
@proxy.logger
def create_consistencygroup(self, ctxt, group):
"""Create a consistency group."""
return self._helper.create_group(ctxt, group)
@proxy.logger
def delete_consistencygroup(self, ctxt, group, volumes):
"""Delete a consistency group."""
luns = [Lun(volume) for volume in volumes]
return self._helper.delete_group(ctxt, group, luns)
@proxy._trace_time
def create_cgsnapshot(self, ctxt, cgsnapshot, snapshots):
"""Create a consistency group snapshot."""
return self._create_group_snapshot(ctxt, cgsnapshot, snapshots, True)
def _create_group_snapshot(self, ctxt, cgsnapshot, snapshots,
cg_enabled=False):
snapshots_model_update = []
model_update = {'status': fields.GroupStatus.AVAILABLE}
src_luns = [Lun(snapshot['volume']) for snapshot in snapshots]
tgt_luns = [Lun(snapshot, is_snapshot=True) for snapshot in snapshots]
try:
if src_luns and tgt_luns:
self._clone_group(src_luns, tgt_luns, cg_enabled)
except restclient.APIException:
model_update['status'] = fields.GroupStatus.ERROR
LOG.exception(_LE('Failed to create group snapshot.'))
for tgt_lun in tgt_luns:
snapshot_model_update = tgt_lun.get_volume_update()
snapshot_model_update.update({
'id': tgt_lun.os_id,
'status': model_update['status']
})
snapshots_model_update.append(snapshot_model_update)
return model_update, snapshots_model_update
@proxy._trace_time
@proxy.logger
def delete_cgsnapshot(self, ctxt, cgsnapshot, snapshots):
"""Delete a consistency group snapshot."""
return self._delete_group_snapshot(ctxt, cgsnapshot, snapshots)
def _delete_group_snapshot(self, ctxt, group_snapshot, snapshots):
snapshots_model_update = []
model_update = {'status': fields.GroupStatus.DELETED}
snapshots = [Lun(s, is_snapshot=True) for s in snapshots]
if snapshots:
try:
self._helper.delete_lun(snapshots)
except restclient.APIException as e:
model_update['status'] = fields.GroupStatus.ERROR_DELETING
LOG.error(_LE("Failed to delete group snapshot. "
"Error: %(err)s"),
{'err': e})
for snapshot in snapshots:
snapshots_model_update.append({
'id': snapshot.os_id,
'status': model_update['status']
})
return model_update, snapshots_model_update
@proxy.logger
def update_consistencygroup(self, ctxt, group,
add_volumes, remove_volumes):
"""Add or remove volume(s) to/from an existing consistency group."""
return self._helper.update_group(ctxt, group,
add_volumes, remove_volumes)
@proxy._trace_time
def create_consistencygroup_from_src(self, ctxt, group, volumes,
cgsnapshot, snapshots,
source_cg, sorted_source_vols):
"""Create a consistencygroup from source.
:param ctxt: the context of the caller.
:param group: the dictionary of the consistency group to be created.
:param volumes: a list of volume dictionaries in the group.
:param cgsnapshot: the dictionary of the cgsnapshot as source.
:param snapshots: a list of snapshot dictionaries in the cgsnapshot.
:param source_cg: the dictionary of the consisgroup as source.
:param sorted_source_vols: a list of volume dictionaries
in the consisgroup.
:return model_update, volumes_model_update
"""
return self._create_group_from_src(ctxt, group, volumes, cgsnapshot,
snapshots, source_cg,
sorted_source_vols, True)
def _create_group_from_src(self, ctxt, group, volumes, cgsnapshot,
snapshots, source_cg, sorted_source_vols,
cg_enabled=False):
model_update = {'status': fields.GroupStatus.AVAILABLE}
volumes_model_update = []
if cgsnapshot and snapshots:
src_luns = [Lun(snapshot, is_snapshot=True)
for snapshot in snapshots]
elif source_cg and sorted_source_vols:
src_luns = [Lun(source_vol)
for source_vol in sorted_source_vols]
else:
msg = _("_create_group_from_src supports a group snapshot "
"source or a group source, other sources can not "
"be used.")
LOG.error(msg)
raise exception.InvalidInput(message=msg)
try:
tgt_luns = [Lun(volume) for volume in volumes]
if src_luns and tgt_luns:
self._clone_group(src_luns, tgt_luns, cg_enabled)
except restclient.APIException:
model_update['status'] = fields.GroupStatus.ERROR
msg = _LE("Failed to create group from group snapshot.")
LOG.exception(msg)
for tgt_lun in tgt_luns:
volume_model_update = tgt_lun.get_volume_update()
volume_model_update.update({
'id': tgt_lun.os_id,
'status': model_update['status']
})
volumes_model_update.append(volume_model_update)
return model_update, volumes_model_update
def _clone_group(self, src_luns, tgt_luns, cg_enabled):
for src_lun in src_luns:
self._ensure_vol_not_fc_target(src_lun.ds_id)
finished = False
try:
vol_pairs = []
for src_lun, tgt_lun in zip(src_luns, tgt_luns):
pool = self._helper.get_pool(src_lun.ds_id[0:2])
if tgt_lun.ds_id is None:
self._create_lun_helper(tgt_lun, pool)
vol_pairs.append({
"source_volume": src_lun.ds_id,
"target_volume": tgt_lun.ds_id
})
if cg_enabled:
self._do_flashcopy_with_freeze(vol_pairs)
else:
self._helper.start_flashcopy(vol_pairs)
finished = self._helper.wait_flashcopy_finished(src_luns, tgt_luns)
finally:
if not finished:
self._helper.delete_lun(tgt_luns)
@synchronized('OpenStackCinderIBMDS8KMutex-CG-', external=True)
@proxy._trace_time
def _do_flashcopy_with_freeze(self, vol_pairs):
# issue flashcopy with freeze
self._helper.start_flashcopy(vol_pairs, True)
# unfreeze the LSS where source volumes are in
lss_ids = list(set(p['source_volume'][0:2] for p in vol_pairs))
LOG.debug('Unfreezing the LSS: %s', ','.join(lss_ids))
self._helper.unfreeze_lss(lss_ids)
@proxy.logger
def create_group(self, ctxt, group):
"""Create generic volume group."""
return self._helper.create_group(ctxt, group)
@proxy.logger
def delete_group(self, ctxt, group, volumes):
"""Delete group and the volumes in the group."""
luns = [Lun(volume) for volume in volumes]
return self._helper.delete_group(ctxt, group, luns)
@proxy.logger
def update_group(self, ctxt, group, add_volumes, remove_volumes):
"""Update generic volume group."""
return self._helper.update_group(ctxt, group,
add_volumes, remove_volumes)
@proxy.logger
def create_group_snapshot(self, ctxt, group_snapshot, snapshots):
"""Create volume group snapshot."""
snapshot_group = Group(group_snapshot)
cg_enabled = True if snapshot_group.type_cg_snapshot else False
return self._create_group_snapshot(ctxt, group_snapshot,
snapshots, cg_enabled)
@proxy.logger
def delete_group_snapshot(self, ctxt, group_snapshot, snapshots):
"""Delete volume group snapshot."""
return self._delete_group_snapshot(ctxt, group_snapshot, snapshots)
@proxy._trace_time
def create_group_from_src(self, ctxt, group, volumes, group_snapshot,
sorted_snapshots, source_group,
sorted_source_vols):
"""Create volume group from volume group or volume group snapshot."""
volume_group = Group(group)
cg_enabled = True if volume_group.type_cg_snapshot else False
return self._create_group_from_src(ctxt, group, volumes,
group_snapshot, sorted_snapshots,
source_group, sorted_source_vols,
cg_enabled)
def freeze_backend(self, ctxt):
"""Notify the backend that it's frozen."""
pass
def thaw_backend(self, ctxt):
"""Notify the backend that it's unfrozen/thawed."""
pass
@proxy.logger
@proxy._trace_time
def failover_host(self, ctxt, volumes, secondary_id):
"""Fail over the volume back and forth.
if secondary_id is 'default', volumes will be failed back,
otherwize failed over.
"""
volume_update_list = []
if secondary_id == strings.PRIMARY_BACKEND_ID:
if not self._active_backend_id:
msg = _LI("Host has been failed back. doesn't need "
"to fail back again.")
LOG.info(msg)
return self._active_backend_id, volume_update_list
else:
if self._active_backend_id:
msg = _LI("Host has been failed over to %s.")
LOG.info(msg, self._active_backend_id)
return self._active_backend_id, volume_update_list
backend_id = self._replication._target_helper.backend['id']
if secondary_id is None:
secondary_id = backend_id
elif secondary_id != backend_id:
msg = (_('Invalid secondary_backend_id specified. '
'Valid backend id is %s.') % backend_id)
raise exception.InvalidReplicationTarget(message=msg)
LOG.debug("Starting failover to %s.", secondary_id)
replicated_luns = []
for volume in volumes:
lun = Lun(volume)
if lun.type_replication and lun.status == "available":
replicated_luns.append(lun)
else:
volume_update = (
self._replication.failover_unreplicated_volume(lun))
volume_update_list.append(volume_update)
if replicated_luns:
try:
if secondary_id != strings.PRIMARY_BACKEND_ID:
self._replication.do_pprc_failover(replicated_luns,
secondary_id)
self._active_backend_id = secondary_id
replicated_luns = self._switch_backend_connection(
secondary_id, replicated_luns)
else:
self._replication.start_pprc_failback(
replicated_luns, self._active_backend_id)
self._active_backend_id = ""
self._helper = self._replication._source_helper
except restclient.APIException as e:
msg = (_("Unable to failover host to %(id)s. "
"Exception= %(ex)s")
% {'id': secondary_id, 'ex': six.text_type(e)})
raise exception.UnableToFailOver(reason=msg)
for lun in replicated_luns:
volume_update = lun.get_volume_update()
volume_update['replication_status'] = (
'failed-over' if self._active_backend_id else 'enabled')
model_update = {'volume_id': lun.os_id,
'updates': volume_update}
volume_update_list.append(model_update)
else:
LOG.info(_LI("No volume has replication capability."))
if secondary_id != strings.PRIMARY_BACKEND_ID:
LOG.info(_LI("Switch to the target %s"), secondary_id)
self._switch_backend_connection(secondary_id)
self._active_backend_id = secondary_id
else:
LOG.info(_LI("Switch to the primary %s"), secondary_id)
self._switch_backend_connection(self._active_backend_id)
self._active_backend_id = ""
return secondary_id, volume_update_list

View File

@ -0,0 +1,577 @@
# Copyright (c) 2016 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import ast
import eventlet
import six
from oslo_log import log as logging
from oslo_utils import excutils
from cinder import exception
from cinder.i18n import _, _LE, _LI
from cinder.utils import synchronized
import cinder.volume.drivers.ibm.ibm_storage as storage
from cinder.volume.drivers.ibm.ibm_storage import ds8k_helper as helper
from cinder.volume.drivers.ibm.ibm_storage import ds8k_restclient as restclient
from cinder.volume.drivers.ibm.ibm_storage import proxy
LOG = logging.getLogger(__name__)
PPRC_PATH_NOT_EXIST = 0x00
PPRC_PATH_HEALTHY = 0x01
PPRC_PATH_UNHEALTHY = 0x02
PPRC_PATH_FULL = 0x03
class MetroMirrorManager(object):
"""Manage metro mirror for replication."""
def __init__(self, source, target):
self._source = source
self._target = target
def switch_source_and_target(self):
self._source, self._target = self._target, self._source
def check_physical_links(self):
ports = self._source.get_physical_links(
self._target.backend['storage_wwnn'])
if not ports:
msg = (_("DS8K %(tgt)s is not connected to the DS8K %(src)s!") %
{'tgt': self._target.backend['storage_wwnn'],
'src': self._source.backend['storage_wwnn']})
raise exception.CinderException(msg)
pairs = [{
'source_port_id': p['source_port_id'],
'target_port_id': p['target_port_id']
} for p in ports]
if not self._target.backend['port_pairs']:
# if there are more than eight physical links,
# choose eight of them.
self._target.backend['port_pairs'] = (
pairs[:8] if len(pairs) > 8 else pairs)
else:
# verify the port pairs user set
for pair in self._target.backend['port_pairs']:
if pair not in pairs:
valid_pairs = ';'.join(
["%s-%s" % (p['source_port_id'],
p['target_port_id'])
for p in pairs])
invalid_pair = "%s-%s" % (pair['source_port_id'],
pair['target_port_id'])
msg = (_("Invalid port pair: %(invalid)s, valid port "
"pair(s) are: %(valid)s") %
{'invalid': invalid_pair,
'valid': valid_pairs})
raise exception.CinderException(msg)
self._source.backend['port_pairs'] = [{
'source_port_id': p['target_port_id'],
'target_port_id': p['source_port_id']
} for p in self._target.backend['port_pairs']]
def is_target_alive(self):
try:
self._target.get_systems()
except restclient.TimeoutException as e:
msg = _LI("REST request time out, backend may be not available "
"any more. Exception: %s")
LOG.info(msg, e)
return False
return True
def find_available_pprc_path(self, lss=None, excluded_lss=None):
"""find lss from existed pprc path.
the format of lss_pair returned is as below:
{'source': (pid, lss), 'target': (pid, lss)}
"""
state, paths = self._filter_pprc_paths(lss)
if state != PPRC_PATH_HEALTHY:
# check whether the physical links are available or not,
# or have been changed.
self.check_physical_links()
return state, None
if excluded_lss:
paths = [p for p in paths
if p['source_lss_id'] not in excluded_lss]
lss_pair = {}
if len(paths) == 1:
path = paths[0]
pid = self._source.get_pool(path['source_lss_id'])
lss_pair['source'] = (pid, path['source_lss_id'])
else:
# sort the lss pairs according to the number of luns,
# get the lss pair which has least luns.
candidates = []
source_lss_set = set(p['source_lss_id'] for p in paths)
for lss in source_lss_set:
# get the number of lun in source.
src_luns = self._source.get_lun_number_in_lss(lss)
if src_luns == helper.LSS_VOL_SLOTS:
continue
spec_paths = [p for p in paths if p['source_lss_id'] == lss]
for path in spec_paths:
# get the number of lun in target.
tgt_luns = self._target.get_lun_number_in_lss(
path['target_lss_id'])
candidates.append((lss, path, src_luns + tgt_luns))
if candidates:
candidate = sorted(candidates, key=lambda c: c[2])[0]
pid = self._source.get_pool(candidate[0])
lss_pair['source'] = (pid, candidate[0])
path = candidate[1]
else:
return PPRC_PATH_FULL, None
# format the target in lss_pair.
pid = self._target.get_pool(path['target_lss_id'])
lss_pair['target'] = (pid, path['target_lss_id'])
return PPRC_PATH_HEALTHY, lss_pair
def _filter_pprc_paths(self, lss):
paths = self._source.get_pprc_paths(lss)
if paths:
# get the paths only connected to replication target
paths = [p for p in paths if p['target_system_wwnn'] in
self._target.backend['storage_wwnn']]
else:
msg = _LI("No PPRC paths found in primary DS8K.")
LOG.info(msg)
return PPRC_PATH_NOT_EXIST, None
# get the paths whose port pairs have been set in configuration file.
expected_port_pairs = [(p['source_port_id'], p['target_port_id'])
for p in self._target.backend['port_pairs']]
for path in paths[:]:
port_pairs = [(p['source_port_id'], p['target_port_id'])
for p in path['port_pairs']]
if not (set(port_pairs) & set(expected_port_pairs)):
paths.remove(path)
if not paths:
msg = _LI("Existing PPRC paths do not use port pairs that "
"are set.")
LOG.info(msg)
return PPRC_PATH_NOT_EXIST, None
# abandon PPRC paths according to volume type(fb/ckd)
source_lss_set = set(p['source_lss_id'] for p in paths)
if self._source.backend.get('device_mapping'):
source_lss_set = source_lss_set & set(
self._source.backend['device_mapping'].keys())
else:
all_lss = self._source.get_all_lss(['id', 'type'])
fb_lss = set(
lss['id'] for lss in all_lss if lss['type'] == 'fb')
source_lss_set = source_lss_set & fb_lss
paths = [p for p in paths if p['source_lss_id'] in source_lss_set]
if not paths:
msg = _LI("No source LSS in PPRC paths has correct volume type.")
LOG.info(msg)
return PPRC_PATH_NOT_EXIST, None
# if the group property of lss doesn't match pool node,
# abandon these paths.
discarded_src_lss = []
discarded_tgt_lss = []
for lss in source_lss_set:
spec_paths = [p for p in paths if p['source_lss_id'] == lss]
if self._source.get_pool(lss) is None:
discarded_src_lss.append(lss)
continue
for spec_path in spec_paths:
tgt_lss = spec_path['target_lss_id']
if self._target.get_pool(tgt_lss) is None:
discarded_tgt_lss.append(tgt_lss)
if discarded_src_lss:
paths = [p for p in paths if p['source_lss_id'] not in
discarded_src_lss]
if discarded_tgt_lss:
paths = [p for p in paths if p['target_lss_id'] not in
discarded_tgt_lss]
if not paths:
msg = _LI("No PPRC paths can be re-used.")
LOG.info(msg)
return PPRC_PATH_NOT_EXIST, None
# abandon unhealthy PPRC paths.
for path in paths[:]:
failed_port_pairs = [
p for p in path['port_pairs'] if p['state'] != 'success']
if len(failed_port_pairs) == len(path['port_pairs']):
paths.remove(path)
if not paths:
msg = _LI("PPRC paths between primary and target DS8K "
"are unhealthy.")
LOG.info(msg)
return PPRC_PATH_UNHEALTHY, None
return PPRC_PATH_HEALTHY, paths
def create_pprc_path(self, lss_pair):
src_lss = lss_pair['source'][1]
tgt_lss = lss_pair['target'][1]
# check whether the pprc path exists and is healthy or not firstly.
pid = (self._source.backend['storage_wwnn'] + '_' + src_lss + ':' +
self._target.backend['storage_wwnn'] + '_' + tgt_lss)
state = self._is_pprc_paths_healthy(pid)
msg = _LI("The state of PPRC path %(path)s is %(state)s.")
LOG.info(msg, {'path': pid, 'state': state})
if state == PPRC_PATH_HEALTHY:
return
# create the pprc path
pathData = {
'target_system_wwnn': self._target.backend['storage_wwnn'],
'source_lss_id': src_lss,
'target_lss_id': tgt_lss,
'port_pairs': self._target.backend['port_pairs']
}
msg = _LI("PPRC path %(src)s:%(tgt)s will be created.")
LOG.info(msg, {'src': src_lss, 'tgt': tgt_lss})
self._source.create_pprc_path(pathData)
# check the state of the pprc path
LOG.debug("Checking the state of the new PPRC path.")
for retry in range(4):
eventlet.sleep(2)
if self._is_pprc_paths_healthy(pid) == PPRC_PATH_HEALTHY:
break
if retry == 3:
self._source.delete_pprc_path(pid)
msg = (_("Fail to create PPRC path %(src)s:%(tgt)s.") %
{'src': src_lss, 'tgt': tgt_lss})
raise restclient.APIException(data=msg)
LOG.debug("Create the new PPRC path successfully.")
def _is_pprc_paths_healthy(self, path_id):
try:
path = self._source.get_pprc_path(path_id)
except restclient.APIException:
return PPRC_PATH_NOT_EXIST
for port in path['port_pairs']:
if port['state'] == 'success':
return PPRC_PATH_HEALTHY
return PPRC_PATH_UNHEALTHY
def create_pprc_pairs(self, lun):
tgt_vol_id = lun.replication_driver_data[
self._target.backend['id']]['vol_hex_id']
tgt_stg_id = self._target.backend['storage_unit']
vol_pairs = [{
'source_volume': lun.ds_id,
'source_system_id':
self._source.backend['storage_unit'],
'target_volume': tgt_vol_id,
'target_system_id': tgt_stg_id
}]
pairData = {
"volume_pairs": vol_pairs,
"type": "metro_mirror",
"options": ["permit_space_efficient_target",
"initial_copy_full"]
}
LOG.debug("Creating pprc pair, pairData is %s.", pairData)
self._source.create_pprc_pair(pairData)
self._source.wait_pprc_copy_finished([lun.ds_id], 'full_duplex')
LOG.info(_LI("The state of PPRC pair has become full_duplex."))
def delete_pprc_pairs(self, lun):
self._source.delete_pprc_pair(lun.ds_id)
if self.is_target_alive():
replica = sorted(lun.replication_driver_data.values())[0]
self._target.delete_pprc_pair(
six.text_type(replica['vol_hex_id']))
def do_pprc_failover(self, luns, backend_id):
vol_pairs = []
target_vol_ids = []
for lun in luns:
target_vol_id = (
lun.replication_driver_data[backend_id]['vol_hex_id'])
if not self._target.lun_exists(target_vol_id):
msg = _LI("Target volume %(volid)s doesn't exist in "
"DS8K %(storage)s.")
LOG.info(msg, {
'volid': target_vol_id,
'storage': self._target.backend['storage_unit']
})
continue
vol_pairs.append({
'source_volume': six.text_type(target_vol_id),
'source_system_id': six.text_type(
self._target.backend['storage_unit']),
'target_volume': six.text_type(lun.ds_id),
'target_system_id': six.text_type(
self._source.backend['storage_unit'])
})
target_vol_ids.append(target_vol_id)
pairData = {
"volume_pairs": vol_pairs,
"type": "metro_mirror",
"options": ["failover"]
}
LOG.info(_LI("Begin to fail over to %s"),
self._target.backend['storage_unit'])
self._target.create_pprc_pair(pairData)
self._target.wait_pprc_copy_finished(target_vol_ids,
'suspended', False)
LOG.info(_LI("Failover from %(src)s to %(tgt)s is finished."), {
'src': self._source.backend['storage_unit'],
'tgt': self._target.backend['storage_unit']
})
def do_pprc_failback(self, luns, backend_id):
pprc_ids = []
vol_ids = []
for lun in luns:
target_vol_id = (
lun.replication_driver_data[backend_id]['vol_hex_id'])
if not self._target.lun_exists(target_vol_id):
msg = _LE("Target volume %(volume)s doesn't exist in "
"DS8K %(storage)s.")
LOG.info(msg, {
'volume': lun.ds_id,
'storage': self._target.backend['storage_unit']
})
continue
pprc_id = (self._source.backend['storage_unit'] + '_' +
lun.ds_id + ':' +
self._target.backend['storage_unit'] +
'_' + target_vol_id)
pprc_ids.append(pprc_id)
vol_ids.append(lun.ds_id)
pairData = {"pprc_ids": pprc_ids,
"type": "metro_mirror",
"options": ["failback"]}
LOG.info(_LI("Begin to run failback in %s."),
self._source.backend['storage_unit'])
self._source.do_failback(pairData)
self._source.wait_pprc_copy_finished(vol_ids, 'full_duplex', False)
LOG.info(_LI("Run failback in %s is finished."),
self._source.backend['storage_unit'])
class Replication(object):
"""Metro Mirror and Global Mirror will be used by it."""
def __init__(self, source_helper, target_device):
self._source_helper = source_helper
connection_type = target_device.get('connection_type')
if connection_type == storage.XIV_CONNECTION_TYPE_FC:
self._target_helper = (
helper.DS8KReplicationTargetHelper(target_device))
else:
self._target_helper = (
helper.DS8KReplicationTargetECKDHelper(target_device))
self._mm_manager = MetroMirrorManager(self._source_helper,
self._target_helper)
def check_connection_type(self):
src_conn_type = self._source_helper.get_connection_type()
tgt_conn_type = self._target_helper.get_connection_type()
if src_conn_type != tgt_conn_type:
msg = (_("The connection type in primary backend is "
"%(primary)s, but in secondary backend it is "
"%(secondary)s") %
{'primary': src_conn_type, 'secondary': tgt_conn_type})
raise exception.CinderException(msg)
# PPRC can not copy from ESE volume to standard volume or vice versus.
if src_conn_type == storage.XIV_CONNECTION_TYPE_FC_ECKD:
src_thin = self._source_helper.get_thin_provision()
tgt_thin = self._target_helper.get_thin_provision()
if src_thin != tgt_thin:
self._source_helper.disable_thin_provision()
self._target_helper.disable_thin_provision()
def check_physical_links(self):
self._mm_manager.check_physical_links()
def switch_source_and_target(self, secondary_id, luns=None):
# switch the helper in metro mirror manager
self._mm_manager.switch_source_and_target()
# switch the helper
self._source_helper, self._target_helper = (
self._target_helper, self._source_helper)
# switch the volume id
if luns:
for lun in luns:
backend = lun.replication_driver_data.get(secondary_id, None)
lun.replication_driver_data.update(
{secondary_id: {'vol_hex_id': lun.ds_id}})
lun.ds_id = backend['vol_hex_id']
return luns
@proxy.logger
def find_available_lss_pair(self, excluded_lss):
state, lss_pair = (
self._mm_manager.find_available_pprc_path(None, excluded_lss))
if lss_pair is None:
lss_pair = self.find_new_lss_for_source(excluded_lss)
lss_pair.update(self.find_new_lss_for_target())
return lss_pair
@proxy.logger
def find_new_lss_for_source(self, excluded_lss):
src_pid, src_lss = self._source_helper.find_pool_and_lss(excluded_lss)
return {'source': (src_pid, src_lss)}
@proxy.logger
def find_new_lss_for_target(self):
tgt_pid, tgt_lss = self._target_helper.find_pool_and_lss()
return {'target': (tgt_pid, tgt_lss)}
@proxy.logger
def enable_replication(self, lun):
state, lun.lss_pair = (
self._mm_manager.find_available_pprc_path(lun.ds_id[0:2]))
if state == PPRC_PATH_UNHEALTHY:
msg = (_("The path(s) for volume %(name)s isn't available "
"any more, please make sure the state of the path(s) "
"which source LSS is %(lss)s is success.") %
{'name': lun.cinder_name, 'lss': lun.ds_id[0:2]})
raise restclient.APIException(data=msg)
elif state == PPRC_PATH_NOT_EXIST:
pid = self._source_helper.get_pool(lun.ds_id[0:2])
lss_pair = {'source': (pid, lun.ds_id[0:2])}
lss_pair.update(self.find_new_lss_for_target())
lun.lss_pair = lss_pair
LOG.debug("Begin to create replication volume, lss_pair is %s." %
lun.lss_pair)
lun = self.create_replica(lun, False)
return lun
@proxy.logger
@synchronized('OpenStackCinderIBMDS8KMutexCreateReplica-', external=True)
def create_replica(self, lun, delete_source=True):
try:
self._target_helper.create_lun(lun)
# create PPRC paths if need.
self._mm_manager.create_pprc_path(lun.lss_pair)
# create pprc pair
self._mm_manager.create_pprc_pairs(lun)
except restclient.APIException:
with excutils.save_and_reraise_exception():
self.delete_replica(lun)
if delete_source:
self._source_helper.delete_lun(lun)
lun.replication_status = 'enabled'
return lun
@proxy.logger
def delete_replica(self, lun):
if lun.ds_id is not None:
try:
self._mm_manager.delete_pprc_pairs(lun)
self._delete_replica(lun)
except restclient.APIException as e:
msg = (_('Failed to delete the target volume for volume '
'%(volume)s, Exception: %(ex)s.') %
{'volume': lun.ds_id, 'ex': six.text_type(e)})
raise exception.CinderException(msg)
lun.replication_status = 'disabled'
lun.replication_driver_data = {}
return lun
@proxy.logger
def _delete_replica(self, lun):
if not lun.replication_driver_data:
msg = _LE("No replica ID for lun %s, maybe there is something "
"wrong when creating the replica for lun.")
LOG.error(msg, lun.ds_id)
return None
for backend_id, backend in lun.replication_driver_data.items():
if not self._mm_manager.is_target_alive():
return None
if not self._target_helper.lun_exists(backend['vol_hex_id']):
LOG.debug("Replica %s not found.", backend['vol_hex_id'])
continue
LOG.debug("Deleting replica %s.", backend['vol_hex_id'])
self._target_helper.delete_lun_by_id(backend['vol_hex_id'])
def extend_replica(self, lun, param):
for backend_id, backend in lun.replication_driver_data.items():
self._target_helper.change_lun(backend['vol_hex_id'], param)
def delete_pprc_pairs(self, lun):
self._mm_manager.delete_pprc_pairs(lun)
def create_pprc_pairs(self, lun):
self._mm_manager.create_pprc_pairs(lun)
def do_pprc_failover(self, luns, backend_id):
self._mm_manager.do_pprc_failover(luns, backend_id)
@proxy.logger
def start_pprc_failback(self, luns, backend_id):
# check whether primary client is alive or not.
if not self._mm_manager.is_target_alive():
try:
self._target_helper.update_client()
except restclient.APIException:
msg = _("Can not connect to the primary backend, "
"please make sure it is back.")
LOG.error(msg)
raise exception.UnableToFailOver(reason=msg)
LOG.debug("Failback starts, backend id is %s.", backend_id)
for lun in luns:
self._mm_manager.create_pprc_path(lun.lss_pair)
self._mm_manager.do_pprc_failback(luns, backend_id)
# revert the relationship of source volume and target volume
self.do_pprc_failover(luns, backend_id)
self.switch_source_and_target(backend_id, luns)
self._mm_manager.do_pprc_failback(luns, backend_id)
LOG.debug("Failback ends, backend id is %s.", backend_id)
@proxy.logger
def failover_unreplicated_volume(self, lun):
provider_location = ast.literal_eval(lun.volume['provider_location'])
if 'old_status' in provider_location:
updates = {'status': provider_location['old_status']}
del provider_location['old_status']
updates['provider_location'] = six.text_type(provider_location)
else:
provider_location['old_status'] = lun.status
updates = {
'status': 'error',
'provider_location': six.text_type(provider_location)
}
return {'volume_id': lun.os_id, 'updates': updates}

View File

@ -0,0 +1,337 @@
# Copyright (c) 2016 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import abc
import eventlet
import importlib
import json
import six
from six.moves import urllib
import requests
from requests import exceptions as req_exception
from cinder import exception
from cinder.i18n import _
TOKEN_ERROR_CODES = ('BE7A001B', 'BE7A001A')
# remove BE7A0032 after REST fixed the problem of throwing message
# which shows all LSS are full but actually only one LSS is full.
LSS_ERROR_CODES = ('BE7A0031', 'BE7A0032')
AUTHENTICATION_ERROR_CODES = (
'BE7A001B', 'BE7A001A', 'BE7A0027',
'BE7A0028', 'BE7A0029', 'BE7A002A',
'BE7A002B', 'BE7A002C', 'BE7A002D'
)
class APIException(exception.VolumeBackendAPIException):
"""Exception raised for errors in the REST APIs."""
"""
Attributes:
message -- explanation of the error
"""
pass
class APIAuthenticationException(APIException):
"""Exception raised for errors in the Authentication."""
"""
Attributes:
message -- explanation of the error
"""
pass
class LssFullException(APIException):
"""Exception raised for errors when LSS is full."""
"""
Attributes:
message -- explanation of the error
"""
pass
class LssIDExhaustError(exception.VolumeBackendAPIException):
"""Exception raised for errors when can not find available LSS."""
"""
Attributes:
message -- explanation of the error
"""
pass
class TimeoutException(APIException):
"""Exception raised when the request is time out."""
"""
Attributes:
message -- explanation of the error
"""
pass
@six.add_metaclass(abc.ABCMeta)
class AbstractRESTConnector(object):
"""Inherit this class when you define your own connector."""
@abc.abstractmethod
def close(self):
"""close the connector.
If the connector uses persistent connection, please provide
a way to close it in this method, otherwise you can just leave
this method empty.
Input: None
Output: None
Exception: can raise any exceptions
"""
pass
@abc.abstractmethod
def send(self, method='', url='', headers=None, payload='', timeout=900):
"""send the request.
Input: see above
Output:
if we reached the server and read an HTTP response:
(INTEGER__HTTP_RESPONSE_STATUS_CODE,
STRING__BODY_OF_RESPONSE_EVEN_IF_STATUS_NOT_200)
if we were not able to reach the server or response
was invalid HTTP(like certificate error, or could not
resolve domain etc):
(False, STRING__SHORT_EXPLANATION_OF_REASON_FOR_NOT_
REACHING_SERVER_OR_GETTING_INVALID_RESPONSE)
Exception: should not raise any exceptions itself as all
the expected scenarios are covered above. Unexpected
exceptions are permitted.
"""
pass
class DefaultRESTConnector(AbstractRESTConnector):
"""User can write their own connector and pass it to RESTScheduler."""
def __init__(self, verify):
# overwrite certificate validation method only when using
# default connector, and not globally import the new scheme.
if isinstance(verify, six.string_types):
importlib.import_module("cinder.volume.drivers.ibm.ibm_storage."
"ds8k_connection")
self.session = None
self.verify = verify
def connect(self):
if self.session is None:
self.session = requests.Session()
if isinstance(self.verify, six.string_types):
self.session.mount('httpsds8k://',
requests.adapters.HTTPAdapter())
else:
self.session.mount('https://',
requests.adapters.HTTPAdapter())
self.session.verify = self.verify
def close(self):
self.session.close()
self.session = None
def send(self, method='', url='', headers=None, payload='', timeout=900):
self.connect()
try:
if isinstance(self.verify, six.string_types):
url = url.replace('https://', 'httpsds8k://')
resp = self.session.request(method,
url,
headers=headers,
data=payload,
timeout=timeout)
return resp.status_code, resp.text
except req_exception.ConnectTimeout as e:
self.close()
return 408, "Connection time out: %s" % six.text_type(e)
except req_exception.SSLError as e:
self.close()
return False, "SSL error: %s" % six.text_type(e)
except Exception as e:
self.close()
return False, "Unexcepted exception: %s" % six.text_type(e)
class RESTScheduler(object):
"""This class is multithread friendly.
it isn't optimally (token handling) but good enough for low-mid traffic.
"""
def __init__(self, host, user, passw, connector_obj, verify=False):
if not host:
raise APIException('The host parameter must not be empty.')
# the api incorrectly transforms an empty password to a missing
# password paramter, so we have to catch it here
if not user or not passw:
raise APIAuthenticationException(
_('The username and the password parameters must '
'not be empty.'))
self.token = ''
self.host = host
self.port = '8452'
self.user = user
self.passw = passw
self.connector = connector_obj or DefaultRESTConnector(verify)
self.connect()
def connect(self):
# one retry when connecting, 60s should be enough to get the token,
# usually it is within 30s.
try:
response = self.send(
'POST', '/tokens',
{'username': self.user, 'password': self.passw},
timeout=60)
except Exception:
eventlet.sleep(2)
response = self.send(
'POST', '/tokens',
{'username': self.user, 'password': self.passw},
timeout=60)
self.token = response['token']['token']
def close(self):
self.connector.close()
# usually NI responses within 15min.
def send(self, method, endpoint, data=None, badStatusException=True,
params=None, fields=None, timeout=900):
# verify the method
if method not in ('GET', 'POST', 'PUT', 'DELETE'):
msg = _("Invalid HTTP method: %s") % method
raise APIException(data=msg)
# prepare the url
url = "https://%s:%s/api/v1%s" % (self.host, self.port, endpoint)
if fields:
params = params or {}
params['data_fields'] = ','.join(fields)
if params:
url += (('&' if '?' in url else '?') +
urllib.parse.urlencode(params))
# prepare the data
data = json.dumps({'request': {'params': data}}) if data else None
# make a REST request to DS8K and get one retry if logged out
for attempts in range(2):
headers = {'Content-Type': 'application/json',
'X-Auth-Token': self.token}
code, body = self.connector.send(method, url, headers,
data, timeout)
# parse the returned code
if code == 200:
try:
response = json.loads(body)
except ValueError:
response = {'server': {
'status': 'failed',
'message': 'Unable to parse server response into json.'
}}
elif code == 408:
response = {'server': {'status': 'timeout', 'message': body}}
elif code is not False:
try:
response = json.loads(body)
# make sure has useful message
response['server']['message']
except Exception:
response = {'server': {
'status': 'failed',
'message': 'HTTP %s: %s' % (code, body)
}}
else:
response = {'server': {'status': 'failed', 'message': body}}
# handle the response
if (response['server'].get('code') in TOKEN_ERROR_CODES and
attempts == 0):
self.connect()
elif response['server'].get('code') in AUTHENTICATION_ERROR_CODES:
msg = (_('Authentication failed for host %(host)s. '
'Exception= %(e)s') %
{'host': self.host, 'e': response['server']['message']})
raise APIAuthenticationException(data=msg)
elif response['server'].get('code') in LSS_ERROR_CODES:
msg = (_('Can not put the volume in LSS: %s') %
response['server']['message'])
raise LssFullException(data=msg)
elif response['server']['status'] == 'timeout':
msg = (_('Request to storage API time out: %s') %
response['server']['message'])
raise TimeoutException(data=msg)
elif (response['server']['status'] != 'ok' and
(badStatusException or 'code' not in response['server'])):
# if code is not in response means that error was in
# transport so we raise exception even if asked not to
# via badStatusException=False, but will retry it to
# confirm the problem.
if attempts == 1:
msg = (_("Request to storage API failed: %(err)s, "
"(%(url)s).") %
{'err': response['server']['message'], 'url': url})
raise APIException(data=msg)
eventlet.sleep(2)
else:
return response
# same as the send method above but returns first item from
# response data, must receive only one item.
def fetchall(self, *args, **kwargs):
r = self.send(*args, **kwargs)['data']
if len(r) != 1:
msg = _('Expected one result but got %d.') % len(r)
raise APIException(data=msg)
else:
return r.popitem()[1]
# the api for some reason returns a list when you request details
# of a specific item.
def fetchone(self, *args, **kwargs):
r = self.fetchall(*args, **kwargs)
if len(r) != 1:
msg = _('Expected one item in result but got %d.') % len(r)
raise APIException(data=msg)
return r[0]
# same as the send method above but returns the last element of the
# link property in the response.
def fetchid(self, *args, **kwargs):
r = self.send(*args, **kwargs)
if 'responses' in r:
if len(r['responses']) != 1:
msg = (_('Expected one item in result responses but '
'got %d.') % len(r['responses']))
raise APIException(data=msg)
r = r['responses'][0]
return r['link']['href'].split('/')[-1]
# the api unfortunately has no way to differentiate between api error
# and error in DS8K resources. this method returns True if "ok", False
# if "failed", exception otherwise.
def statusok(self, *args, **kwargs):
return self.send(*args, badStatusException=False,
**kwargs)['server']['status'] == 'ok'

View File

@ -32,7 +32,7 @@ from cinder.zonemanager import utils as fczm_utils
driver_opts = [
cfg.StrOpt(
'proxy',
default='storage.proxy.IBMStorageProxy',
default='cinder.volume.drivers.ibm.ibm_storage.proxy.IBMStorageProxy',
help='Proxy driver that connects to the IBM Storage Array'),
cfg.StrOpt(
'connection_type',
@ -69,7 +69,7 @@ class IBMStorageDriver(san.SanDriver,
systems.
"""
VERSION = "1.8.0"
VERSION = "2.0.0"
# ThirdPartySystems wiki page
CI_WIKI_NAME = "IBM_XIV-DS8K_CI"
@ -183,36 +183,8 @@ class IBMStorageDriver(san.SanDriver,
return self.proxy.migrate_volume(context, volume, host)
def manage_existing(self, volume, existing_ref):
"""Brings an existing backend storage object under Cinder management.
"""Brings an existing backend storage object to Cinder management."""
existing_ref is passed straight through from the API request's
manage_existing_ref value, and it is up to the driver how this should
be interpreted. It should be sufficient to identify a storage object
that the driver should somehow associate with the newly-created cinder
volume structure.
In the case of XIV family and FlashSystem A9000 family, the
existing_ref consists of a single field named 'existing_ref'
representing the name of the volume on the storage.
There are two ways to do this:
1. Rename the backend storage object so that it matches the,
volume['name'] which is how drivers traditionally map between a
cinder volume and the associated backend storage object.
2. Place some metadata on the volume, or somewhere in the backend, that
allows other driver requests (e.g. delete, clone, attach, detach...)
to locate the backend storage object when required.
If the existing_ref doesn't make sense, or doesn't refer to an existing
backend storage object, raise a ManageExistingInvalidReference
exception.
The volume may have a volume_type, and the driver can inspect that and
compare against the properties of the referenced backend storage
object. If they are incompatible, raise a
ManageExistingVolumeTypeMismatch, specifying a reason for the failure.
"""
return self.proxy.manage_volume(volume, existing_ref)
def manage_existing_get_size(self, volume, existing_ref):

View File

@ -0,0 +1,409 @@
# Copyright (c) 2016 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import functools
import gettext
import inspect
import platform
from oslo_log import log as logging
from oslo_utils import timeutils
from cinder.i18n import _, _LE, _LI
from cinder import version
import cinder.volume.drivers.ibm.ibm_storage as storage
from cinder.volume.drivers.ibm.ibm_storage import strings
LOG = logging.getLogger(__name__)
gettext.install('cinder')
def get_total_seconds(td):
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 1e6) / 1e6
def logger(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
frm = inspect.stack()[1]
log = getattr(inspect.getmodule(frm[0]), 'LOG')
log.debug("Enter %s()", func.__name__)
log.debug("Args: %(args)s %(kwargs)s",
{'args': args, 'kwargs': kwargs})
result = func(*args, **kwargs)
log.debug("Exit %s()", func.__name__)
log.debug("Return: %s", result)
return result
return wrapper
def _trace_time(fnc):
@functools.wraps(fnc)
def wrapper(self, *args, **kwargs):
method = fnc.__name__
start = timeutils.utcnow()
LOG.debug("Entered '%(method)s' at %(when)s.",
{'method': method, 'when': start})
result = fnc(self, *args, **kwargs)
current = timeutils.utcnow()
delta = current - start
LOG.debug(
"Exited '%(method)s' at %(when)s, after %(seconds)f seconds.",
{'method': method, 'when': start,
'seconds': get_total_seconds(delta)})
return result
return wrapper
class IBMStorageProxy(object):
"""Base class for connecting to storage.
Abstract Proxy between the XIV/DS8K Cinder Volume and Spectrum Accelerate
Storage (e.g. XIV, Spectruam Accelerate, A9000, A9000R)
"""
prefix = storage.XIV_LOG_PREFIX
def __init__(self, storage_info, logger, exception,
driver=None, active_backend_id=None):
"""Initialize Proxy."""
self.storage_info = storage_info
self.meta = dict()
self.logger = logger
self.meta['exception'] = exception
self.meta['openstack_version'] = "cinder-%s" % version.version_string()
self.meta['stat'] = None
self.driver = driver
if driver is not None:
self.full_version = "%(title)s (v%(version)s)" % {
'title': strings.TITLE,
'version': driver.VERSION}
else:
self.full_version = strings.TITLE
self.active_backend_id = active_backend_id
self.targets = {}
self._read_replication_devices()
self.meta['bypass_connection_check'] = (
self._get_safely_from_configuration(
storage.FLAG_KEYS['bypass_connection_check'], False))
@_trace_time
def setup(self, context):
"""Driver setup."""
pass
@_trace_time
def create_volume(self, volume):
"""Creates a volume."""
pass
@_trace_time
def ensure_export(self, context, volume):
ctxt = context.as_dict() if hasattr(context, 'as_dict') else "Empty"
LOG.debug("ensure_export: %(volume)s context : %(ctxt)s",
{'volume': volume['name'], 'ctxt': ctxt})
return 1
@_trace_time
def create_export(self, context, volume):
ctxt = context.as_dict() if hasattr(context, 'as_dict') else "Empty"
LOG.debug("create_export: %(volume)s context : %(ctxt)s",
{'volume': volume['name'], 'ctxt': ctxt})
return {}
@_trace_time
def delete_volume(self, volume):
"""Deletes a volume on the IBM Storage machine."""
pass
@_trace_time
def remove_export(self, context, volume):
"""Remove export.
Disconnect a volume from an attached instance
"""
ctxt = context.as_dict() if hasattr(context, 'as_dict') else "Empty"
LOG.debug("remove_export: %(volume)s context : %(ctxt)s",
{'volume': volume['name'], 'ctxt': ctxt})
@_trace_time
def initialize_connection(self, volume, connector):
"""Initialize connection.
Maps the created volume to the cinder volume node,
and returns the iSCSI/FC targets to be used in the instance
"""
pass
@_trace_time
def terminate_connection(self, volume, connector):
"""Terminate connection."""
pass
@_trace_time
def create_volume_from_snapshot(self, volume, snapshot):
"""create volume from snapshot."""
pass
@_trace_time
def create_snapshot(self, snapshot):
"""create snapshot"""
pass
@_trace_time
def delete_snapshot(self, snapshot):
"""delete snapshot."""
pass
@_trace_time
def get_volume_stats(self, refresh=False):
"""get volume stats."""
if self.meta['stat'] is None or refresh:
self._update_stats()
return self.meta['stat']
@_trace_time
def _update_stats(self):
"""fetch and update stats."""
pass
@_trace_time
def check_for_export(self, context, volume_id):
pass
@_trace_time
def copy_volume_to_image(self, context, volume, image_service, image_id):
"""Copy volume to image.
Handled by ISCSiDriver
"""
LOG.info(_LI("The copy_volume_to_image feature is not implemented."))
raise NotImplementedError()
@_trace_time
def create_cloned_volume(self, volume, src_vref):
"""Create cloned volume."""
pass
@_trace_time
def volume_exists(self, volume):
"""Checks if a volume exists on xiv."""
pass
@_trace_time
def validate_connection(self):
"""Validates ibm_storage connection info."""
pass
@_trace_time
def retype(self, ctxt, volume, new_type, diff, host):
"""Convert the volume to be of the new type."""
pass
@_trace_time
def _get_bunch_from_host(
self, connector, host_id=0, host_name=None, chap=None):
"""Get's a Bunch describing a host"""
if not host_name:
LOG.debug("Connector %(conn)s", {'conn': connector})
current_host_name = host_name or storage.get_host_or_create_from_iqn(
connector)
initiator = connector.get('initiator', None)
wwpns = connector.get("wwpns", [])
if len(wwpns) == 0 and "wwnns" in connector:
wwpns = connector.get("wwns", [])
return {'name': current_host_name,
'initiator': initiator,
'id': host_id,
'wwpns': wwpns,
'chap': chap}
@_trace_time
def _get_os_type(self):
"""Gets a string representation of the current os"""
dist = platform.dist()
return "%s-%s-%s" % (dist[0], dist[1], platform.processor())
def _log(self, level, message, **kwargs):
"""Wrapper around the logger"""
to_log = _(self.prefix + message) # NOQA
if len(kwargs) > 0:
to_log = to_log % kwargs
getattr(self.logger, level)(to_log)
def _get_exception(self):
"""Get's Cinder exception"""
return self.meta['exception'].CinderException
def _get_code_and_status_or_message(self, exception):
"""Returns status message
returns a string made out of code and status if present, else message
"""
if (getattr(exception, "code", None) is not None and
getattr(exception, "status", None) is not None):
return "Status: '%s', Code: %s" % (
exception.status, exception.code)
try:
msg = exception.message
except AttributeError:
msg = exception
return msg
def _get_driver_super(self):
"""Gets the IBM Storage Drivers super class
returns driver super class
"""
return super(self.driver.__class__, self.driver)
def _get_connection_type(self):
"""Get Connection Type(iscsi|fibre_channel)
:returns: iscsi|fibre_channel
"""
return self._get_safely_from_configuration(
storage.CONF_KEYS['connection_type'],
default=storage.XIV_CONNECTION_TYPE_ISCSI)
def _is_iscsi(self):
"""Checks if connection type is iscsi"""
connection_type = self._get_connection_type()
return connection_type == storage.XIV_CONNECTION_TYPE_ISCSI
def _get_management_ips(self):
"""Gets the management IP addresses from conf"""
return self._get_safely_from_configuration(
storage.CONF_KEYS['management_ips'],
default='')
def _get_chap_type(self):
"""Get CHAP Type(disabled|enabled)
:returns: disabled|enabled
"""
LOG.debug("_get_chap_type chap: %(chap)s",
{'chap': storage.CONF_KEYS['chap']})
return self._get_safely_from_configuration(
storage.CONF_KEYS['chap'],
default=storage.CHAP_NONE)
def _get_safely_from_configuration(self, key, default=None):
"""Get value of key from configuration
Get's a key from the backend configuration if available.
If not available returns default value
"""
if not self.driver:
LOG.debug("self.driver is missing")
return default
config_value = self.driver.configuration.safe_get(key)
if not config_value:
LOG.debug("missing key %(key)s ", {'key': key})
return default
return config_value
# Backend_id values:
# - The primary backend_id is marked 'default'
# - The secondary backend_ids are the values of the targets.
# - In most cases the given value is one of the above, but in some cases
# it can be None. For example in failover_host, the value None means
# that the function should select a target by itself (consider multiple
# targets)
def _get_primary_backend_id(self):
return strings.PRIMARY_BACKEND_ID
def _get_secondary_backend_id(self):
return self._get_target()
def _get_active_backend_id(self):
if self.active_backend_id == strings.PRIMARY_BACKEND_ID:
return self._get_primary_backend_id()
else:
return self._get_secondary_backend_id()
def _get_inactive_backend_id(self):
if self.active_backend_id != strings.PRIMARY_BACKEND_ID:
return self._get_primary_backend_id()
else:
return self._get_secondary_backend_id()
def _get_target_params(self, target):
if not self.targets:
LOG.debug("No targets available")
return None
try:
params = self.targets[target]
return params
except Exception:
LOG.debug("No target called '%(target)s'", {'target': target})
return None
def _get_target(self):
"""returns an arbitrary target if available"""
if not self.targets:
return None
try:
target = self.targets.iterkeys().next()
return target
except Exception:
return None
def _get_targets(self):
return self.targets
def _is_replication_supported(self):
if self.targets:
return True
return False
@_trace_time
def _read_replication_devices(self):
"""Read replication devices from configuration
Several replication devices are permitted.
If an entry already exists an error is assumed.
The format is:
replication_device = backend_id:vendor-id-1,unique_key:val....
"""
if not self.driver:
return
replication_devices = self._get_safely_from_configuration(
'replication_device', default={})
if not replication_devices:
LOG.debug('No replication devices were found')
for dev in replication_devices:
LOG.debug('Replication device found: %(dev)s', {'dev': dev})
backend_id = dev.get('backend_id', None)
if backend_id is None:
LOG.error(_LE("Replication is missing backend_id: %(dev)s"),
{'dev': dev})
elif self.targets.get(backend_id, None):
LOG.error(_LE("Multiple entries for replication %(dev)s"),
{'dev': dev})
else:
self.targets[backend_id] = {}
device = self.targets[backend_id]
for k, v in dev.iteritems():
if k != 'backend_id':
device[k] = v

View File

@ -0,0 +1,44 @@
# Copyright (c) 2016 IBM Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# General
TITLE = "IBM Storage"
DEFAULT = "Default"
# PROMPTS
CERTIFICATES_PATH = "/opt/ibm/ds8k_certs/"
# DEFAULT INSTALLED VALUES
XIV_BACKEND_PREFIX = "IBM-XIV"
DS8K_BACKEND_PREFIX = "IBM-DS8K"
# Replication Status Strings
REPLICATION_STATUS_DISABLED = 'disabled' # no replication
REPLICATION_STATUS_ERROR = 'error' # replication in error state
# replication copying data to secondary (inconsistent)
REPLICATION_STATUS_COPYING = 'copying'
# replication copying data to secondary (consistent)
REPLICATION_STATUS_ACTIVE = 'active'
# replication copying data to secondary (consistent)
REPLICATION_STATUS_ACTIVE_STOPPED = 'active-stopped'
# replication copying data to secondary (consistent)
REPLICATION_STATUS_INACTIVE = 'inactive'
# Replication Failback String
PRIMARY_BACKEND_ID = 'default'
# Volume Extra Metadata Default Value
METADATA_IS_TRUE = '<IS> TRUE'

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,14 @@
---
features:
- |
The IBM_Storage driver has been open sourced. This means that there is no
more need to download the package from the IBM site. The only requirement
remaining is to install pyxcli, which is available through pypi:
'sudo pip install pyxcli'
upgrade:
- |
Previous installations of IBM Storage must be un-installed first and the
new driver should be installed on top. In addition the cinder.conf values
should be updated to reflect the new paths. For example the proxy setting
of 'storage.proxy.IBMStorageProxy' should be updated to
'cinder.volume.drivers.ibm.ibm_storage.proxy.IBMStorageProxy'.