Merge "Add parallelization options"

This commit is contained in:
Jenkins 2015-12-23 22:43:49 +00:00 committed by Gerrit Code Review
commit 0fc023b911
8 changed files with 330 additions and 36 deletions

View File

@ -160,6 +160,13 @@ arguments after the job definition path. To update Foo1 and Foo2 run::
jenkins-jobs update /path/to/defs Foo1 Foo2
You can also enable the parallel execution of the program passing the workers
option with a value of 0, 2, or higher. Use 0 to run as many workers as cores
in the host that runs it, and 2 or higher to specify the number of workers to
use::
jenkins-jobs update --workers 0 /path/to/defs
Passing Multiple Paths
^^^^^^^^^^^^^^^^^^^^^^
It is possible to pass multiple paths to JJB using colons as a path separator on

View File

@ -26,8 +26,10 @@ import jenkins
import re
from pprint import pformat
import logging
import time
from jenkins_jobs.constants import MAGIC_MANAGE_STRING
from jenkins_jobs.parallel import parallelize
from jenkins_jobs.parser import YamlParser
from jenkins_jobs import utils
@ -135,6 +137,7 @@ class Jenkins(object):
self._job_list = set(job['name'] for job in self.jobs)
return self._job_list
@parallelize
def update_job(self, job_name, xml):
if self.is_job(job_name):
logger.info("Reconfiguring jenkins job {0}".format(job_name))
@ -303,6 +306,7 @@ class Builder(object):
self.jenkins.delete_job(job)
if(self.cache.is_cached(job)):
self.cache.set(job, '')
self.cache.save()
def delete_all_jobs(self):
jobs = self.jenkins.get_jobs()
@ -311,10 +315,23 @@ class Builder(object):
# Need to clear the JJB cache after deletion
self.cache.clear()
def update_job(self, input_fn, jobs_glob=None, output=None):
@parallelize
def changed(self, job):
md5 = job.md5()
changed = self.ignore_cache or self.cache.has_changed(job.name, md5)
if not changed:
logger.debug("'{0}' has not changed".format(job.name))
return changed
def update_jobs(self, input_fn, jobs_glob=None, output=None,
n_workers=None):
orig = time.time()
self.load_files(input_fn)
self.parser.expandYaml(jobs_glob)
self.parser.generateXML()
step = time.time()
logging.debug('%d XML files generated in %ss',
len(self.parser.jobs), str(step - orig))
logger.info("Number of jobs generated: %d", len(self.parser.xml_jobs))
self.parser.xml_jobs.sort(key=operator.attrgetter('name'))
@ -328,9 +345,8 @@ class Builder(object):
if not os.path.isdir(output):
raise
updated_jobs = 0
for job in self.parser.xml_jobs:
if output:
if output:
for job in self.parser.xml_jobs:
if hasattr(output, 'write'):
# `output` is a file-like object
logger.info("Job name: %s", job.name)
@ -351,17 +367,54 @@ class Builder(object):
logger.debug("Writing XML to '{0}'".format(output_fn))
with io.open(output_fn, 'w', encoding='utf-8') as f:
f.write(job.output().decode('utf-8'))
continue
md5 = job.md5()
if (self.jenkins.is_job(job.name)
and not self.cache.is_cached(job.name)):
old_md5 = self.jenkins.get_job_md5(job.name)
self.cache.set(job.name, old_md5)
return self.parser.xml_jobs, len(self.parser.xml_jobs)
if self.cache.has_changed(job.name, md5) or self.ignore_cache:
self.jenkins.update_job(job.name, job.output().decode('utf-8'))
updated_jobs += 1
self.cache.set(job.name, md5)
# Filter out the jobs that did not change
logging.debug('Filtering %d jobs for changed jobs',
len(self.parser.xml_jobs))
step = time.time()
jobs = [job for job in self.parser.xml_jobs
if self.changed(job)]
logging.debug("Filtered for changed jobs in %ss",
(time.time() - step))
if not jobs:
return [], 0
# Update the jobs
logging.debug('Updating jobs')
step = time.time()
p_params = [{'job': job} for job in jobs]
results = self.parallel_update_job(
n_workers=n_workers,
parallelize=p_params)
logging.debug("Parsing results")
# generalize the result parsing, as a parallelized job always returns a
# list
if len(p_params) in (1, 0):
results = [results]
for result in results:
if isinstance(result, Exception):
raise result
else:
logger.debug("'{0}' has not changed".format(job.name))
return self.parser.xml_jobs, updated_jobs
# update in-memory cache
j_name, j_md5 = result
self.cache.set(j_name, j_md5)
# write cache to disk
self.cache.save()
logging.debug("Updated %d jobs in %ss",
len(jobs),
time.time() - step)
logging.debug("Total run took %ss", (time.time() - orig))
return jobs, len(jobs)
@parallelize
def parallel_update_job(self, job):
self.jenkins.update_job(job.name, job.output().decode('utf-8'))
return (job.name, job.md5())
def update_job(self, input_fn, jobs_glob=None, output=None):
logging.warn('Current update_job function signature is deprecated and '
'will change in future versions to the signature of the '
'new parallel_update_job')
return self.update_jobs(input_fn, jobs_glob, output)

