From 7154c9bd06eb8f95ce156d035f5abfeadb258090 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Tue, 24 Oct 2017 09:57:51 +1300 Subject: [PATCH] Fix the job task failure Change-Id: Ief9eec6833c0379f9c04fdd4c5440d9984247f79 Closes-Bug: #1726594 --- qinling/db/api.py | 4 ++-- qinling/db/sqlalchemy/api.py | 4 ++-- qinling/services/periodics.py | 22 ++++++++++++---------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/qinling/db/api.py b/qinling/db/api.py index cd766533..16567137 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -66,8 +66,8 @@ def conditional_update(model, values, expected_values, **kwargs): return IMPL.conditional_update(model, values, expected_values, **kwargs) -def get_function(id): - return IMPL.get_function(id) +def get_function(id, insecure=False): + return IMPL.get_function(id, insecure=insecure) def get_functions(limit=None, marker=None, sort_keys=None, diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 2cba8f5c..4a824791 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -216,8 +216,8 @@ def conditional_update(model, values, expected_values, insecure=False, @db_base.session_aware() -def get_function(id, session=None): - function = _get_db_object_by_id(models.Function, id) +def get_function(id, insecure=False, session=None): + function = _get_db_object_by_id(models.Function, id, insecure=insecure) if not function: raise exc.DBEntityNotFoundError("Function not found [id=%s]" % id) diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index 174def70..3b063841 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -68,10 +68,13 @@ def handle_function_service_expiration(ctx, engine_client, orchestrator): def handle_job(engine_client): + """Execute job task with no db transactions.""" for job in db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3)): - LOG.debug("Processing job: %s, function: %s", job.id, job.function_id) + job_id = job.id + func_id = job.function_id + LOG.debug("Processing job: %s, function: %s", job_id, func_id) - func_db = db_api.get_function(job.function_id) + func_db = db_api.get_function(func_id, insecure=True) trust_id = func_db.trust_id try: @@ -94,7 +97,7 @@ def handle_job(engine_client): 'count': 0 }, { - 'id': job.id, + 'id': job_id, 'status': status.RUNNING }, insecure=True, @@ -112,7 +115,7 @@ def handle_job(engine_client): 'count': job.count }, { - 'id': job.id, + 'id': job_id, 'next_execution_time': job.next_execution_time }, insecure=True, @@ -121,24 +124,23 @@ def handle_job(engine_client): if not modified: LOG.warning( 'Job %s has been already handled by another periodic ' - 'task.', job.id + 'task.', job_id ) continue LOG.debug( - "Starting to execute function %s by job %s", - job.function_id, job.id + "Starting to execute function %s by job %s", func_id, job_id ) params = { - 'function_id': job.function_id, + 'function_id': func_id, 'input': job.function_input, 'sync': False, - 'description': constants.EXECUTION_BY_JOB % job.id + 'description': constants.EXECUTION_BY_JOB % job_id } executions.create_execution(engine_client, params) except Exception: - LOG.exception("Failed to process job %s", job.id) + LOG.exception("Failed to process job %s", job_id) finally: context.set_ctx(None)