Merge "Make gearman calls async in ZuulWeb"

This commit is contained in:
Zuul 2018-04-19 18:13:07 +00:00 committed by Gerrit Code Review
commit e337f46c5d
1 changed files with 27 additions and 5 deletions

View File

@ -170,15 +170,35 @@ class GearmanHandler(object):
'key_get': self.key_get,
}
def setEventLoop(self, event_loop):
self.event_loop = event_loop
# TODO: At some point, we should make this use a gear.Client, rather than
# the RPC client, so we can use that to make async Gearman calls. This
# implementation will create additional threads by putting the call onto
# the asycio ThreadPool, which is not ideal.
async def asyncSubmitJob(self, name, data):
'''
Submit a job to Gearman asynchronously.
This will raise a asyncio.TimeoutError if we hit the timeout. It is
up to the caller to handle the exception.
'''
gear_task = self.event_loop.run_in_executor(
None, self.rpc.submitJob, name, data)
job = await asyncio.wait_for(gear_task, 300)
return job
async def tenant_list(self, request, result_filter=None):
job = self.rpc.submitJob('zuul:tenant_list', {})
job = await self.asyncSubmitJob('zuul:tenant_list', {})
return web.json_response(json.loads(job.data[0]))
async def status_get(self, request, result_filter=None):
tenant = request.match_info["tenant"]
if tenant not in self.cache or \
(time.time() - self.cache_time[tenant]) > self.cache_expiry:
job = self.rpc.submitJob('zuul:status_get', {'tenant': tenant})
job = await self.asyncSubmitJob('zuul:status_get',
{'tenant': tenant})
self.cache[tenant] = json.loads(job.data[0])
self.cache_time[tenant] = time.time()
payload = self.cache[tenant]
@ -194,14 +214,14 @@ class GearmanHandler(object):
async def job_list(self, request, result_filter=None):
tenant = request.match_info["tenant"]
job = self.rpc.submitJob('zuul:job_list', {'tenant': tenant})
job = await self.asyncSubmitJob('zuul:job_list', {'tenant': tenant})
return web.json_response(json.loads(job.data[0]))
async def key_get(self, request, result_filter=None):
tenant = request.match_info["tenant"]
project = request.match_info["project"]
job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant,
'project': project})
job = await self.asyncSubmitJob('zuul:key_get', {'tenant': tenant,
'project': project})
return web.Response(body=job.data[0])
async def processRequest(self, request, action, result_filter=None):
@ -364,6 +384,8 @@ class ZuulWeb(object):
self.event_loop = loop
self.log_streaming_handler.setEventLoop(loop)
self.gearman_handler.setEventLoop(loop)
for handler in self._connection_handlers:
if hasattr(handler, 'setEventLoop'):
handler.setEventLoop(loop)