Merge "Add shrink candidates to recon dump"
This commit is contained in:
commit
8767bbcb99
@ -17,6 +17,7 @@ import errno
|
||||
import json
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from operator import itemgetter
|
||||
from random import random
|
||||
|
||||
import os
|
||||
@ -514,6 +515,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
conf.get('cleave_row_batch_size', 10000))
|
||||
self.auto_shard = config_true_value(conf.get('auto_shard', False))
|
||||
self.sharding_candidates = []
|
||||
self.shrinking_candidates = []
|
||||
self.recon_candidates_limit = int(
|
||||
conf.get('recon_candidates_limit', 5))
|
||||
self.broker_timeout = config_positive_int_value(
|
||||
@ -567,6 +569,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
# stats are maintained under the 'sharding' key in self.stats
|
||||
self.stats['sharding'] = defaultdict(lambda: defaultdict(int))
|
||||
self.sharding_candidates = []
|
||||
self.shrinking_candidates = []
|
||||
|
||||
def _append_stat(self, category, key, value):
|
||||
if not self.stats['sharding'][category][key]:
|
||||
@ -615,11 +618,26 @@ class ContainerSharder(ContainerReplicator):
|
||||
self.sharding_candidates.append(
|
||||
self._make_stats_info(broker, node, own_shard_range))
|
||||
|
||||
def _transform_sharding_candidate_stats(self):
|
||||
category = self.stats['sharding']['sharding_candidates']
|
||||
candidates = self.sharding_candidates
|
||||
def _identify_shrinking_candidate(self, broker, node):
|
||||
sequences = find_compactible_shard_sequences(
|
||||
broker, self.shrink_size, self.merge_size,
|
||||
1, -1)
|
||||
_, compactible_ranges = process_compactible_shard_sequences(
|
||||
sequences, Timestamp.now())
|
||||
|
||||
if compactible_ranges:
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
shrink_candidate = self._make_stats_info(
|
||||
broker, node, own_shard_range)
|
||||
# The number of ranges/donors that can be shrunk if the
|
||||
# tool is used with the current max_shrinking, max_expanding
|
||||
# settings.
|
||||
shrink_candidate['compactible_ranges'] = len(compactible_ranges)
|
||||
self.shrinking_candidates.append(shrink_candidate)
|
||||
|
||||
def _transform_candidate_stats(self, category, candidates, sort_keys):
|
||||
category['found'] = len(candidates)
|
||||
candidates.sort(key=lambda c: c['object_count'], reverse=True)
|
||||
candidates.sort(key=itemgetter(*sort_keys), reverse=True)
|
||||
if self.recon_candidates_limit >= 0:
|
||||
category['top'] = candidates[:self.recon_candidates_limit]
|
||||
else:
|
||||
@ -667,7 +685,16 @@ class ContainerSharder(ContainerReplicator):
|
||||
msg = ' '.join(['%s:%s' % (k, str(stats[k])) for k in keys])
|
||||
self.logger.info('Since %s %s - %s', last_report, category, msg)
|
||||
|
||||
self._transform_sharding_candidate_stats()
|
||||
# transform the sharding and shrinking candidate states
|
||||
# first sharding
|
||||
category = self.stats['sharding']['sharding_candidates']
|
||||
self._transform_candidate_stats(category, self.sharding_candidates,
|
||||
sort_keys=('object_count',))
|
||||
|
||||
# next shrinking
|
||||
category = self.stats['sharding']['shrinking_candidates']
|
||||
self._transform_candidate_stats(category, self.shrinking_candidates,
|
||||
sort_keys=('compactible_ranges',))
|
||||
|
||||
dump_recon_cache(
|
||||
{'sharding_stats': self.stats,
|
||||
@ -1770,6 +1797,8 @@ class ContainerSharder(ContainerReplicator):
|
||||
quote(broker.path))
|
||||
|
||||
if state == SHARDED and broker.is_root_container():
|
||||
# look for shrink stats
|
||||
self._identify_shrinking_candidate(broker, node)
|
||||
if is_leader:
|
||||
self._find_and_enable_shrinking_candidates(broker)
|
||||
self._find_and_enable_sharding_candidates(broker)
|
||||
|
@ -469,6 +469,12 @@ class TestSharder(BaseTestSharder):
|
||||
for call in fake_process_broker_calls[:2]]
|
||||
}
|
||||
})
|
||||
fake_stats.update({
|
||||
'shrinking_candidates': {
|
||||
'found': 0,
|
||||
'top': []
|
||||
}
|
||||
})
|
||||
check_recon(recon_data[0], sum(fake_periods[1:3]),
|
||||
sum(fake_periods[:3]), fake_stats)
|
||||
# periodic stats report after first broker has been visited during
|
||||
@ -5464,6 +5470,231 @@ class TestSharder(BaseTestSharder):
|
||||
newish_ctx = get_context(id_newish, broker)
|
||||
self.assertEqual(newish_ctx, "")
|
||||
|
||||
def test_shrinking_candidate_recon_dump(self):
|
||||
conf = {'recon_cache_path': self.tempdir,
|
||||
'devices': self.tempdir}
|
||||
|
||||
shard_bounds = (
|
||||
('', 'd'), ('d', 'g'), ('g', 'l'), ('l', 'o'), ('o', 't'),
|
||||
('t', 'x'), ('x', ''))
|
||||
|
||||
with self._mock_sharder(conf) as sharder:
|
||||
brokers = []
|
||||
shard_ranges = []
|
||||
C1, C2, C3 = 0, 1, 2
|
||||
|
||||
for container in ('c1', 'c2', 'c3'):
|
||||
broker = self._make_broker(
|
||||
container=container, hash_=container + 'hash',
|
||||
device=sharder.ring.devs[0]['device'], part=0)
|
||||
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||
('true', next(self.ts_iter).internal)})
|
||||
my_sr = broker.get_own_shard_range()
|
||||
my_sr.epoch = Timestamp.now()
|
||||
broker.merge_shard_ranges([my_sr])
|
||||
brokers.append(broker)
|
||||
shard_ranges.append(self._make_shard_ranges(
|
||||
shard_bounds, state=ShardRange.ACTIVE,
|
||||
object_count=(DEFAULT_SHARD_CONTAINER_THRESHOLD / 2),
|
||||
timestamp=next(self.ts_iter)))
|
||||
|
||||
# we want c2 to have 2 shrink pairs
|
||||
shard_ranges[C2][1].object_count = 0
|
||||
shard_ranges[C2][3].object_count = 0
|
||||
brokers[C2].merge_shard_ranges(shard_ranges[C2])
|
||||
brokers[C2].set_sharding_state()
|
||||
brokers[C2].set_sharded_state()
|
||||
|
||||
# we want c1 to have the same, but one can't be shrunk
|
||||
shard_ranges[C1][1].object_count = 0
|
||||
shard_ranges[C1][2].object_count = \
|
||||
DEFAULT_SHARD_CONTAINER_THRESHOLD - 1
|
||||
shard_ranges[C1][3].object_count = 0
|
||||
brokers[C1].merge_shard_ranges(shard_ranges[C1])
|
||||
brokers[C1].set_sharding_state()
|
||||
brokers[C1].set_sharded_state()
|
||||
|
||||
# c3 we want to have more total_sharding donors then can be sharded
|
||||
# in one go.
|
||||
shard_ranges[C3][0].object_count = 0
|
||||
shard_ranges[C3][1].object_count = 0
|
||||
shard_ranges[C3][2].object_count = 0
|
||||
shard_ranges[C3][3].object_count = 0
|
||||
shard_ranges[C3][4].object_count = 0
|
||||
shard_ranges[C3][5].object_count = 0
|
||||
brokers[C3].merge_shard_ranges(shard_ranges[C3])
|
||||
brokers[C3].set_sharding_state()
|
||||
brokers[C3].set_sharded_state()
|
||||
|
||||
node = {'ip': '10.0.0.0', 'replication_ip': '10.0.1.0',
|
||||
'port': 1000, 'replication_port': 1100,
|
||||
'device': 'sda', 'zone': 0, 'region': 0, 'id': 1,
|
||||
'index': 0}
|
||||
|
||||
for broker in brokers:
|
||||
sharder._identify_shrinking_candidate(broker, node)
|
||||
|
||||
sharder._report_stats()
|
||||
expected_shrinking_candidates_data = {
|
||||
'found': 3,
|
||||
'top': [
|
||||
{
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C3].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C3].container,
|
||||
'file_size': os.stat(brokers[C3].db_file).st_size,
|
||||
'path': brokers[C3].db_file,
|
||||
'root': brokers[C3].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 3
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C2].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C2].container,
|
||||
'file_size': os.stat(brokers[1].db_file).st_size,
|
||||
'path': brokers[C2].db_file,
|
||||
'root': brokers[C2].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 2
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C1].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C1].container,
|
||||
'file_size': os.stat(brokers[C1].db_file).st_size,
|
||||
'path': brokers[C1].db_file,
|
||||
'root': brokers[C1].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 1
|
||||
}
|
||||
]}
|
||||
self._assert_recon_stats(expected_shrinking_candidates_data,
|
||||
sharder, 'shrinking_candidates')
|
||||
|
||||
# check shrinking stats are reset
|
||||
sharder._zero_stats()
|
||||
for broker in brokers:
|
||||
sharder._identify_shrinking_candidate(broker, node)
|
||||
sharder._report_stats()
|
||||
self._assert_recon_stats(expected_shrinking_candidates_data,
|
||||
sharder, 'shrinking_candidates')
|
||||
|
||||
# set some ranges to shrinking and check that stats are updated; in
|
||||
# this case the container C2 no longer has any shrinkable ranges
|
||||
# and no longer appears in stats
|
||||
def shrink_actionable_ranges(broker):
|
||||
compactible = find_compactible_shard_sequences(
|
||||
broker, sharder.shrink_size, sharder.merge_size, 1, -1)
|
||||
self.assertNotEqual([], compactible)
|
||||
timestamp = next(self.ts_iter)
|
||||
acceptors, donors = process_compactible_shard_sequences(
|
||||
compactible, timestamp)
|
||||
finalize_shrinking(broker, acceptors, donors, timestamp)
|
||||
|
||||
shrink_actionable_ranges(brokers[C2])
|
||||
sharder._zero_stats()
|
||||
for broker in brokers:
|
||||
sharder._identify_shrinking_candidate(broker, node)
|
||||
sharder._report_stats()
|
||||
expected_shrinking_candidates_data = {
|
||||
'found': 2,
|
||||
'top': [
|
||||
{
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C3].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C3].container,
|
||||
'file_size': os.stat(brokers[C3].db_file).st_size,
|
||||
'path': brokers[C3].db_file,
|
||||
'root': brokers[C3].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 3
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C1].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C1].container,
|
||||
'file_size': os.stat(brokers[C1].db_file).st_size,
|
||||
'path': brokers[C1].db_file,
|
||||
'root': brokers[C1].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 1
|
||||
}
|
||||
]}
|
||||
self._assert_recon_stats(expected_shrinking_candidates_data,
|
||||
sharder, 'shrinking_candidates')
|
||||
|
||||
# set some ranges to shrinking and check that stats are updated; in
|
||||
# this case the container C3 no longer has any actionable ranges
|
||||
# and no longer appears in stats
|
||||
shrink_actionable_ranges(brokers[C3])
|
||||
sharder._zero_stats()
|
||||
for broker in brokers:
|
||||
sharder._identify_shrinking_candidate(broker, node)
|
||||
sharder._report_stats()
|
||||
expected_shrinking_candidates_data = {
|
||||
'found': 1,
|
||||
'top': [
|
||||
{
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C1].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C1].container,
|
||||
'file_size': os.stat(brokers[C1].db_file).st_size,
|
||||
'path': brokers[C1].db_file,
|
||||
'root': brokers[C1].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 1
|
||||
}
|
||||
]}
|
||||
self._assert_recon_stats(expected_shrinking_candidates_data,
|
||||
sharder, 'shrinking_candidates')
|
||||
|
||||
# set some ranges to shrunk in C3 so that other sequences become
|
||||
# compactible
|
||||
now = next(self.ts_iter)
|
||||
shard_ranges = brokers[C3].get_shard_ranges()
|
||||
for (donor, acceptor) in zip(shard_ranges, shard_ranges[1:]):
|
||||
if donor.state == ShardRange.SHRINKING:
|
||||
donor.update_state(ShardRange.SHRUNK, state_timestamp=now)
|
||||
donor.set_deleted(timestamp=now)
|
||||
acceptor.lower = donor.lower
|
||||
acceptor.timestamp = now
|
||||
brokers[C3].merge_shard_ranges(shard_ranges)
|
||||
sharder._zero_stats()
|
||||
for broker in brokers:
|
||||
sharder._identify_shrinking_candidate(broker, node)
|
||||
sharder._report_stats()
|
||||
expected_shrinking_candidates_data = {
|
||||
'found': 2,
|
||||
'top': [
|
||||
{
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C3].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C3].container,
|
||||
'file_size': os.stat(brokers[C3].db_file).st_size,
|
||||
'path': brokers[C3].db_file,
|
||||
'root': brokers[C3].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 2
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C1].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C1].container,
|
||||
'file_size': os.stat(brokers[C1].db_file).st_size,
|
||||
'path': brokers[C1].db_file,
|
||||
'root': brokers[C1].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 1
|
||||
}
|
||||
]}
|
||||
self._assert_recon_stats(expected_shrinking_candidates_data,
|
||||
sharder, 'shrinking_candidates')
|
||||
|
||||
|
||||
class TestCleavingContext(BaseTestSharder):
|
||||
def test_init(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user