Perform live reconfiguration

Change-Id: I69563ee47dd6f3777a52b67999ff1a03247f1e1e
Reviewed-on: https://review.openstack.org/35324
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Reviewed-by: Clark Boylan <clark.boylan@gmail.com>
Approved: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
This commit is contained in:
James E. Blair 2013-07-01 12:10:22 -07:00 committed by Jenkins
parent eff881637f
commit cdccd976f7
4 changed files with 106 additions and 16 deletions

View File

@ -2281,3 +2281,23 @@ class TestScheduler(testtools.TestCase):
assert self.getJobFromHistory('node-project-merge').node is None
assert self.getJobFromHistory('node-project-test1').node == 'debian'
assert self.getJobFromHistory('node-project-test2').node is None
def test_live_reconfiguration(self):
"Test that live reconfiguration works"
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
self.sched.reconfigure(self.config)
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
assert self.getJobFromHistory('project-merge').result == 'SUCCESS'
assert self.getJobFromHistory('project-test1').result == 'SUCCESS'
assert self.getJobFromHistory('project-test2').result == 'SUCCESS'
assert A.data['status'] == 'MERGED'
assert A.reported == 2
self.assertEmptyQueues()

View File

@ -306,12 +306,15 @@ class ChangeQueue(object):
def enqueueChange(self, change):
item = QueueItem(self.pipeline, change)
self.enqueueItem(item)
item.enqueue_time = time.time()
return item
def enqueueItem(self, item):
if self.dependent and self.queue:
item.item_ahead = self.queue[-1]
item.item_ahead.item_behind = item
self.queue.append(item)
item.enqueue_time = time.time()
return item
def dequeueItem(self, item):
if item in self.queue:

View File

@ -67,6 +67,7 @@ class Scheduler(threading.Thread):
threading.Thread.__init__(self)
self.daemon = True
self.wake_event = threading.Event()
self.layout_lock = threading.Lock()
self.reconfigure_complete_event = threading.Event()
self._pause = False
self._reconfigure = False
@ -78,9 +79,6 @@ class Scheduler(threading.Thread):
self.trigger_event_queue = Queue.Queue()
self.result_event_queue = Queue.Queue()
self._init()
def _init(self):
self.layout = model.Layout()
def stop(self):
@ -88,7 +86,6 @@ class Scheduler(threading.Thread):
self.wake_event.set()
def testConfig(self, config_path):
self._init()
self._parseConfig(config_path)
def _parseConfig(self, config_path):
@ -283,6 +280,15 @@ class Scheduler(threading.Thread):
def setTrigger(self, trigger):
self.trigger = trigger
def getProject(self, name):
self.layout_lock.acquire()
p = None
try:
p = self.layout.projects.get(name)
finally:
self.layout_lock.release()
return p
def addEvent(self, event):
self.log.debug("Adding trigger event: %s" % event)
try:
@ -320,7 +326,6 @@ class Scheduler(threading.Thread):
def reconfigure(self, config):
self.log.debug("Prepare to reconfigure")
self.config = config
self._pause = True
self._reconfigure = True
self.wake_event.set()
self.log.debug("Waiting for reconfiguration")
@ -387,15 +392,62 @@ class Scheduler(threading.Thread):
self.log.debug("Exiting")
self._save_queue()
os._exit(0)
if self._reconfigure:
def _doReconfigureEvent(self):
# This is called in the scheduler loop after another thread sets
# the reconfigure flag
self.layout_lock.acquire()
try:
self.log.debug("Performing reconfiguration")
self._init()
self.layout = self._parseConfig(
layout = self._parseConfig(
self.config.get('zuul', 'layout_config'))
for name, new_pipeline in layout.pipelines.items():
old_pipeline = self.layout.pipelines.get(name)
if not old_pipeline:
if self.layout.pipelines:
# Don't emit this warning on startup
self.log.warning("No old pipeline matching %s found "
"when reconfiguring" % name)
continue
self.log.debug("Re-enqueueing changes for pipeline %s" %
name)
items_to_remove = []
for shared_queue in old_pipeline.queues:
for item in (shared_queue.queue +
shared_queue.severed_heads):
item.item_ahead = None
item.item_behind = None
item.pipeline = None
project = layout.projects.get(item.change.project.name)
if not project:
self.log.warning("Unable to find project for "
"change %s while reenqueueing" %
item.change)
item.change.project = None
items_to_remove.append(item)
continue
item.change.project = project
severed = item in shared_queue.severed_heads
if not new_pipeline.manager.reEnqueueItem(item,
severed=severed):
items_to_remove.append(item)
builds_to_remove = []
for build, item in old_pipeline.manager.building_jobs.items():
if item in items_to_remove:
builds_to_remove.append(build)
self.log.warning("Deleting running build %s for "
"change %s while reenqueueing" % (
build, item.change))
for build in builds_to_remove:
del old_pipeline.manager.building_jobs[build]
new_pipeline.manager.building_jobs = \
old_pipeline.manager.building_jobs
self.layout = layout
self._setupMerger()
self._pause = False
self._reconfigure = False
self.reconfigure_complete_event.set()
finally:
self.layout_lock.release()
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
@ -424,6 +476,9 @@ class Scheduler(threading.Thread):
return
self.log.debug("Run handler awake")
try:
if self._reconfigure:
self._doReconfigureEvent()
# Give result events priority -- they let us stop builds,
# whereas trigger evensts cause us to launch builds.
if not self.result_event_queue.empty():
@ -497,8 +552,6 @@ class Scheduler(threading.Thread):
ret = '<html><pre>'
if self._pause:
ret += '<p><b>Queue only mode:</b> preparing to '
if self._reconfigure:
ret += 'reconfigure'
if self._exit:
ret += 'exit'
ret += ', queue length: %s' % self.trigger_event_queue.qsize()
@ -520,8 +573,6 @@ class Scheduler(threading.Thread):
data = {}
if self._pause:
ret = '<p><b>Queue only mode:</b> preparing to '
if self._reconfigure:
ret += 'reconfigure'
if self._exit:
ret += 'exit'
ret += ', queue length: %s' % self.trigger_event_queue.qsize()
@ -683,6 +734,22 @@ class BasePipelineManager(object):
(change, old_change, old_change))
self.removeChange(old_change)
def reEnqueueItem(self, item, severed=False):
change_queue = self.pipeline.getQueue(item.change.project)
if change_queue:
self.log.debug("Re-enqueing change %s in queue %s" %
(item.change, change_queue))
if severed:
change_queue.addSeveredHead(item)
else:
change_queue.enqueueItem(item)
self.reportStats(item)
return True
else:
self.log.error("Unable to find change queue for project %s" %
item.change.project)
return False
def addChange(self, change):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):

View File

@ -324,7 +324,7 @@ class Gerrit(object):
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
change.project = self.sched.layout.projects[data['project']]
change.project = self.sched.getProject(data['project'])
change.branch = data['branch']
change.url = data['url']
max_ps = 0