diff --git a/doc/source/config/object_server_config.rst b/doc/source/config/object_server_config.rst index 8dc5e3127d..7f5c4737ec 100644 --- a/doc/source/config/object_server_config.rst +++ b/doc/source/config/object_server_config.rst @@ -466,6 +466,63 @@ handoffs_only false The handoffs_only mode op temporary use and should be disabled as soon as the emergency situation has been resolved. +rebuild_handoff_node_count 2 The default strategy for unmounted + drives will stage + rebuilt data on a + handoff node until + updated rings are + deployed. Because + fragments are rebuilt on + offset handoffs based on + fragment index and the + proxy limits how deep it + will search for EC frags + we restrict how many + nodes we'll try. + Setting to 0 will + disable rebuilds to + handoffs and only + rebuild fragments for + unmounted devices to + mounted primaries after + a ring change. Setting + to -1 means "no limit". +max_objects_per_revert 0 By default the reconstructor + attempts to revert all + objects from handoff + partitions in a single + batch using a single + SSYNC request. In + exceptional + circumstances + max_objects_per_revert + can be used to + temporarily limit the + number of objects + reverted by each + reconstructor revert + type job. If more than + max_objects_per_revert + are available in a + sender's handoff + partition, the remaining + objects will remain in + the handoff partition + and will not be reverted + until the next time the + reconstructor visits + that handoff partition + i.e. with this option + set, a single cycle of + the reconstructor may + not completely revert + all handoff partitions. + The option has no effect + on reconstructor sync + type jobs between + primary partitions. A + value of 0 (the default) + means there is no limit. node_timeout DEFAULT or 10 Request timeout to external services. The value used is the value set in this section, or the value set diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index db4d226360..e9557b6c27 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -358,6 +358,7 @@ use = egg:swift#recon # lockup_timeout = 1800 # ring_check_interval = 15.0 # recon_cache_path = /var/cache/swift +# # The handoffs_only mode option is for special case emergency situations during # rebalance such as disk full in the cluster. This option SHOULD NOT BE # CHANGED, except for extreme situations. When handoffs_only mode is enabled @@ -380,6 +381,19 @@ use = egg:swift#recon # Setting to -1 means "no limit". # rebuild_handoff_node_count = 2 # +# By default the reconstructor attempts to revert all objects from handoff +# partitions in a single batch using a single SSYNC request. In exceptional +# circumstances max_objects_per_revert can be used to temporarily limit the +# number of objects reverted by each reconstructor revert type job. If more +# than max_objects_per_revert are available in a sender's handoff partition, +# the remaining objects will remain in the handoff partition and will not be +# reverted until the next time the reconstructor visits that handoff partition +# i.e. with this option set, a single cycle of the reconstructor may not +# completely revert all handoff partitions. The option has no effect on +# reconstructor sync type jobs between primary partitions. A value of 0 (the +# default) means there is no limit. +# max_objects_per_revert = 0 +# # You can set scheduling priority of processes. Niceness values range from -20 # (most favorable to the process) to 19 (least favorable to the process). # nice_priority = diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index accda48510..c1c3636492 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -241,7 +241,8 @@ class ObjectReconstructor(Daemon): conf.get('reclaim_age', DEFAULT_RECLAIM_AGE))) self.request_node_count = config_request_node_count_value( conf.get('request_node_count', '2 * replicas')) - + self.max_objects_per_revert = non_negative_int( + conf.get('max_objects_per_revert', 0)) # When upgrading from liberasurecode<=1.5.0, you may want to continue # writing legacy CRCs until all nodes are upgraded and capabale of # reading fragments with zlib CRCs. @@ -1058,9 +1059,13 @@ class ObjectReconstructor(Daemon): if not suffixes: continue - # ssync any out-of-sync suffixes with the remote node + # ssync any out-of-sync suffixes with the remote node; do not limit + # max_objects - we need to check them all because, unlike a revert + # job, we don't purge any objects so start with the same set each + # cycle success, _ = ssync_sender( - self, node, job, suffixes, include_non_durable=False)() + self, node, job, suffixes, include_non_durable=False, + max_objects=0)() # update stats for this attempt self.suffix_sync += len(suffixes) self.logger.update_stats('suffix.syncs', len(suffixes)) @@ -1088,7 +1093,8 @@ class ObjectReconstructor(Daemon): node['index']) success, in_sync_objs = ssync_sender( self, node, job, job['suffixes'], - include_non_durable=True)() + include_non_durable=True, + max_objects=self.max_objects_per_revert)() if success: syncd_with += 1 reverted_objs.update(in_sync_objs) diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 700b097c45..309bcc1b61 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -144,7 +144,7 @@ class Sender(object): """ def __init__(self, daemon, node, job, suffixes, remote_check_objs=None, - include_non_durable=False): + include_non_durable=False, max_objects=0): self.daemon = daemon self.df_mgr = self.daemon._df_router[job['policy']] self.node = node @@ -154,6 +154,7 @@ class Sender(object): # make sure those objects exist or not in remote. self.remote_check_objs = remote_check_objs self.include_non_durable = include_non_durable + self.max_objects = max_objects def __call__(self): """ @@ -319,6 +320,17 @@ class Sender(object): sleep() # Gives a chance for other greenthreads to run nlines += 1 nbytes += len(msg) + if 0 < self.max_objects <= nlines: + for _ in hash_gen: + # only log truncation if there were more hashes to come... + self.daemon.logger.info( + 'ssync missing_check truncated after %d objects: ' + 'device: %s, part: %s, policy: %s, last object hash: ' + '%s', nlines, self.job['device'], + self.job['partition'], int(self.job['policy']), + object_hash) + break + break with exceptions.MessageTimeout( self.daemon.node_timeout, 'missing_check end'): msg = b':MISSING_CHECK: END\r\n' diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index ef3999296f..f8f71e3581 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -1108,12 +1108,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): """ class _fake_ssync(object): def __init__(self, daemon, node, job, suffixes, - include_non_durable=False, **kwargs): + include_non_durable=False, max_objects=0, + **kwargs): # capture context and generate an available_map of objs context = {} context['node'] = node context['job'] = job context['suffixes'] = suffixes + context['max_objects'] = max_objects self.suffixes = suffixes self.daemon = daemon self.job = job @@ -1124,8 +1126,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): frag_index=self.job.get('frag_index'), frag_prefs=frag_prefs) self.available_map = {} + nlines = 0 for hash_, timestamps in hash_gen: self.available_map[hash_] = timestamps + nlines += 1 + if 0 < max_objects <= nlines: + break + context['available_map'] = self.available_map ssync_calls.append(context) self.success = True @@ -1179,6 +1186,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): context['available_map'])) else: self.assertFalse(context.get('include_non_durable')) + self.assertEqual(0, context.get('max_objects')) mock_delete.assert_has_calls(expected_calls, any_order=True) @@ -1207,10 +1215,32 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): filename.endswith(data_file_tail), filename) else: self.assertFalse(context.get('include_non_durable')) + self.assertEqual(0, context.get('max_objects')) # sanity check that some files should were deleted self.assertGreater(n_files, n_files_after) + def test_max_objects_per_revert_only_for_revert_jobs(self): + # verify max_objects_per_revert option is only passed to revert jobs + ssync_calls = [] + conf = dict(self.conf, max_objects_per_revert=2) + with mock.patch('swift.obj.reconstructor.ssync_sender', + self._make_fake_ssync(ssync_calls)), \ + mocked_http_conn(*[200] * 6, body=pickle.dumps({})): + reconstructor = object_reconstructor.ObjectReconstructor( + conf, logger=self.logger) + reconstructor.reconstruct() + reverts = syncs = 0 + for context in ssync_calls: + if context['job']['job_type'] == REVERT: + self.assertEqual(2, context.get('max_objects')) + reverts += 1 + else: + self.assertEqual(0, context.get('max_objects')) + syncs += 1 + self.assertGreater(reverts, 0) + self.assertGreater(syncs, 0) + def test_delete_reverted_nondurable(self): # verify reconstructor only deletes reverted nondurable fragments older # commit_window @@ -1419,6 +1449,63 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): with self.assertRaises(DiskFileError): df_older.writer().commit(ts_old) + def test_delete_reverted_max_objects_per_revert(self): + # verify reconstructor only deletes objects that were actually reverted + # when ssync is limited by max_objects_per_revert + shutil.rmtree(self.ec_obj_path) + ips = utils.whataremyips(self.reconstructor.bind_ip) + local_devs = [dev for dev in self.ec_obj_ring.devs + if dev and dev['replication_ip'] in ips and + dev['replication_port'] == + self.reconstructor.port] + partition = (local_devs[0]['id'] + 1) % 3 + # 2 durable objects + df_0 = self._create_diskfile( + object_name='zero', part=partition) + datafile_0 = df_0.manager.cleanup_ondisk_files( + df_0._datadir, frag_prefs=[])['data_file'] + self.assertTrue(os.path.exists(datafile_0)) + df_1 = self._create_diskfile( + object_name='one', part=partition) + datafile_1 = df_1.manager.cleanup_ondisk_files( + df_1._datadir, frag_prefs=[])['data_file'] + self.assertTrue(os.path.exists(datafile_1)) + df_2 = self._create_diskfile( + object_name='two', part=partition) + datafile_2 = df_2.manager.cleanup_ondisk_files( + df_2._datadir, frag_prefs=[])['data_file'] + self.assertTrue(os.path.exists(datafile_2)) + + datafiles = [datafile_0, datafile_1, datafile_2] + actual_datafiles = [df for df in datafiles if os.path.exists(df)] + self.assertEqual(datafiles, actual_datafiles) + + # only two object will be sync'd and purged... + ssync_calls = [] + conf = dict(self.conf, max_objects_per_revert=2, handoffs_only=True) + self.reconstructor = object_reconstructor.ObjectReconstructor( + conf, logger=self.logger) + with mock.patch('swift.obj.reconstructor.ssync_sender', + self._make_fake_ssync(ssync_calls)): + self.reconstructor.reconstruct() + for context in ssync_calls: + self.assertEqual(REVERT, context['job']['job_type']) + self.assertEqual(2, context.get('max_objects')) + actual_datafiles = [df for df in datafiles if os.path.exists(df)] + self.assertEqual(1, len(actual_datafiles), actual_datafiles) + + # ...until next reconstructor run which will sync and purge the last + # object + ssync_calls = [] + with mock.patch('swift.obj.reconstructor.ssync_sender', + self._make_fake_ssync(ssync_calls)): + self.reconstructor.reconstruct() + for context in ssync_calls: + self.assertEqual(REVERT, context['job']['job_type']) + self.assertEqual(2, context.get('max_objects')) + actual_datafiles = [df for df in datafiles if os.path.exists(df)] + self.assertEqual([], actual_datafiles) + def test_no_delete_failed_revert(self): # test will only process revert jobs self.reconstructor.handoffs_only = True diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index d27ae79a01..cd7999e41e 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -102,10 +102,10 @@ class TestSender(BaseTest): self.daemon_logger = debug_logger('test-ssync-sender') self.daemon = ObjectReplicator(self.daemon_conf, self.daemon_logger) - job = {'policy': POLICIES.legacy, - 'device': 'test-dev', - 'partition': '99'} # sufficient for Sender.__init__ - self.sender = ssync_sender.Sender(self.daemon, None, job, None) + self.job = {'policy': POLICIES.legacy, + 'device': 'test-dev', + 'partition': '99'} # sufficient for Sender.__init__ + self.sender = ssync_sender.Sender(self.daemon, None, self.job, None) def test_call_catches_MessageTimeout(self): @@ -810,6 +810,9 @@ class TestSender(BaseTest): {'ts_data': Timestamp(1380144471.00000)}) connection = FakeConnection() response = FakeResponse() + # max_objects unlimited + self.sender = ssync_sender.Sender(self.daemon, None, self.job, None, + max_objects=0) self.sender.daemon.node_timeout = 0.01 self.sender.df_mgr.yield_hashes = yield_hashes sleeps = [0, 0, 1] @@ -874,6 +877,10 @@ class TestSender(BaseTest): 'No match for %r %r %r %r' % (device, partition, policy, suffixes)) + # note: max_objects > number that would yield + self.sender = ssync_sender.Sender(self.daemon, None, self.job, None, + max_objects=4) + connection = FakeConnection() self.sender.job = { 'device': 'dev', @@ -908,6 +915,121 @@ class TestSender(BaseTest): ts_meta=Timestamp(1380144475.44444), ts_ctype=Timestamp(1380144474.44448)))] self.assertEqual(available_map, dict(candidates)) + self.assertEqual([], self.daemon_logger.get_lines_for_level('info')) + + def test_missing_check_max_objects_less_than_actual_objects(self): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): + # verify missing_check stops after 2 objects even though more + # objects would yield + if (device == 'dev' and partition == '9' and + policy == POLICIES.legacy and + suffixes == ['abc', 'def']): + yield ( + '9d41d8cd98f00b204e9800998ecf0abc', + {'ts_data': Timestamp(1380144470.00000)}) + yield ( + '9d41d8cd98f00b204e9800998ecf0def', + {'ts_data': Timestamp(1380144472.22222), + 'ts_meta': Timestamp(1380144473.22222)}) + yield ( + '9d41d8cd98f00b204e9800998ecf1def', + {'ts_data': Timestamp(1380144474.44444), + 'ts_ctype': Timestamp(1380144474.44448), + 'ts_meta': Timestamp(1380144475.44444)}) + else: + raise Exception( + 'No match for %r %r %r %r' % (device, partition, + policy, suffixes)) + + # max_objects < number that would yield + self.sender = ssync_sender.Sender(self.daemon, None, self.job, None, + max_objects=2) + + connection = FakeConnection() + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + } + self.sender.suffixes = ['abc', 'def'] + response = FakeResponse( + chunk_body=( + ':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n')) + self.sender.df_mgr.yield_hashes = yield_hashes + available_map, send_map = self.sender.missing_check(connection, + response) + self.assertEqual( + b''.join(connection.sent), + b'17\r\n:MISSING_CHECK: START\r\n\r\n' + b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' + b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 ' + b'm:186a0\r\n\r\n' + b'15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual(send_map, {}) + candidates = [('9d41d8cd98f00b204e9800998ecf0abc', + dict(ts_data=Timestamp(1380144470.00000))), + ('9d41d8cd98f00b204e9800998ecf0def', + dict(ts_data=Timestamp(1380144472.22222), + ts_meta=Timestamp(1380144473.22222)))] + self.assertEqual(available_map, dict(candidates)) + self.assertEqual( + ['ssync missing_check truncated after 2 objects: device: dev, ' + 'part: 9, policy: 0, last object hash: ' + '9d41d8cd98f00b204e9800998ecf0def'], + self.daemon_logger.get_lines_for_level('info')) + + def test_missing_check_max_objects_exactly_actual_objects(self): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): + if (device == 'dev' and partition == '9' and + policy == POLICIES.legacy and + suffixes == ['abc', 'def']): + yield ( + '9d41d8cd98f00b204e9800998ecf0abc', + {'ts_data': Timestamp(1380144470.00000)}) + yield ( + '9d41d8cd98f00b204e9800998ecf0def', + {'ts_data': Timestamp(1380144472.22222), + 'ts_meta': Timestamp(1380144473.22222)}) + else: + raise Exception( + 'No match for %r %r %r %r' % (device, partition, + policy, suffixes)) + + # max_objects == number that would yield + self.sender = ssync_sender.Sender(self.daemon, None, self.job, None, + max_objects=2) + + connection = FakeConnection() + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + } + self.sender.suffixes = ['abc', 'def'] + response = FakeResponse( + chunk_body=( + ':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n')) + self.sender.df_mgr.yield_hashes = yield_hashes + available_map, send_map = self.sender.missing_check(connection, + response) + self.assertEqual( + b''.join(connection.sent), + b'17\r\n:MISSING_CHECK: START\r\n\r\n' + b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' + b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 ' + b'm:186a0\r\n\r\n' + b'15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual(send_map, {}) + candidates = [('9d41d8cd98f00b204e9800998ecf0abc', + dict(ts_data=Timestamp(1380144470.00000))), + ('9d41d8cd98f00b204e9800998ecf0def', + dict(ts_data=Timestamp(1380144472.22222), + ts_meta=Timestamp(1380144473.22222)))] + self.assertEqual(available_map, dict(candidates)) + # nothing logged re: truncation + self.assertEqual([], self.daemon_logger.get_lines_for_level('info')) def test_missing_check_far_end_disconnect(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs):