Asynchronous job management(part 2)

Implement asynchronous job management to ensure jobs can be
successfully completed even if those jobs temporally fail
for some reasons. The detailed design can be found in section
9 in design document.

This patch focuses on enabling workers to rerun failed job.
Workers started with configuration option 'periodic_enable'
set to 'True' are responsible to this work.

Purging old job records will be covered in later patches.

Change-Id: I2631a98af67e663f929f293bdfb7e7779fe8018e
This commit is contained in:
zhiyuan_cai 2016-04-06 16:36:07 +08:00 committed by Chaoyi Huang
parent 53cfdb2e9e
commit bbe0977cd3
5 changed files with 87 additions and 5 deletions

View File

@ -66,3 +66,7 @@ JS_Success = 'Success'
JS_Fail = 'Fail'
SP_EXTRA_ID = '00000000-0000-0000-0000-000000000000'
TOP = 'top'
# job type
JT_ROUTER = 'router'

View File

@ -24,6 +24,9 @@ import rpc
from serializer import TricircleSerializer as Serializer
import topics
from tricircle.common import constants
CONF = cfg.CONF
rpcapi_cap_opt = cfg.StrOpt('xjobapi',
@ -78,4 +81,5 @@ class XJobAPI(object):
# specifying its control exchange, so the default value "openstack" is
# used, thus we need to pass exchange as "openstack" here.
self.client.prepare(exchange='openstack').cast(
ctxt, 'configure_extra_routes', payload={'router': router_id})
ctxt, 'configure_extra_routes',
payload={constants.JT_ROUTER: router_id})

View File

@ -14,6 +14,7 @@
# under the License.
import functools
import sqlalchemy as sql
import time
import uuid
@ -236,6 +237,24 @@ def register_job(context, _type, resource_id):
context.session.close()
def get_latest_failed_jobs(context):
jobs = []
query = context.session.query(models.Job.type, models.Job.resource_id,
sql.func.count(models.Job.id))
query = query.group_by(models.Job.type, models.Job.resource_id)
for job_type, resource_id, count in query:
_query = context.session.query(models.Job)
_query = _query.filter_by(type=job_type, resource_id=resource_id)
_query = _query.order_by(sql.desc('timestamp'))
# when timestamps of job entries are the same, sort entries by status
# so "Fail" job is placed before "New" and "Success" jobs
_query = _query.order_by(sql.asc('status'))
latest_job = _query[0].to_dict()
if latest_job['status'] == constants.JS_Fail:
jobs.append(latest_job)
return jobs
def get_latest_timestamp(context, status, _type, resource_id):
jobs = core.query_resource(
context, models.Job,

View File

@ -267,6 +267,41 @@ class XManagerTest(unittest.TestCase):
# nothing to assert, what we test is that fake_handle can exit when
# timeout
def test_get_failed_jobs(self):
job_dict_list = [
{'timestamp': datetime.datetime(2000, 1, 1, 12, 0, 0),
'resource_id': 'uuid1', 'type': 'res1',
'status': constants.JS_Fail}, # job_uuid1
{'timestamp': datetime.datetime(2000, 1, 1, 12, 5, 0),
'resource_id': 'uuid1', 'type': 'res1',
'status': constants.JS_Fail}, # job_uuid3
{'timestamp': datetime.datetime(2000, 1, 1, 12, 20, 0),
'resource_id': 'uuid2', 'type': 'res2',
'status': constants.JS_Fail}, # job_uuid5
{'timestamp': datetime.datetime(2000, 1, 1, 12, 15, 0),
'resource_id': 'uuid2', 'type': 'res2',
'status': constants.JS_Fail}, # job_uuid7
{'timestamp': datetime.datetime(2000, 1, 1, 12, 25, 0),
'resource_id': 'uuid3', 'type': 'res3',
'status': constants.JS_Fail}, # job_uuid9
{'timestamp': datetime.datetime(2000, 1, 1, 12, 30, 0),
'resource_id': 'uuid3', 'type': 'res3',
'status': constants.JS_Success}]
for i, job_dict in enumerate(job_dict_list, 1):
job_dict['id'] = 'job_uuid%d' % (2 * i - 1)
job_dict['extra_id'] = 'extra_uuid%d' % (2 * i - 1)
core.create_resource(self.context, models.Job, job_dict)
job_dict['id'] = 'job_uuid%d' % (2 * i)
job_dict['extra_id'] = 'extra_uuid%d' % (2 * i)
job_dict['status'] = constants.JS_New
core.create_resource(self.context, models.Job, job_dict)
# for res3 + uuid3, the latest job's status is "Success", not returned
expected_ids = ['job_uuid3', 'job_uuid5']
returned_jobs = db_api.get_latest_failed_jobs(self.context)
actual_ids = [job['id'] for job in returned_jobs]
self.assertItemsEqual(expected_ids, actual_ids)
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())
for res in RES_LIST:

View File

@ -16,6 +16,7 @@
import datetime
import eventlet
import netaddr
import random
import six
from oslo_config import cfg
@ -127,12 +128,13 @@ class XManager(PeriodicTasks):
self.service_name = service_name
# self.notifier = rpc.get_notifier(self.service_name, self.host)
self.additional_endpoints = []
self.clients = {'top': client.Client()}
self.clients = {constants.TOP: client.Client()}
self.job_handles = {constants.JT_ROUTER: self.configure_extra_routes}
super(XManager, self).__init__()
def _get_client(self, pod_name=None):
if not pod_name:
return self.clients['top']
return self.clients[constants.TOP]
if pod_name not in self.clients:
self.clients[pod_name] = client.Client(pod_name)
return self.clients[pod_name]
@ -205,11 +207,29 @@ class XManager(PeriodicTasks):
return info_text
@_job_handle('router')
@periodic_task.periodic_task
def redo_failed_job(self, ctx):
failed_jobs = db_api.get_latest_failed_jobs(ctx)
failed_jobs = [
job for job in failed_jobs if job['type'] in self.job_handles]
if not failed_jobs:
return
# in one run we only pick one job to handle
job_index = random.randint(0, len(failed_jobs) - 1)
failed_job = failed_jobs[job_index]
job_type = failed_job['type']
payload = {job_type: failed_job['resource_id']}
LOG.debug(_('Redo failed job for %(resource_id)s of type '
'%(job_type)s'),
{'resource_id': failed_job['resource_id'],
'job_type': job_type})
self.job_handles[job_type](ctx, payload=payload)
@_job_handle(constants.JT_ROUTER)
def configure_extra_routes(self, ctx, payload):
# TODO(zhiyuan) performance and reliability issue
# better have a job tracking mechanism
t_router_id = payload['router']
t_router_id = payload[constants.JT_ROUTER]
b_pods, b_router_ids = zip(*db_api.get_bottom_mappings_by_top_id(
ctx, t_router_id, constants.RT_ROUTER))