Optimize record post-processing

Post-processing phase contains 7 different functions that iterate
over full set of records, 2 of these functions iterate even twice.
This causes the whole set be iterated 9 times total, which is very
time-consuming (the bottleneck is reading from memcached, not CPU
utilization).

With this patch the post-processing is refactored the way that
all iterations are grouped together. So the same record is fed
to all functions of the same pass. Thus the overall number of
iterations on full data set reduced to 2 iterations only.

Change-Id: Ia4369798d32cf0e8d6660f6bca0130f934223018
This commit is contained in:
Ilya Shakhat 2015-10-16 16:28:34 +03:00
parent fa0b8ec9d4
commit 433b1d1365
3 changed files with 163 additions and 31 deletions

View File

@ -16,6 +16,7 @@
import bisect
import collections
import copy
import functools
import time
from oslo_log import log as logging
@ -599,7 +600,7 @@ class RecordProcessor(object):
def _update_records_with_releases(self, release_index):
LOG.info('Update records with releases')
for record in self.runtime_storage_inst.get_all_records():
def record_handler(record):
if record['primary_key'] in release_index:
release = release_index[record['primary_key']]
else:
@ -609,10 +610,12 @@ class RecordProcessor(object):
record['release'] = release
yield record
yield record_handler
def _update_records_with_user_info(self):
LOG.info('Update user info in records')
for record in self.runtime_storage_inst.get_all_records():
def record_handler(record):
company_name = record['company_name']
user_id = record['user_id']
author_name = record['author_name']
@ -628,16 +631,23 @@ class RecordProcessor(object):
'company': company_name, 'record': record})
yield record
yield record_handler
def _update_commits_with_merge_date(self):
LOG.info('Update commits with merge date')
change_id_to_date = {}
for record in self.runtime_storage_inst.get_all_records():
def record_handler_pass_1(record):
if (record['record_type'] == 'review' and
record.get('status') == 'MERGED'):
change_id_to_date[record['id']] = record['lastUpdated']
for record in self.runtime_storage_inst.get_all_records():
yield record_handler_pass_1
LOG.info('Update commits with merge date: pass 2')
def record_handler_pass_2(record):
if record['record_type'] == 'commit':
change_id_list = record.get('change_id')
if change_id_list and len(change_id_list) == 1:
@ -652,12 +662,15 @@ class RecordProcessor(object):
'record': record})
yield record
yield record_handler_pass_2
def _update_blueprints_with_mention_info(self):
LOG.info('Process blueprints and calculate mention info')
valid_blueprints = {}
mentioned_blueprints = {}
for record in self.runtime_storage_inst.get_all_records():
def record_handler_pass_1(record):
for bp in record.get('blueprint_id', []):
if bp in mentioned_blueprints:
mentioned_blueprints[bp]['count'] += 1
@ -675,6 +688,8 @@ class RecordProcessor(object):
'date': record['date']
}
yield record_handler_pass_1
for bp_name, bp in six.iteritems(valid_blueprints):
if bp_name in mentioned_blueprints:
bp['count'] = mentioned_blueprints[bp_name]['count']
@ -683,7 +698,9 @@ class RecordProcessor(object):
bp['count'] = 0
bp['date'] = 0
for record in self.runtime_storage_inst.get_all_records():
LOG.info('Process blueprints and calculate mention info: pass 2')
def record_handler_pass_2(record):
need_update = False
valid_bp = set([])
@ -709,13 +726,15 @@ class RecordProcessor(object):
if need_update:
yield record
yield record_handler_pass_2
def _determine_core_contributors(self):
LOG.info('Determine core contributors')
module_branches = collections.defaultdict(set)
quarter_ago = int(time.time()) - 60 * 60 * 24 * 30 * 3 # a quarter ago
for record in self.runtime_storage_inst.get_all_records():
def record_handler(record):
if (record['record_type'] == 'mark' and
record['date'] > quarter_ago and
record['value'] in [2, -2]):
@ -723,6 +742,8 @@ class RecordProcessor(object):
user_id = record['user_id']
module_branches[user_id].add(module_branch)
yield record_handler
for user in self.runtime_storage_inst.get_all_users():
core_old = user.get('core')
user_module_branch = module_branches.get(user['user_id'])
@ -767,7 +788,7 @@ class RecordProcessor(object):
marks_per_patch = collections.defaultdict(
lambda: {'patch_number': 0, 'marks': []})
for record in self.runtime_storage_inst.get_all_records():
def record_handler(record):
if (record['record_type'] == 'mark' and
record['type'] == 'Code-Review'):
review_id = record['review_id']
@ -786,17 +807,19 @@ class RecordProcessor(object):
marks_per_patch[review_id]['patch_number'] = patch_number
marks_per_patch[review_id]['marks'].append(record)
yield record_handler
# purge the rest
for marks_patch in marks_per_patch.values():
for processed in self._close_patch(cores, marks_patch['marks']):
yield processed
self.runtime_storage_inst.set_records(
self._close_patch(cores, marks_patch['marks']))
def _update_members_company_name(self):
LOG.info('Update members with company names')
for record in self.runtime_storage_inst.get_all_records():
def record_handler(record):
if record['record_type'] != 'member':
continue
return
company_draft = record['company_draft']
company_name = self.domains_index.get(
@ -804,7 +827,7 @@ class RecordProcessor(object):
utils.normalize_company_draft(company_draft))
if company_name == record['company_name']:
continue
return
LOG.debug('Update record %s, company name changed to %s',
record, company_name)
@ -822,24 +845,21 @@ class RecordProcessor(object):
}]
user_processor.store_user(self.runtime_storage_inst, user)
yield record_handler
def post_processing(self, release_index):
self.runtime_storage_inst.set_records(
self._update_records_with_user_info())
processors = [
self._update_records_with_user_info,
self._update_commits_with_merge_date,
functools.partial(self._update_records_with_releases,
release_index),
self._update_blueprints_with_mention_info,
self._determine_core_contributors,
self._update_members_company_name,
self._update_marks_with_disagreement,
]
self.runtime_storage_inst.set_records(
self._update_commits_with_merge_date())
pipeline_processor = utils.make_pipeline_processor(processors)
self.runtime_storage_inst.set_records(
self._update_records_with_releases(release_index))
self.runtime_storage_inst.set_records(
self._update_blueprints_with_mention_info())
self._determine_core_contributors()
# disagreement calculation must go after determining core contributors
self.runtime_storage_inst.set_records(
self._update_marks_with_disagreement())
self.runtime_storage_inst.set_records(
self._update_members_company_name())
self.runtime_storage_inst.set_records(pipeline_processor(
self.runtime_storage_inst.get_all_records))

