Add LRUCache to common.utils
This decorator will memonize a function using a fixed size cache that evicts the oldest entries. It also supports a maxtime paramter to configure a "time-to-live" for entries in the cache. The reconciler code uses this to cache computations of the correct storage policy index for a container for 30 seconds. DocImpact Implements: blueprint storage-policies Change-Id: I0f220869e33c461a4100b21c6324ad725da864fa
This commit is contained in:
parent
a14d2c857c
commit
fbcfb83566
@ -2424,6 +2424,93 @@ class InputProxy(object):
|
||||
return line
|
||||
|
||||
|
||||
class LRUCache(object):
|
||||
"""
|
||||
Decorator for size/time bound memoization that evicts the least
|
||||
recently used members.
|
||||
"""
|
||||
|
||||
PREV, NEXT, KEY, CACHED_AT, VALUE = 0, 1, 2, 3, 4 # link fields
|
||||
|
||||
def __init__(self, maxsize=1000, maxtime=3600):
|
||||
self.maxsize = maxsize
|
||||
self.maxtime = maxtime
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.mapping = {}
|
||||
self.head = [None, None, None, None, None] # oldest
|
||||
self.tail = [self.head, None, None, None, None] # newest
|
||||
self.head[self.NEXT] = self.tail
|
||||
|
||||
def set_cache(self, value, *key):
|
||||
while len(self.mapping) >= self.maxsize:
|
||||
old_next, old_key = self.head[self.NEXT][self.NEXT:self.NEXT + 2]
|
||||
self.head[self.NEXT], old_next[self.PREV] = old_next, self.head
|
||||
del self.mapping[old_key]
|
||||
last = self.tail[self.PREV]
|
||||
link = [last, self.tail, key, time.time(), value]
|
||||
self.mapping[key] = last[self.NEXT] = self.tail[self.PREV] = link
|
||||
return value
|
||||
|
||||
def get_cached(self, link, *key):
|
||||
link_prev, link_next, key, cached_at, value = link
|
||||
if cached_at + self.maxtime < time.time():
|
||||
raise KeyError('%r has timed out' % (key,))
|
||||
link_prev[self.NEXT] = link_next
|
||||
link_next[self.PREV] = link_prev
|
||||
last = self.tail[self.PREV]
|
||||
last[self.NEXT] = self.tail[self.PREV] = link
|
||||
link[self.PREV] = last
|
||||
link[self.NEXT] = self.tail
|
||||
return value
|
||||
|
||||
def __call__(self, f):
|
||||
|
||||
class LRUCacheWrapped(object):
|
||||
|
||||
@functools.wraps(f)
|
||||
def __call__(im_self, *key):
|
||||
link = self.mapping.get(key, self.head)
|
||||
if link is not self.head:
|
||||
try:
|
||||
return self.get_cached(link, *key)
|
||||
except KeyError:
|
||||
pass
|
||||
value = f(*key)
|
||||
self.set_cache(value, *key)
|
||||
return value
|
||||
|
||||
def size(im_self):
|
||||
"""
|
||||
Return the size of the cache
|
||||
"""
|
||||
return len(self.mapping)
|
||||
|
||||
def reset(im_self):
|
||||
return self.reset()
|
||||
|
||||
def get_maxsize(im_self):
|
||||
return self.maxsize
|
||||
|
||||
def set_maxsize(im_self, i):
|
||||
self.maxsize = i
|
||||
|
||||
def get_maxtime(im_self):
|
||||
return self.maxtime
|
||||
|
||||
def set_maxtime(im_self, i):
|
||||
self.maxtime = i
|
||||
|
||||
maxsize = property(get_maxsize, set_maxsize)
|
||||
maxtime = property(get_maxtime, set_maxtime)
|
||||
|
||||
def __repr__(im_self):
|
||||
return '<%s %r>' % (im_self.__class__.__name__, f)
|
||||
|
||||
return LRUCacheWrapped()
|
||||
|
||||
|
||||
def tpool_reraise(func, *args, **kwargs):
|
||||
"""
|
||||
Hack to work around Eventlet's tpool not catching and reraising Timeouts.
|
||||
|
@ -27,11 +27,13 @@ from swift.common.direct_client import (
|
||||
from swift.common.internal_client import InternalClient, UnexpectedResponse
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
from swift.common.utils import get_logger, split_path, quorum_size, \
|
||||
FileLikeIter, normalize_timestamp, last_modified_date_to_timestamp
|
||||
FileLikeIter, normalize_timestamp, last_modified_date_to_timestamp, \
|
||||
LRUCache
|
||||
|
||||
|
||||
MISPLACED_OBJECTS_ACCOUNT = '.misplaced_objects'
|
||||
MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour
|
||||
CONTAINER_POLICY_TTL = 30
|
||||
|
||||
|
||||
def cmp_policy_info(info, remote_info):
|
||||
@ -271,6 +273,7 @@ def parse_raw_obj(obj_info):
|
||||
}
|
||||
|
||||
|
||||
@LRUCache(maxtime=CONTAINER_POLICY_TTL)
|
||||
def direct_get_container_policy_index(container_ring, account_name,
|
||||
container_name):
|
||||
"""
|
||||
|
@ -30,6 +30,7 @@ import re
|
||||
import socket
|
||||
import sys
|
||||
import json
|
||||
import math
|
||||
|
||||
from textwrap import dedent
|
||||
|
||||
@ -3248,5 +3249,100 @@ class TestGreenAsyncPile(unittest.TestCase):
|
||||
self.assertEqual(completed[0], 2)
|
||||
|
||||
|
||||
class TestLRUCache(unittest.TestCase):
|
||||
|
||||
def test_maxsize(self):
|
||||
@utils.LRUCache(maxsize=10)
|
||||
def f(*args):
|
||||
return math.sqrt(*args)
|
||||
_orig_math_sqrt = math.sqrt
|
||||
# setup cache [0-10)
|
||||
for i in range(10):
|
||||
self.assertEqual(math.sqrt(i), f(i))
|
||||
self.assertEqual(f.size(), 10)
|
||||
# validate cache [0-10)
|
||||
with patch('math.sqrt'):
|
||||
for i in range(10):
|
||||
self.assertEqual(_orig_math_sqrt(i), f(i))
|
||||
self.assertEqual(f.size(), 10)
|
||||
# update cache [10-20)
|
||||
for i in range(10, 20):
|
||||
self.assertEqual(math.sqrt(i), f(i))
|
||||
# cache size is fixed
|
||||
self.assertEqual(f.size(), 10)
|
||||
# validate cache [10-20)
|
||||
with patch('math.sqrt'):
|
||||
for i in range(10, 20):
|
||||
self.assertEqual(_orig_math_sqrt(i), f(i))
|
||||
# validate un-cached [0-10)
|
||||
with patch('math.sqrt', new=None):
|
||||
for i in range(10):
|
||||
self.assertRaises(TypeError, f, i)
|
||||
# cache unchanged
|
||||
self.assertEqual(f.size(), 10)
|
||||
with patch('math.sqrt'):
|
||||
for i in range(10, 20):
|
||||
self.assertEqual(_orig_math_sqrt(i), f(i))
|
||||
self.assertEqual(f.size(), 10)
|
||||
|
||||
def test_maxtime(self):
|
||||
@utils.LRUCache(maxtime=30)
|
||||
def f(*args):
|
||||
return math.sqrt(*args)
|
||||
self.assertEqual(30, f.maxtime)
|
||||
_orig_math_sqrt = math.sqrt
|
||||
|
||||
now = time.time()
|
||||
the_future = now + 31
|
||||
# setup cache [0-10)
|
||||
with patch('time.time', lambda: now):
|
||||
for i in range(10):
|
||||
self.assertEqual(math.sqrt(i), f(i))
|
||||
self.assertEqual(f.size(), 10)
|
||||
# validate cache [0-10)
|
||||
with patch('math.sqrt'):
|
||||
for i in range(10):
|
||||
self.assertEqual(_orig_math_sqrt(i), f(i))
|
||||
self.assertEqual(f.size(), 10)
|
||||
|
||||
# validate expired [0-10)
|
||||
with patch('math.sqrt', new=None):
|
||||
with patch('time.time', lambda: the_future):
|
||||
for i in range(10):
|
||||
self.assertRaises(TypeError, f, i)
|
||||
|
||||
# validate repopulates [0-10)
|
||||
with patch('time.time', lambda: the_future):
|
||||
for i in range(10):
|
||||
self.assertEqual(math.sqrt(i), f(i))
|
||||
# reuses cache space
|
||||
self.assertEqual(f.size(), 10)
|
||||
|
||||
def test_set_maxtime(self):
|
||||
@utils.LRUCache(maxtime=30)
|
||||
def f(*args):
|
||||
return math.sqrt(*args)
|
||||
self.assertEqual(30, f.maxtime)
|
||||
self.assertEqual(2, f(4))
|
||||
self.assertEqual(1, f.size())
|
||||
# expire everything
|
||||
f.maxtime = -1
|
||||
# validate un-cached [0-10)
|
||||
with patch('math.sqrt', new=None):
|
||||
self.assertRaises(TypeError, f, 4)
|
||||
|
||||
def test_set_maxsize(self):
|
||||
@utils.LRUCache(maxsize=10)
|
||||
def f(*args):
|
||||
return math.sqrt(*args)
|
||||
for i in range(12):
|
||||
f(i)
|
||||
self.assertEqual(f.size(), 10)
|
||||
f.maxsize = 4
|
||||
for i in range(12):
|
||||
f(i)
|
||||
self.assertEqual(f.size(), 4)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -178,6 +178,7 @@ class TestReconcilerUtils(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.fake_ring = FakeRing()
|
||||
reconciler.direct_get_container_policy_index.reset()
|
||||
|
||||
def test_parse_raw_obj(self):
|
||||
got = reconciler.parse_raw_obj({
|
||||
@ -247,6 +248,7 @@ class TestReconcilerUtils(unittest.TestCase):
|
||||
),
|
||||
]
|
||||
for permutation in itertools.permutations((0, 1, 2)):
|
||||
reconciler.direct_get_container_policy_index.reset()
|
||||
resp_headers = [stub_resp_headers[i] for i in permutation]
|
||||
with mock.patch(mock_path) as direct_head:
|
||||
direct_head.side_effect = resp_headers
|
||||
@ -476,6 +478,53 @@ class TestReconcilerUtils(unittest.TestCase):
|
||||
self.fake_ring, 'a', 'con')
|
||||
self.assertEqual(oldest_spi, 1)
|
||||
|
||||
def test_get_container_policy_index_cache(self):
|
||||
now = time.time()
|
||||
ts = itertools.count(int(now))
|
||||
mock_path = 'swift.container.reconciler.direct_head_container'
|
||||
stub_resp_headers = [
|
||||
container_resp_headers(
|
||||
status_changed_at=normalize_timestamp(ts.next()),
|
||||
storage_policy_index=0,
|
||||
),
|
||||
container_resp_headers(
|
||||
status_changed_at=normalize_timestamp(ts.next()),
|
||||
storage_policy_index=1,
|
||||
),
|
||||
container_resp_headers(
|
||||
status_changed_at=normalize_timestamp(ts.next()),
|
||||
storage_policy_index=0,
|
||||
),
|
||||
]
|
||||
random.shuffle(stub_resp_headers)
|
||||
with mock.patch(mock_path) as direct_head:
|
||||
direct_head.side_effect = stub_resp_headers
|
||||
oldest_spi = reconciler.direct_get_container_policy_index(
|
||||
self.fake_ring, 'a', 'con')
|
||||
self.assertEqual(oldest_spi, 0)
|
||||
# re-mock with errors
|
||||
stub_resp_headers = [
|
||||
socket.error(errno.ECONNREFUSED, os.strerror(errno.ECONNREFUSED)),
|
||||
socket.error(errno.ECONNREFUSED, os.strerror(errno.ECONNREFUSED)),
|
||||
socket.error(errno.ECONNREFUSED, os.strerror(errno.ECONNREFUSED)),
|
||||
]
|
||||
with mock.patch('time.time', new=lambda: now):
|
||||
with mock.patch(mock_path) as direct_head:
|
||||
direct_head.side_effect = stub_resp_headers
|
||||
oldest_spi = reconciler.direct_get_container_policy_index(
|
||||
self.fake_ring, 'a', 'con')
|
||||
# still cached
|
||||
self.assertEqual(oldest_spi, 0)
|
||||
# propel time forward
|
||||
the_future = now + 31
|
||||
with mock.patch('time.time', new=lambda: the_future):
|
||||
with mock.patch(mock_path) as direct_head:
|
||||
direct_head.side_effect = stub_resp_headers
|
||||
oldest_spi = reconciler.direct_get_container_policy_index(
|
||||
self.fake_ring, 'a', 'con')
|
||||
# expired
|
||||
self.assertEqual(oldest_spi, None)
|
||||
|
||||
def test_direct_delete_container_entry(self):
|
||||
mock_path = 'swift.common.direct_client.http_connect'
|
||||
connect_args = []
|
||||
|
Loading…
x
Reference in New Issue
Block a user