Cleanup deleted pipelines and and event queues
When a pipeline is removed during a reconfiguration Zuul will cancel active builds and node requests. However, since we no longer refresh the pipeline state during a reconfig we can run into errors when Zuul tries to cancel builds and node requests based on a possibly outdated pipeline state. 2023-01-17 10:41:32,223 ERROR zuul.Scheduler: Exception in run handler: Traceback (most recent call last): File "/opt/zuul/lib/python3.10/site-packages/zuul/scheduler.py", line 2007, in run self.process_tenant_management_queue(tenant) File "/opt/zuul/lib/python3.10/site-packages/zuul/scheduler.py", line 2452, in process_tenant_management_queue self._process_tenant_management_queue(tenant) File "/opt/zuul/lib/python3.10/site-packages/zuul/scheduler.py", line 2462, in _process_tenant_management_queue self._doTenantReconfigureEvent(event) File "/opt/zuul/lib/python3.10/site-packages/zuul/scheduler.py", line 1533, in _doTenantReconfigureEvent self._reconfigureTenant(ctx, min_ltimes, File "/opt/zuul/lib/python3.10/site-packages/zuul/scheduler.py", line 1699, in _reconfigureTenant self._reconfigureDeletePipeline(old_pipeline) File "/opt/zuul/lib/python3.10/site-packages/zuul/scheduler.py", line 1804, in _reconfigureDeletePipeline self.cancelJob(build.build_set, build.job, File "/opt/zuul/lib/python3.10/site-packages/zuul/scheduler.py", line 2930, in cancelJob build.updateAttributes( File "/opt/zuul/lib/python3.10/site-packages/zuul/zk/zkobject.py", line 193, in updateAttributes self._save(context, serial) File "/opt/zuul/lib/python3.10/site-packages/zuul/zk/zkobject.py", line 392, in _save zstat = self._retry(context, self._retryableSave, File "/opt/zuul/lib/python3.10/site-packages/zuul/zk/zkobject.py", line 314, in _retry return kazoo_retry(func, *args, **kw) File "/opt/zuul/lib/python3.10/site-packages/kazoo/retry.py", line 126, in __call__ return func(*args, **kwargs) File "/opt/zuul/lib/python3.10/site-packages/zuul/zk/zkobject.py", line 371, in _retryableSave zstat = context.client.set(path, compressed_data, File "/opt/zuul/lib/python3.10/site-packages/kazoo/client.py", line 1359, in set return self.set_async(path, value, version).get() File "/opt/zuul/lib/python3.10/site-packages/kazoo/handlers/utils.py", line 86, in get raise self._exception kazoo.exceptions.BadVersionError To fix this we need to refresh the pipeline state prior to canceling those active builds and node requests. We will also take care of removing the pipeline state and the event queues from Zookeeper if possible. Errors will be ignored as the periodic cleanup task takes care of removing leaked pipelines. Change-Id: I2986419636d8c6557d68d65fb6aff589aa4a680e
This commit is contained in:
parent
a2b114e1a3
commit
9048706d93
|
@ -5622,6 +5622,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
time.sleep(0.1)
|
||||
|
||||
def refreshPipelines(self, sched):
|
||||
ctx = None
|
||||
for tenant in sched.abide.tenants.values():
|
||||
with tenant_read_lock(self.zk_client, tenant.name):
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
|
|
|
@ -244,6 +244,79 @@ class TestScaleOutScheduler(ZuulTestCase):
|
|||
self.assertTrue(all(l == new.uuid for l in layout_uuids))
|
||||
self.waitUntilSettled()
|
||||
|
||||
def test_live_reconfiguration_del_pipeline(self):
|
||||
# Test pipeline deletion while changes are enqueued
|
||||
|
||||
# Create a second scheduler instance
|
||||
app = self.createScheduler()
|
||||
app.start()
|
||||
self.assertEqual(len(self.scheds), 2)
|
||||
|
||||
for _ in iterate_timeout(10, "Wait until priming is complete"):
|
||||
old = self.scheds.first.sched.tenant_layout_state.get("tenant-one")
|
||||
if old is not None:
|
||||
break
|
||||
|
||||
for _ in iterate_timeout(
|
||||
10, "Wait for all schedulers to have the same layout state"):
|
||||
layout_states = [a.sched.local_layout_state.get("tenant-one")
|
||||
for a in self.scheds.instances]
|
||||
if all(l == old for l in layout_states):
|
||||
break
|
||||
|
||||
pipeline_zk_path = app.sched.abide.tenants[
|
||||
"tenant-one"].layout.pipelines["check"].state.getPath()
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
|
||||
# Let the first scheduler enqueue the change into the pipeline that
|
||||
# will be removed later on.
|
||||
with app.sched.run_handler_lock:
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled(matcher=[self.scheds.first])
|
||||
|
||||
# Process item only on second scheduler so the first scheduler has
|
||||
# an outdated pipeline state.
|
||||
with self.scheds.first.sched.run_handler_lock:
|
||||
self.executor_server.release('.*-merge')
|
||||
self.waitUntilSettled(matcher=[app])
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
|
||||
self.commitConfigUpdate(
|
||||
'common-config',
|
||||
'layouts/live-reconfiguration-del-pipeline.yaml')
|
||||
# Trigger a reconfiguration on the first scheduler with the outdated
|
||||
# pipeline state of the pipeline that will be removed.
|
||||
self.scheds.execute(lambda a: a.sched.reconfigure(a.config),
|
||||
matcher=[self.scheds.first])
|
||||
|
||||
new = self.scheds.first.sched.tenant_layout_state.get("tenant-one")
|
||||
for _ in iterate_timeout(
|
||||
10, "Wait for all schedulers to have the same layout state"):
|
||||
layout_states = [a.sched.local_layout_state.get("tenant-one")
|
||||
for a in self.scheds.instances]
|
||||
if all(l == new for l in layout_states):
|
||||
break
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.data['status'], 'NEW')
|
||||
self.assertEqual(A.reported, 0)
|
||||
|
||||
self.assertHistory([
|
||||
dict(name='project-merge', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-test1', result='ABORTED', changes='1,1'),
|
||||
dict(name='project-test2', result='ABORTED', changes='1,1'),
|
||||
], ordered=False)
|
||||
|
||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
self.assertEqual(len(tenant.layout.pipelines), 0)
|
||||
stat = self.zk_client.client.exists(pipeline_zk_path)
|
||||
self.assertIsNone(stat)
|
||||
|
||||
def test_change_cache(self):
|
||||
# Test re-using a change from the change cache.
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
|
|
|
@ -1758,7 +1758,12 @@ class Scheduler(threading.Thread):
|
|||
new_pipeline = tenant.layout.pipelines.get(name)
|
||||
if not new_pipeline:
|
||||
with old_pipeline.manager.currentContext(context):
|
||||
self._reconfigureDeletePipeline(old_pipeline)
|
||||
try:
|
||||
self._reconfigureDeletePipeline(old_pipeline)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Failed to cleanup deleted pipeline %s:",
|
||||
old_pipeline)
|
||||
|
||||
self.management_events[tenant.name].initialize()
|
||||
self.trigger_events[tenant.name].initialize()
|
||||
|
@ -1829,7 +1834,11 @@ class Scheduler(threading.Thread):
|
|||
(tenant,))
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
with pipeline.manager.currentContext(context):
|
||||
self._reconfigureDeletePipeline(pipeline)
|
||||
try:
|
||||
self._reconfigureDeletePipeline(pipeline)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Failed to cleanup deleted pipeline %s:", pipeline)
|
||||
|
||||
# Delete the tenant root path for this tenant in ZooKeeper to remove
|
||||
# all tenant specific event queues
|
||||
|
@ -1845,45 +1854,80 @@ class Scheduler(threading.Thread):
|
|||
def _reconfigureDeletePipeline(self, pipeline):
|
||||
self.log.info("Removing pipeline %s during reconfiguration" %
|
||||
(pipeline,))
|
||||
for shared_queue in pipeline.queues:
|
||||
builds_to_cancel = []
|
||||
requests_to_cancel = []
|
||||
for item in shared_queue.queue:
|
||||
with item.activeContext(pipeline.manager.current_context):
|
||||
item.item_ahead = None
|
||||
item.items_behind = []
|
||||
self.log.info(
|
||||
"Removing item %s during reconfiguration" % (item,))
|
||||
for build in item.current_build_set.getBuilds():
|
||||
builds_to_cancel.append(build)
|
||||
for request_job, request in \
|
||||
item.current_build_set.getNodeRequests():
|
||||
requests_to_cancel.append(
|
||||
(
|
||||
item.current_build_set,
|
||||
request,
|
||||
item.getJob(request_job),
|
||||
)
|
||||
)
|
||||
try:
|
||||
self.sql.reportBuildsetEnd(
|
||||
item.current_build_set, 'dequeue',
|
||||
final=False, result='DEQUEUED')
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Error reporting buildset completion to DB:")
|
||||
|
||||
for build in builds_to_cancel:
|
||||
self.log.info(
|
||||
"Canceling build %s during reconfiguration" % (build,))
|
||||
ctx = pipeline.manager.current_context
|
||||
pipeline.state.refresh(ctx)
|
||||
|
||||
builds_to_cancel = []
|
||||
requests_to_cancel = []
|
||||
for item in pipeline.getAllItems():
|
||||
with item.activeContext(pipeline.manager.current_context):
|
||||
item.item_ahead = None
|
||||
item.items_behind = []
|
||||
self.log.info(
|
||||
"Removing item %s during reconfiguration" % (item,))
|
||||
for build in item.current_build_set.getBuilds():
|
||||
builds_to_cancel.append(build)
|
||||
for request_job, request in \
|
||||
item.current_build_set.getNodeRequests():
|
||||
requests_to_cancel.append(
|
||||
(
|
||||
item.current_build_set,
|
||||
request,
|
||||
item.getJob(request_job),
|
||||
)
|
||||
)
|
||||
try:
|
||||
self.sql.reportBuildsetEnd(
|
||||
item.current_build_set, 'dequeue',
|
||||
final=False, result='DEQUEUED')
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Error reporting buildset completion to DB:")
|
||||
|
||||
for build in builds_to_cancel:
|
||||
self.log.info(
|
||||
"Canceling build %s during reconfiguration", build)
|
||||
try:
|
||||
self.cancelJob(build.build_set, build.job,
|
||||
build=build, force=True)
|
||||
for build_set, request, request_job in requests_to_cancel:
|
||||
self.log.info(
|
||||
"Canceling node request %s during reconfiguration",
|
||||
request)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Error canceling build %s during reconfiguration", build)
|
||||
for build_set, request, request_job in requests_to_cancel:
|
||||
self.log.info(
|
||||
"Canceling node request %s during reconfiguration", request)
|
||||
try:
|
||||
self.cancelJob(build_set, request_job, force=True)
|
||||
shared_queue.delete(pipeline.manager.current_context)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Error canceling node request %s during reconfiguration",
|
||||
request)
|
||||
|
||||
# Delete the pipeline event root path in ZooKeeper to remove
|
||||
# all pipeline specific event queues.
|
||||
try:
|
||||
self.zk_client.client.delete(
|
||||
PIPELINE_NAME_ROOT.format(
|
||||
tenant=pipeline.tenant.name,
|
||||
pipeline=pipeline.name),
|
||||
recursive=True)
|
||||
except Exception:
|
||||
# In case a pipeline event has been submitted during
|
||||
# reconfiguration this cleanup will fail.
|
||||
self.log.exception(
|
||||
"Error removing event queues for deleted pipeline %s in "
|
||||
"tenant %s", pipeline.name, pipeline.tenant.name)
|
||||
|
||||
# Delete the pipeline root path in ZooKeeper to remove all pipeline
|
||||
# state.
|
||||
try:
|
||||
self.zk_client.client.delete(pipeline.state.getPath(),
|
||||
recursive=True)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Error removing state for deleted pipeline %s in tenant %s",
|
||||
pipeline.name, pipeline.tenant.name)
|
||||
|
||||
def _doPromoteEvent(self, event):
|
||||
tenant = self.abide.tenants.get(event.tenant_name)
|
||||
|
|
Loading…
Reference in New Issue