Merge "Make gearman calls async in ZuulWeb"
This commit is contained in:
commit
e337f46c5d
|
@ -170,15 +170,35 @@ class GearmanHandler(object):
|
|||
'key_get': self.key_get,
|
||||
}
|
||||
|
||||
def setEventLoop(self, event_loop):
|
||||
self.event_loop = event_loop
|
||||
|
||||
# TODO: At some point, we should make this use a gear.Client, rather than
|
||||
# the RPC client, so we can use that to make async Gearman calls. This
|
||||
# implementation will create additional threads by putting the call onto
|
||||
# the asycio ThreadPool, which is not ideal.
|
||||
async def asyncSubmitJob(self, name, data):
|
||||
'''
|
||||
Submit a job to Gearman asynchronously.
|
||||
|
||||
This will raise a asyncio.TimeoutError if we hit the timeout. It is
|
||||
up to the caller to handle the exception.
|
||||
'''
|
||||
gear_task = self.event_loop.run_in_executor(
|
||||
None, self.rpc.submitJob, name, data)
|
||||
job = await asyncio.wait_for(gear_task, 300)
|
||||
return job
|
||||
|
||||
async def tenant_list(self, request, result_filter=None):
|
||||
job = self.rpc.submitJob('zuul:tenant_list', {})
|
||||
job = await self.asyncSubmitJob('zuul:tenant_list', {})
|
||||
return web.json_response(json.loads(job.data[0]))
|
||||
|
||||
async def status_get(self, request, result_filter=None):
|
||||
tenant = request.match_info["tenant"]
|
||||
if tenant not in self.cache or \
|
||||
(time.time() - self.cache_time[tenant]) > self.cache_expiry:
|
||||
job = self.rpc.submitJob('zuul:status_get', {'tenant': tenant})
|
||||
job = await self.asyncSubmitJob('zuul:status_get',
|
||||
{'tenant': tenant})
|
||||
self.cache[tenant] = json.loads(job.data[0])
|
||||
self.cache_time[tenant] = time.time()
|
||||
payload = self.cache[tenant]
|
||||
|
@ -194,14 +214,14 @@ class GearmanHandler(object):
|
|||
|
||||
async def job_list(self, request, result_filter=None):
|
||||
tenant = request.match_info["tenant"]
|
||||
job = self.rpc.submitJob('zuul:job_list', {'tenant': tenant})
|
||||
job = await self.asyncSubmitJob('zuul:job_list', {'tenant': tenant})
|
||||
return web.json_response(json.loads(job.data[0]))
|
||||
|
||||
async def key_get(self, request, result_filter=None):
|
||||
tenant = request.match_info["tenant"]
|
||||
project = request.match_info["project"]
|
||||
job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant,
|
||||
'project': project})
|
||||
job = await self.asyncSubmitJob('zuul:key_get', {'tenant': tenant,
|
||||
'project': project})
|
||||
return web.Response(body=job.data[0])
|
||||
|
||||
async def processRequest(self, request, action, result_filter=None):
|
||||
|
@ -364,6 +384,8 @@ class ZuulWeb(object):
|
|||
|
||||
self.event_loop = loop
|
||||
self.log_streaming_handler.setEventLoop(loop)
|
||||
self.gearman_handler.setEventLoop(loop)
|
||||
|
||||
for handler in self._connection_handlers:
|
||||
if hasattr(handler, 'setEventLoop'):
|
||||
handler.setEventLoop(loop)
|
||||
|
|
Loading…
Reference in New Issue