Fix issue with reopened PR dependencies

Given two PRs with B depending on A which are enqueued in gate, A is
closed and then immediately reopened.

This sequence of events will currently dequeue A and then immediately
enqueue it behind B. Since the check for whether a dependency is already
in the queue doesn't care if it's ahead or behind the current change,
we'll not dequeue B and the content of builds executed by B will not
include A.

This change updates the check to determine if a change is already in
the queue to only check for changes ahead of it.  This causes B to
be correctly dequeued in the next pipeline pass.

This behavior is correct, but isn't always intuitive or consistent.
If the time between closing and reopening a change is long enough for
a pipeline process, then both changes will be enqueued by the reopening
(because we check for changes needing enqueued changes and enqueue them
behind).  But if both events are processed in a single pipeline run,
then the removal of B happens after the re-enqueue of A which means that
it won't be re-added.

To correct this, whenever we remove abandoned changes, we will also remove
changes behind them that depend on the removed abandoned changes at the
same time.  This means that in our scenario above, the re-enqueue happens
under the same conditions as the original enqueue, and both A and B are
re-enqueued.

Co-Authored-By: James E. Blair <jim@acmegating.com>
Change-Id: Ia1d79bccb9ea39e486483283611601aa23903000
This commit is contained in:
Simon Westphahl 2024-04-25 16:25:39 +02:00 committed by James E. Blair
parent 2c2a2d61a5
commit 516780f9e7
5 changed files with 126 additions and 24 deletions

View File

@ -19,6 +19,8 @@
github:
- event: pull_request
action: edited
- event: pull_request
action: reopened
start:
github: {}
success:

View File

@ -215,3 +215,45 @@ class TestGithubCrossRepoDeps(ZuulTestCase):
# The job should be aborted since B was updated while enqueued.
self.assertHistory([dict(name='project3-test', result='ABORTED')])
@simple_layout('layouts/crd-github.yaml', driver='github')
def test_crd_dependent_close_reopen(self):
"""
Test that closing and reopening PR correctly re-enqueues the
dependent change in the correct order.
"""
self.executor_server.hold_jobs_in_build = True
A = self.fake_github.openFakePullRequest('org/project3', 'master', 'A')
B = self.fake_github.openFakePullRequest(
'org/project4', 'master', 'B',
body=f"Depends-On: https://github.com/org/project3/pull/{A.number}"
)
self.fake_github.emitEvent(B.getPullRequestEditedEvent())
self.waitUntilSettled()
# Close and reopen PR A
self.fake_github.emitEvent(A.getPullRequestClosedEvent())
self.fake_github.emitEvent(A.getPullRequestReopenedEvent())
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
# The changes for the job from project4 should include the project3
# PR content
self.assertHistory([
dict(name='project3-test', result='ABORTED',
changes=f"{A.number},{A.head_sha}"),
dict(name='project4-test', result='ABORTED',
changes=f"{A.number},{A.head_sha} {B.number},{B.head_sha}"),
dict(name='project3-test', result='SUCCESS',
changes=f"{A.number},{A.head_sha}"),
dict(name='project4-test', result='SUCCESS',
changes=f"{A.number},{A.head_sha} {B.number},{B.head_sha}"),
], ordered=False)
self.assertTrue(A.is_merged)
self.assertTrue(B.is_merged)

View File