View File

@ -104,6 +104,9 @@ def create_parser():
parser_update.add_argument('--delete-old', help='delete obsolete jobs',
action='store_true',
dest='delete_old', default=False,)
parser_update.add_argument('--workers', dest='n_workers', type=int,
default=1, help='number of workers to use, 0 '
'for autodetection and 1 for just one worker.')
# subparser: test
parser_test = subparser.add_parser('test', parents=[recursive_parser])
@ -325,17 +328,23 @@ def execute(options, config):
logger.info("Deleting all jobs")
builder.delete_all_jobs()
elif options.command == 'update':
if options.n_workers < 0:
raise JenkinsJobsException(
'Number of workers must be equal or greater than 0')
logger.info("Updating jobs in {0} ({1})".format(
options.path, options.names))
jobs, num_updated_jobs = builder.update_job(options.path,
options.names)
jobs, num_updated_jobs = builder.update_jobs(
options.path, options.names,
n_workers=options.n_workers)
logger.info("Number of jobs updated: %d", num_updated_jobs)
if options.delete_old:
num_deleted_jobs = builder.delete_old_managed()
logger.info("Number of jobs deleted: %d", num_deleted_jobs)
elif options.command == 'test':
builder.update_job(options.path, options.name,
output=options.output_dir)
builder.update_jobs(options.path, options.name,
output=options.output_dir,
n_workers=1)
def version():

151
jenkins_jobs/parallel.py Normal file
View File

