From b548b141ced9853456bb0c16b90d8ed30d423577 Mon Sep 17 00:00:00 2001 From: K Jonathan Harker Date: Wed, 25 Nov 2015 11:44:37 -0800 Subject: [PATCH] Switch to using the new log_processor project The python scripts have been moved to their own project at openstack-infra/log_processor. Delete the files here and start installing that project from source. As a part of this split, the .py extension has been dropped from the filename of the installed executables. Change-Id: Ied3025df46b5014a092be0c26e43d4f90699a43f --- files/jenkins-log-client.init | 2 +- files/log-gearman-client.py | 244 -------------- files/log-gearman-worker.py | 446 -------------------------- manifests/client.pp | 2 +- manifests/init.pp | 51 +-- manifests/worker.pp | 2 +- templates/jenkins-log-worker.init.erb | 2 +- 7 files changed, 20 insertions(+), 729 deletions(-) delete mode 100644 files/log-gearman-client.py delete mode 100644 files/log-gearman-worker.py diff --git a/files/jenkins-log-client.init b/files/jenkins-log-client.init index 49f9d65..a62dae6 100755 --- a/files/jenkins-log-client.init +++ b/files/jenkins-log-client.init @@ -15,7 +15,7 @@ PATH=/sbin:/usr/sbin:/bin:/usr/bin DESC="Jenkins Log Client" NAME=jenkins-log-client -DAEMON=/usr/local/bin/log-gearman-client.py +DAEMON=/usr/local/bin/log-gearman-client PIDFILE=/var/run/$NAME/$NAME.pid DAEMON_ARGS="-c /etc/logstash/jenkins-log-client.yaml -d /var/log/logstash/log-client-debug.log -p $PIDFILE" SCRIPTNAME=/etc/init.d/$NAME diff --git a/files/log-gearman-client.py b/files/log-gearman-client.py deleted file mode 100644 index 4957fe7..0000000 --- a/files/log-gearman-client.py +++ /dev/null @@ -1,244 +0,0 @@ -#!/usr/bin/python2 -# -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import argparse -import daemon -import gear -import json -import logging -import os -import os.path -import re -import signal -import socket -import threading -import time -import yaml -import zmq - - -try: - import daemon.pidlockfile as pidfile_mod -except ImportError: - import daemon.pidfile as pidfile_mod - - -class EventProcessor(threading.Thread): - def __init__(self, zmq_address, gearman_client, files, source_url): - threading.Thread.__init__(self) - self.files = files - self.source_url = source_url - self.gearman_client = gearman_client - self.zmq_address = zmq_address - self._connect_zmq() - - def run(self): - while True: - try: - self._read_event() - except: - # Assume that an error reading data from zmq or deserializing - # data received from zmq indicates a zmq error and reconnect. - logging.exception("ZMQ exception.") - self._connect_zmq() - - def _connect_zmq(self): - logging.debug("Connecting to zmq endpoint.") - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) - event_filter = b"onFinalized" - self.socket.setsockopt(zmq.SUBSCRIBE, event_filter) - self.socket.connect(self.zmq_address) - - def _read_event(self): - string = self.socket.recv().decode('utf-8') - event = json.loads(string.split(None, 1)[1]) - logging.debug("Jenkins event received: " + json.dumps(event)) - for fileopts in self.files: - output = {} - source_url, out_event = self._parse_event(event, fileopts) - job_filter = fileopts.get('job-filter') - if (job_filter and - not re.match(job_filter, out_event['fields']['build_name'])): - continue - build_queue_filter = fileopts.get('build-queue-filter') - if (build_queue_filter and - not re.match(build_queue_filter, - out_event['fields']['build_queue'])): - continue - project_filter = fileopts.get('project-filter') - if (project_filter and - not re.match(project_filter, out_event['fields']['project'])): - continue - output['source_url'] = source_url - output['retry'] = fileopts.get('retry-get', False) - output['event'] = out_event - if 'subunit' in fileopts.get('name'): - job = gear.Job(b'push-subunit', - json.dumps(output).encode('utf8')) - else: - job = gear.Job(b'push-log', json.dumps(output).encode('utf8')) - try: - self.gearman_client.submitJob(job) - except: - logging.exception("Exception submitting job to Gearman.") - - def _get_log_dir(self, event): - parameters = event["build"].get("parameters", {}) - base = parameters.get('LOG_PATH', 'UNKNOWN') - return base - - def _parse_fields(self, event, filename): - fields = {} - fields["filename"] = filename - fields["build_name"] = event.get("name", "UNKNOWN") - fields["build_status"] = event["build"].get("status", "UNKNOWN") - fields["build_node"] = event["build"].get("node_name", "UNKNOWN") - fields["build_master"] = event["build"].get("host_name", "UNKNOWN") - parameters = event["build"].get("parameters", {}) - fields["project"] = parameters.get("ZUUL_PROJECT", "UNKNOWN") - # The voting value is "1" for voting, "0" for non-voting - fields["voting"] = parameters.get("ZUUL_VOTING", "UNKNOWN") - # TODO(clarkb) can we do better without duplicated data here? - fields["build_uuid"] = parameters.get("ZUUL_UUID", "UNKNOWN") - fields["build_short_uuid"] = fields["build_uuid"][:7] - fields["build_queue"] = parameters.get("ZUUL_PIPELINE", "UNKNOWN") - fields["build_ref"] = parameters.get("ZUUL_REF", "UNKNOWN") - fields["build_branch"] = parameters.get("ZUUL_BRANCH", "UNKNOWN") - fields["build_zuul_url"] = parameters.get("ZUUL_URL", "UNKNOWN") - if parameters.get("ZUUL_CHANGE"): - fields["build_change"] = parameters.get("ZUUL_CHANGE", "UNKNOWN") - fields["build_patchset"] = parameters.get("ZUUL_PATCHSET", - "UNKNOWN") - elif parameters.get("ZUUL_NEWREV"): - fields["build_newrev"] = parameters.get("ZUUL_NEWREV", - "UNKNOWN") - if ["build_node"] != "UNKNOWN": - node_region = '-'.join( - fields["build_node"].split('-')[-3:-1]) - fields["node_region"] = node_region or "UNKNOWN" - else: - fields["node_region"] = "UNKNOWN" - return fields - - def _parse_event(self, event, fileopts): - fields = self._parse_fields(event, fileopts['name']) - log_dir = self._get_log_dir(event) - source_url = fileopts.get('source-url', self.source_url) + '/' + \ - os.path.join(log_dir, fileopts['name']) - fields["log_url"] = source_url - out_event = {} - out_event["fields"] = fields - out_event["tags"] = [os.path.basename(fileopts['name'])] + \ - fileopts.get('tags', []) - return source_url, out_event - - -class Server(object): - def __init__(self, config, debuglog): - # Config init. - self.config = config - self.source_url = self.config['source-url'] - # Pythong logging output file. - self.debuglog = debuglog - self.processors = [] - - def setup_logging(self): - if self.debuglog: - logging.basicConfig(format='%(asctime)s %(message)s', - filename=self.debuglog, level=logging.DEBUG) - else: - # Prevent leakage into the logstash log stream. - logging.basicConfig(level=logging.CRITICAL) - logging.debug("Log pusher starting.") - - def setup_processors(self): - for publisher in self.config['zmq-publishers']: - gearclient = gear.Client() - gearclient.addServer('localhost') - gearclient.waitForServer() - log_processor = EventProcessor( - publisher, gearclient, - self.config['source-files'], self.source_url) - subunit_processor = EventProcessor( - publisher, gearclient, - self.config['subunit-files'], self.source_url) - self.processors.append(log_processor) - self.processors.append(subunit_processor) - - def wait_for_name_resolution(self, host, port): - while True: - try: - socket.getaddrinfo(host, port) - except socket.gaierror as e: - if e.errno == socket.EAI_AGAIN: - logging.debug("Temporary failure in name resolution") - time.sleep(2) - continue - else: - raise - break - - def main(self): - statsd_host = os.environ.get('STATSD_HOST') - statsd_port = int(os.environ.get('STATSD_PORT', 8125)) - statsd_prefix = os.environ.get('STATSD_PREFIX', 'logstash.geard') - if statsd_host: - self.wait_for_name_resolution(statsd_host, statsd_port) - self.gearserver = gear.Server( - statsd_host=statsd_host, - statsd_port=statsd_port, - statsd_prefix=statsd_prefix) - - self.setup_processors() - for processor in self.processors: - processor.daemon = True - processor.start() - while True: - signal.pause() - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("-c", "--config", required=True, - help="Path to yaml config file.") - parser.add_argument("-d", "--debuglog", - help="Enable debug log. " - "Specifies file to write log to.") - parser.add_argument("--foreground", action='store_true', - help="Run in the foreground.") - parser.add_argument("-p", "--pidfile", - default="/var/run/jenkins-log-pusher/" - "jenkins-log-gearman-client.pid", - help="PID file to lock during daemonization.") - args = parser.parse_args() - - with open(args.config, 'r') as config_stream: - config = yaml.load(config_stream) - server = Server(config, args.debuglog) - - if args.foreground: - server.setup_logging() - server.main() - else: - pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10) - with daemon.DaemonContext(pidfile=pidfile): - server.setup_logging() - server.main() - - -if __name__ == '__main__': - main() diff --git a/files/log-gearman-worker.py b/files/log-gearman-worker.py deleted file mode 100644 index 74acc00..0000000 --- a/files/log-gearman-worker.py +++ /dev/null @@ -1,446 +0,0 @@ -#!/usr/bin/python2 -# -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import argparse -import cStringIO -import daemon -import gear -import gzip -import json -import logging -import os -import Queue -import re -import select -import socket -import subprocess -import sys -import threading -import time -import urllib2 -import yaml - - -try: - import daemon.pidlockfile as pidfile_mod -except ImportError: - import daemon.pidfile as pidfile_mod - - -def semi_busy_wait(seconds): - # time.sleep() may return early. If it does sleep() again and repeat - # until at least the number of seconds specified has elapsed. - start_time = time.time() - while True: - time.sleep(seconds) - cur_time = time.time() - seconds = seconds - (cur_time - start_time) - if seconds <= 0.0: - return - - -class FilterException(Exception): - pass - - -class CRM114Filter(object): - def __init__(self, script, path, build_status): - self.p = None - self.script = script - self.path = path - self.build_status = build_status - if build_status not in ['SUCCESS', 'FAILURE']: - return - if not os.path.exists(path): - os.makedirs(path) - args = [script, path, build_status] - self.p = subprocess.Popen(args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - close_fds=True) - - def process(self, data): - if not self.p: - return - self.p.stdin.write(data['message'].encode('utf-8') + '\n') - (r, w, x) = select.select([self.p.stdout], [], - [self.p.stdin, self.p.stdout], 20) - if not r: - self.p.kill() - raise FilterException('Timeout reading from CRM114') - r = self.p.stdout.readline() - if not r: - err = self.p.stderr.read() - if err: - raise FilterException(err) - else: - raise FilterException('Early EOF from CRM114') - r = r.strip() - data['error_pr'] = float(r) - - def _catchOSError(self, method): - try: - method() - except OSError: - logging.exception("Subprocess cleanup failed.") - - def close(self): - if not self.p: - return - # CRM114 should die when its stdinput is closed. Close that - # fd along with stdout and stderr then return. - self._catchOSError(self.p.stdin.close) - self._catchOSError(self.p.stdout.close) - self._catchOSError(self.p.stderr.close) - self._catchOSError(self.p.wait) - - -class CRM114FilterFactory(object): - name = "CRM114" - - def __init__(self, script, basepath): - self.script = script - self.basepath = basepath - - def create(self, fields): - filename = re.sub('\.', '_', fields['filename']) - path = os.path.join(self.basepath, filename) - return CRM114Filter(self.script, path, fields['build_status']) - - -class LogRetriever(threading.Thread): - def __init__(self, gearman_worker, filters, logq): - threading.Thread.__init__(self) - self.gearman_worker = gearman_worker - self.filters = filters - self.logq = logq - - def run(self): - while True: - try: - self._handle_event() - except: - logging.exception("Exception retrieving log event.") - - def _handle_event(self): - job = self.gearman_worker.getJob() - try: - arguments = json.loads(job.arguments.decode('utf-8')) - source_url = arguments['source_url'] - retry = arguments['retry'] - event = arguments['event'] - logging.debug("Handling event: " + json.dumps(event)) - fields = event.get('fields') or event.get('@fields') - tags = event.get('tags') or event.get('@tags') - if fields['build_status'] != 'ABORTED': - # Handle events ignoring aborted builds. These builds are - # discarded by zuul. - log_lines = self._retrieve_log(source_url, retry) - - try: - all_filters = [] - for f in self.filters: - logging.debug("Adding filter: %s" % f.name) - all_filters.append(f.create(fields)) - filters = all_filters - - logging.debug("Pushing " + str(len(log_lines)) + - " log lines.") - base_event = {} - base_event.update(fields) - base_event["tags"] = tags - for line in log_lines: - out_event = base_event.copy() - out_event["message"] = line - new_filters = [] - for f in filters: - try: - f.process(out_event) - new_filters.append(f) - except FilterException: - logging.exception("Exception filtering event: " - "%s" % line.encode("utf-8")) - filters = new_filters - self.logq.put(out_event) - finally: - for f in all_filters: - f.close() - job.sendWorkComplete() - except Exception as e: - logging.exception("Exception handling log event.") - job.sendWorkException(str(e).encode('utf-8')) - - def _retrieve_log(self, source_url, retry): - # TODO (clarkb): This should check the content type instead of file - # extension for determining if gzip was used. - gzipped = False - raw_buf = b'' - try: - gzipped, raw_buf = self._get_log_data(source_url, retry) - except urllib2.HTTPError as e: - if e.code == 404: - logging.info("Unable to retrieve %s: HTTP error 404" % - source_url) - else: - logging.exception("Unable to get log data.") - except Exception: - # Silently drop fatal errors when retrieving logs. - # TODO (clarkb): Handle these errors. - # Perhaps simply add a log message to raw_buf? - logging.exception("Unable to get log data.") - if gzipped: - logging.debug("Decompressing gzipped source file.") - raw_strIO = cStringIO.StringIO(raw_buf) - f = gzip.GzipFile(fileobj=raw_strIO) - buf = f.read().decode('utf-8') - raw_strIO.close() - f.close() - else: - logging.debug("Decoding source file.") - buf = raw_buf.decode('utf-8') - return buf.splitlines() - - def _get_log_data(self, source_url, retry): - gzipped = False - try: - # TODO(clarkb): We really should be using requests instead - # of urllib2. urllib2 will automatically perform a POST - # instead of a GET if we provide urlencoded data to urlopen - # but we need to do a GET. The parameters are currently - # hardcoded so this should be ok for now. - logging.debug("Retrieving: " + source_url + ".gz?level=INFO") - req = urllib2.Request(source_url + ".gz?level=INFO") - req.add_header('Accept-encoding', 'gzip') - r = urllib2.urlopen(req) - except urllib2.URLError: - try: - # Fallback on GETting unzipped data. - logging.debug("Retrieving: " + source_url + "?level=INFO") - r = urllib2.urlopen(source_url + "?level=INFO") - except: - logging.exception("Unable to retrieve source file.") - raise - except: - logging.exception("Unable to retrieve source file.") - raise - if ('gzip' in r.info().get('Content-Type', '') or - 'gzip' in r.info().get('Content-Encoding', '')): - gzipped = True - - raw_buf = r.read() - # Hack to read all of Jenkins console logs as they upload - # asynchronously. After each attempt do an exponential backup before - # the next request for up to 255 seconds total, if we do not - # retrieve the entire file. Short circuit when the end of file string - # for console logs, '\n\n', is read. - if (retry and not gzipped and - raw_buf[-8:].decode('utf-8') != '\n\n'): - content_len = len(raw_buf) - backoff = 1 - while backoff < 129: - # Try for up to 255 seconds to retrieve the complete log file. - try: - logging.debug(str(backoff) + " Retrying fetch of: " + - source_url + "?level=INFO") - logging.debug("Fetching bytes=" + str(content_len) + '-') - req = urllib2.Request(source_url + "?level=INFO") - req.add_header('Range', 'bytes=' + str(content_len) + '-') - r = urllib2.urlopen(req) - raw_buf += r.read() - content_len = len(raw_buf) - except urllib2.HTTPError as e: - if e.code == 416: - logging.exception("Index out of range.") - else: - raise - finally: - if raw_buf[-8:].decode('utf-8') == '\n\n': - break - semi_busy_wait(backoff) - backoff += backoff - - return gzipped, raw_buf - - -class StdOutLogProcessor(object): - def __init__(self, logq, pretty_print=False): - self.logq = logq - self.pretty_print = pretty_print - - def handle_log_event(self): - log = self.logq.get() - if self.pretty_print: - print(json.dumps(log, sort_keys=True, - indent=4, separators=(',', ': '))) - else: - print(json.dumps(log)) - # Push each log event through to keep logstash up to date. - sys.stdout.flush() - - -class INETLogProcessor(object): - socket_type = None - - def __init__(self, logq, host, port): - self.logq = logq - self.host = host - self.port = port - self.socket = None - - def _connect_socket(self): - logging.debug("Creating socket.") - self.socket = socket.socket(socket.AF_INET, self.socket_type) - self.socket.connect((self.host, self.port)) - - def handle_log_event(self): - log = self.logq.get() - try: - if self.socket is None: - self._connect_socket() - self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) - except: - logging.exception("Exception sending INET event.") - # Logstash seems to take about a minute to start again. Wait 90 - # seconds before attempting to reconnect. If logstash is not - # available after 90 seconds we will throw another exception and - # die. - semi_busy_wait(90) - self._connect_socket() - self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) - - -class UDPLogProcessor(INETLogProcessor): - socket_type = socket.SOCK_DGRAM - - -class TCPLogProcessor(INETLogProcessor): - socket_type = socket.SOCK_STREAM - - -class Server(object): - def __init__(self, config, debuglog): - # Config init. - self.config = config - self.gearman_host = self.config['gearman-host'] - self.gearman_port = self.config['gearman-port'] - self.output_host = self.config['output-host'] - self.output_port = self.config['output-port'] - self.output_mode = self.config['output-mode'] - # Pythong logging output file. - self.debuglog = debuglog - self.retriever = None - self.logqueue = Queue.Queue(131072) - self.processor = None - self.filter_factories = [] - crmscript = self.config.get('crm114-script') - crmdata = self.config.get('crm114-data') - if crmscript and crmdata: - self.filter_factories.append( - CRM114FilterFactory(crmscript, crmdata)) - - def setup_logging(self): - if self.debuglog: - logging.basicConfig(format='%(asctime)s %(message)s', - filename=self.debuglog, level=logging.DEBUG) - else: - # Prevent leakage into the logstash log stream. - logging.basicConfig(level=logging.CRITICAL) - logging.debug("Log pusher starting.") - - def wait_for_name_resolution(self, host, port): - while True: - try: - socket.getaddrinfo(host, port) - except socket.gaierror as e: - if e.errno == socket.EAI_AGAIN: - logging.debug("Temporary failure in name resolution") - time.sleep(2) - continue - else: - raise - break - - def setup_retriever(self): - hostname = socket.gethostname() - gearman_worker = gear.Worker(hostname + b'-pusher') - self.wait_for_name_resolution(self.gearman_host, self.gearman_port) - gearman_worker.addServer(self.gearman_host, - self.gearman_port) - gearman_worker.registerFunction(b'push-log') - self.retriever = LogRetriever(gearman_worker, self.filter_factories, - self.logqueue) - - def setup_processor(self): - if self.output_mode == "tcp": - self.processor = TCPLogProcessor(self.logqueue, - self.output_host, - self.output_port) - elif self.output_mode == "udp": - self.processor = UDPLogProcessor(self.logqueue, - self.output_host, - self.output_port) - else: - # Note this processor will not work if the process is run as a - # daemon. You must use the --foreground option. - self.processor = StdOutLogProcessor(self.logqueue) - - def main(self): - self.setup_retriever() - self.setup_processor() - - self.retriever.daemon = True - self.retriever.start() - - while True: - try: - self.processor.handle_log_event() - except: - logging.exception("Exception processing log event.") - raise - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("-c", "--config", required=True, - help="Path to yaml config file.") - parser.add_argument("-d", "--debuglog", - help="Enable debug log. " - "Specifies file to write log to.") - parser.add_argument("--foreground", action='store_true', - help="Run in the foreground.") - parser.add_argument("-p", "--pidfile", - default="/var/run/jenkins-log-pusher/" - "jenkins-log-gearman-worker.pid", - help="PID file to lock during daemonization.") - args = parser.parse_args() - - with open(args.config, 'r') as config_stream: - config = yaml.load(config_stream) - server = Server(config, args.debuglog) - - if args.foreground: - server.setup_logging() - server.main() - else: - pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10) - with daemon.DaemonContext(pidfile=pidfile): - server.setup_logging() - server.main() - - -if __name__ == '__main__': - main() diff --git a/manifests/client.pp b/manifests/client.pp index 999a4af..2e7f1fb 100644 --- a/manifests/client.pp +++ b/manifests/client.pp @@ -36,7 +36,7 @@ class log_processor::client ( mode => '0555', source => 'puppet:///modules/log_processor/jenkins-log-client.init', require => [ - File['/usr/local/bin/log-gearman-client.py'], + Vcsrepo['/opt/log_processor'], File['/etc/logstash/jenkins-log-client.yaml'], File['/etc/default/jenkins-log-client'], ], diff --git a/manifests/init.pp b/manifests/init.pp index eb08cf4..f5d5c5f 100644 --- a/manifests/init.pp +++ b/manifests/init.pp @@ -16,6 +16,8 @@ # == Class: log_processor # class log_processor ( + $git_source = 'git://git.openstack.org/openstack-infra/log_processor', + $git_rev = 'master', ) { package { 'python-daemon': ensure => present, @@ -33,19 +35,6 @@ class log_processor ( ensure => present, } - include ::pip - package { 'gear': - ensure => latest, - provider => 'pip', - require => Class['pip'], - } - - package { 'statsd': - ensure => latest, - provider => 'pip', - require => Class['pip'], - } - file { '/var/lib/crm114': ensure => directory, owner => 'logstash', @@ -64,31 +53,23 @@ class log_processor ( ], } - file { '/usr/local/bin/log-gearman-client.py': - ensure => present, - owner => 'root', - group => 'root', - mode => '0755', - source => 'puppet:///modules/log_processor/log-gearman-client.py', - require => [ + vcsrepo { '/opt/log_processor': + ensure => latest, + provider => git, + source => $git_source, + revision => $git_rev, + } + + exec { 'install-log_processor': + command => 'pip install --install-option="--install-scripts=/usr/local/bin" --install-option="--install-lib=/usr/local/lib/python2.7/dist-packages" /opt/log_processor', + path => '/usr/local/bin:/usr/bin:/bin', + refreshonly => true, + subscribe => Vcsrepo['/opt/log_processor'], + require => [ Package['python-daemon'], - Package['python-zmq'], Package['python-yaml'], - Package['gear'], + Package['python-zmq'], ], } - file { '/usr/local/bin/log-gearman-worker.py': - ensure => present, - owner => 'root', - group => 'root', - mode => '0755', - source => 'puppet:///modules/log_processor/log-gearman-worker.py', - require => [ - Package['python-daemon'], - Package['python-zmq'], - Package['python-yaml'], - Package['gear'], - ], - } } diff --git a/manifests/worker.pp b/manifests/worker.pp index cfe173d..bc65f08 100644 --- a/manifests/worker.pp +++ b/manifests/worker.pp @@ -36,7 +36,7 @@ define log_processor::worker ( mode => '0555', content => template('log_processor/jenkins-log-worker.init.erb'), require => [ - File['/usr/local/bin/log-gearman-worker.py'], + Vcsrepo['/opt/log_processor'], File["/etc/logstash/jenkins-log-worker${suffix}.yaml"], ], } diff --git a/templates/jenkins-log-worker.init.erb b/templates/jenkins-log-worker.init.erb index f122623..af016d8 100755 --- a/templates/jenkins-log-worker.init.erb +++ b/templates/jenkins-log-worker.init.erb @@ -15,7 +15,7 @@ PATH=/sbin:/usr/sbin:/bin:/usr/bin DESC="Jenkins Log Worker" NAME=jenkins-log-worker<%= suffix %> -DAEMON=/usr/local/bin/log-gearman-worker.py +DAEMON=/usr/local/bin/log-gearman-worker PIDFILE=/var/run/$NAME/$NAME.pid DAEMON_ARGS="-c /etc/logstash/jenkins-log-worker<%= suffix %>.yaml -d /var/log/logstash/log-worker<%= suffix %>-debug.log -p $PIDFILE" SCRIPTNAME=/etc/init.d/$NAME