From 9d84647dee3395b876c838889b70005fa0aebf41 Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Tue, 10 Apr 2018 09:33:55 -0400 Subject: [PATCH] Make gearman calls async in ZuulWeb These need to be scheduled on the event loop, or else they'll block. We choose a 300s gearman timeout to match the change in: I12741bb259c1a78fa2446d764318f84df34bac67 Change-Id: I2785d945c8032f73bfdc240cf09954b5ed9a3978 --- zuul/web/__init__.py | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index 61e2ad376d..f634fa9988 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -170,15 +170,35 @@ class GearmanHandler(object): 'key_get': self.key_get, } + def setEventLoop(self, event_loop): + self.event_loop = event_loop + + # TODO: At some point, we should make this use a gear.Client, rather than + # the RPC client, so we can use that to make async Gearman calls. This + # implementation will create additional threads by putting the call onto + # the asycio ThreadPool, which is not ideal. + async def asyncSubmitJob(self, name, data): + ''' + Submit a job to Gearman asynchronously. + + This will raise a asyncio.TimeoutError if we hit the timeout. It is + up to the caller to handle the exception. + ''' + gear_task = self.event_loop.run_in_executor( + None, self.rpc.submitJob, name, data) + job = await asyncio.wait_for(gear_task, 300) + return job + async def tenant_list(self, request, result_filter=None): - job = self.rpc.submitJob('zuul:tenant_list', {}) + job = await self.asyncSubmitJob('zuul:tenant_list', {}) return web.json_response(json.loads(job.data[0])) async def status_get(self, request, result_filter=None): tenant = request.match_info["tenant"] if tenant not in self.cache or \ (time.time() - self.cache_time[tenant]) > self.cache_expiry: - job = self.rpc.submitJob('zuul:status_get', {'tenant': tenant}) + job = await self.asyncSubmitJob('zuul:status_get', + {'tenant': tenant}) self.cache[tenant] = json.loads(job.data[0]) self.cache_time[tenant] = time.time() payload = self.cache[tenant] @@ -194,14 +214,14 @@ class GearmanHandler(object): async def job_list(self, request, result_filter=None): tenant = request.match_info["tenant"] - job = self.rpc.submitJob('zuul:job_list', {'tenant': tenant}) + job = await self.asyncSubmitJob('zuul:job_list', {'tenant': tenant}) return web.json_response(json.loads(job.data[0])) async def key_get(self, request, result_filter=None): tenant = request.match_info["tenant"] project = request.match_info["project"] - job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant, - 'project': project}) + job = await self.asyncSubmitJob('zuul:key_get', {'tenant': tenant, + 'project': project}) return web.Response(body=job.data[0]) async def processRequest(self, request, action, result_filter=None): @@ -364,6 +384,8 @@ class ZuulWeb(object): self.event_loop = loop self.log_streaming_handler.setEventLoop(loop) + self.gearman_handler.setEventLoop(loop) + for handler in self._connection_handlers: if hasattr(handler, 'setEventLoop'): handler.setEventLoop(loop)