@ -312,10 +312,15 @@ class PipelineManager(metaclass=ABCMeta):
return True
return False
def isChangeAlreadyInQueue(self, change, change_queue):
def isChangeAlreadyInQueue(self, change, change_queue, item=None):
# Checks any item in the specified change queue
for item in change_queue.queue:
for c in item.changes:
# If item is supplied, only consider items ahead of the
# supplied item (ie, is the change already in the queue ahead
# of this item?).
for queue_item in change_queue.queue:
if queue_item is item:
break
for c in queue_item.changes:
if change.equals(c):
return True
return False
@ -419,13 +424,18 @@ class PipelineManager(metaclass=ABCMeta):
return True
def getMissingNeededChanges(self, changes, change_queue, event,
dependency_graph=None):
dependency_graph=None, item=None):
"""Check that all needed changes are ahead in the queue.
Return a list of any that are missing. If it is not possible
to correct the missing changes, "abort" will be true.
If item is supplied, only consider items ahead of the supplied
item when determining whether needed changes are already ahead
in the queue.
:returns: (abort, needed_changes)
"""
return False, []
@ -472,23 +482,69 @@ class PipelineManager(metaclass=ABCMeta):
def removeAbandonedChange(self, change, event):
log = get_annotated_logger(self.log, event)
log.debug("Change %s abandoned, removing", change)
for item in self.pipeline.getAllItems():
if not item.live:
continue
for item_change in item.changes:
if item_change.equals(change):
if len(item.changes) > 1:
# We need to cancel all jobs here as setting
# dequeued needing change will skip all jobs.
self.cancelJobs(item)
msg = ("Dependency cycle change "
f"{change.url} abandoned.")
for queue in self.pipeline.queues:
# Below we need to remove dependent changes of abandoned
# changes we remove here, but only if both are live.
# Therefore, track the changes we remove independently for
# each shared queue (since different queues may have
# different live/non-live changes).
changes_removed = set()
for item in queue.queue:
if not item.live:
continue
self._removeAbandonedChangeExamineItem(
change, item, changes_removed)
# Abbreviated version of dependency check; we're not going
# to check everything (processOneItem will take care of
# that), but we will remove any changes that depend on any
# changes we just removed; we can do that without
# recreating the full dependency graphs.
for item in queue.queue:
if not item.live:
continue
self._removeAbandonedChangeDependentsExamineItem(
log, item, changes_removed)
def _removeAbandonedChangeExamineItem(self, change, item, changes_removed):
# The inner loop from the first part of removeAbandonedChange.
for item_change in item.changes:
if item_change.equals(change):
if len(item.changes) > 1:
# We need to cancel all jobs here as setting
# dequeued needing change will skip all jobs.
self.cancelJobs(item)
msg = ("Dependency cycle change "
f"{change.url} abandoned.")
item.setDequeuedNeedingChange(msg)
try:
self.reportItem(item)
except exceptions.MergeFailure:
pass
self.removeItem(item)
changes_removed.update(item.changes)
return
def _removeAbandonedChangeDependentsExamineItem(
self, log, item, changes_removed):
# The inner loop from the second part of removeAbandonedChange.
for item_change in item.changes:
for needed_change in self.resolveChangeReferences(
item_change.getNeedsChanges(
self.useDependenciesByTopic(item_change.project))):
for removed_change in changes_removed:
if needed_change.equals(removed_change):
log.debug("Change %s depends on abandoned change %s, "
"removing", item_change, removed_change)
msg = f'Change {removed_change} is needed.'
item.setDequeuedNeedingChange(msg)
try:
self.reportItem(item)
except exceptions.MergeFailure:
pass
self.removeItem(item)
if item.live:
try:
self.reportItem(item)
except exceptions.MergeFailure:
pass
self.dequeueItem(item)
changes_removed.update(item.changes)
return
def reEnqueueChanges(self, item, changes):
for change in changes:
@ -1659,7 +1715,8 @@ class PipelineManager(metaclass=ABCMeta):
abort, needs_changes = self.getMissingNeededChanges(
item.changes, change_queue, item.event,
dependency_graph=dependency_graph)
dependency_graph=dependency_graph,
item=item)
if not (meets_reqs and not needs_changes):
# It's not okay to enqueue this change, we should remove it.
log.info("Dequeuing %s because "

View File

@ -183,7 +183,8 @@ class DependentPipelineManager(SharedQueuePipelineManager):
return True
def getMissingNeededChanges(self, changes, change_queue, event,
dependency_graph=None, warnings=None):
dependency_graph=None, warnings=None,
item=None):
log = get_annotated_logger(self.log, event)
changes_needed = []
abort = False
@ -231,7 +232,7 @@ class DependentPipelineManager(SharedQueuePipelineManager):
log.debug(" Needed change is in cycle")
continue
if self.isChangeAlreadyInQueue(
needed_change, change_queue):
needed_change, change_queue, item):
log.debug(" Needed change is already "
"ahead in the queue")
continue

View File

@ -80,7 +80,7 @@ class IndependentPipelineManager(PipelineManager):
return True
def getMissingNeededChanges(self, changes, change_queue, event,
dependency_graph=None):
dependency_graph=None, item=None):
log = get_annotated_logger(self.log, event)
if self.pipeline.ignore_dependencies: