diff --git a/swift/common/middleware/bulk.py b/swift/common/middleware/bulk.py index eb6465f16e..cf26c7ef5a 100644 --- a/swift/common/middleware/bulk.py +++ b/swift/common/middleware/bulk.py @@ -194,9 +194,8 @@ swift.source set and the content length will reflect the size of the payload sent to the proxy (the list of objects/containers to be deleted). """ -import json +from swift.common.request_helpers import get_heartbeat_response_body import tarfile -from xml.sax.saxutils import escape # nosec B406 from time import time from eventlet import sleep import zlib @@ -224,50 +223,6 @@ ACCEPTABLE_FORMATS = ['text/plain', 'application/json', 'application/xml', 'text/xml'] -def get_response_body(data_format, data_dict, error_list, root_tag): - """ - Returns a properly formatted response body according to format. - - Handles json and xml, otherwise will return text/plain. - Note: xml response does not include xml declaration. - - :params data_format: resulting format - :params data_dict: generated data about results. - :params error_list: list of quoted filenames that failed - :params root_tag: the tag name to use for root elements when returning XML; - e.g. 'extract' or 'delete' - """ - if data_format == 'application/json': - data_dict['Errors'] = error_list - return json.dumps(data_dict).encode('ascii') - if data_format and data_format.endswith('/xml'): - output = ['<', root_tag, '>\n'] - for key in sorted(data_dict): - xml_key = key.replace(' ', '_').lower() - output.extend([ - '<', xml_key, '>', - escape(str(data_dict[key])), - '\n', - ]) - output.append('\n') - for name, status in error_list: - output.extend([ - '', escape(name), '', - escape(status), '\n', - ]) - output.extend(['\n\n']) - return ''.join(output).encode('utf-8') - - output = [] - for key in sorted(data_dict): - output.append('%s: %s\n' % (key, data_dict[key])) - output.append('Errors:\n') - output.extend( - '%s, %s\n' % (name, status) - for name, status in error_list) - return ''.join(output).encode('utf-8') - - def pax_key_to_swift_header(pax_key): if (pax_key == u"SCHILY.xattr.user.mime_type" or pax_key == u"LIBARCHIVE.xattr.user.mime_type"): @@ -506,8 +461,9 @@ class Bulk(object): self.logger.exception('Error in bulk delete.') resp_dict['Response Status'] = HTTPServerError().status - yield separator + get_response_body(out_content_type, - resp_dict, failed_files, 'delete') + yield separator + get_heartbeat_response_body(out_content_type, + resp_dict, failed_files, + 'delete') def handle_extract_iter(self, req, compress_type, out_content_type='text/plain'): @@ -671,7 +627,7 @@ class Bulk(object): self.logger.exception('Error in extract archive.') resp_dict['Response Status'] = HTTPServerError().status - yield separator + get_response_body( + yield separator + get_heartbeat_response_body( out_content_type, resp_dict, failed_files, 'extract') def _process_delete(self, resp, pile, obj_name, resp_dict, diff --git a/swift/common/middleware/copy.py b/swift/common/middleware/copy.py index 598653c69a..fed8f860fe 100644 --- a/swift/common/middleware/copy.py +++ b/swift/common/middleware/copy.py @@ -125,6 +125,8 @@ from swift.common.request_helpers import copy_header_subset, remove_items, \ is_sys_meta, is_sys_or_user_meta, is_object_transient_sysmeta, \ check_path_header, OBJECT_SYSMETA_CONTAINER_UPDATE_OVERRIDE_PREFIX from swift.common.wsgi import WSGIContext, make_subrequest +import eventlet +from swift.common.request_helpers import get_heartbeat_response_body def _check_copy_from_header(req): @@ -175,10 +177,11 @@ def _copy_headers(src, dest): class ServerSideCopyWebContext(WSGIContext): - def __init__(self, app, logger): + def __init__(self, app, logger, yield_frequency=10): super(ServerSideCopyWebContext, self).__init__(app) self.app = app self.logger = logger + self.yield_frequency = yield_frequency def get_source_resp(self, req): sub_req = make_subrequest( @@ -187,12 +190,73 @@ class ServerSideCopyWebContext(WSGIContext): return sub_req.get_response(self.app) def send_put_req(self, req, additional_resp_headers, start_response): - app_resp = self._app_call(req.environ) - self._adjust_put_response(req, additional_resp_headers) - start_response(self._response_status, - self._response_headers, - self._response_exc_info) - return app_resp + heartbeat = config_true_value(req.params.get('heartbeat')) + ACCEPTABLE_FORMATS = ['text/plain', 'application/json'] + + try: + out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS) + except ValueError: + out_content_type = 'text/plain' + if not out_content_type: + out_content_type = 'text/plain' + + if heartbeat: + gt = eventlet.spawn(self._app_call, + req.environ) + start_response('202 Accepted', + [('Content-Type', out_content_type)]) + + def resp_iter(): + # Send an initial heartbeat + yield b' ' + app_iter = [b''] + try: + while not gt.dead: + try: + with eventlet.Timeout(self.yield_frequency): + app_iter = gt.wait() + except eventlet.Timeout: + yield b' ' + except Exception as e: + # Send back the status to the client if error + self._response_status = '500 Internal Error' + app_iter = [str(e).encode('utf8')] + finally: + response_body = b''.join(app_iter).decode('utf8') + resp_dict = {'Response Status': self._response_status, + 'Response Body': response_body} + errors = [] + + if not is_success(self._get_status_int()): + src_path = additional_resp_headers['X-Copied-From'] + errors.append(( + wsgi_quote(src_path), + self._response_status, + )) + + for k, v in additional_resp_headers.items(): + if not k.lower().startswith(('x-object-sysmeta-', + 'x-backend')): + resp_dict[k] = v + + for k, v in self._response_headers: + if not k.lower().startswith(('x-object-sysmeta-', + 'x-backend')): + resp_dict[k] = v + yield get_heartbeat_response_body(out_content_type, + resp_dict, + errors, 'copy') + close_if_possible(gt) + + return resp_iter() + + else: + app_resp = self._app_call(req.environ) + self._adjust_put_response(req, additional_resp_headers) + start_response(self._response_status, + self._response_headers, + self._response_exc_info) + return app_resp def _adjust_put_response(self, req, additional_resp_headers): if is_success(self._get_status_int()): @@ -220,6 +284,7 @@ class ServerSideCopyMiddleware(object): def __init__(self, app, conf): self.app = app self.logger = get_logger(conf, log_route="copy") + self.yield_frequency = int(conf.get('yield_frequency', 10)) def __call__(self, env, start_response): req = Request(env) @@ -333,6 +398,15 @@ class ServerSideCopyMiddleware(object): 'body', request=req, content_type='text/plain')(req.environ, start_response) + # If heartbeat is enabled, set minimum_write_chunk_size directly + # in the original client request before making subrequests + if config_true_value(req.params.get('heartbeat')): + wsgi_input = req.environ.get('wsgi.input') + if hasattr(wsgi_input, 'environ'): + wsgi_input.environ['eventlet.minimum_write_chunk_size'] = 0 + # Not sure if we also need to set it in + # the current request's environ + req.environ['eventlet.minimum_write_chunk_size'] = 0 # Form the path of source object to be fetched ver, acct, _rest = req.split_path(2, 3, True) @@ -347,7 +421,8 @@ class ServerSideCopyMiddleware(object): src_container_name, src_obj_name) # GET the source object, bail out on error - ssc_ctx = ServerSideCopyWebContext(self.app, self.logger) + ssc_ctx = ServerSideCopyWebContext(self.app, self.logger, + self.yield_frequency) source_resp = self._get_source_object(ssc_ctx, source_path, req) if source_resp.status_int >= HTTP_MULTIPLE_CHOICES: return source_resp(source_resp.environ, start_response) @@ -434,6 +509,17 @@ class ServerSideCopyMiddleware(object): source_resp, sink_req) put_resp = ssc_ctx.send_put_req(sink_req, resp_headers, start_response) + + # For heartbeat=on, we need to cleanup the resp iter + if config_true_value(req.params.get('heartbeat')): + def clean_iter(app_iter): + try: + for chunk in app_iter: + yield chunk + finally: + close_if_possible(source_resp.app_iter) + return clean_iter(put_resp) + close_if_possible(source_resp.app_iter) return put_resp diff --git a/swift/common/middleware/slo.py b/swift/common/middleware/slo.py index 6c7389ac2c..46f4bde9cc 100644 --- a/swift/common/middleware/slo.py +++ b/swift/common/middleware/slo.py @@ -366,13 +366,12 @@ from swift.common.registry import register_swift_info from swift.common.request_helpers import SegmentedIterable, \ get_sys_meta_prefix, update_etag_is_at_header, resolve_etag_is_at_header, \ get_container_update_override_key, update_ignore_range_header, \ - get_param, get_valid_part_num + get_param, get_valid_part_num, get_heartbeat_response_body from swift.common.constraints import check_utf8 from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED from swift.common.wsgi import WSGIContext, make_subrequest, make_env, \ make_pre_authed_request -from swift.common.middleware.bulk import get_response_body, \ - ACCEPTABLE_FORMATS, Bulk +from swift.common.middleware.bulk import ACCEPTABLE_FORMATS, Bulk from swift.obj import expirer from swift.proxy.controllers.base import get_container_info @@ -1527,7 +1526,7 @@ class StaticLargeObject(object): start_response(err.status, [(h, v) for h, v in err.headers.items() if h.lower() != 'content-length']) - yield separator + get_response_body( + yield separator + get_heartbeat_response_body( out_content_type, resp_dict, problem_segments, 'upload') return @@ -1554,7 +1553,7 @@ class StaticLargeObject(object): err_body = err_body.decode('utf-8', errors='replace') resp_dict['Response Body'] = err_body or '\n'.join( RESPONSE_REASONS.get(err.status_int, [''])) - yield separator + get_response_body( + yield separator + get_heartbeat_response_body( out_content_type, resp_dict, problem_segments, 'upload') else: @@ -1600,7 +1599,7 @@ class StaticLargeObject(object): if isinstance(resp_body, bytes): resp_body = resp_body.decode('utf-8') resp_dict['Response Body'] = resp_body - yield separator + get_response_body( + yield separator + get_heartbeat_response_body( out_content_type, resp_dict, [], 'upload') else: for chunk in resp(req.environ, start_response): diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index 60a862aa6c..63e63ac69c 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -22,6 +22,8 @@ from swob in here without creating circular imports. import itertools import time +import json +from xml.sax.saxutils import escape # nosec B406 from swift.common.header_key_dict import HeaderKeyDict @@ -1005,3 +1007,47 @@ def append_log_info(environ, log_info): def get_log_info(environ): return ','.join(environ.get('swift.log_info', [])) + + +def get_heartbeat_response_body(data_format, data_dict, error_list, root_tag): + """ + Returns a response body for heartbeat according to format. + + Handles json and xml, otherwise will return text/plain. + Note: xml response does not include xml declaration. + + :params data_format: resulting format + :params data_dict: generated data about results. + :params error_list: list of quoted filenames that failed + :params root_tag: the tag name to use for root elements when returning XML; + e.g. 'extract' or 'delete' + """ + if data_format == 'application/json': + data_dict['Errors'] = error_list + return json.dumps(data_dict).encode('ascii') + if data_format and data_format.endswith('/xml'): + output = ['<', root_tag, '>\n'] + for key in sorted(data_dict): + xml_key = key.replace(' ', '_').lower() + output.extend([ + '<', xml_key, '>', + escape(str(data_dict[key])), + '\n', + ]) + output.append('\n') + for name, status in error_list: + output.extend([ + '', escape(name), '', + escape(status), '\n', + ]) + output.extend(['\n\n']) + return ''.join(output).encode('utf-8') + + output = [] + for key in sorted(data_dict): + output.append('%s: %s\n' % (key, data_dict[key])) + output.append('Errors:\n') + output.extend( + '%s, %s\n' % (name, status) + for name, status in error_list) + return ''.join(output).encode('utf-8') diff --git a/test/unit/common/middleware/test_bulk.py b/test/unit/common/middleware/test_bulk.py index cff78b4060..f8ae6e8ce6 100644 --- a/test/unit/common/middleware/test_bulk.py +++ b/test/unit/common/middleware/test_bulk.py @@ -645,11 +645,11 @@ class TestUntar(unittest.TestCase): ]) def test_get_response_body(self): - txt_body = bulk.get_response_body( + txt_body = bulk.get_heartbeat_response_body( 'bad_formay', {'hey': 'there'}, [['json > xml', '202 Accepted']], "doesn't matter for text") self.assertIn(b'hey: there', txt_body) - xml_body = bulk.get_response_body( + xml_body = bulk.get_heartbeat_response_body( 'text/xml', {'hey': 'there'}, [['json > xml', '202 Accepted']], 'root_tag') self.assertIn(b'>', xml_body) diff --git a/test/unit/common/middleware/test_copy.py b/test/unit/common/middleware/test_copy.py index f9d62d997e..37201f5a5f 100644 --- a/test/unit/common/middleware/test_copy.py +++ b/test/unit/common/middleware/test_copy.py @@ -17,12 +17,13 @@ from unittest import mock import unittest import urllib.parse +import eventlet from swift.common import swob from swift.common.middleware import copy from swift.common.storage_policy import POLICIES from swift.common.swob import Request, HTTPException -from swift.common.utils import closing_if_possible, md5 +from swift.common.utils import close_if_possible, closing_if_possible, md5 from test.debug_logger import debug_logger from test.unit import patch_policies, FakeRing from test.unit.common.middleware.helpers import FakeSwift @@ -1426,3 +1427,141 @@ class TestServerSideCopyMiddlewareWithEC(unittest.TestCase): self.assertEqual(resp.body, range_not_satisfiable_body) self.assertEqual(resp.etag, body_etag) self.assertEqual(resp.headers['Accept-Ranges'], 'bytes') + + +class TestServerSideCopyHeartbeat(unittest.TestCase): + def setUp(self): + self.app = FakeSwift() + self.ssc = copy.filter_factory({'yield_frequency': '1'})(self.app) + + def tearDown(self): + pass + + def call_app(self, req, app=None): + if app is None: + app = self.app + + self.authorized = [] + + def authorize(req): + self.authorized.append(req) + + if 'swift.authorize' not in req.environ: + req.environ['swift.authorize'] = authorize + + req.headers.setdefault("User-Agent", "Test User Agent") + + status = [None] + headers = [None] + + def start_response(s, h, ei=None): + status[0] = s + headers[0] = h + + body_iter = app(req.environ, start_response) + body = b'' + try: + for chunk in body_iter: + body += chunk + finally: + close_if_possible(body_iter) + + return status[0], headers[0], body + + def test_copy_with_heartbeat_success(self): + original_spawn = eventlet.spawn + self.app.register('GET', '/v1/a/c/o?heartbeat=true', swob.HTTPOk, + {'Content-Length': '10'}, b'X' * 10) + self.app.register('PUT', '/v1/a/c/o2?heartbeat=true', + swob.HTTPCreated, {}) + heartbeats = [] + + def mock_spawn(func, *args, **kwargs): + def delayed_func(*a, **kw): + eventlet.sleep(2.5) + return func(*a, **kw) + return original_spawn(delayed_func, *args, **kwargs) + req = swob.Request.blank( + '/v1/a/c/o2?heartbeat=true', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'Content-Length': '0', 'X-Copy-From': 'c/o'}) + + with mock.patch('eventlet.spawn', mock_spawn): + status = [None] + headers_list = [None] + + def start_response(s, h, ei=None): + status[0] = s + headers_list[0] = h + body_iter = self.ssc(req.environ, start_response) + self.assertEqual('202 Accepted', status[0]) + + try: + for chunk in body_iter: + heartbeats.append(chunk) + finally: + close_if_possible(body_iter) + self.assertTrue(len(heartbeats) >= 3, + f"Expected 3 heartbeats, got {len(heartbeats)}") + self.assertEqual(heartbeats[0], b' ') + + for i in range(1, len(heartbeats) - 1): + self.assertEqual(heartbeats[i], b' ') + self.assertIn(b'201 Created', heartbeats[-1]) + self.assertEqual(req.environ.get('eventlet.minimum_write_chunk_size'), + 0) + self.assertEqual(2, len(self.app.calls)) + self.assertEqual(('GET', '/v1/a/c/o?heartbeat=true'), + self.app.calls[0]) + self.assertEqual(('PUT', '/v1/a/c/o2?heartbeat=true'), + self.app.calls[1]) + + def test_copy_with_heartbeat_failure(self): + original_spawn = eventlet.spawn + self.app.register('GET', '/v1/a/c/o?heartbeat=true', swob.HTTPOk, + {'Content-Length': '10'}, b'X' * 10) + self.app.register('PUT', '/v1/a/c/o2?heartbeat=true', + swob.HTTPServiceUnavailable, {}) + heartbeats = [] + + def mock_spawn(func, *args, **kwargs): + + def delayed_func(*a, **kw): + eventlet.sleep(2.5) + return func(*a, **kw) + return original_spawn(delayed_func, *args, **kwargs) + + req = swob.Request.blank( + '/v1/a/c/o2?heartbeat=true', + environ={'REQUEST_METHOD': 'PUT'}, + headers={'Content-Length': '0', 'X-Copy-From': 'c/o'}) + + with mock.patch('eventlet.spawn', mock_spawn): + status = [None] + headers_list = [None] + + def start_response(s, h, ei=None): + status[0] = s + headers_list[0] = h + body_iter = self.ssc(req.environ, start_response) + self.assertEqual('202 Accepted', status[0]) + + try: + for chunk in body_iter: + heartbeats.append(chunk) + finally: + close_if_possible(body_iter) + self.assertTrue(len(heartbeats) >= 3, + f"Expected 3 heartbeats, got {len(heartbeats)}") + self.assertEqual(heartbeats[0], b' ') + + for i in range(1, len(heartbeats) - 1): + self.assertEqual(heartbeats[i], b' ') + self.assertIn(b'503 Service Unavailable', heartbeats[-1]) + self.assertEqual(req.environ.get('eventlet.minimum_write_chunk_size'), + 0) + self.assertEqual(2, len(self.app.calls)) + self.assertEqual(('GET', '/v1/a/c/o?heartbeat=true'), + self.app.calls[0]) + self.assertEqual(('PUT', '/v1/a/c/o2?heartbeat=true'), + self.app.calls[1])