View File

@ -317,3 +317,41 @@ def validate_lp_display_name(lp_profile):
if lp_profile:
if "<email address hidden>" == lp_profile['display_name']:
lp_profile['display_name'] = lp_profile['name']
def make_pipeline_processor(processors):
def get_passes(_processors):
# every processor yields one or more record handlers
# this function groups record handlers by pass and returns list of them
processor_generators = [p() for p in _processors]
work = True
while work:
work = False
record_handlers = []
for generator in processor_generators:
try:
record_handlers.append(next(generator))
except StopIteration:
pass
if record_handlers:
work = True
yield record_handlers
def pipeline_processor(record_generator):
# for every pass
for one_pass in get_passes(processors):
# iterate every record in producer
for record in record_generator():
# iterate over record handlers within single pass
for record_handler in one_pass:
# feed record to the handler
for r in record_handler(record) or []:
# yield processed record
yield r
return pipeline_processor

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from stackalytics.processor import utils
@ -123,3 +124,76 @@ class TestUtils(testtools.TestCase):
profile = None
utils.validate_lp_display_name(profile)
self.assertEqual(None, profile)
def test_pipeline_processor(self):
counter = dict(n=0)
consumed = []
log = mock.Mock()
def get_all_items():
for i in range(5):
counter['n'] += 1
yield i
def single_pass_uno():
log('single_pass_uno:begin')
def pass_1(s):
yield s
yield pass_1
log('single_pass_uno:end')
def single_pass_duo():
log('single_pass_duo:begin')
def pass_1(s):
yield s + 10
yield pass_1
log('single_pass_duo:end')
def double_pass():
log('double_pass:begin')
r = set()
def pass_1(s):
if s % 2:
r.add(s)
yield pass_1
log('double_pass:middle')
def pass_2(s):
if s in r:
yield s * 100
yield pass_2
log('double_pass:end')
def consume(r):
for x in r:
consumed.append(x)
processors = [single_pass_uno, double_pass, single_pass_duo]
pipeline_processor = utils.make_pipeline_processor(processors)
consume(pipeline_processor(get_all_items))
self.assertEqual(10, counter['n']) # twice by 5 elements
expected = [0, 10, 1, 11, 2, 12, 3, 13, 4, 14, 100, 300]
self.assertEqual(expected, consumed)
log.assert_has_calls([
mock.call('single_pass_uno:begin'),
mock.call('double_pass:begin'),
mock.call('single_pass_duo:begin'),
mock.call('single_pass_uno:end'),
mock.call('double_pass:middle'),
mock.call('single_pass_duo:end'),
mock.call('double_pass:end'),
])