Merge "Implement heartbeat response for COPY request"
This commit is contained in:
commit
2c5bcfdf0a
@ -194,9 +194,8 @@ swift.source set and the content length will reflect the size of the
|
||||
payload sent to the proxy (the list of objects/containers to be deleted).
|
||||
"""
|
||||
|
||||
import json
|
||||
from swift.common.request_helpers import get_heartbeat_response_body
|
||||
import tarfile
|
||||
from xml.sax.saxutils import escape # nosec B406
|
||||
from time import time
|
||||
from eventlet import sleep
|
||||
import zlib
|
||||
@ -224,50 +223,6 @@ ACCEPTABLE_FORMATS = ['text/plain', 'application/json', 'application/xml',
|
||||
'text/xml']
|
||||
|
||||
|
||||
def get_response_body(data_format, data_dict, error_list, root_tag):
|
||||
"""
|
||||
Returns a properly formatted response body according to format.
|
||||
|
||||
Handles json and xml, otherwise will return text/plain.
|
||||
Note: xml response does not include xml declaration.
|
||||
|
||||
:params data_format: resulting format
|
||||
:params data_dict: generated data about results.
|
||||
:params error_list: list of quoted filenames that failed
|
||||
:params root_tag: the tag name to use for root elements when returning XML;
|
||||
e.g. 'extract' or 'delete'
|
||||
"""
|
||||
if data_format == 'application/json':
|
||||
data_dict['Errors'] = error_list
|
||||
return json.dumps(data_dict).encode('ascii')
|
||||
if data_format and data_format.endswith('/xml'):
|
||||
output = ['<', root_tag, '>\n']
|
||||
for key in sorted(data_dict):
|
||||
xml_key = key.replace(' ', '_').lower()
|
||||
output.extend([
|
||||
'<', xml_key, '>',
|
||||
escape(str(data_dict[key])),
|
||||
'</', xml_key, '>\n',
|
||||
])
|
||||
output.append('<errors>\n')
|
||||
for name, status in error_list:
|
||||
output.extend([
|
||||
'<object><name>', escape(name), '</name><status>',
|
||||
escape(status), '</status></object>\n',
|
||||
])
|
||||
output.extend(['</errors>\n</', root_tag, '>\n'])
|
||||
return ''.join(output).encode('utf-8')
|
||||
|
||||
output = []
|
||||
for key in sorted(data_dict):
|
||||
output.append('%s: %s\n' % (key, data_dict[key]))
|
||||
output.append('Errors:\n')
|
||||
output.extend(
|
||||
'%s, %s\n' % (name, status)
|
||||
for name, status in error_list)
|
||||
return ''.join(output).encode('utf-8')
|
||||
|
||||
|
||||
def pax_key_to_swift_header(pax_key):
|
||||
if (pax_key == u"SCHILY.xattr.user.mime_type" or
|
||||
pax_key == u"LIBARCHIVE.xattr.user.mime_type"):
|
||||
@ -506,8 +461,9 @@ class Bulk(object):
|
||||
self.logger.exception('Error in bulk delete.')
|
||||
resp_dict['Response Status'] = HTTPServerError().status
|
||||
|
||||
yield separator + get_response_body(out_content_type,
|
||||
resp_dict, failed_files, 'delete')
|
||||
yield separator + get_heartbeat_response_body(out_content_type,
|
||||
resp_dict, failed_files,
|
||||
'delete')
|
||||
|
||||
def handle_extract_iter(self, req, compress_type,
|
||||
out_content_type='text/plain'):
|
||||
@ -671,7 +627,7 @@ class Bulk(object):
|
||||
self.logger.exception('Error in extract archive.')
|
||||
resp_dict['Response Status'] = HTTPServerError().status
|
||||
|
||||
yield separator + get_response_body(
|
||||
yield separator + get_heartbeat_response_body(
|
||||
out_content_type, resp_dict, failed_files, 'extract')
|
||||
|
||||
def _process_delete(self, resp, pile, obj_name, resp_dict,
|
||||
|
@ -125,6 +125,8 @@ from swift.common.request_helpers import copy_header_subset, remove_items, \
|
||||
is_sys_meta, is_sys_or_user_meta, is_object_transient_sysmeta, \
|
||||
check_path_header, OBJECT_SYSMETA_CONTAINER_UPDATE_OVERRIDE_PREFIX
|
||||
from swift.common.wsgi import WSGIContext, make_subrequest
|
||||
import eventlet
|
||||
from swift.common.request_helpers import get_heartbeat_response_body
|
||||
|
||||
|
||||
def _check_copy_from_header(req):
|
||||
@ -175,10 +177,11 @@ def _copy_headers(src, dest):
|
||||
|
||||
class ServerSideCopyWebContext(WSGIContext):
|
||||
|
||||
def __init__(self, app, logger):
|
||||
def __init__(self, app, logger, yield_frequency=10):
|
||||
super(ServerSideCopyWebContext, self).__init__(app)
|
||||
self.app = app
|
||||
self.logger = logger
|
||||
self.yield_frequency = yield_frequency
|
||||
|
||||
def get_source_resp(self, req):
|
||||
sub_req = make_subrequest(
|
||||
@ -187,12 +190,73 @@ class ServerSideCopyWebContext(WSGIContext):
|
||||
return sub_req.get_response(self.app)
|
||||
|
||||
def send_put_req(self, req, additional_resp_headers, start_response):
|
||||
app_resp = self._app_call(req.environ)
|
||||
self._adjust_put_response(req, additional_resp_headers)
|
||||
start_response(self._response_status,
|
||||
self._response_headers,
|
||||
self._response_exc_info)
|
||||
return app_resp
|
||||
heartbeat = config_true_value(req.params.get('heartbeat'))
|
||||
ACCEPTABLE_FORMATS = ['text/plain', 'application/json']
|
||||
|
||||
try:
|
||||
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
|
||||
except ValueError:
|
||||
out_content_type = 'text/plain'
|
||||
if not out_content_type:
|
||||
out_content_type = 'text/plain'
|
||||
|
||||
if heartbeat:
|
||||
gt = eventlet.spawn(self._app_call,
|
||||
req.environ)
|
||||
start_response('202 Accepted',
|
||||
[('Content-Type', out_content_type)])
|
||||
|
||||
def resp_iter():
|
||||
# Send an initial heartbeat
|
||||
yield b' '
|
||||
app_iter = [b'']
|
||||
try:
|
||||
while not gt.dead:
|
||||
try:
|
||||
with eventlet.Timeout(self.yield_frequency):
|
||||
app_iter = gt.wait()
|
||||
except eventlet.Timeout:
|
||||
yield b' '
|
||||
except Exception as e:
|
||||
# Send back the status to the client if error
|
||||
self._response_status = '500 Internal Error'
|
||||
app_iter = [str(e).encode('utf8')]
|
||||
finally:
|
||||
response_body = b''.join(app_iter).decode('utf8')
|
||||
resp_dict = {'Response Status': self._response_status,
|
||||
'Response Body': response_body}
|
||||
errors = []
|
||||
|
||||
if not is_success(self._get_status_int()):
|
||||
src_path = additional_resp_headers['X-Copied-From']
|
||||
errors.append((
|
||||
wsgi_quote(src_path),
|
||||
self._response_status,
|
||||
))
|
||||
|
||||
for k, v in additional_resp_headers.items():
|
||||
if not k.lower().startswith(('x-object-sysmeta-',
|
||||
'x-backend')):
|
||||
resp_dict[k] = v
|
||||
|
||||
for k, v in self._response_headers:
|
||||
if not k.lower().startswith(('x-object-sysmeta-',
|
||||
'x-backend')):
|
||||
resp_dict[k] = v
|
||||
yield get_heartbeat_response_body(out_content_type,
|
||||
resp_dict,
|
||||
errors, 'copy')
|
||||
close_if_possible(gt)
|
||||
|
||||
return resp_iter()
|
||||
|
||||
else:
|
||||
app_resp = self._app_call(req.environ)
|
||||
self._adjust_put_response(req, additional_resp_headers)
|
||||
start_response(self._response_status,
|
||||
self._response_headers,
|
||||
self._response_exc_info)
|
||||
return app_resp
|
||||
|
||||
def _adjust_put_response(self, req, additional_resp_headers):
|
||||
if is_success(self._get_status_int()):
|
||||
@ -220,6 +284,7 @@ class ServerSideCopyMiddleware(object):
|
||||
def __init__(self, app, conf):
|
||||
self.app = app
|
||||
self.logger = get_logger(conf, log_route="copy")
|
||||
self.yield_frequency = int(conf.get('yield_frequency', 10))
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
req = Request(env)
|
||||
@ -333,6 +398,15 @@ class ServerSideCopyMiddleware(object):
|
||||
'body', request=req,
|
||||
content_type='text/plain')(req.environ,
|
||||
start_response)
|
||||
# If heartbeat is enabled, set minimum_write_chunk_size directly
|
||||
# in the original client request before making subrequests
|
||||
if config_true_value(req.params.get('heartbeat')):
|
||||
wsgi_input = req.environ.get('wsgi.input')
|
||||
if hasattr(wsgi_input, 'environ'):
|
||||
wsgi_input.environ['eventlet.minimum_write_chunk_size'] = 0
|
||||
# Not sure if we also need to set it in
|
||||
# the current request's environ
|
||||
req.environ['eventlet.minimum_write_chunk_size'] = 0
|
||||
|
||||
# Form the path of source object to be fetched
|
||||
ver, acct, _rest = req.split_path(2, 3, True)
|
||||
@ -347,7 +421,8 @@ class ServerSideCopyMiddleware(object):
|
||||
src_container_name, src_obj_name)
|
||||
|
||||
# GET the source object, bail out on error
|
||||
ssc_ctx = ServerSideCopyWebContext(self.app, self.logger)
|
||||
ssc_ctx = ServerSideCopyWebContext(self.app, self.logger,
|
||||
self.yield_frequency)
|
||||
source_resp = self._get_source_object(ssc_ctx, source_path, req)
|
||||
if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:
|
||||
return source_resp(source_resp.environ, start_response)
|
||||
@ -434,6 +509,17 @@ class ServerSideCopyMiddleware(object):
|
||||
source_resp, sink_req)
|
||||
|
||||
put_resp = ssc_ctx.send_put_req(sink_req, resp_headers, start_response)
|
||||
|
||||
# For heartbeat=on, we need to cleanup the resp iter
|
||||
if config_true_value(req.params.get('heartbeat')):
|
||||
def clean_iter(app_iter):
|
||||
try:
|
||||
for chunk in app_iter:
|
||||
yield chunk
|
||||
finally:
|
||||
close_if_possible(source_resp.app_iter)
|
||||
return clean_iter(put_resp)
|
||||
|
||||
close_if_possible(source_resp.app_iter)
|
||||
return put_resp
|
||||
|
||||
|
@ -366,13 +366,12 @@ from swift.common.registry import register_swift_info
|
||||
from swift.common.request_helpers import SegmentedIterable, \
|
||||
get_sys_meta_prefix, update_etag_is_at_header, resolve_etag_is_at_header, \
|
||||
get_container_update_override_key, update_ignore_range_header, \
|
||||
get_param, get_valid_part_num
|
||||
get_param, get_valid_part_num, get_heartbeat_response_body
|
||||
from swift.common.constraints import check_utf8
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED
|
||||
from swift.common.wsgi import WSGIContext, make_subrequest, make_env, \
|
||||
make_pre_authed_request
|
||||
from swift.common.middleware.bulk import get_response_body, \
|
||||
ACCEPTABLE_FORMATS, Bulk
|
||||
from swift.common.middleware.bulk import ACCEPTABLE_FORMATS, Bulk
|
||||
from swift.obj import expirer
|
||||
from swift.proxy.controllers.base import get_container_info
|
||||
|
||||
@ -1527,7 +1526,7 @@ class StaticLargeObject(object):
|
||||
start_response(err.status,
|
||||
[(h, v) for h, v in err.headers.items()
|
||||
if h.lower() != 'content-length'])
|
||||
yield separator + get_response_body(
|
||||
yield separator + get_heartbeat_response_body(
|
||||
out_content_type, resp_dict, problem_segments, 'upload')
|
||||
return
|
||||
|
||||
@ -1554,7 +1553,7 @@ class StaticLargeObject(object):
|
||||
err_body = err_body.decode('utf-8', errors='replace')
|
||||
resp_dict['Response Body'] = err_body or '\n'.join(
|
||||
RESPONSE_REASONS.get(err.status_int, ['']))
|
||||
yield separator + get_response_body(
|
||||
yield separator + get_heartbeat_response_body(
|
||||
out_content_type, resp_dict, problem_segments,
|
||||
'upload')
|
||||
else:
|
||||
@ -1600,7 +1599,7 @@ class StaticLargeObject(object):
|
||||
if isinstance(resp_body, bytes):
|
||||
resp_body = resp_body.decode('utf-8')
|
||||
resp_dict['Response Body'] = resp_body
|
||||
yield separator + get_response_body(
|
||||
yield separator + get_heartbeat_response_body(
|
||||
out_content_type, resp_dict, [], 'upload')
|
||||
else:
|
||||
for chunk in resp(req.environ, start_response):
|
||||
|
@ -22,6 +22,8 @@ from swob in here without creating circular imports.
|
||||
|
||||
import itertools
|
||||
import time
|
||||
import json
|
||||
from xml.sax.saxutils import escape # nosec B406
|
||||
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
|
||||
@ -1005,3 +1007,47 @@ def append_log_info(environ, log_info):
|
||||
|
||||
def get_log_info(environ):
|
||||
return ','.join(environ.get('swift.log_info', []))
|
||||
|
||||
|
||||
def get_heartbeat_response_body(data_format, data_dict, error_list, root_tag):
|
||||
"""
|
||||
Returns a response body for heartbeat according to format.
|
||||
|
||||
Handles json and xml, otherwise will return text/plain.
|
||||
Note: xml response does not include xml declaration.
|
||||
|
||||
:params data_format: resulting format
|
||||
:params data_dict: generated data about results.
|
||||
:params error_list: list of quoted filenames that failed
|
||||
:params root_tag: the tag name to use for root elements when returning XML;
|
||||
e.g. 'extract' or 'delete'
|
||||
"""
|
||||
if data_format == 'application/json':
|
||||
data_dict['Errors'] = error_list
|
||||
return json.dumps(data_dict).encode('ascii')
|
||||
if data_format and data_format.endswith('/xml'):
|
||||
output = ['<', root_tag, '>\n']
|
||||
for key in sorted(data_dict):
|
||||
xml_key = key.replace(' ', '_').lower()
|
||||
output.extend([
|
||||
'<', xml_key, '>',
|
||||
escape(str(data_dict[key])),
|
||||
'</', xml_key, '>\n',
|
||||
])
|
||||
output.append('<errors>\n')
|
||||
for name, status in error_list:
|
||||
output.extend([
|
||||
'<object><name>', escape(name), '</name><status>',
|
||||
escape(status), '</status></object>\n',
|
||||
])
|
||||
output.extend(['</errors>\n</', root_tag, '>\n'])
|
||||
return ''.join(output).encode('utf-8')
|
||||
|
||||
output = []
|
||||
for key in sorted(data_dict):
|
||||
output.append('%s: %s\n' % (key, data_dict[key]))
|
||||
output.append('Errors:\n')
|
||||
output.extend(
|
||||
'%s, %s\n' % (name, status)
|
||||
for name, status in error_list)
|
||||
return ''.join(output).encode('utf-8')
|
||||
|
@ -645,11 +645,11 @@ class TestUntar(unittest.TestCase):
|
||||
])
|
||||
|
||||
def test_get_response_body(self):
|
||||
txt_body = bulk.get_response_body(
|
||||
txt_body = bulk.get_heartbeat_response_body(
|
||||
'bad_formay', {'hey': 'there'}, [['json > xml', '202 Accepted']],
|
||||
"doesn't matter for text")
|
||||
self.assertIn(b'hey: there', txt_body)
|
||||
xml_body = bulk.get_response_body(
|
||||
xml_body = bulk.get_heartbeat_response_body(
|
||||
'text/xml', {'hey': 'there'}, [['json > xml', '202 Accepted']],
|
||||
'root_tag')
|
||||
self.assertIn(b'>', xml_body)
|
||||
|
@ -17,12 +17,13 @@
|
||||
from unittest import mock
|
||||
import unittest
|
||||
import urllib.parse
|
||||
import eventlet
|
||||
|
||||
from swift.common import swob
|
||||
from swift.common.middleware import copy
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.swob import Request, HTTPException
|
||||
from swift.common.utils import closing_if_possible, md5
|
||||
from swift.common.utils import close_if_possible, closing_if_possible, md5
|
||||
from test.debug_logger import debug_logger
|
||||
from test.unit import patch_policies, FakeRing
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
@ -1426,3 +1427,141 @@ class TestServerSideCopyMiddlewareWithEC(unittest.TestCase):
|
||||
self.assertEqual(resp.body, range_not_satisfiable_body)
|
||||
self.assertEqual(resp.etag, body_etag)
|
||||
self.assertEqual(resp.headers['Accept-Ranges'], 'bytes')
|
||||
|
||||
|
||||
class TestServerSideCopyHeartbeat(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.app = FakeSwift()
|
||||
self.ssc = copy.filter_factory({'yield_frequency': '1'})(self.app)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def call_app(self, req, app=None):
|
||||
if app is None:
|
||||
app = self.app
|
||||
|
||||
self.authorized = []
|
||||
|
||||
def authorize(req):
|
||||
self.authorized.append(req)
|
||||
|
||||
if 'swift.authorize' not in req.environ:
|
||||
req.environ['swift.authorize'] = authorize
|
||||
|
||||
req.headers.setdefault("User-Agent", "Test User Agent")
|
||||
|
||||
status = [None]
|
||||
headers = [None]
|
||||
|
||||
def start_response(s, h, ei=None):
|
||||
status[0] = s
|
||||
headers[0] = h
|
||||
|
||||
body_iter = app(req.environ, start_response)
|
||||
body = b''
|
||||
try:
|
||||
for chunk in body_iter:
|
||||
body += chunk
|
||||
finally:
|
||||
close_if_possible(body_iter)
|
||||
|
||||
return status[0], headers[0], body
|
||||
|
||||
def test_copy_with_heartbeat_success(self):
|
||||
original_spawn = eventlet.spawn
|
||||
self.app.register('GET', '/v1/a/c/o?heartbeat=true', swob.HTTPOk,
|
||||
{'Content-Length': '10'}, b'X' * 10)
|
||||
self.app.register('PUT', '/v1/a/c/o2?heartbeat=true',
|
||||
swob.HTTPCreated, {})
|
||||
heartbeats = []
|
||||
|
||||
def mock_spawn(func, *args, **kwargs):
|
||||
def delayed_func(*a, **kw):
|
||||
eventlet.sleep(2.5)
|
||||
return func(*a, **kw)
|
||||
return original_spawn(delayed_func, *args, **kwargs)
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o2?heartbeat=true',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '0', 'X-Copy-From': 'c/o'})
|
||||
|
||||
with mock.patch('eventlet.spawn', mock_spawn):
|
||||
status = [None]
|
||||
headers_list = [None]
|
||||
|
||||
def start_response(s, h, ei=None):
|
||||
status[0] = s
|
||||
headers_list[0] = h
|
||||
body_iter = self.ssc(req.environ, start_response)
|
||||
self.assertEqual('202 Accepted', status[0])
|
||||
|
||||
try:
|
||||
for chunk in body_iter:
|
||||
heartbeats.append(chunk)
|
||||
finally:
|
||||
close_if_possible(body_iter)
|
||||
self.assertTrue(len(heartbeats) >= 3,
|
||||
f"Expected 3 heartbeats, got {len(heartbeats)}")
|
||||
self.assertEqual(heartbeats[0], b' ')
|
||||
|
||||
for i in range(1, len(heartbeats) - 1):
|
||||
self.assertEqual(heartbeats[i], b' ')
|
||||
self.assertIn(b'201 Created', heartbeats[-1])
|
||||
self.assertEqual(req.environ.get('eventlet.minimum_write_chunk_size'),
|
||||
0)
|
||||
self.assertEqual(2, len(self.app.calls))
|
||||
self.assertEqual(('GET', '/v1/a/c/o?heartbeat=true'),
|
||||
self.app.calls[0])
|
||||
self.assertEqual(('PUT', '/v1/a/c/o2?heartbeat=true'),
|
||||
self.app.calls[1])
|
||||
|
||||
def test_copy_with_heartbeat_failure(self):
|
||||
original_spawn = eventlet.spawn
|
||||
self.app.register('GET', '/v1/a/c/o?heartbeat=true', swob.HTTPOk,
|
||||
{'Content-Length': '10'}, b'X' * 10)
|
||||
self.app.register('PUT', '/v1/a/c/o2?heartbeat=true',
|
||||
swob.HTTPServiceUnavailable, {})
|
||||
heartbeats = []
|
||||
|
||||
def mock_spawn(func, *args, **kwargs):
|
||||
|
||||
def delayed_func(*a, **kw):
|
||||
eventlet.sleep(2.5)
|
||||
return func(*a, **kw)
|
||||
return original_spawn(delayed_func, *args, **kwargs)
|
||||
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o2?heartbeat=true',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '0', 'X-Copy-From': 'c/o'})
|
||||
|
||||
with mock.patch('eventlet.spawn', mock_spawn):
|
||||
status = [None]
|
||||
headers_list = [None]
|
||||
|
||||
def start_response(s, h, ei=None):
|
||||
status[0] = s
|
||||
headers_list[0] = h
|
||||
body_iter = self.ssc(req.environ, start_response)
|
||||
self.assertEqual('202 Accepted', status[0])
|
||||
|
||||
try:
|
||||
for chunk in body_iter:
|
||||
heartbeats.append(chunk)
|
||||
finally:
|
||||
close_if_possible(body_iter)
|
||||
self.assertTrue(len(heartbeats) >= 3,
|
||||
f"Expected 3 heartbeats, got {len(heartbeats)}")
|
||||
self.assertEqual(heartbeats[0], b' ')
|
||||
|
||||
for i in range(1, len(heartbeats) - 1):
|
||||
self.assertEqual(heartbeats[i], b' ')
|
||||
self.assertIn(b'503 Service Unavailable', heartbeats[-1])
|
||||
self.assertEqual(req.environ.get('eventlet.minimum_write_chunk_size'),
|
||||
0)
|
||||
self.assertEqual(2, len(self.app.calls))
|
||||
self.assertEqual(('GET', '/v1/a/c/o?heartbeat=true'),
|
||||
self.app.calls[0])
|
||||
self.assertEqual(('PUT', '/v1/a/c/o2?heartbeat=true'),
|
||||
self.app.calls[1])
|
||||
|
Loading…
x
Reference in New Issue
Block a user