diff --git a/swift/common/memcached.py b/swift/common/memcached.py index d9cde54093..199e73066c 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -254,6 +254,7 @@ class MemcacheRing(object): """ pos = bisect(self._sorted, key) served = [] + any_yielded = False while len(served) < self._tries: pos = (pos + 1) % len(self._sorted) server = self._ring[self._sorted[pos]] @@ -266,6 +267,7 @@ class MemcacheRing(object): try: with MemcachePoolTimeout(self._pool_timeout): fp, sock = self._client_cache[server].get() + any_yielded = True yield server, fp, sock except MemcachePoolTimeout as e: self._exception_occurred( @@ -277,13 +279,15 @@ class MemcacheRing(object): # object. self._exception_occurred( server, e, action='connecting', sock=sock) + if not any_yielded: + self.logger.error('All memcached servers error-limited') def _return_conn(self, server, fp, sock): """Returns a server connection to the pool.""" self._client_cache[server].put((fp, sock)) def set(self, key, value, serialize=True, time=0, - min_compress_len=0): + min_compress_len=0, raise_on_error=False): """ Set a key/value pair in memcache @@ -296,6 +300,8 @@ class MemcacheRing(object): added to keep the signature compatible with python-memcached interface. This implementation ignores it. + :param raise_on_error: if True, propagate Timeouts and other errors. + By default, errors are ignored. """ key = md5hash(key) timeout = sanitize_timeout(time) @@ -332,13 +338,18 @@ class MemcacheRing(object): return except (Exception, Timeout) as e: self._exception_occurred(server, e, sock=sock, fp=fp) + if raise_on_error: + raise MemcacheConnectionError( + "No memcached connections succeeded.") - def get(self, key): + def get(self, key, raise_on_error=False): """ Gets the object specified by key. It will also unserialize the object before returning if it is serialized in memcache with JSON. :param key: key + :param raise_on_error: if True, propagate Timeouts and other errors. + By default, errors are treated as cache misses. :returns: value of the key in memcache """ key = md5hash(key) @@ -366,6 +377,9 @@ class MemcacheRing(object): return value except (Exception, Timeout) as e: self._exception_occurred(server, e, sock=sock, fp=fp) + if raise_on_error: + raise MemcacheConnectionError( + "No memcached connections succeeded.") def incr(self, key, delta=1, time=0): """ diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 825fbb7c2d..e0b95107e2 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -40,6 +40,7 @@ from eventlet import sleep from eventlet.timeout import Timeout import six +from swift.common.memcached import MemcacheConnectionError from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \ @@ -2400,9 +2401,13 @@ class Controller(object): if skip_chance and random.random() < skip_chance: self.logger.increment('shard_updating.cache.skip') else: - cached_ranges = memcache.get(cache_key) - self.logger.increment('shard_updating.cache.%s' % ( - 'hit' if cached_ranges else 'miss')) + try: + cached_ranges = memcache.get( + cache_key, raise_on_error=True) + cache_state = 'hit' if cached_ranges else 'miss' + except MemcacheConnectionError: + cache_state = 'error' + self.logger.increment('shard_updating.cache.%s' % cache_state) if cached_ranges: shard_ranges = [ diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 4d893b96d8..562f9afa59 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -19,6 +19,7 @@ import random import six from six.moves.urllib.parse import unquote +from swift.common.memcached import MemcacheConnectionError from swift.common.utils import public, private, csv_append, Timestamp, \ config_true_value, ShardRange, cache_from_env, filter_shard_ranges from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT @@ -151,9 +152,14 @@ class ContainerController(Controller): if skip_chance and random.random() < skip_chance: self.logger.increment('shard_listing.cache.skip') else: - cached_ranges = memcache.get(cache_key) - self.logger.increment('shard_listing.cache.%s' % ( - 'hit' if cached_ranges else 'miss')) + try: + cached_ranges = memcache.get( + cache_key, raise_on_error=True) + cache_state = 'hit' if cached_ranges else 'miss' + except MemcacheConnectionError: + cache_state = 'error' + self.logger.increment( + 'shard_listing.cache.%s' % cache_state) if cached_ranges is not None: infocache[cache_key] = tuple(cached_ranges) diff --git a/test/unit/__init__.py b/test/unit/__init__.py index f7f4b22fe5..266763e0ab 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -410,17 +410,22 @@ def track(f): class FakeMemcache(object): - def __init__(self): + def __init__(self, error_on_set=None, error_on_get=None): self.store = {} self.calls = [] self.error_on_incr = False + self.error_on_get = error_on_get or [] + self.error_on_set = error_on_set or [] self.init_incr_return_neg = False def clear_calls(self): del self.calls[:] @track - def get(self, key): + def get(self, key, raise_on_error=False): + if self.error_on_get and self.error_on_get.pop(0): + if raise_on_error: + raise MemcacheConnectionError() return self.store.get(key) @property @@ -428,7 +433,10 @@ class FakeMemcache(object): return self.store.keys @track - def set(self, key, value, serialize=True, time=0): + def set(self, key, value, serialize=True, time=0, raise_on_error=False): + if self.error_on_set and self.error_on_set.pop(0): + if raise_on_error: + raise MemcacheConnectionError() if serialize: value = json.loads(json.dumps(value)) else: diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py index 69371fab6f..42c3cab837 100644 --- a/test/unit/common/test_memcached.py +++ b/test/unit/common/test_memcached.py @@ -32,6 +32,7 @@ from eventlet import GreenPool, sleep, Queue from eventlet.pools import Pool from swift.common import memcached +from swift.common.memcached import MemcacheConnectionError from swift.common.utils import md5, human_readable from mock import patch, MagicMock from test.debug_logger import debug_logger @@ -581,19 +582,27 @@ class TestMemcached(unittest.TestCase): self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' '[Errno 32] Broken pipe', - ] * 11 + [ - 'Error limiting server 1.2.3.4:11211' + ] * 10 + [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + 'Error limiting server 1.2.3.4:11211', + 'All memcached servers error-limited', ]) self.logger.clear() # continued requests just keep bypassing memcache for _ in range(12): memcache_client.set('some_key', [1, 2, 3]) - self.assertEqual(self.logger.get_lines_for_level('error'), []) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'All memcached servers error-limited', + ] * 12) + self.logger.clear() # and get()s are all a "cache miss" self.assertIsNone(memcache_client.get('some_key')) - self.assertEqual(self.logger.get_lines_for_level('error'), []) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'All memcached servers error-limited', + ]) def test_error_disabled(self): memcache_client = memcached.MemcacheRing( @@ -611,6 +620,44 @@ class TestMemcached(unittest.TestCase): '[Errno 32] Broken pipe', ] * 20) + def test_error_raising(self): + memcache_client = memcached.MemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, error_limit_time=0) + mock1 = ExplodingMockMemcached() + memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool( + [(mock1, mock1)] * 20) + + # expect exception when requested... + with self.assertRaises(MemcacheConnectionError): + memcache_client.set('some_key', [1, 2, 3], raise_on_error=True) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + ]) + self.logger.clear() + + with self.assertRaises(MemcacheConnectionError): + memcache_client.get('some_key', raise_on_error=True) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + ]) + self.logger.clear() + + # ...but default is no exception + memcache_client.set('some_key', [1, 2, 3]) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + ]) + self.logger.clear() + + memcache_client.get('some_key') + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + ]) + def test_error_limiting_custom_config(self): def do_calls(time_step, num_calls, **memcache_kwargs): self.logger.clear() @@ -632,8 +679,11 @@ class TestMemcached(unittest.TestCase): self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' '[Errno 32] Broken pipe', - ] * 11 + [ - 'Error limiting server 1.2.3.5:11211' + ] * 10 + [ + 'Error talking to memcached: 1.2.3.5:11211: ' + '[Errno 32] Broken pipe', + 'Error limiting server 1.2.3.5:11211', + 'All memcached servers error-limited', ]) # with default error_limit_time of 60, one call per 6 secs, error limit @@ -650,8 +700,11 @@ class TestMemcached(unittest.TestCase): self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' '[Errno 32] Broken pipe', - ] * 11 + [ - 'Error limiting server 1.2.3.5:11211' + ] * 10 + [ + 'Error talking to memcached: 1.2.3.5:11211: ' + '[Errno 32] Broken pipe', + 'Error limiting server 1.2.3.5:11211', + 'All memcached servers error-limited', ]) # with error_limit_time of 70, one call per 6 secs, error_limit_count @@ -660,8 +713,11 @@ class TestMemcached(unittest.TestCase): self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' '[Errno 32] Broken pipe', - ] * 12 + [ - 'Error limiting server 1.2.3.5:11211' + ] * 11 + [ + 'Error talking to memcached: 1.2.3.5:11211: ' + '[Errno 32] Broken pipe', + 'Error limiting server 1.2.3.5:11211', + 'All memcached servers error-limited', ]) def test_delete(self): diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py index 27e888e1f1..ac73465da2 100644 --- a/test/unit/proxy/controllers/test_container.py +++ b/test/unit/proxy/controllers/test_container.py @@ -2148,7 +2148,7 @@ class TestContainerController(TestRingBase): 'X-Backend-Sharding-State': sharding_state}) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c'), + mock.call.get('shard-listing/a/c', raise_on_error=True), mock.call.set('shard-listing/a/c', self.sr_dicts, time=exp_recheck_listing), # Since there was a backend request, we go ahead and cache @@ -2177,7 +2177,7 @@ class TestContainerController(TestRingBase): 'X-Backend-Sharding-State': sharding_state}) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c')], + mock.call.get('shard-listing/a/c', raise_on_error=True)], self.memcache.calls) self.assertIn('swift.infocache', req.environ) self.assertIn('shard-listing/a/c', req.environ['swift.infocache']) @@ -2237,7 +2237,7 @@ class TestContainerController(TestRingBase): 'X-Backend-Sharding-State': sharding_state}) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c')], + mock.call.get('shard-listing/a/c', raise_on_error=True)], self.memcache.calls) self.assertIn('swift.infocache', req.environ) self.assertIn('shard-listing/a/c', req.environ['swift.infocache']) @@ -2390,7 +2390,7 @@ class TestContainerController(TestRingBase): # deleted from cache self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c'), + mock.call.get('shard-listing/a/c', raise_on_error=True), mock.call.set('container/a/c', mock.ANY, time=6.0)], self.memcache.calls) self.assertEqual(404, self.memcache.calls[2][1][1]['status']) @@ -2400,6 +2400,39 @@ class TestContainerController(TestRingBase): 'container.shard_listing.backend.404': 1}, self.logger.get_increment_counts()) + def test_GET_shard_ranges_read_from_cache_error(self): + self._setup_shard_range_stubs() + self.memcache = FakeMemcache() + self.memcache.delete_all() + self.logger.clear() + info = headers_to_container_info(self.root_resp_hdrs) + info['status'] = 200 + info['sharding_state'] = 'sharded' + self.memcache.set('container/a/c', info) + self.memcache.clear_calls() + self.memcache.error_on_get = [False, True] + + req = self._build_request({'X-Backend-Record-Type': 'shard'}, + {'states': 'listing'}, {}) + backend_req, resp = self._capture_backend_request( + req, 404, b'', {}, num_resp=2 * self.CONTAINER_REPLICAS) + self._check_backend_req( + req, backend_req, + extra_hdrs={'X-Backend-Record-Type': 'shard', + 'X-Backend-Override-Shard-Name-Filter': 'sharded'}) + self.assertNotIn('X-Backend-Cached-Results', resp.headers) + self.assertEqual( + [mock.call.get('container/a/c'), + mock.call.get('shard-listing/a/c', raise_on_error=True), + mock.call.set('container/a/c', mock.ANY, time=6.0)], + self.memcache.calls) + self.assertEqual(404, self.memcache.calls[2][1][1]['status']) + self.assertEqual(b'', resp.body) + self.assertEqual(404, resp.status_int) + self.assertEqual({'container.shard_listing.cache.error': 1, + 'container.shard_listing.backend.404': 1}, + self.logger.get_increment_counts()) + def _do_test_GET_shard_ranges_read_from_cache(self, params, record_type): # pre-warm cache with container metadata and shard ranges and verify # that shard range listing are read from cache when appropriate @@ -2417,7 +2450,7 @@ class TestContainerController(TestRingBase): resp = req.get_response(self.app) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c')], + mock.call.get('shard-listing/a/c', raise_on_error=True)], self.memcache.calls) self.assertEqual({'container.shard_listing.cache.hit': 1}, self.logger.get_increment_counts()) diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 5158a09cce..a03e8b43e4 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -4413,6 +4413,23 @@ class TestReplicatedObjectController( expected[device] = '10.0.0.%d:100%d' % (i, i) self.assertEqual(container_headers, expected) + # shard lookup in memcache may error... + req = Request.blank( + '/v1/a/c/o', {'swift.cache': cache}, + method=method, body='', headers={'Content-Type': 'text/plain'}) + cache.error_on_get = [False, True] + with mock.patch('random.random', return_value=1.0), \ + mocked_http_conn(*status_codes, headers=resp_headers, + body=body): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 202) + stats = self.app.logger.get_increment_counts() + self.assertEqual({'object.shard_updating.cache.skip': 1, + 'object.shard_updating.cache.hit': 1, + 'object.shard_updating.cache.error': 1, + 'object.shard_updating.backend.200': 2}, stats) + do_test('POST', 'sharding') do_test('POST', 'sharded') do_test('DELETE', 'sharding')