Simplify interface to sharder shrinking

Calls to process_compactible_shard_sequences were always followed by
calls to finalize_shrinking, so simplify the sharder module interface
by making process_compactible_shard_sequences call finalize_shrinking.

Change-Id: I22b8d23f32a5e776c37f711a913e4b40425d5e54
This commit is contained in:
Alistair Coles 2021-02-10 11:57:28 +00:00
parent 2aaeab6f5d
commit a3d77cac07
3 changed files with 48 additions and 43 deletions

View File

@ -168,8 +168,7 @@ from swift.common.utils import Timestamp, get_logger, ShardRange
from swift.container.backend import ContainerBroker, UNSHARDED
from swift.container.sharder import make_shard_ranges, sharding_enabled, \
CleavingContext, process_compactible_shard_sequences, \
find_compactible_shard_sequences, find_overlapping_ranges, \
finalize_shrinking
find_compactible_shard_sequences, find_overlapping_ranges
DEFAULT_ROWS_PER_SHARD = 500000
DEFAULT_SHRINK_THRESHOLD = 10000
@ -473,10 +472,7 @@ def compact_shard_ranges(broker, args):
print('No changes applied')
return 0
timestamp = Timestamp.now()
acceptor_ranges, shrinking_ranges = process_compactible_shard_sequences(
compactible, timestamp)
finalize_shrinking(broker, acceptor_ranges, shrinking_ranges, timestamp)
process_compactible_shard_sequences(broker, compactible)
print('Updated %s shard sequences for compaction.' % len(compactible))
print('Run container-replicator to replicate the changes to other '
'nodes.')

View File

@ -305,18 +305,18 @@ def finalize_shrinking(broker, acceptor_ranges, donor_ranges, timestamp):
broker.merge_shard_ranges(acceptor_ranges + donor_ranges)
def process_compactible_shard_sequences(sequences, timestamp):
def process_compactible_shard_sequences(broker, sequences):
"""
Transform the given sequences of shard ranges into a list of acceptors and
a list of shrinking donors. For each given sequence the final ShardRange in
the sequence (the acceptor) is expanded to accommodate the other
ShardRanges in the sequence (the donors).
ShardRanges in the sequence (the donors). The donors and acceptors are then
merged into the broker.
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
:param sequences: A list of :class:`~swift.common.utils.ShardRangeList`
:param timestamp: an instance of :class:`~swift.common.utils.Timestamp`
that is used when updating acceptor range bounds or state
:return: a tuple (acceptor_ranges, shrinking_ranges)
"""
timestamp = Timestamp.now()
acceptor_ranges = []
shrinking_ranges = []
for sequence in sequences:
@ -333,7 +333,7 @@ def process_compactible_shard_sequences(sequences, timestamp):
# Ensure acceptor state is ACTIVE (when acceptor is root)
acceptor.state_timestamp = timestamp
acceptor_ranges.append(acceptor)
return acceptor_ranges, shrinking_ranges
finalize_shrinking(broker, acceptor_ranges, shrinking_ranges, timestamp)
class CleavingContext(object):
@ -622,8 +622,8 @@ class ContainerSharder(ContainerReplicator):
sequences = find_compactible_shard_sequences(
broker, self.shrink_size, self.merge_size,
1, -1)
_, compactible_ranges = process_compactible_shard_sequences(
sequences, Timestamp.now())
# compactible_ranges are all apart from final acceptor in each sequence
compactible_ranges = sum(len(seq) - 1 for seq in sequences)
if compactible_ranges:
own_shard_range = broker.get_own_shard_range()
@ -632,7 +632,7 @@ class ContainerSharder(ContainerReplicator):
# The number of ranges/donors that can be shrunk if the
# tool is used with the current max_shrinking, max_expanding
# settings.
shrink_candidate['compactible_ranges'] = len(compactible_ranges)
shrink_candidate['compactible_ranges'] = compactible_ranges
self.shrinking_candidates.append(shrink_candidate)
def _transform_candidate_stats(self, category, candidates, sort_keys):
@ -1681,10 +1681,7 @@ class ContainerSharder(ContainerReplicator):
self.logger.debug('Found %s compactible sequences of length(s) %s' %
(len(compactible_sequences),
[len(s) for s in compactible_sequences]))
timestamp = Timestamp.now()
acceptors, donors = process_compactible_shard_sequences(
compactible_sequences, timestamp)
finalize_shrinking(broker, acceptors, donors, timestamp)
process_compactible_shard_sequences(broker, compactible_sequences)
own_shard_range = broker.get_own_shard_range()
for sequence in compactible_sequences:
acceptor = sequence[-1]

View File

