Merge "Optimize ec duplication and its md5 hashing"

This commit is contained in:
Jenkins 2017-03-09 21:55:56 +00:00 committed by Gerrit Code Review
commit 455827716d
2 changed files with 72 additions and 64 deletions

View File

@ -1747,7 +1747,12 @@ class MIMEPutter(Putter):
mime_boundary, multiphase=need_multiphase)
def chunk_transformer(policy, nstreams):
def chunk_transformer(policy):
"""
A generator to transform a source chunk to erasure coded chunks for each
`send` call. The number of erasure coded chunks is as
policy.ec_n_unique_fragments.
"""
segment_size = policy.ec_segment_size
buf = collections.deque()
@ -1775,9 +1780,8 @@ def chunk_transformer(policy, nstreams):
frags_by_byte_order = []
for chunk_to_encode in chunks_to_encode:
encoded_chunks = policy.pyeclib_driver.encode(chunk_to_encode)
send_chunks = encoded_chunks * policy.ec_duplication_factor
frags_by_byte_order.append(send_chunks)
frags_by_byte_order.append(
policy.pyeclib_driver.encode(chunk_to_encode))
# Sequential calls to encode() have given us a list that
# looks like this:
#
@ -1802,9 +1806,9 @@ def chunk_transformer(policy, nstreams):
last_bytes = ''.join(buf)
if last_bytes:
last_frags = policy.pyeclib_driver.encode(last_bytes)
yield last_frags * policy.ec_duplication_factor
yield last_frags
else:
yield [''] * nstreams
yield [''] * policy.ec_n_unique_fragments
def trailing_metadata(policy, client_obj_hasher,
@ -2330,28 +2334,29 @@ class ECObjectController(BaseObjectController):
def _determine_chunk_destinations(self, putters, policy):
"""
Given a list of putters, return a dict where the key is the putter
and the value is the node index to use.
and the value is the frag index to use.
This is done so that we line up handoffs using the same node index
This is done so that we line up handoffs using the same frag index
(in the primary part list) as the primary that the handoff is standing
in for. This lets erasure-code fragment archives wind up on the
preferred local primary nodes when possible.
:param putters: a list of swift.proxy.controllers.obj.MIMEPutter
instance
:param policy: A policy instance
:param policy: A policy instance which should be one of ECStoragePolicy
"""
# Give each putter a "chunk index": the index of the
# Give each putter a "frag index": the index of the
# transformed chunk that we'll send to it.
#
# For primary nodes, that's just its index (primary 0 gets
# chunk 0, primary 1 gets chunk 1, and so on). For handoffs,
# we assign the chunk index of a missing primary.
handoff_conns = []
chunk_index = {}
putter_to_frag_index = {}
for p in putters:
if p.node_index is not None:
chunk_index[p] = p.node_index
putter_to_frag_index[p] = policy.get_backend_index(
p.node_index)
else:
handoff_conns.append(p)
@ -2362,35 +2367,33 @@ class ECObjectController(BaseObjectController):
# returns 507, in which case a handoff is used to replace it.
# lack_list is a dict of list to keep hole indexes
# e.g. if we have 2 holes for index 0 with ec_duplication_factor=2
# e.g. if we have 2 holes for frag index 0 with ec_duplication_factor=2
# lack_list is like {0: [0], 1: [0]}, and then, if 1 hole found
# for index 1, lack_list will be {0: [0, 1], 1: [0]}.
# for frag index 1, lack_list will be {0: [0, 1], 1: [0]}.
# After that, holes will be filled from bigger key
# (i.e. 1:[0] at first)
# Grouping all missing fragment indexes for each unique_index
unique_index_to_holes = collections.defaultdict(list)
available_indexes = chunk_index.values()
for node_index in range(policy.object_ring.replica_count):
if node_index not in available_indexes:
unique_index = policy.get_backend_index(node_index)
unique_index_to_holes[unique_index].append(node_index)
# Set the missing index to lack_list
# Grouping all missing fragment indexes for each frag_index
available_indexes = putter_to_frag_index.values()
lack_list = collections.defaultdict(list)
for unique_index, holes in unique_index_to_holes.items():
for lack_tier, hole_node_index in enumerate(holes):
lack_list[lack_tier].append(hole_node_index)
for frag_index in range(policy.ec_n_unique_fragments):
# Set the missing index to lack_list
available_count = available_indexes.count(frag_index)
# N.B. it should be duplication_factor >= lack >= 0
lack = policy.ec_duplication_factor - available_count
# now we are missing one or more nodes to store the frag index
for lack_tier in range(lack):
lack_list[lack_tier].append(frag_index)
# Extract the lack_list to a flat list
holes = []
for lack_tier, indexes in sorted(lack_list.items(), reverse=True):
holes.extend(indexes)
# Fill chunk_index list with the hole list
# Fill putter_to_frag_index list with the hole list
for hole, p in zip(holes, handoff_conns):
chunk_index[p] = hole
return chunk_index
putter_to_frag_index[p] = hole
return putter_to_frag_index
def _transfer_data(self, req, policy, data_source, putters, nodes,
min_conns, etag_hasher):
@ -2400,15 +2403,15 @@ class ECObjectController(BaseObjectController):
This method was added in the PUT method extraction change
"""
bytes_transferred = 0
chunk_transform = chunk_transformer(policy, len(nodes))
chunk_transform = chunk_transformer(policy)
chunk_transform.send(None)
chunk_hashers = collections.defaultdict(md5)
frag_hashers = collections.defaultdict(md5)
def send_chunk(chunk):
# Note: there's two different hashers in here. etag_hasher is
# hashing the original object so that we can validate the ETag
# that the client sent (and etag_hasher is None if the client
# didn't send one). The hasher in chunk_hashers is hashing the
# didn't send one). The hasher in frag_hashers is hashing the
# fragment archive being sent to the client; this lets us guard
# against data corruption on the network between proxy and
# object server.
@ -2420,11 +2423,17 @@ class ECObjectController(BaseObjectController):
# or whatever we're doing, the transform will give us None.
return
updated_frag_indexes = set()
for putter in list(putters):
ci = chunk_index[putter]
backend_chunk = backend_chunks[ci]
frag_index = putter_to_frag_index[putter]
backend_chunk = backend_chunks[frag_index]
if not putter.failed:
chunk_hashers[ci].update(backend_chunk)
# N.B. same frag_index will appear when using
# ec_duplication_factor >= 2. So skip to feed the chunk
# to hasher if the frag was updated already.
if frag_index not in updated_frag_indexes:
frag_hashers[frag_index].update(backend_chunk)
updated_frag_indexes.add(frag_index)
putter.send_chunk(backend_chunk)
else:
putter.close()
@ -2437,9 +2446,9 @@ class ECObjectController(BaseObjectController):
try:
with ContextPool(len(putters)) as pool:
# build our chunk index dict to place handoffs in the
# build our putter_to_frag_index dict to place handoffs in the
# same part nodes index as the primaries they are covering
chunk_index = self._determine_chunk_destinations(
putter_to_frag_index = self._determine_chunk_destinations(
putters, policy)
for putter in putters:
@ -2487,14 +2496,14 @@ class ECObjectController(BaseObjectController):
footers = {(k, v) for k, v in footers.items()
if not k.lower().startswith('x-object-sysmeta-ec-')}
for putter in putters:
ci = chunk_index[putter]
frag_index = putter_to_frag_index[putter]
# Update any footers set by middleware with EC footers
trail_md = trailing_metadata(
policy, etag_hasher,
bytes_transferred, policy.get_backend_index(ci))
bytes_transferred, frag_index)
trail_md.update(footers)
# Etag footer must always be hash of what we sent
trail_md['Etag'] = chunk_hashers[ci].hexdigest()
trail_md['Etag'] = frag_hashers[frag_index].hexdigest()
putter.end_of_object_data(footer_metadata=trail_md)
for putter in putters:

View File

@ -3663,21 +3663,20 @@ class TestECFunctions(unittest.TestCase):
ec_segment_size=segment_size,
ec_duplication_factor=dup)
expected = policy.pyeclib_driver.encode(orig_chunk)
transform = obj.chunk_transformer(
policy, policy.object_ring.replica_count)
transform = obj.chunk_transformer(policy)
transform.send(None)
backend_chunks = transform.send(orig_chunk)
self.assertIsNotNone(backend_chunks) # sanity
self.assertEqual(
len(backend_chunks), policy.object_ring.replica_count)
self.assertEqual(expected * dup, backend_chunks)
len(backend_chunks), policy.ec_n_unique_fragments)
self.assertEqual(expected, backend_chunks)
# flush out last chunk buffer
backend_chunks = transform.send('')
self.assertEqual(
len(backend_chunks), policy.object_ring.replica_count)
self.assertEqual([''] * policy.object_ring.replica_count,
len(backend_chunks), policy.ec_n_unique_fragments)
self.assertEqual([''] * policy.ec_n_unique_fragments,
backend_chunks)
do_test(1)
do_test(2)
@ -3693,8 +3692,7 @@ class TestECFunctions(unittest.TestCase):
ec_segment_size=1024,
ec_duplication_factor=dup)
expected = policy.pyeclib_driver.encode(last_chunk)
transform = obj.chunk_transformer(
policy, policy.object_ring.replica_count)
transform = obj.chunk_transformer(policy)
transform.send(None)
transform.send(last_chunk)
@ -3702,8 +3700,9 @@ class TestECFunctions(unittest.TestCase):
backend_chunks = transform.send('')
self.assertEqual(
len(backend_chunks), policy.object_ring.replica_count)
self.assertEqual(expected * dup, backend_chunks)
len(backend_chunks), policy.ec_n_unique_fragments)
self.assertEqual(expected, backend_chunks)
do_test(1)
do_test(2)
@ -4310,12 +4309,13 @@ class TestECDuplicationObjController(
got = controller._determine_chunk_destinations(putters, self.policy)
expected = {}
for i, p in enumerate(putters):
expected[p] = i
expected[p] = self.policy.get_backend_index(i)
# sanity
self.assertEqual(got, expected)
# now lets make an unique fragment as handoffs
putters[unique].node_index = None
handoff_putter = putters[unique]
handoff_putter.node_index = None
# and then, pop a fragment which has same fragment index with unique
self.assertEqual(
@ -4329,21 +4329,20 @@ class TestECDuplicationObjController(
# index 0 missing 2 copies and unique frag index 1 missing 1 copy
# i.e. the handoff node should be assigned to unique frag index 1
got = controller._determine_chunk_destinations(putters, self.policy)
self.assertEqual(len(expected) - 2, len(got))
# index one_more_missing should not be choosen
self.assertNotIn(one_more_missing, got.values())
# either index unique or duplicated should be in the got dict
self.assertTrue(
any([unique in got.values(), duplicated in got.values()]))
# but it's not both
self.assertFalse(
all([unique in got.values(), duplicated in got.values()]))
# N.B. len(putters) is now len(expected - 2) due to pop twice
self.assertEqual(len(putters), len(got))
# sanity, no node index - for handoff putter
self.assertIsNone(handoff_putter.node_index)
self.assertEqual(got[handoff_putter], unique)
# sanity, other nodes execpt handoff_putter have node_index
self.assertTrue(all(
[putter.node_index for putter in got if
putter != handoff_putter]))
def test_determine_chunk_destinations_prioritize_more_missing(self):
# drop 0, 14 and 1 should work
# drop node_index 0, 14 and 1 should work
self._test_determine_chunk_destinations_prioritize(0, 14, 1)
# drop 1, 15 and 0 should work, too
# drop node_index 1, 15 and 0 should work, too
self._test_determine_chunk_destinations_prioritize(1, 15, 0)