update job processing structure

This commit is contained in:
Joshua Hesketh 2013-08-09 10:42:14 +10:00
parent 778aa176ac
commit a446f4ffef
2 changed files with 39 additions and 18 deletions

View File

@ -5,7 +5,7 @@
"gearman_port": 4730
},
"debug_log": "/home/josh/var/log/turbo-hipster/debug.log",
"job_working_dir": "/home/josh/var/lib/turbo-hipster/jobs",
"jobs_working_dir": "/home/josh/var/lib/turbo-hipster/jobs",
"git_working_dir": "/home/josh/var/lib/turbo-hipster/git",
"pip_download_cache": "/home/josh/var/cache/pip",
"plugins": ["gate_real_db_upgrade"]

View File

@ -17,12 +17,19 @@ import gear
import json
import logging
import os
import re
import threading
from lib import utils
import task_plugins.gate_real_db_upgrade.handle_results as handle_results
__worker_name__ = 'sql-migrate-test-runner-%s' % os.uname()[1]
# Regex for log checking
MIGRATION_START_RE = re.compile('([0-9]+) -> ([0-9]+)\.\.\.$')
MIGRATION_END_RE = re.compile('^done$')
class Runner(threading.Thread):
@ -106,7 +113,7 @@ class Runner(threading.Thread):
# Step 3: Analyse logs for errors
self._do_next_step()
self._check_log_for_errors()
self._check_all_dataset_logs_for_errors()
# Step 4: handle the results (and upload etc)
self._do_next_step()
@ -114,6 +121,8 @@ class Runner(threading.Thread):
# Finally, send completed packet
self._send_work_data()
return
if self.work_data['result'] is 'SUCCESS':
self.job.sendWorkComplete(
json.dumps(self._get_work_data()))
@ -124,21 +133,21 @@ class Runner(threading.Thread):
if not self.cancelled:
self.job.sendWorkException(str(e).encode('utf-8'))
def _get_logging_file(self, dataset):
return os.path.join(
self.config['job_working_dir'],
self.job.unique,
dataset['name'] + '.log'
)
def _handle_results(self):
""" pass over the results to handle_results.py for post-processing """
pass
index_url = handle_results.generate_push_results(self._get_datasets())
self.work_data['url'] = index_url
def _check_log_for_errors(self):
# logging_file = self._get_logging_file(job)
def _check_all_dataset_logs_for_errors(self):
failed = False
for dataset in self._get_datasets():
# Look for the beginning of the migration start
pass
self.work_data['result'] = "Failed: errors found in log"
if failed:
self.work_data['result'] = "Failed: errors found in dataset log(s)"
else:
self.work_data['result'] = "SUCCESS"
def _get_datasets(self):
if len(self.datasets) > 0:
@ -153,6 +162,14 @@ class Runner(threading.Thread):
dataset = {}
dataset['name'] = ent
dataset['path'] = os.path.join(datasets_path, ent)
dataset['job_working_dir'] = os.path.join(
self.config['jobs_working_dir'],
self.job.unique
)
dataset['log_file_path'] = os.path.join(
dataset['job_working_dir'],
dataset['name'] + '.log'
)
with open(os.path.join(dataset['path'], 'config.json'),
'r') as config_stream:
dataset['config'] = json.load(config_stream)
@ -177,12 +194,12 @@ class Runner(threading.Thread):
# $7 is the path to the dataset to test against
# $8 is the pip cache dir
cmd += (
(' %(unique_id)s %(working_dir)s %(git_path)s'
(' %(unique_id)s %(job_working_dir)s %(git_path)s'
' %(dbuser)s %(dbpassword)s %(db)s'
' %(dataset_path)s %(pip_cache_dir)s')
% {
'unique_id': self.job.unique,
'working_dir': self.config['job_working_dir'],
'job_working_dir': dataset['job_working_dir'],
'git_path': git_path,
'dbuser': dataset['config']['db_user'],
'dbpassword': dataset['config']['db_pass'],
@ -206,7 +223,7 @@ class Runner(threading.Thread):
utils.execute_to_log(
cmd,
self._get_logging_file(dataset),
dataset['log_file_path'],
watch_logs=[
('[syslog]', syslog),
('[sqlslo]', sqlslo),
@ -226,6 +243,10 @@ class Runner(threading.Thread):
)
)
# reset to zuul's master
repo.reset()
# Fetch patchset and checkout
repo.fetch(zuul_ref)
repo.checkout('FETCH_HEAD')
@ -254,12 +275,12 @@ class Runner(threading.Thread):
if self.stopped():
self.work_data['result'] = "Failed: Worker interrupted/stopped"
self.job.sendWorkStatus(self.current_step, self.total_steps)
raise Exception('Thread stopped', 'stopping')
raise Exception('Thread stopped')
elif self.cancelled:
self.work_data['result'] = "Failed: Job cancelled"
self.job.sendWorkStatus(self.current_step, self.total_steps)
self.job.sendWorkFail()
raise Exception('Job cancelled', 'stopping')
raise Exception('Job cancelled')
self.current_step += 1
self.job.sendWorkStatus(self.current_step, self.total_steps)