From a8951ef927ec9aab093b4915af3fc5e60fdb2a55 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 19 Dec 2013 09:51:43 -0800 Subject: [PATCH] Add statsd support Also correct the oversight of not documenting the SSL params to the server class. Also cap sphinx due to issues with 1.2 and ignore hacking. Change-Id: Ic4e8e942620d06a92696d5cf52bc2e9ce6e66bdc --- gear/__init__.py | 77 +++++++++++++++++++++++++++++++++++++++++-- gear/cmd/geard.py | 16 +++++++-- requirements.txt | 2 +- test-requirements.txt | 3 +- tox.ini | 3 +- 5 files changed, 94 insertions(+), 7 deletions(-) diff --git a/gear/__init__.py b/gear/__init__.py index 12ff764..71a47f0 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -29,6 +29,11 @@ try: except ImportError: import queue as queue +try: + import statsd +except ImportError: + statsd = None + PRECEDENCE_NORMAL = 0 PRECEDENCE_LOW = 1 PRECEDENCE_HIGH = 2 @@ -2109,13 +2114,31 @@ class Server(BaseClientServer): (not for production use). :arg int port: The TCP port on which to listen. + :arg str ssl_key: Path to the SSL private key. + :arg str ssl_cert: Path to the SSL certificate. + :arg str ssl_ca: Path to the CA certificate. + :arg str statsd_host: statsd hostname. None means disabled + (the default). + :arg str statsd_port: statsd port (defaults to 8125). + :arg str statsd_prefix: statsd key prefix. """ - def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None): + def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None, + statsd_host=None, statsd_port=8125, statsd_prefix=None): self.port = port self.ssl_key = ssl_key self.ssl_cert = ssl_cert self.ssl_ca = ssl_ca + if statsd_host: + if not statsd: + self.log.error("Unable to import statsd module") + self.statsd = None + else: + self.statsd = statsd.StatsClient(statsd_host, + statsd_port, + statsd_prefix) + else: + self.statsd = None self.high_queue = [] self.normal_queue = [] self.low_queue = [] @@ -2227,6 +2250,7 @@ class Server(BaseClientServer): if job.worker_connection: del job.worker_connection.related_jobs[job.handle] del self.jobs[job.handle] + self._updateStats() def getQueue(self): """Returns a copy of all internal queues in a flattened form. @@ -2257,11 +2281,12 @@ class Server(BaseClientServer): if handle == job.handle: queue.remove(job) del self.jobs[handle] + self._updateStats() request.connection.sendRaw(b'OK\n') return request.connection.sendRaw(b'ERR UNKNOWN_JOB\n') - def handleStatus(self, request): + def _getFunctionStats(self): functions = {} for function in self.functions: # Total, running, workers @@ -2273,6 +2298,10 @@ class Server(BaseClientServer): for connection in self.active_connections: for function in connection.functions: functions[function][2] += 1 + return functions + + def handleStatus(self, request): + functions = self._getFunctionStats() for name, values in functions.items(): request.connection.sendRaw(("%s\t%s\t%s\t%s\n" % (name, values[0], values[1], @@ -2297,6 +2326,47 @@ class Server(BaseClientServer): connection.changeState("AWAKE") connection.sendPacket(p) + def _updateStats(self): + if not self.statsd: + return + + # prefix.queue.JOB.waiting + # prefix.queue.JOB.running + # prefix.queue.JOB.workers + # prefix.queue.waiting + # prefix.queue.running + # prefix.queue.workers + functions = self._getFunctionStats() + base_key = 'queue' + total_waiting = 0 + total_running = 0 + for name, values in functions.items(): + (total, running, workers) = values + job_key = '.'.join([base_key, name]) + + key = '.'.join([job_key, 'waiting']) + self.statsd.gauge(key, total - running) + total_waiting += (total - running) + + key = '.'.join([job_key, 'running']) + self.statsd.gauge(key, running) + total_running += running + + key = '.'.join([job_key, 'workers']) + self.statsd.gauge(key, workers) + + key = '.'.join([base_key, 'waiting']) + self.statsd.gauge(key, total_waiting) + key = '.'.join([base_key, 'running']) + self.statsd.gauge(key, total_running) + + total_workers = 0 + for connection in self.active_connections: + if connection.functions: + total_workers += 1 + key = '.'.join([base_key, 'workers']) + self.statsd.gauge(key, total_workers) + def _handleSubmitJob(self, packet, precedence): name = packet.getArgument(0) unique = packet.getArgument(1) @@ -2317,6 +2387,7 @@ class Server(BaseClientServer): self.normal_queue.append(job) elif precedence == PRECEDENCE_LOW: self.low_queue.append(job) + self._updateStats() self.wakeConnections() def handleSubmitJob(self, packet): @@ -2337,6 +2408,7 @@ class Server(BaseClientServer): connection.related_jobs[job.handle] = job job.worker_connection = connection job.running = True + self._updateStats() return job return None @@ -2399,6 +2471,7 @@ class Server(BaseClientServer): del self.jobs[handle] del job.client_connection.related_jobs[handle] del job.worker_connection.related_jobs[handle] + self._updateStats() def handleSetClientID(self, packet): name = packet.getArgument(0) diff --git a/gear/cmd/geard.py b/gear/cmd/geard.py index 45df72a..fa33216 100644 --- a/gear/cmd/geard.py +++ b/gear/cmd/geard.py @@ -34,7 +34,13 @@ class Server(object): self.gear_server_pid = None def parse_arguments(self): - parser = argparse.ArgumentParser(description='Gearman server.') + parser = argparse.ArgumentParser(description=""" +Gearman server. + +If the statsd python module is available, set STATSD_HOST, +STATSD_PORT, and STATSD_PREFIX environment variables for statsd +support. +""") parser.add_argument('-d', dest='nodaemon', action='store_true', help='do not run as a daemon') parser.add_argument('-p', dest='port', default=4730, @@ -69,10 +75,16 @@ class Server(object): def main(self): self.setup_logging() + statsd_host = os.environ.get('STATSD_HOST') + statsd_port = int(os.environ.get('STATSD_PORT', 8125)) + statsd_prefix = os.environ.get('STATSD_PREFIX') self.server = gear.Server(self.args.port, self.args.ssl_key, self.args.ssl_cert, - self.args.ssl_ca) + self.args.ssl_ca, + statsd_host, + statsd_port, + statsd_prefix) signal.pause() diff --git a/requirements.txt b/requirements.txt index 4ecbf2a..4f48485 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ pbr>=0.5.21,<1.0 -python-daemon extras +python-daemon diff --git a/test-requirements.txt b/test-requirements.txt index 4dab357..c75f664 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,7 +3,8 @@ discover fixtures>=0.3.12 hacking>=0.5.3,<0.6 python-subunit -sphinx>=1.1.2 +statsd>=1.0.0,<3.0 +sphinx>=1.1.2,<1.2 testrepository>=0.0.13 testresources testscenarios diff --git a/tox.ini b/tox.ini index 3b544d1..572fff4 100644 --- a/tox.ini +++ b/tox.ini @@ -33,4 +33,5 @@ commands = {posargs} [flake8] exclude = .venv,.tox,dist,doc,*.egg show-source = true -ignore = E123,E125 +# E123, E125, and H ignored intentionally in this code-base +ignore = E123,E125,H