Merge "Match events to pipelines based on topic deps"

This commit is contained in:
Zuul 2023-02-27 21:49:42 +00:00 committed by Gerrit Code Review
commit bb648c8b7d
4 changed files with 119 additions and 17 deletions

View File

@ -47,24 +47,27 @@
run: playbooks/run.yaml
- job:
name: test-job
name: check-job
- job:
name: gate-job
- project:
name: org/project1
queue: integrated
check:
jobs:
- test-job
- check-job
gate:
jobs:
- test-job
- gate-job
- project:
name: org/project2
queue: integrated
check:
jobs:
- test-job
- check-job
gate:
jobs:
- test-job
- gate-job

View File

@ -2267,8 +2267,8 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(B.patchsets[-1]["approvals"][0]["value"], "1")
self.assertHistory([
dict(name="test-job", result="SUCCESS", changes="2,1 1,1"),
dict(name="test-job", result="SUCCESS", changes="1,1 2,1"),
dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
], ordered=False)
A.addPatchset()
@ -2277,10 +2277,10 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertHistory([
# Original check run
dict(name="test-job", result="SUCCESS", changes="2,1 1,1"),
dict(name="test-job", result="SUCCESS", changes="1,1 2,1"),
dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
# Second check run
dict(name="test-job", result="SUCCESS", changes="2,1 1,2"),
dict(name="check-job", result="SUCCESS", changes="2,1 1,2"),
], ordered=False)
def test_deps_by_topic_multi_tenant(self):
@ -2378,6 +2378,81 @@ class TestGerritCircularDependencies(ZuulTestCase):
dict(name="project-job", result="SUCCESS", changes="2,1 1,2"),
], ordered=False)
@simple_layout('layouts/deps-by-topic.yaml')
def test_dependency_refresh_by_topic_check(self):
# Test that when two changes are put into a cycle, the
# dependencies are refreshed and items already in pipelines
# are updated.
self.executor_server.hold_jobs_in_build = True
# This simulates the typical workflow where a developer
# uploads changes one at a time.
# The first change:
A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
topic='test-topic')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# Now that it has been uploaded, upload the second change
# in the same topic.
B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
topic='test-topic')
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
# A quirk: at the end of this process, the second change in
# Gerrit has a complete run because only at that point is the
# topic complete; the first is aborted once the second is
# uploaded.
self.assertHistory([
dict(name="check-job", result="ABORTED", changes="1,1"),
dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
], ordered=False)
@simple_layout('layouts/deps-by-topic.yaml')
def test_dependency_refresh_by_topic_gate(self):
# Test that when two changes are put into a cycle, the
# dependencies are refreshed and items already in pipelines
# are updated.
self.executor_server.hold_jobs_in_build = True
# This simulates a workflow where a developer adds a change to
# a cycle already in gate.
A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
topic='test-topic')
B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
topic='test-topic')
A.addApproval("Code-Review", 2)
B.addApproval("Code-Review", 2)
A.addApproval("Approved", 1)
self.fake_gerrit.addEvent(B.addApproval("Approved", 1))
self.waitUntilSettled()
# Add a new change to the cycle.
C = self.fake_gerrit.addFakeChange('org/project1', "master", "C",
topic='test-topic')
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
# At the end of this process, the gate jobs should be aborted
# because the new dpendency showed up.
self.assertEqual(A.data["status"], "NEW")
self.assertEqual(B.data["status"], "NEW")
self.assertEqual(C.data["status"], "NEW")
self.assertHistory([
dict(name="gate-job", result="ABORTED", changes="1,1 2,1"),
dict(name="gate-job", result="ABORTED", changes="1,1 2,1"),
dict(name="check-job", result="SUCCESS", changes="2,1 1,1 3,1"),
], ordered=False)
class TestGithubCircularDependencies(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"

View File

@ -243,7 +243,7 @@ class PipelineManager(metaclass=ABCMeta):
and self.useDependenciesByTopic(change.project))
if (update_commit_dependencies
or update_topic_dependencies):
self.updateCommitDependencies(change, None, event=None)
self.updateCommitDependencies(change, event=None)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
@ -285,11 +285,18 @@ class PipelineManager(metaclass=ABCMeta):
return True
return False
def isAnyVersionOfChangeInPipeline(self, change):
# Checks any items in the pipeline
def isChangeRelevantToPipeline(self, change):
# Checks if any version of the change or its deps matches any
# item in the pipeline.
for change_key in self.pipeline.change_list.getChangeKeys():
if change.cache_stat.key.isSameChange(change_key):
return True
if isinstance(change, model.Change):
for dep_change_ref in change.getNeedsChanges(
self.useDependenciesByTopic(change.project)):
dep_change_key = ChangeKey.fromReference(dep_change_ref)
if change.cache_stat.key.isSameChange(dep_change_key):
return True
return False
def isChangeAlreadyInQueue(self, change, change_queue):
@ -315,7 +322,7 @@ class PipelineManager(metaclass=ABCMeta):
to_refresh.add(item.change)
for existing_change in to_refresh:
self.updateCommitDependencies(existing_change, None, event)
self.updateCommitDependencies(existing_change, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
@ -564,7 +571,7 @@ class PipelineManager(metaclass=ABCMeta):
# to date and this is a noop; otherwise, we need to refresh
# them anyway.
if isinstance(change, model.Change):
self.updateCommitDependencies(change, None, event)
self.updateCommitDependencies(change, event)
with self.getChangeQueue(change, event, change_queue) as change_queue:
if not change_queue:
@ -857,7 +864,7 @@ class PipelineManager(metaclass=ABCMeta):
self.pipeline.tenant.name][other_pipeline.name].put(
event, needs_result=False)
def updateCommitDependencies(self, change, change_queue, event):
def updateCommitDependencies(self, change, event):
log = get_annotated_logger(self.log, event)
must_update_commit_deps = (

View File

@ -2519,9 +2519,26 @@ class Scheduler(threading.Thread):
event.span_context = tracing.getSpanContext(span)
for pipeline in tenant.layout.pipelines.values():
# For most kinds of dependencies, it's sufficient to check
# if this change is already in the pipeline, because the
# only way to update a dependency cycle is to update one
# of the changes in it. However, dependencies-by-topic
# can have changes added to the cycle without updating any
# of the existing changes in the cycle. That means in
# order to detect whether a new change is added to an
# existing cycle in the pipeline, we need to know all of
# the dependencies of the new change, and check if *they*
# are in the pipeline. Therefore, go ahead and update our
# dependencies here so they are available for comparison
# against the pipeline contents. This front-loads some
# work that otherwise would happen in the pipeline
# manager, but the result of the work goes into the change
# cache, so it's not wasted; it's just less parallelized.
if isinstance(change, Change):
pipeline.manager.updateCommitDependencies(change, event)
if (
pipeline.manager.eventMatches(event, change)
or pipeline.manager.isAnyVersionOfChangeInPipeline(change)
or pipeline.manager.isChangeRelevantToPipeline(change)
):
self.pipeline_trigger_events[tenant.name][
pipeline.name