diff --git a/cinder/backup/chunkeddriver.py b/cinder/backup/chunkeddriver.py index 527a527d6f2..1752f12e663 100644 --- a/cinder/backup/chunkeddriver.py +++ b/cinder/backup/chunkeddriver.py @@ -76,12 +76,18 @@ class ChunkedBackupDriver(driver.BackupDriver): try: if algorithm.lower() in ('none', 'off', 'no'): return None - elif algorithm.lower() in ('zlib', 'gzip'): + if algorithm.lower() in ('zlib', 'gzip'): import zlib as compressor - return compressor + result = compressor elif algorithm.lower() in ('bz2', 'bzip2'): import bz2 as compressor - return compressor + result = compressor + else: + result = None + if result: + # NOTE(geguileo): Compression/Decompression starves + # greenthreads so we use a native thread instead. + return eventlet.tpool.Proxy(result) except ImportError: pass @@ -105,6 +111,16 @@ class ChunkedBackupDriver(driver.BackupDriver): self._get_compressor(CONF.backup_compression_algorithm) self.support_force_delete = True + def _get_object_writer(self, container, object_name, extra_metadata=None): + """Return writer proxy-wrapped to execute methods in native thread.""" + writer = self.get_object_writer(container, object_name, extra_metadata) + return eventlet.tpool.Proxy(writer) + + def _get_object_reader(self, container, object_name, extra_metadata=None): + """Return reader proxy-wrapped to execute methods in native thread.""" + reader = self.get_object_reader(container, object_name, extra_metadata) + return eventlet.tpool.Proxy(reader) + # To create your own "chunked" backup driver, implement the following # abstract methods. @@ -222,7 +238,7 @@ class ChunkedBackupDriver(driver.BackupDriver): metadata_json = json.dumps(metadata, sort_keys=True, indent=2) if six.PY3: metadata_json = metadata_json.encode('utf-8') - with self.get_object_writer(container, filename) as writer: + with self._get_object_writer(container, filename) as writer: writer.write(metadata_json) LOG.debug('_write_metadata finished. Metadata: %s.', metadata_json) @@ -243,7 +259,7 @@ class ChunkedBackupDriver(driver.BackupDriver): sha256file_json = json.dumps(sha256file, sort_keys=True, indent=2) if six.PY3: sha256file_json = sha256file_json.encode('utf-8') - with self.get_object_writer(container, filename) as writer: + with self._get_object_writer(container, filename) as writer: writer.write(sha256file_json) LOG.debug('_write_sha256file finished.') @@ -253,7 +269,7 @@ class ChunkedBackupDriver(driver.BackupDriver): LOG.debug('_read_metadata started, container name: %(container)s, ' 'metadata filename: %(filename)s.', {'container': container, 'filename': filename}) - with self.get_object_reader(container, filename) as reader: + with self._get_object_reader(container, filename) as reader: metadata_json = reader.read() if six.PY3: metadata_json = metadata_json.decode('utf-8') @@ -267,7 +283,7 @@ class ChunkedBackupDriver(driver.BackupDriver): LOG.debug('_read_sha256file started, container name: %(container)s, ' 'sha256 filename: %(filename)s.', {'container': container, 'filename': filename}) - with self.get_object_reader(container, filename) as reader: + with self._get_object_reader(container, filename) as reader: sha256file_json = reader.read() if six.PY3: sha256file_json = sha256file_json.decode('utf-8') @@ -327,7 +343,7 @@ class ChunkedBackupDriver(driver.BackupDriver): algorithm, output_data = self._prepare_output_data(data) obj[object_name]['compression'] = algorithm LOG.debug('About to put_object') - with self.get_object_writer( + with self._get_object_writer( container, object_name, extra_metadata=extra_metadata ) as writer: writer.write(output_data) @@ -349,8 +365,7 @@ class ChunkedBackupDriver(driver.BackupDriver): data_size_bytes = len(data) # Execute compression in native thread so it doesn't prevent # cooperative greenthread switching. - compressed_data = eventlet.tpool.execute(self.compressor.compress, - data) + compressed_data = self.compressor.compress(data) comp_size_bytes = len(compressed_data) algorithm = CONF.backup_compression_algorithm.lower() if comp_size_bytes >= data_size_bytes: @@ -618,7 +633,7 @@ class ChunkedBackupDriver(driver.BackupDriver): 'volume_id': volume_id, }) - with self.get_object_reader( + with self._get_object_reader( container, object_name, extra_metadata=extra_metadata) as reader: body = reader.read() diff --git a/cinder/backup/driver.py b/cinder/backup/driver.py index 10598300181..b085cdbe7d6 100644 --- a/cinder/backup/driver.py +++ b/cinder/backup/driver.py @@ -374,12 +374,24 @@ class BackupDriver(base.Base): @abc.abstractmethod def backup(self, backup, volume_file, backup_metadata=False): - """Start a backup of a specified volume.""" + """Start a backup of a specified volume. + + Some I/O operations may block greenthreads, so in order to prevent + starvation parameter volume_file will be a proxy that will execute all + methods in native threads, so the method implementation doesn't need to + worry about that.. + """ return @abc.abstractmethod def restore(self, backup, volume_id, volume_file): - """Restore a saved backup.""" + """Restore a saved backup. + + Some I/O operations may block greenthreads, so in order to prevent + starvation parameter volume_file will be a proxy that will execute all + methods in native threads, so the method implementation doesn't need to + worry about that.. + """ return @abc.abstractmethod diff --git a/cinder/backup/manager.py b/cinder/backup/manager.py index 10231db8d89..0cb3900b65b 100644 --- a/cinder/backup/manager.py +++ b/cinder/backup/manager.py @@ -32,6 +32,8 @@ Volume backups can be created, restored, deleted and listed. """ import os + +from eventlet import tpool from oslo_config import cfg from oslo_log import log as logging from oslo_log import versionutils @@ -82,6 +84,12 @@ CONF.import_opt('num_volume_device_scan_tries', 'cinder.volume.driver') QUOTAS = quota.QUOTAS +# TODO(geguileo): Once Eventlet issue #432 gets fixed we can just tpool.execute +# the whole call to the driver's backup and restore methods instead of proxy +# wrapping the device_file and having the drivers also proxy wrap their +# writes/reads and the compression/decompression calls. +# (https://github.com/eventlet/eventlet/issues/432) + class BackupManager(manager.ThreadPoolManager): """Manages backup of block storage devices.""" @@ -407,6 +415,10 @@ class BackupManager(manager.ThreadPoolManager): backup_service = self.get_backup_driver(context) properties = utils.brick_get_connector_properties() + + # NOTE(geguileo): Not all I/O disk operations properly do greenthread + # context switching and may end up blocking the greenthread, so we go + # with native threads proxy-wrapping the device file object. try: backup_device = self.volume_rpcapi.get_backup_device(context, backup, @@ -422,16 +434,16 @@ class BackupManager(manager.ThreadPoolManager): if backup_device.secure_enabled: with open(device_path) as device_file: updates = backup_service.backup( - backup, device_file) + backup, tpool.Proxy(device_file)) else: with utils.temporary_chown(device_path): with open(device_path) as device_file: updates = backup_service.backup( - backup, device_file) + backup, tpool.Proxy(device_file)) # device_path is already file-like so no need to open it else: - updates = backup_service.backup( - backup, device_path) + updates = backup_service.backup(backup, + tpool.Proxy(device_path)) finally: self._detach_device(context, attach_info, @@ -533,21 +545,27 @@ class BackupManager(manager.ThreadPoolManager): self.volume_rpcapi.secure_file_operations_enabled(context, volume)) attach_info = self._attach_device(context, volume, properties) + + # NOTE(geguileo): Not all I/O disk operations properly do greenthread + # context switching and may end up blocking the greenthread, so we go + # with native threads proxy-wrapping the device file object. try: device_path = attach_info['device']['path'] if (isinstance(device_path, six.string_types) and not os.path.isdir(device_path)): if secure_enabled: with open(device_path, 'wb') as device_file: - backup_service.restore(backup, volume.id, device_file) + backup_service.restore(backup, volume.id, + tpool.Proxy(device_file)) else: with utils.temporary_chown(device_path): with open(device_path, 'wb') as device_file: backup_service.restore(backup, volume.id, - device_file) + tpool.Proxy(device_file)) # device_path is already file-like so no need to open it else: - backup_service.restore(backup, volume.id, device_path) + backup_service.restore(backup, volume.id, + tpool.Proxy(device_path)) finally: self._detach_device(context, attach_info, volume, properties, force=True) diff --git a/cinder/tests/unit/backup/drivers/test_backup_google.py b/cinder/tests/unit/backup/drivers/test_backup_google.py index b09e74dc40b..9e2eeddb1a8 100644 --- a/cinder/tests/unit/backup/drivers/test_backup_google.py +++ b/cinder/tests/unit/backup/drivers/test_backup_google.py @@ -28,6 +28,7 @@ import tempfile import threading import zlib +from eventlet import tpool import mock from oslo_utils import units @@ -551,8 +552,10 @@ class GoogleBackupDriverTestCase(test.TestCase): self.assertIsNone(compressor) compressor = service._get_compressor('zlib') self.assertEqual(zlib, compressor) + self.assertIsInstance(compressor, tpool.Proxy) compressor = service._get_compressor('bz2') self.assertEqual(bz2, compressor) + self.assertIsInstance(compressor, tpool.Proxy) self.assertRaises(ValueError, service._get_compressor, 'fake') @gcs_client @@ -562,17 +565,17 @@ class GoogleBackupDriverTestCase(test.TestCase): thread_dict = {} original_compress = zlib.compress - def my_compress(data, *args, **kwargs): + def my_compress(data): thread_dict['compress'] = threading.current_thread() return original_compress(data) + self.mock_object(zlib, 'compress', side_effect=my_compress) + service = google_dr.GoogleBackupDriver(self.ctxt) # Set up buffer of 128 zeroed bytes fake_data = b'\0' * 128 - with mock.patch.object(service.compressor, 'compress', - side_effect=my_compress): - result = service._prepare_output_data(fake_data) + result = service._prepare_output_data(fake_data) self.assertEqual('zlib', result[0]) self.assertGreater(len(fake_data), len(result[1])) diff --git a/cinder/tests/unit/backup/drivers/test_backup_nfs.py b/cinder/tests/unit/backup/drivers/test_backup_nfs.py index 1b1ab6a69d2..4b29a47edd2 100644 --- a/cinder/tests/unit/backup/drivers/test_backup_nfs.py +++ b/cinder/tests/unit/backup/drivers/test_backup_nfs.py @@ -25,6 +25,7 @@ import tempfile import threading import zlib +from eventlet import tpool import mock from os_brick.remotefs import remotefs as remotefs_brick from oslo_config import cfg @@ -149,6 +150,10 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): self.volume_file.write(bytes([65] * data_size)) self.volume_file.seek(0) + def _store_thread(self, *args, **kwargs): + self.thread_dict['thread'] = threading.current_thread() + return self.thread_original_method(*args, **kwargs) + def setUp(self): super(BackupNFSSwiftBasedTestCase, self).setUp() @@ -173,6 +178,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): self.volume_file.write(os.urandom(1024)) self.size_volume_file += 1024 + # Use dictionary to share data between threads + self.thread_dict = {} + def test_backup_uncompressed(self): volume_id = fake.VOLUME_ID self._create_backup_db_entry(volume_id=volume_id) @@ -573,7 +581,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): restored_file.name)) def test_restore_bz2(self): + self.thread_original_method = bz2.decompress volume_id = fake.VOLUME_ID + self.mock_object(bz2, 'decompress', side_effect=self._store_thread) self._create_backup_db_entry(volume_id=volume_id) self.flags(backup_compression_algorithm='bz2') @@ -591,7 +601,12 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): self.assertTrue(filecmp.cmp(self.volume_file.name, restored_file.name)) + self.assertNotEqual(threading.current_thread(), + self.thread_dict['thread']) + def test_restore_zlib(self): + self.thread_original_method = zlib.decompress + self.mock_object(zlib, 'decompress', side_effect=self._store_thread) volume_id = fake.VOLUME_ID self._create_backup_db_entry(volume_id=volume_id) @@ -610,6 +625,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): self.assertTrue(filecmp.cmp(self.volume_file.name, restored_file.name)) + self.assertNotEqual(threading.current_thread(), + self.thread_dict['thread']) + def test_restore_delta(self): volume_id = fake.VOLUME_ID @@ -672,8 +690,10 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): self.assertIsNone(compressor) compressor = service._get_compressor('zlib') self.assertEqual(compressor, zlib) + self.assertIsInstance(compressor, tpool.Proxy) compressor = service._get_compressor('bz2') self.assertEqual(compressor, bz2) + self.assertIsInstance(compressor, tpool.Proxy) self.assertRaises(ValueError, service._get_compressor, 'fake') def create_buffer(self, size): @@ -688,24 +708,18 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): def test_prepare_output_data_effective_compression(self): """Test compression works on a native thread.""" - # Use dictionary to share data between threads - thread_dict = {} - original_compress = zlib.compress - - def my_compress(data, *args, **kwargs): - thread_dict['compress'] = threading.current_thread() - return original_compress(data) + self.thread_original_method = zlib.compress + self.mock_object(zlib, 'compress', side_effect=self._store_thread) service = nfs.NFSBackupDriver(self.ctxt) fake_data = self.create_buffer(128) - with mock.patch.object(service.compressor, 'compress', - side_effect=my_compress): - result = service._prepare_output_data(fake_data) + + result = service._prepare_output_data(fake_data) self.assertEqual('zlib', result[0]) self.assertGreater(len(fake_data), len(result[1])) self.assertNotEqual(threading.current_thread(), - thread_dict['compress']) + self.thread_dict['thread']) def test_prepare_output_data_no_compresssion(self): self.flags(backup_compression_algorithm='none') diff --git a/cinder/tests/unit/backup/drivers/test_backup_swift.py b/cinder/tests/unit/backup/drivers/test_backup_swift.py index 5ebee45d0b6..d7faf0a4b04 100644 --- a/cinder/tests/unit/backup/drivers/test_backup_swift.py +++ b/cinder/tests/unit/backup/drivers/test_backup_swift.py @@ -27,6 +27,7 @@ import tempfile import threading import zlib +from eventlet import tpool import mock from oslo_config import cfg from swiftclient import client as swift @@ -821,8 +822,10 @@ class BackupSwiftTestCase(test.TestCase): self.assertIsNone(compressor) compressor = service._get_compressor('zlib') self.assertEqual(zlib, compressor) + self.assertIsInstance(compressor, tpool.Proxy) compressor = service._get_compressor('bz2') self.assertEqual(bz2, compressor) + self.assertIsInstance(compressor, tpool.Proxy) self.assertRaises(ValueError, service._get_compressor, 'fake') def test_prepare_output_data_effective_compression(self): @@ -831,17 +834,17 @@ class BackupSwiftTestCase(test.TestCase): thread_dict = {} original_compress = zlib.compress - def my_compress(data, *args, **kwargs): + def my_compress(data): thread_dict['compress'] = threading.current_thread() return original_compress(data) + self.mock_object(zlib, 'compress', side_effect=my_compress) + service = swift_dr.SwiftBackupDriver(self.ctxt) # Set up buffer of 128 zeroed bytes fake_data = b'\0' * 128 - with mock.patch.object(service.compressor, 'compress', - side_effect=my_compress): - result = service._prepare_output_data(fake_data) + result = service._prepare_output_data(fake_data) self.assertEqual('zlib', result[0]) self.assertGreater(len(fake_data), len(result[1]))