@ -0,0 +1,151 @@
#!/usr/bin/env python
# Copyright (C) 2015 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Parallel execution helper functions and classes
from functools import wraps
import logging
from multiprocessing import cpu_count
import threading
import traceback
try:
import Queue as queue
except ImportError:
import queue
logger = logging.getLogger(__name__)
class TaskFunc(dict):
"""
Simple class to wrap around the information needed to run a function.
"""
def __init__(self, n_ord, func, args=None, kwargs=None):
self['func'] = func
self['args'] = args or []
self['kwargs'] = kwargs or {}
self['ord'] = n_ord
class Worker(threading.Thread):
"""
Class that actually does the work, gets a TaskFunc through the queue,
runs its function with the passed parameters and returns the result
If the string 'done' is passed instead of a TaskFunc instance, the thread
will end.
"""
def __init__(self, in_queue, out_queue):
threading.Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
while True:
task = self.in_queue.get()
if task == 'done':
return
try:
res = task['func'](*task['args'],
**task['kwargs'])
except Exception as exc:
res = exc
traceback.print_exc()
self.out_queue.put((task['ord'], res))
def parallelize(func):
@wraps(func)
def parallelized(*args, **kwargs):
"""
This function will spawn workers and run the decorated function in
parallel on the workers. It will not ensure the thread safety of the
decorated function (the decorated function should be thread safe by
itself). It accepts two special parameters:
:arg list parallelize: list of the arguments to pass to each of the
runs, the results of each run will be returned in the same order.
:arg int n_workers: number of workers to use, by default and if '0'
passed will autodetect the number of cores and use that, if '1'
passed, it will not use any workers and just run as if were not
parallelized everything.
Example:
> @parallelize
> def sample(param1, param2, param3):
> return param1 + param2 + param3
>
> sample('param1', param2='val2',
> parallelize=[
> {'param3': 'val3'},
> {'param3': 'val4'},
> {'param3': 'val5'},
> ])
>
['param1val2val3', 'param1val2val4', 'param1val2val5']
This will run the function `parallelized_function` 3 times, in
parallel (depending on the number of detected cores) and return an
array with the results of the executions in the same order the
parameters were passed.
"""
n_workers = kwargs.pop('n_workers', 0)
p_kwargs = kwargs.pop('parallelize', [])
# if only one parameter is passed inside the parallelize dict, run the
# original function as is, no need for pools
if len(p_kwargs) == 1:
kwargs.update(p_kwargs[0])
if len(p_kwargs) in (1, 0):
return func(*args, **kwargs)
# prepare the workers
# If no number of workers passed or passed 0
if not n_workers:
n_workers = cpu_count()
logging.debug("Running parallel %d workers", n_workers)
worker_pool = []
in_queue = queue.Queue()
out_queue = queue.Queue()
for n_worker in range(n_workers):
new_worker = Worker(in_queue, out_queue)
new_worker.setDaemon(True)
logging.debug("Spawning worker %d", n_worker)
new_worker.start()
worker_pool.append(new_worker)
# Feed the workers
n_ord = 0
for f_kwargs in p_kwargs:
f_kwargs.update(kwargs)
in_queue.put(TaskFunc(n_ord, func, args, f_kwargs))
n_ord += 1
for _ in range(n_workers):
in_queue.put('done')
# Wait for the results
logging.debug("Waiting for workers to finish processing")
results = []
for _ in p_kwargs:
new_res = out_queue.get()
results.append(new_res)
# cleanup
for worker in worker_pool:
worker.join()
# Reorder the results
results = [r[1] for r in sorted(results)]
logging.debug("Parallel task finished")
return results
return parallelized

View File