@ -5588,10 +5588,8 @@ class TestSharder(BaseTestSharder):
compactible = find_compactible_shard_sequences(
broker, sharder.shrink_size, sharder.merge_size, 1, -1)
self.assertNotEqual([], compactible)
timestamp = next(self.ts_iter)
acceptors, donors = process_compactible_shard_sequences(
compactible, timestamp)
finalize_shrinking(broker, acceptors, donors, timestamp)
with mock_timestamp_now(next(self.ts_iter)):
process_compactible_shard_sequences(broker, compactible)
shrink_actionable_ranges(brokers[C2])
sharder._zero_stats()
@ -6223,29 +6221,34 @@ class TestSharderFunctions(BaseTestSharder):
[sr.epoch for sr in updated_ranges[:2]])
def test_process_compactible(self):
ts_0 = next(self.ts_iter)
# no sequences...
acceptors, donors = process_compactible_shard_sequences([], ts_0)
self.assertEqual([], acceptors)
self.assertEqual([], donors)
broker = self._make_broker()
with mock.patch('swift.container.sharder.finalize_shrinking') as fs:
with mock_timestamp_now(next(self.ts_iter)) as now:
process_compactible_shard_sequences(broker, [])
fs.assert_called_once_with(broker, [], [], now)
# two sequences with acceptor bounds needing to be updated
ts_0 = next(self.ts_iter)
sequence_1 = self._make_shard_ranges(
(('a', 'b'), ('b', 'c'), ('c', 'd')),
state=ShardRange.ACTIVE, timestamp=ts_0)
sequence_2 = self._make_shard_ranges(
(('x', 'y'), ('y', 'z')),
state=ShardRange.ACTIVE, timestamp=ts_0)
ts_1 = next(self.ts_iter)
acceptors, donors = process_compactible_shard_sequences(
[sequence_1, sequence_2], ts_1)
with mock.patch('swift.container.sharder.finalize_shrinking') as fs:
with mock_timestamp_now(next(self.ts_iter)) as now:
process_compactible_shard_sequences(
broker, [sequence_1, sequence_2])
expected_donors = sequence_1[:-1] + sequence_2[:-1]
expected_acceptors = [sequence_1[-1].copy(lower='a', timestamp=ts_1),
sequence_2[-1].copy(lower='x', timestamp=ts_1)]
expected_acceptors = [sequence_1[-1].copy(lower='a', timestamp=now),
sequence_2[-1].copy(lower='x', timestamp=now)]
fs.assert_called_once_with(
broker, expected_acceptors, expected_donors, now)
self.assertEqual([dict(sr) for sr in expected_acceptors],
[dict(sr) for sr in acceptors])
[dict(sr) for sr in fs.call_args[0][1]])
self.assertEqual([dict(sr) for sr in expected_donors],
[dict(sr) for sr in donors])
[dict(sr) for sr in fs.call_args[0][2]])
# sequences have already been processed - acceptors expanded
sequence_1 = self._make_shard_ranges(
@ -6254,29 +6257,38 @@ class TestSharderFunctions(BaseTestSharder):
sequence_2 = self._make_shard_ranges(
(('x', 'y'), ('x', 'z')),
state=ShardRange.ACTIVE, timestamp=ts_0)
acceptors, donors = process_compactible_shard_sequences(
[sequence_1, sequence_2], ts_1)
with mock.patch('swift.container.sharder.finalize_shrinking') as fs:
with mock_timestamp_now(next(self.ts_iter)) as now:
process_compactible_shard_sequences(
broker, [sequence_1, sequence_2])
expected_donors = sequence_1[:-1] + sequence_2[:-1]
expected_acceptors = [sequence_1[-1], sequence_2[-1]]
fs.assert_called_once_with(
broker, expected_acceptors, expected_donors, now)
self.assertEqual([dict(sr) for sr in expected_acceptors],
[dict(sr) for sr in acceptors])
[dict(sr) for sr in fs.call_args[0][1]])
self.assertEqual([dict(sr) for sr in expected_donors],
[dict(sr) for sr in donors])
[dict(sr) for sr in fs.call_args[0][2]])
# acceptor is root - needs state to be updated, but not bounds
sequence_1 = self._make_shard_ranges(
(('a', 'b'), ('b', 'c'), ('a', 'd'), ('d', ''), ('', '')),
state=[ShardRange.ACTIVE] * 4 + [ShardRange.SHARDED],
timestamp=ts_0)
acceptors, donors = process_compactible_shard_sequences(
[sequence_1], ts_1)
with mock.patch('swift.container.sharder.finalize_shrinking') as fs:
with mock_timestamp_now(next(self.ts_iter)) as now:
process_compactible_shard_sequences(broker, [sequence_1])
expected_donors = sequence_1[:-1]
expected_acceptors = [sequence_1[-1].copy(state=ShardRange.ACTIVE,
state_timestamp=ts_1)]
state_timestamp=now)]
fs.assert_called_once_with(
broker, expected_acceptors, expected_donors, now)
self.assertEqual([dict(sr) for sr in expected_acceptors],
[dict(sr) for sr in acceptors])
[dict(sr) for sr in fs.call_args[0][1]])
self.assertEqual([dict(sr) for sr in expected_donors],
[dict(sr) for sr in donors])
[dict(sr) for sr in fs.call_args[0][2]])
def test_find_compactible_shard_ranges_in_found_state(self):
broker = self._make_broker()