summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Treinish <mtreinish@kortar.org>2016-04-08 14:59:07 -0400
committerMatthew Treinish <mtreinish@kortar.org>2016-04-08 14:59:07 -0400
commitd31b5d971076b2586be827ce80b6d30043974fc9 (patch)
treec4ed934e20ea5e0f8777da19a52a7f04cac49c69
parent224ed54b1e614f0bbcceb695eb72d4029dbf1654 (diff)
Make subunit2sql population single threaded
This commit switches the gearman worker to handle both retrieving the subunit file and populating the DB with that data in a single thread. This is as opposed to pushing it on an in memory queue and processing the streams in a separate thread. This should have 2 major benefits first it should make the behavior much more consistent, we'll only send work completed to the gearman server after data is already in the db. It should also significantly improve our memory consumption. Change-Id: Ifaac2f150e26158b0a22e1c33e0c5d39c10182a4
Notes
Notes (review): Code-Review+2: yolanda.robla <yolanda.robla-mota@hpe.com> Code-Review+2: James E. Blair <corvus@inaugust.com> Workflow+1: James E. Blair <corvus@inaugust.com> Verified+2: Jenkins Submitted-by: Jenkins Submitted-at: Fri, 15 Apr 2016 23:45:28 +0000 Reviewed-on: https://review.openstack.org/303583 Project: openstack-infra/puppet-subunit2sql Branch: refs/heads/master
-rw-r--r--files/subunit-gearman-worker.py110
1 files changed, 44 insertions, 66 deletions
diff --git a/files/subunit-gearman-worker.py b/files/subunit-gearman-worker.py
index 36a11b2..5c64833 100644
--- a/files/subunit-gearman-worker.py
+++ b/files/subunit-gearman-worker.py
@@ -22,7 +22,6 @@ import io
22import json 22import json
23import logging 23import logging
24import os 24import os
25import Queue
26import socket 25import socket
27import threading 26import threading
28import time 27import time
@@ -56,11 +55,50 @@ class FilterException(Exception):
56 55
57 56
58class SubunitRetriever(threading.Thread): 57class SubunitRetriever(threading.Thread):
59 def __init__(self, gearman_worker, filters, subunitq): 58 def __init__(self, gearman_worker, filters, subunit2sql_conf):
60 threading.Thread.__init__(self) 59 threading.Thread.__init__(self)
61 self.gearman_worker = gearman_worker 60 self.gearman_worker = gearman_worker
62 self.filters = filters 61 self.filters = filters
63 self.subunitq = subunitq 62 # Initialize subunit2sql settings
63 self.config = subunit2sql_conf
64 shell.cli_opts()
65 extensions = shell.get_extensions()
66 shell.parse_args([], [self.config])
67 self.extra_targets = shell.get_targets(extensions)
68
69 def _write_to_db(self, subunit):
70 subunit_v2 = subunit.pop('subunit')
71 # Set run metadata from gearman
72 log_url = subunit.pop('log_url', None)
73 if log_url:
74 log_dir = os.path.dirname(log_url)
75
76 # log_dir should be the top-level directory containing a job run,
77 # but the subunit file may be nested in 0 - 2 subdirectories (top,
78 # logs/, or logs/old/), so we need to safely correct the path here
79 log_base = os.path.basename(log_dir)
80 if log_base == 'logs':
81 log_dir = os.path.dirname(log_dir)
82 elif log_base == 'old':
83 log_dir = os.path.dirname(os.path.dirname(log_dir))
84
85 shell.CONF.set_override('artifacts', log_dir)
86 shell.CONF.set_override('run_meta', subunit)
87 # Parse subunit stream and store in DB
88 if subunit_v2.closed:
89 logging.debug('Trying to convert closed subunit v2 stream: %s to '
90 'SQL' % subunit_v2)
91 else:
92 logging.debug('Converting Subunit V2 stream: %s to SQL' %
93 subunit_v2)
94 stream = read_subunit.ReadSubunit(subunit_v2,
95 targets=self.extra_targets)
96 results = stream.get_results()
97 start_time = sorted(
98 [results[x]['start_time'] for x in results if x != 'run_time'])[0]
99 shell.CONF.set_override('run_at', start_time.isoformat())
100 shell.process_results(results)
101 subunit_v2.close()
64 102
65 def run(self): 103 def run(self):
66 while True: 104 while True:
@@ -93,7 +131,7 @@ class SubunitRetriever(threading.Thread):
93 logging.debug("Pushing subunit file: %s" % subunit_io) 131 logging.debug("Pushing subunit file: %s" % subunit_io)
94 out_event = fields.copy() 132 out_event = fields.copy()
95 out_event["subunit"] = subunit_io 133 out_event["subunit"] = subunit_io
96 self.subunitq.put(out_event) 134 self._write_to_db(out_event)
97 job.sendWorkComplete() 135 job.sendWorkComplete()
98 except Exception as e: 136 except Exception as e:
99 logging.exception("Exception handling log event.") 137 logging.exception("Exception handling log event.")
@@ -163,53 +201,6 @@ class SubunitRetriever(threading.Thread):
163 return gzipped, raw_buf 201 return gzipped, raw_buf
164 202
165 203
166class Subunit2SQLProcessor(object):
167 def __init__(self, subunitq, subunit2sql_conf):
168 self.subunitq = subunitq
169 self.config = subunit2sql_conf
170 # Initialize subunit2sql settings
171 shell.cli_opts()
172 extensions = shell.get_extensions()
173 shell.parse_args([], [self.config])
174 self.extra_targets = shell.get_targets(extensions)
175
176 def handle_subunit_event(self):
177 # Pull subunit event from queue and separate stream from metadata
178 subunit = self.subunitq.get()
179 subunit_v2 = subunit.pop('subunit')
180 # Set run metadata from gearman
181 log_url = subunit.pop('log_url', None)
182 if log_url:
183 log_dir = os.path.dirname(log_url)
184
185 # log_dir should be the top-level directory containing a job run,
186 # but the subunit file may be nested in 0 - 2 subdirectories (top,
187 # logs/, or logs/old/), so we need to safely correct the path here
188 log_base = os.path.basename(log_dir)
189 if log_base == 'logs':
190 log_dir = os.path.dirname(log_dir)
191 elif log_base == 'old':
192 log_dir = os.path.dirname(os.path.dirname(log_dir))
193
194 shell.CONF.set_override('artifacts', log_dir)
195 shell.CONF.set_override('run_meta', subunit)
196 # Parse subunit stream and store in DB
197 if subunit_v2.closed:
198 logging.debug('Trying to convert closed subunit v2 stream: %s to '
199 'SQL' % subunit_v2)
200 else:
201 logging.debug('Converting Subunit V2 stream: %s to SQL' %
202 subunit_v2)
203 stream = read_subunit.ReadSubunit(subunit_v2,
204 targets=self.extra_targets)
205 results = stream.get_results()
206 start_time = sorted(
207 [results[x]['start_time'] for x in results if x != 'run_time'])[0]
208 shell.CONF.set_override('run_at', start_time.isoformat())
209 shell.process_results(results)
210 subunit_v2.close()
211
212
213class Server(object): 204class Server(object):
214 def __init__(self, config, debuglog): 205 def __init__(self, config, debuglog):
215 # Config init. 206 # Config init.
@@ -219,8 +210,6 @@ class Server(object):
219 # Pythong logging output file. 210 # Pythong logging output file.
220 self.debuglog = debuglog 211 self.debuglog = debuglog
221 self.retriever = None 212 self.retriever = None
222 self.subunitqueue = Queue.Queue(131072)
223 self.processor = None
224 self.filter_factories = [] 213 self.filter_factories = []
225 214
226 def setup_logging(self): 215 def setup_logging(self):
@@ -238,28 +227,17 @@ class Server(object):
238 gearman_worker.addServer(self.gearman_host, 227 gearman_worker.addServer(self.gearman_host,
239 self.gearman_port) 228 self.gearman_port)
240 gearman_worker.registerFunction(b'push-subunit') 229 gearman_worker.registerFunction(b'push-subunit')
230 subunit2sql_conf = self.config['config']
241 self.retriever = SubunitRetriever(gearman_worker, 231 self.retriever = SubunitRetriever(gearman_worker,
242 self.filter_factories, 232 self.filter_factories,
243 self.subunitqueue) 233 subunit2sql_conf)
244
245 def setup_processor(self):
246 subunit2sql_config = self.config['config']
247 self.processor = Subunit2SQLProcessor(self.subunitqueue,
248 subunit2sql_config)
249 234
250 def main(self): 235 def main(self):
251 self.setup_retriever() 236 self.setup_retriever()
252 self.setup_processor()
253 237
254 self.retriever.daemon = True 238 self.retriever.daemon = True
255 self.retriever.start() 239 self.retriever.start()
256 240
257 while True:
258 try:
259 self.processor.handle_subunit_event()
260 except:
261 logging.exception("Exception processing log event.")
262
263 241
264def main(): 242def main():
265 parser = argparse.ArgumentParser() 243 parser = argparse.ArgumentParser()