Refactor rate-limiting helper into a class

Replaces the ratelimit_sleep helper function with an
EventletRateLimiter class that encapsulates the rate-limiting state
that previously needed to be maintained by the caller of the function.

The ratelimit_sleep function is retained but deprecated, and now
forwards to the EventletRateLimiter class.

The object updater's BucketizedUpdateSkippingLimiter is refactored to
take advantage of the new EventletRateLimiter class.

The rate limiting algorithm is corrected to make the allowed request
rate more uniform: previously pairs of requests would be allowed in
rapid succession before the rate limiter would the sleep for the time
allowance consumed by those two requests; now the rate limiter will
sleep as required after each allowed request. For example, before a
max_rate of 1 per second might result in 2 requests being allowed
followed by a 2 second sleep. That is corrected to be a sleep of 1
second after each request.

Change-Id: Ibcf4dbeb4332dee7e9e233473d4ceaf75a5a85c7
This commit is contained in:
Alistair Coles 2022-03-23 18:26:50 +00:00
parent 954032d5d2
commit 5227cb702b
8 changed files with 271 additions and 100 deletions

View File

@ -22,7 +22,7 @@ from eventlet import Timeout
import swift.common.db
from swift.common.utils import get_logger, audit_location_generator, \
config_true_value, dump_recon_cache, ratelimit_sleep
config_true_value, dump_recon_cache, EventletRateLimiter
from swift.common.daemon import Daemon
from swift.common.exceptions import DatabaseAuditorException
from swift.common.recon import DEFAULT_RECON_CACHE_PATH, \
@ -56,9 +56,9 @@ class DatabaseAuditor(Daemon):
self.logging_interval = 3600 # once an hour
self.passes = 0
self.failures = 0
self.running_time = 0
self.max_dbs_per_second = \
float(conf.get('{}s_per_second'.format(self.server_type), 200))
self.rate_limiter = EventletRateLimiter(self.max_dbs_per_second)
swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f'))
self.recon_cache_path = conf.get('recon_cache_path',
@ -88,8 +88,7 @@ class DatabaseAuditor(Daemon):
reported = time.time()
self.passes = 0
self.failures = 0
self.running_time = ratelimit_sleep(
self.running_time, self.max_dbs_per_second)
self.rate_limiter.wait()
return reported
def run_forever(self, *args, **kwargs):

View File

@ -1723,7 +1723,7 @@ class RateLimitedIterator(object):
self.iterator = iter(iterable)
self.elements_per_second = elements_per_second
self.limit_after = limit_after
self.running_time = 0
self.rate_limiter = EventletRateLimiter(elements_per_second)
self.ratelimit_if = ratelimit_if
def __iter__(self):
@ -1736,8 +1736,7 @@ class RateLimitedIterator(object):
if self.limit_after > 0:
self.limit_after -= 1
else:
self.running_time = ratelimit_sleep(self.running_time,
self.elements_per_second)
self.rate_limiter.wait()
return next_value
__next__ = next
@ -3451,6 +3450,91 @@ def audit_location_generator(devices, datadir, suffix='',
hook_post_device(os.path.join(devices, device))
class AbstractRateLimiter(object):
# 1,000 milliseconds = 1 second
clock_accuracy = 1000.0
def __init__(self, max_rate, rate_buffer=5, running_time=0):
"""
:param max_rate: The maximum rate per second allowed for the process.
Must be > 0 to engage rate-limiting behavior.
:param rate_buffer: Number of seconds the rate counter can drop and be
allowed to catch up (at a faster than listed rate). A larger number
will result in larger spikes in rate but better average accuracy.
:param running_time: The running time in milliseconds of the next
allowable request. Setting this to any time in the past will cause
the rate limiter to immediately allow requests; setting this to a
future time will cause the rate limiter to deny requests until that
time.
"""
self.max_rate = max_rate
self.rate_buffer_ms = rate_buffer * self.clock_accuracy
self.running_time = running_time
self.time_per_incr = (self.clock_accuracy / self.max_rate
if self.max_rate else 0)
def _sleep(self, seconds):
# subclasses should override to implement a sleep
raise NotImplementedError
def is_allowed(self, incr_by=1, now=None, block=False):
"""
Check if the calling process is allowed to proceed according to the
rate limit.
:param incr_by: How much to increment the counter. Useful if you want
to ratelimit 1024 bytes/sec and have differing sizes
of requests. Must be > 0 to engage rate-limiting
behavior.
:param now: The time in seconds; defaults to time.time()
:param block: if True, the call will sleep until the calling process
is allowed to proceed; otherwise the call returns immediately.
:return: True if the the calling process is allowed to proceed, False
otherwise.
"""
if self.max_rate <= 0 or incr_by <= 0:
return True
now = now or time.time()
# Convert seconds to milliseconds
now = now * self.clock_accuracy
# Calculate time per request in milliseconds
time_per_request = self.time_per_incr * float(incr_by)
# Convert rate_buffer to milliseconds and compare
if now - self.running_time > self.rate_buffer_ms:
self.running_time = now
if now >= self.running_time:
self.running_time += time_per_request
allowed = True
elif block:
sleep_time = (self.running_time - now) / self.clock_accuracy
# increment running time before sleeping in case the sleep allows
# another thread to inspect the rate limiter state
self.running_time += time_per_request
# Convert diff to a floating point number of seconds and sleep
self._sleep(sleep_time)
allowed = True
else:
allowed = False
return allowed
def wait(self, incr_by=1, now=None):
self.is_allowed(incr_by=incr_by, now=now, block=True)
class EventletRateLimiter(AbstractRateLimiter):
def __init__(self, max_rate, rate_buffer=5, running_time=0):
super(EventletRateLimiter, self).__init__(
max_rate, rate_buffer, running_time)
def _sleep(self, seconds):
eventlet.sleep(seconds)
def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
"""
Will eventlet.sleep() for the appropriate time so that the max_rate
@ -3471,30 +3555,18 @@ def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
A larger number will result in larger spikes in rate
but better average accuracy. Must be > 0 to engage
rate-limiting behavior.
:return: The absolute time for the next interval in milliseconds; note
that time could have passed well beyond that point, but the next call
will catch that and skip the sleep.
"""
if max_rate <= 0 or incr_by <= 0:
return running_time
# 1,000 milliseconds = 1 second
clock_accuracy = 1000.0
# Convert seconds to milliseconds
now = time.time() * clock_accuracy
# Calculate time per request in milliseconds
time_per_request = clock_accuracy * (float(incr_by) / max_rate)
# Convert rate_buffer to milliseconds and compare
if now - running_time > rate_buffer * clock_accuracy:
running_time = now
elif running_time - now > time_per_request:
# Convert diff back to a floating point number of seconds and sleep
eventlet.sleep((running_time - now) / clock_accuracy)
# Return the absolute time for the next interval in milliseconds; note
# that time could have passed well beyond that point, but the next call
# will catch that and skip the sleep.
return running_time + time_per_request
warnings.warn(
'ratelimit_sleep() is deprecated; use the ``EventletRateLimiter`` '
'class instead.', DeprecationWarning
)
rate_limit = EventletRateLimiter(max_rate, rate_buffer=rate_buffer,
running_time=running_time)
rate_limit.wait(incr_by=incr_by)
return rate_limit.running_time
class ContextPool(GreenPool):

View File

@ -31,7 +31,7 @@ from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout, LockTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, config_true_value, \
dump_recon_cache, majority_size, Timestamp, ratelimit_sleep, \
dump_recon_cache, majority_size, Timestamp, EventletRateLimiter, \
eventlet_monkey_patch
from swift.common.daemon import Daemon
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
@ -59,10 +59,10 @@ class ContainerUpdater(Daemon):
float(conf.get('slowdown', '0.01')) + 0.01)
else:
containers_per_second = 50
self.containers_running_time = 0
self.max_containers_per_second = \
float(conf.get('containers_per_second',
containers_per_second))
self.rate_limiter = EventletRateLimiter(self.max_containers_per_second)
self.node_timeout = float(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.no_changes = 0
@ -226,9 +226,7 @@ class ContainerUpdater(Daemon):
self.logger.exception(
"Error processing container %s: %s", dbfile, e)
self.containers_running_time = ratelimit_sleep(
self.containers_running_time,
self.max_containers_per_second)
self.rate_limiter.wait()
def process_container(self, dbfile):
"""

View File

@ -30,7 +30,7 @@ from swift.common.daemon import Daemon
from swift.common.storage_policy import POLICIES
from swift.common.utils import (
config_auto_int_value, dump_recon_cache, get_logger, list_from_csv,
listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep,
listdir, load_pkg_resource, parse_prefixed_conf, EventletRateLimiter,
readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter)
from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH
@ -85,8 +85,10 @@ class AuditorWorker(object):
self.auditor_type = 'ZBF'
self.log_time = int(conf.get('log_time', 3600))
self.last_logged = 0
self.files_running_time = 0
self.bytes_running_time = 0
self.files_rate_limiter = EventletRateLimiter(
self.max_files_per_second)
self.bytes_rate_limiter = EventletRateLimiter(
self.max_bytes_per_second)
self.bytes_processed = 0
self.total_bytes_processed = 0
self.total_files_processed = 0
@ -146,8 +148,7 @@ class AuditorWorker(object):
loop_time = time.time()
self.failsafe_object_audit(location)
self.logger.timing_since('timing', loop_time)
self.files_running_time = ratelimit_sleep(
self.files_running_time, self.max_files_per_second)
self.files_rate_limiter.wait()
self.total_files_processed += 1
now = time.time()
if now - self.last_logged >= self.log_time:
@ -266,10 +267,7 @@ class AuditorWorker(object):
with closing(reader):
for chunk in reader:
chunk_len = len(chunk)
self.bytes_running_time = ratelimit_sleep(
self.bytes_running_time,
self.max_bytes_per_second,
incr_by=chunk_len)
self.bytes_rate_limiter.wait(incr_by=chunk_len)
self.bytes_processed += chunk_len
self.total_bytes_processed += chunk_len
for watcher in self.watchers:

View File

@ -33,7 +33,8 @@ from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
non_negative_float, config_positive_int_value, non_negative_int
non_negative_float, config_positive_int_value, non_negative_int, \
EventletRateLimiter
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@ -43,16 +44,17 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
HTTP_MOVED_PERMANENTLY
class RateLimiterBucket(object):
def __init__(self, update_delta):
self.update_delta = update_delta
self.last_time = 0
class RateLimiterBucket(EventletRateLimiter):
"""
Extends EventletRateLimiter to also maintain a deque of items that have
been deferred due to rate-limiting, and to provide a comparator for sorting
instanced by readiness.
"""
def __init__(self, max_updates_per_second):
super(RateLimiterBucket, self).__init__(max_updates_per_second,
rate_buffer=0)
self.deque = deque()
@property
def wait_until(self):
return self.last_time + self.update_delta
def __len__(self):
return len(self.deque)
@ -62,10 +64,10 @@ class RateLimiterBucket(object):
__nonzero__ = __bool__ # py2
def __lt__(self, other):
# used to sort buckets by readiness
# used to sort RateLimiterBuckets by readiness
if isinstance(other, RateLimiterBucket):
return self.wait_until < other.wait_until
return self.wait_until < other
return self.running_time < other.running_time
return self.running_time < other
class BucketizedUpdateSkippingLimiter(object):
@ -124,15 +126,11 @@ class BucketizedUpdateSkippingLimiter(object):
self.stats = stats
# if we want a smaller "blast radius" we could make this number bigger
self.num_buckets = max(num_buckets, 1)
try:
self.bucket_update_delta = 1.0 / max_elements_per_group_per_second
except ZeroDivisionError:
self.bucket_update_delta = -1
self.max_deferred_elements = max_deferred_elements
self.deferred_buckets = deque()
self.drain_until = drain_until
self.salt = str(uuid.uuid4())
self.buckets = [RateLimiterBucket(self.bucket_update_delta)
self.buckets = [RateLimiterBucket(max_elements_per_group_per_second)
for _ in range(self.num_buckets)]
self.buckets_ordered_by_readiness = None
@ -151,9 +149,8 @@ class BucketizedUpdateSkippingLimiter(object):
for update_ctx in self.iterator:
bucket = self.buckets[self._bucket_key(update_ctx['update'])]
now = self._get_time()
if now >= bucket.wait_until:
if bucket.is_allowed(now=now):
# no need to ratelimit, just return next update
bucket.last_time = now
return update_ctx
self.stats.deferrals += 1
@ -194,13 +191,12 @@ class BucketizedUpdateSkippingLimiter(object):
bucket = self.buckets_ordered_by_readiness.get_nowait()
if now < self.drain_until:
# wait for next element to be ready
time.sleep(max(0, bucket.wait_until - now))
bucket.wait(now=now)
# drain the most recently deferred element
item = bucket.deque.pop()
if bucket:
# bucket has more deferred elements, re-insert in queue in
# correct chronological position
bucket.last_time = self._get_time()
self.buckets_ordered_by_readiness.put(bucket)
self.stats.drains += 1
self.logger.increment("drains")

View File

@ -2423,7 +2423,7 @@ class TestSloGetManifest(SloTestCase):
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK') # sanity check
self.assertEqual(sleeps, [2.0, 2.0, 2.0, 2.0, 2.0])
self.assertEqual(sleeps, [1.0] * 11)
# give the client the first 4 segments without ratelimiting; we'll
# sleep less
@ -2435,7 +2435,7 @@ class TestSloGetManifest(SloTestCase):
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK') # sanity check
self.assertEqual(sleeps, [2.0, 2.0, 2.0])
self.assertEqual(sleeps, [1.0] * 7)
# ratelimit segments under 35 bytes; this affects a-f
del sleeps[:]
@ -2446,7 +2446,7 @@ class TestSloGetManifest(SloTestCase):
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK') # sanity check
self.assertEqual(sleeps, [2.0, 2.0])
self.assertEqual(sleeps, [1.0] * 5)
# ratelimit segments under 36 bytes; this now affects a-g, netting
# us one more sleep than before
@ -2458,7 +2458,7 @@ class TestSloGetManifest(SloTestCase):
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK') # sanity check
self.assertEqual(sleeps, [2.0, 2.0, 2.0])
self.assertEqual(sleeps, [1.0] * 6)
def test_get_manifest_with_submanifest(self):
req = Request.blank(

View File

@ -5832,6 +5832,125 @@ class TestAffinityLocalityPredicate(unittest.TestCase):
utils.affinity_locality_predicate, 'r1z1=1')
class TestEventletRateLimiter(unittest.TestCase):
def test_init(self):
rl = utils.EventletRateLimiter(0.1)
self.assertEqual(0.1, rl.max_rate)
self.assertEqual(0.0, rl.running_time)
self.assertEqual(5000, rl.rate_buffer_ms)
rl = utils.EventletRateLimiter(
0.2, rate_buffer=2, running_time=1234567.8)
self.assertEqual(0.2, rl.max_rate)
self.assertEqual(1234567.8, rl.running_time)
self.assertEqual(2000, rl.rate_buffer_ms)
def test_non_blocking(self):
rate_limiter = utils.EventletRateLimiter(0.1, rate_buffer=0)
with patch('time.time',) as mock_time:
with patch('eventlet.sleep') as mock_sleep:
mock_time.return_value = 0
self.assertTrue(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
self.assertFalse(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
mock_time.return_value = 9.99
self.assertFalse(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
mock_time.return_value = 10.0
self.assertTrue(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
self.assertFalse(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
rate_limiter = utils.EventletRateLimiter(0.1, rate_buffer=20)
with patch('time.time',) as mock_time:
with patch('eventlet.sleep') as mock_sleep:
mock_time.return_value = 20.0
self.assertTrue(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
self.assertTrue(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
self.assertTrue(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
self.assertFalse(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
def _do_test(self, max_rate, running_time, start_time, rate_buffer,
incr_by=1.0):
rate_limiter = utils.EventletRateLimiter(
max_rate,
running_time=1000 * running_time, # msecs
rate_buffer=rate_buffer)
grant_times = []
current_time = [start_time]
def mock_time():
return current_time[0]
def mock_sleep(duration):
current_time[0] += duration
with patch('time.time', mock_time):
with patch('eventlet.sleep', mock_sleep):
for i in range(5):
rate_limiter.wait(incr_by=incr_by)
grant_times.append(current_time[0])
return [round(t, 6) for t in grant_times]
def test_ratelimit(self):
grant_times = self._do_test(1, 0, 1, 0)
self.assertEqual([1, 2, 3, 4, 5], grant_times)
grant_times = self._do_test(10, 0, 1, 0)
self.assertEqual([1, 1.1, 1.2, 1.3, 1.4], grant_times)
grant_times = self._do_test(.1, 0, 1, 0)
self.assertEqual([1, 11, 21, 31, 41], grant_times)
grant_times = self._do_test(.1, 11, 1, 0)
self.assertEqual([11, 21, 31, 41, 51], grant_times)
def test_incr_by(self):
grant_times = self._do_test(1, 0, 1, 0, incr_by=2.5)
self.assertEqual([1, 3.5, 6, 8.5, 11], grant_times)
def test_burst(self):
grant_times = self._do_test(1, 1, 4, 0)
self.assertEqual([4, 5, 6, 7, 8], grant_times)
grant_times = self._do_test(1, 1, 4, 1)
self.assertEqual([4, 5, 6, 7, 8], grant_times)
grant_times = self._do_test(1, 1, 4, 2)
self.assertEqual([4, 5, 6, 7, 8], grant_times)
grant_times = self._do_test(1, 1, 4, 3)
self.assertEqual([4, 4, 4, 4, 5], grant_times)
grant_times = self._do_test(1, 1, 4, 4)
self.assertEqual([4, 4, 4, 4, 5], grant_times)
grant_times = self._do_test(1, 1, 3, 3)
self.assertEqual([3, 3, 3, 4, 5], grant_times)
grant_times = self._do_test(1, 0, 2, 3)
self.assertEqual([2, 2, 2, 3, 4], grant_times)
grant_times = self._do_test(1, 1, 3, 3)
self.assertEqual([3, 3, 3, 4, 5], grant_times)
grant_times = self._do_test(1, 0, 3, 3)
self.assertEqual([3, 3, 3, 3, 4], grant_times)
grant_times = self._do_test(1, 1, 3, 3)
self.assertEqual([3, 3, 3, 4, 5], grant_times)
grant_times = self._do_test(1, 0, 4, 3)
self.assertEqual([4, 5, 6, 7, 8], grant_times)
class TestRateLimitedIterator(unittest.TestCase):
def run_under_pseudo_time(

View File

@ -1652,7 +1652,7 @@ class TestObjectUpdater(unittest.TestCase):
len(self._find_async_pending_files()))
# indexes 0, 2 succeed; 1, 3, 4 deferred but 1 is bumped from deferral
# queue by 4; 4, 3 are then drained
latencies = [0, 0.05, .051, 0, 0, 0, .11, .01]
latencies = [0, 0.05, .051, 0, 0, 0, .11]
expected_success = 4
contexts_fed_in = []
@ -1693,7 +1693,7 @@ class TestObjectUpdater(unittest.TestCase):
fake_object_update), \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator), \
mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
@ -1719,7 +1719,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual([aorder[o] for o in expected_updates_sent],
[aorder[o] for o in actual_updates_sent])
self.assertEqual([0, 0, 0, 0, 0, 1, 1, 1], captured_skips_stats)
self.assertEqual([0, 0, 0, 0, 0, 1, 1], captured_skips_stats)
expected_deferrals = [
[],
@ -1729,7 +1729,6 @@ class TestObjectUpdater(unittest.TestCase):
[objs_fed_in[1], objs_fed_in[3]],
[objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[3]], # note: rightmost element is drained
[objs_fed_in[3]],
]
self.assertEqual(
expected_deferrals,
@ -1776,7 +1775,7 @@ class TestObjectUpdater(unittest.TestCase):
# first pass: 0, 2 and 5 succeed, 1, 3, 4, 6 deferred
# last 2 deferred items sent before interval elapses
latencies = [0, .05, 0.051, 0, 0, .11, 0, 0,
0.1, 0, 0.1, 0] # total 0.42
0.1, 0.1, 0] # total 0.411
expected_success = 5
contexts_fed_in = []
@ -1820,7 +1819,7 @@ class TestObjectUpdater(unittest.TestCase):
fake_object_update), \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator), \
mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
@ -1840,7 +1839,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(expected_updates_sent, actual_updates_sent)
# skips (un-drained deferrals) not reported until end of cycle
self.assertEqual([0] * 12, captured_skips_stats)
self.assertEqual([0] * 10, captured_skips_stats)
objs_fed_in = [ctx['update']['obj'] for ctx in contexts_fed_in]
expected_deferrals = [
@ -1856,8 +1855,6 @@ class TestObjectUpdater(unittest.TestCase):
# note: rightmost element is drained
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4], objs_fed_in[6]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[1], objs_fed_in[3]],
[objs_fed_in[1], objs_fed_in[3]],
]
self.assertEqual(
@ -1911,21 +1908,21 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
it = object_updater.BucketizedUpdateSkippingLimiter(
[3, 1], self.logger, self.stats, 1000, 10)
self.assertEqual(1000, it.num_buckets)
self.assertEqual(0.1, it.bucket_update_delta)
self.assertEqual([10] * 1000, [b.max_rate for b in it.buckets])
self.assertEqual([3, 1], [x for x in it.iterator])
# rate of 0 implies unlimited
it = object_updater.BucketizedUpdateSkippingLimiter(
iter([3, 1]), self.logger, self.stats, 9, 0)
self.assertEqual(9, it.num_buckets)
self.assertEqual(-1, it.bucket_update_delta)
self.assertEqual([0] * 9, [b.max_rate for b in it.buckets])
self.assertEqual([3, 1], [x for x in it.iterator])
# num_buckets is collared at 1
it = object_updater.BucketizedUpdateSkippingLimiter(
iter([3, 1]), self.logger, self.stats, 0, 1)
self.assertEqual(1, it.num_buckets)
self.assertEqual(1, it.bucket_update_delta)
self.assertEqual([1], [b.max_rate for b in it.buckets])
self.assertEqual([3, 1], [x for x in it.iterator])
def test_iteration_unlimited(self):
@ -1963,7 +1960,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# enough capacity for all deferrals
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now, now]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
max_deferred_elements=2,
@ -1982,7 +1979,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# only space for one deferral
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
max_deferred_elements=1,
@ -2000,7 +1997,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# only time for one deferral
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now + 20, now + 20]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
max_deferred_elements=2,
@ -2019,7 +2016,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now,
now + 20, now + 20]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), self.logger, self.stats, 1, 10,
max_deferred_elements=2,
@ -2048,7 +2045,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# deferrals stick in both buckets
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(12)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
@ -2073,7 +2070,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# oldest deferral bumped from one bucket due to max_deferrals == 3
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(10)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
@ -2097,7 +2094,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# older deferrals bumped from one bucket due to max_deferrals == 2
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(10)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
@ -2119,16 +2116,8 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
class TestRateLimiterBucket(unittest.TestCase):
def test_wait_until(self):
b1 = object_updater.RateLimiterBucket(10)
self.assertEqual(10, b1.wait_until)
b1.last_time = b1.wait_until
self.assertEqual(20, b1.wait_until)
b1.last_time = 12345.678
self.assertEqual(12355.678, b1.wait_until)
def test_len(self):
b1 = object_updater.RateLimiterBucket(10)
b1 = object_updater.RateLimiterBucket(0.1)
b1.deque.append(1)
b1.deque.append(2)
self.assertEqual(2, len(b1))
@ -2136,7 +2125,7 @@ class TestRateLimiterBucket(unittest.TestCase):
self.assertEqual(1, len(b1))
def test_bool(self):
b1 = object_updater.RateLimiterBucket(10)
b1 = object_updater.RateLimiterBucket(0.1)
self.assertFalse(b1)
b1.deque.append(1)
self.assertTrue(b1)
@ -2148,13 +2137,13 @@ class TestRateLimiterBucket(unittest.TestCase):
b1 = object_updater.RateLimiterBucket(10)
b2 = object_updater.RateLimiterBucket(10)
b2.last_time = next(time_iter)
b2.running_time = next(time_iter)
buckets = PriorityQueue()
buckets.put(b1)
buckets.put(b2)
self.assertEqual([b1, b2], [buckets.get_nowait() for _ in range(2)])
b1.last_time = next(time_iter)
b1.running_time = next(time_iter)
buckets.put(b1)
buckets.put(b2)
self.assertEqual([b2, b1], [buckets.get_nowait() for _ in range(2)])