@ -85,8 +85,8 @@ class TestTests(CmdTestsBase):
args.output_dir = mock.Mock(wraps=io.BytesIO())
cmd.execute(args, self.config) # probably better to fail here
@mock.patch('jenkins_jobs.cmd.Builder.update_job')
def test_multi_path(self, update_job_mock):
@mock.patch('jenkins_jobs.cmd.Builder.update_jobs')
def test_multi_path(self, update_jobs_mock):
"""
Run test mode and pass multiple paths.
"""
@ -98,14 +98,15 @@ class TestTests(CmdTestsBase):
cmd.execute(args, self.config)
self.assertEqual(args.path, path_list)
update_job_mock.assert_called_with(path_list, [],
output=args.output_dir)
update_jobs_mock.assert_called_with(path_list, [],
output=args.output_dir,
n_workers=mock.ANY)
@mock.patch('jenkins_jobs.cmd.Builder.update_job')
@mock.patch('jenkins_jobs.cmd.Builder.update_jobs')
@mock.patch('jenkins_jobs.cmd.os.path.isdir')
@mock.patch('jenkins_jobs.cmd.os.walk')
def test_recursive_multi_path(self, os_walk_mock, isdir_mock,
update_job_mock):
update_jobs_mock):
"""
Run test mode and pass multiple paths with recursive path option.
"""
@ -125,20 +126,22 @@ class TestTests(CmdTestsBase):
cmd.execute(args, self.config)
update_job_mock.assert_called_with(paths, [], output=args.output_dir)
update_jobs_mock.assert_called_with(paths, [], output=args.output_dir,
n_workers=mock.ANY)
args = self.parser.parse_args(['test', multipath])
args.output_dir = mock.MagicMock()
self.config.set('job_builder', 'recursive', 'True')
cmd.execute(args, self.config)
update_job_mock.assert_called_with(paths, [], output=args.output_dir)
update_jobs_mock.assert_called_with(paths, [], output=args.output_dir,
n_workers=mock.ANY)
@mock.patch('jenkins_jobs.cmd.Builder.update_job')
@mock.patch('jenkins_jobs.cmd.Builder.update_jobs')
@mock.patch('jenkins_jobs.cmd.os.path.isdir')
@mock.patch('jenkins_jobs.cmd.os.walk')
def test_recursive_multi_path_with_excludes(self, os_walk_mock, isdir_mock,
update_job_mock):
update_jobs_mock):
"""
Run test mode and pass multiple paths with recursive path option.
"""
@ -160,7 +163,8 @@ class TestTests(CmdTestsBase):
cmd.execute(args, self.config)
update_job_mock.assert_called_with(paths, [], output=args.output_dir)
update_jobs_mock.assert_called_with(paths, [], output=args.output_dir,
n_workers=mock.ANY)
def test_console_output(self):
"""

View File

@ -26,19 +26,19 @@ from tests.cmd.test_cmd import CmdTestsBase
@mock.patch('jenkins_jobs.builder.Jenkins.get_plugins_info', mock.MagicMock)
class UpdateTests(CmdTestsBase):
@mock.patch('jenkins_jobs.cmd.Builder.update_job')
def test_update_jobs(self, update_job_mock):
@mock.patch('jenkins_jobs.cmd.Builder.update_jobs')
def test_update_jobs(self, update_jobs_mock):
"""
Test update_job is called
"""
# don't care about the value returned here
update_job_mock.return_value = ([], 0)
update_jobs_mock.return_value = ([], 0)
path = os.path.join(self.fixtures_path, 'cmd-002.yaml')
args = self.parser.parse_args(['update', path])
cmd.execute(args, self.config)
update_job_mock.assert_called_with([path], [])
update_jobs_mock.assert_called_with([path], [], n_workers=mock.ANY)
@mock.patch('jenkins_jobs.builder.Jenkins.is_job', return_value=True)
@mock.patch('jenkins_jobs.builder.Jenkins.get_jobs')
@ -88,7 +88,7 @@ class UpdateTests(CmdTestsBase):
# mocks to call real methods on a the above test object.
b_inst = builder_mock.return_value
b_inst.plugins_list = builder_obj.plugins_list
b_inst.update_job.side_effect = builder_obj.update_job
b_inst.update_jobs.side_effect = builder_obj.update_jobs
b_inst.delete_old_managed.side_effect = builder_obj.delete_old_managed
def _get_jobs():

View File

View File

@ -0,0 +1,70 @@
# Copyright 2015 David Caro
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from multiprocessing import cpu_count
from testtools import matchers
from testtools import TestCase
from jenkins_jobs.parallel import parallelize
from tests.base import mock
class TestCaseParallel(TestCase):
def test_parallel_correct_order(self):
expected = list(range(10, 20))
@parallelize
def parallel_test(num_base, num_extra):
return num_base + num_extra
parallel_args = [{'num_extra': num} for num in range(10)]
result = parallel_test(10, parallelize=parallel_args)
self.assertThat(result, matchers.Equals(expected))
def test_parallel_time_less_than_serial(self):
@parallelize
def wait(secs):
time.sleep(secs)
before = time.time()
# ten threads to make it as fast as possible
wait(parallelize=[{'secs': 1} for _ in range(10)], n_workers=10)
after = time.time()
self.assertThat(after - before, matchers.LessThan(5))
def test_parallel_single_thread(self):
expected = list(range(10, 20))
@parallelize
def parallel_test(num_base, num_extra):
return num_base + num_extra
parallel_args = [{'num_extra': num} for num in range(10)]
result = parallel_test(10, parallelize=parallel_args, n_workers=1)
self.assertThat(result, matchers.Equals(expected))
@mock.patch('jenkins_jobs.parallel.cpu_count', wraps=cpu_count)
def test_use_auto_detect_cores(self, mockCpu_count):
@parallelize
def parallel_test():
return True
result = parallel_test(parallelize=[{} for _ in range(10)],
n_workers=0)
self.assertThat(result, matchers.Equals([True for _ in range(10)]))
mockCpu_count.assert_called_once_with()