Send job parent data + artifacts via build request

With job parents that supply data we might end up updating the (secret)
parent data and artifacts of a job multiple times in addition to also
storing duplicate data as most of this information is part of the
parent's build result.

Instead we will collect the parent data and artifacts before scheduling
a build request and send it as part of the request paramters.

If those parameters are part of the build request the executor will use
them, otherwise it falls back on using the data from the job for
backward compatibility.

This change affects the behavior of job deduplication in that input data
from parent jobs is no longer considered when deciding if a job can be
deduplicated or not.

Change-Id: Ic4a85a57983d38f033cf63947a3b276c1ecc70dc
This commit is contained in:
Simon Westphahl 2023-10-09 13:16:57 +02:00
parent 93b4f71d8e
commit 68d7a99cee
No known key found for this signature in database
11 changed files with 161 additions and 64 deletions

View File

@ -155,3 +155,10 @@ Version 19
:Prior Zuul version: 9.2.0
:Description: Changes the storage path of a frozen job to use the job's UUID
instead of the name as identifier.
Version 20
----------
:Prior Zuul version: 9.2.0
:Description: Send (secret) job parent and artifact data via build request
parameters instead of updating the job.
Affects schedulers and executors.

View File

@ -0,0 +1,5 @@
---
upgrade:
- |
Input data from a parent job (return data or artifacts) no longer
influences whether a child job can be deduplicated or not.

View File

@ -1246,8 +1246,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
vars_builds = [b for b in self.builds if b.name == "project-vars-job"]
self.assertEqual(len(vars_builds), 1)
self.assertEqual(vars_builds[0].job.combined_variables["test_var"],
"pass")
self.assertEqual(vars_builds[0].job.variables["test_var"], "pass")
self.executor_server.release()
self.waitUntilSettled()
@ -1261,8 +1260,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
vars_builds = [b for b in self.builds if b.name == "project-vars-job"]
self.assertEqual(len(vars_builds), 1)
self.assertEqual(vars_builds[0].job.combined_variables["test_var"],
"pass")
self.assertEqual(vars_builds[0].job.variables["test_var"], "pass")
self.executor_server.release()
self.waitUntilSettled()
@ -1276,8 +1274,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
vars_builds = [b for b in self.builds if b.name == "project-vars-job"]
self.assertEqual(len(vars_builds), 1)
self.assertEqual(vars_builds[0].job.combined_variables["test_var"],
"pass")
self.assertEqual(vars_builds[0].job.variables["test_var"], "pass")
self.executor_server.release()
self.waitUntilSettled()
@ -1298,8 +1295,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
vars_builds = [b for b in self.builds if b.name == "project-vars-job"]
self.assertEqual(len(vars_builds), 3)
for build in vars_builds:
self.assertEqual(build.job.combined_variables["test_var"],
"pass")
self.assertEqual(build.job.variables["test_var"], "pass")
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -1857,7 +1853,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
def test_job_deduplication_child_of_diff_parent_diff_data(self):
# This is the more realistic test of the above, where we
# return different data from the non-deduplicated parent job,
# which causes the child job not to be deduplicated.
# which should still causes the child job to be deduplicated.
# The child job uses auto deduplication.
self.executor_server.returnData(
@ -1876,8 +1872,6 @@ class TestGerritCircularDependencies(ZuulTestCase):
ref='refs/changes/01/1/1'),
dict(name="child-job", result="SUCCESS", changes="2,1 1,1",
ref='refs/changes/02/2/1'),
dict(name="child-job", result="SUCCESS", changes="2,1 1,1",
ref='refs/changes/01/1/1'),
], ordered=False)
@simple_layout('layouts/job-dedup-paused-parent.yaml')
@ -2582,7 +2576,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
def test_job_deduplication_check_child_of_diff_parent_diff_data(self):
# This is the more realistic test of the above, where we
# return different data from the non-deduplicated parent job,
# which causes the child job not to be deduplicated.
# which should still cause the child job to be deduplicated.
# The child job uses auto deduplication.
self.executor_server.returnData(
@ -2601,8 +2595,6 @@ class TestGerritCircularDependencies(ZuulTestCase):
ref='refs/changes/02/2/1'),
dict(name="child-job", result="SUCCESS", changes="2,1 1,1",
ref='refs/changes/01/1/1'),
dict(name="child-job", result="SUCCESS", changes="1,1 2,1",
ref='refs/changes/02/2/1'),
], ordered=False)
self._assert_job_deduplication_check()

View File

@ -19,7 +19,12 @@ from zuul import model
from zuul.lib.re2util import ZuulRegex
from zuul.zk.components import ComponentRegistry
from tests.base import ZuulTestCase, simple_layout, iterate_timeout
from tests.base import (
AnsibleZuulTestCase,
ZuulTestCase,
simple_layout,
iterate_timeout,
)
from tests.base import ZuulWebFixture
@ -636,3 +641,23 @@ class TestDeduplication(ZuulTestCase):
dict(name="common-job", result="SUCCESS", changes="2,1 1,1"),
], ordered=False)
self.assertEqual(len(self.fake_nodepool.history), 4)
class TestDataReturn(AnsibleZuulTestCase):
tenant_config_file = 'config/data-return/main.yaml'
@model_version(19)
def test_data_return(self):
# Test backward compatibility handling
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='data-return', result='SUCCESS', changes='1,1'),
dict(name='data-return-relative', result='SUCCESS', changes='1,1'),
dict(name='child', result='SUCCESS', changes='1,1'),
], ordered=False)
self.assertIn('- data-return https://zuul.example.com/',
A.messages[-1])
self.assertIn('- data-return-relative https://zuul.example.com',
A.messages[-1])

View File

@ -3527,7 +3527,7 @@ class TestScheduler(ZuulTestCase):
], ordered=False)
j = self.getJobFromHistory('parentjob')
rp = set([p['name'] for p in j.parameters['projects']])
job_vars = j.job.combined_variables
job_vars = j.job.variables
self.assertEqual(job_vars['project_var'], 'set_in_project')
self.assertEqual(job_vars['template_var1'], 'set_in_template1')
self.assertEqual(job_vars['template_var2'], 'set_in_template2')
@ -3542,7 +3542,7 @@ class TestScheduler(ZuulTestCase):
'org/project0']))
j = self.getJobFromHistory('child1')
rp = set([p['name'] for p in j.parameters['projects']])
job_vars = j.job.combined_variables
job_vars = j.job.variables
self.assertEqual(job_vars['project_var'], 'set_in_project')
self.assertEqual(job_vars['override'], 1)
self.assertEqual(job_vars['child1override'], 1)
@ -3554,7 +3554,7 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(rp, set(['org/project', 'org/project0',
'org/project1']))
j = self.getJobFromHistory('child2')
job_vars = j.job.combined_variables
job_vars = j.job.variables
self.assertEqual(job_vars['project_var'], 'set_in_project')
rp = set([p['name'] for p in j.parameters['projects']])
self.assertEqual(job_vars['override'], 2)
@ -3567,7 +3567,7 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(rp, set(['org/project', 'org/project0',
'org/project2']))
j = self.getJobFromHistory('child3')
job_vars = j.job.combined_variables
job_vars = j.job.variables
self.assertEqual(job_vars['project_var'], 'set_in_project')
rp = set([p['name'] for p in j.parameters['projects']])
self.assertEqual(job_vars['override'], 3)
@ -3580,7 +3580,7 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(rp, set(['org/project', 'org/project0',
'org/project3']))
j = self.getJobFromHistory('override_project_var')
job_vars = j.job.combined_variables
job_vars = j.job.variables
self.assertEqual(job_vars['project_var'], 'override_in_job')
@simple_layout('layouts/job-variants.yaml')

View File

@ -954,7 +954,7 @@ class TestBranchMismatch(ZuulTestCase):
self.assertEqual(
{'testbranch': True, 'this_branch': 'testbranch'},
self.builds[0].job.combined_variables)
self.builds[0].job.variables)
self.executor_server.release()
self.waitUntilSettled()
@ -968,7 +968,7 @@ class TestBranchMismatch(ZuulTestCase):
# testbranch2 should not pick up vars from testbranch.
self.assertEqual(
{'testbranch2': True, 'this_branch': 'testbranch2'},
self.builds[0].job.combined_variables)
self.builds[0].job.variables)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -1038,14 +1038,14 @@ class TestBranchMismatch(ZuulTestCase):
self.assertEqual(
{'testbranch': True, 'this_branch': 'testbranch'},
self.builds[0].job.combined_variables)
self.builds[0].job.variables)
# The two jobs should have distinct variables (notably, the
# variant on testbranch2 should not pick up vars from
# testbranch.
self.assertEqual(
{'testbranch2': True, 'this_branch': 'testbranch2'},
self.builds[1].job.combined_variables)
self.builds[1].job.variables)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()

View File

@ -15,6 +15,7 @@
import os
from zuul.lib import strings
from zuul.zk.components import COMPONENT_REGISTRY
def construct_build_params(uuid, connections, job, item, pipeline,
@ -86,6 +87,15 @@ def construct_build_params(uuid, connections, job, item, pipeline,
getDirectDependentJobs(job.name))
params = dict()
if COMPONENT_REGISTRY.model_api >= 19:
(
params["parent_data"],
params["secret_parent_data"],
artifact_data
) = item.getJobParentData(job)
if artifact_data:
zuul_params['artifacts'] = artifact_data
params['job_ref'] = job.getPath()
params['items'] = merger_items
params['projects'] = []

View File

@ -70,6 +70,7 @@ from zuul.model import (
BuildStatusEvent,
ExtraRepoState,
FrozenJob,
Job,
MergeRepoState,
)
import zuul.model
@ -977,7 +978,13 @@ class AnsibleJob(object):
self.arguments = arguments
with executor_server.zk_context as ctx:
self.job = FrozenJob.fromZK(ctx, arguments["job_ref"])
self.arguments["zuul"].update(zuul_params_from_job(self.job))
job_zuul_params = zuul_params_from_job(self.job)
# MODEL_API < 20
job_zuul_params["artifacts"] = self.arguments["zuul"].get(
"artifacts", job_zuul_params.get("artifacts"))
if job_zuul_params["artifacts"] is None:
del job_zuul_params["artifacts"]
self.arguments["zuul"].update(job_zuul_params)
if self.job.failure_output:
self.failure_output = json.dumps(self.job.failure_output)
else:
@ -1068,6 +1075,19 @@ class AnsibleJob(object):
max_attempts = self.arguments["max_attempts"]
self.retry_limit = self.arguments["zuul"]["attempts"] >= max_attempts
try:
parent_data = self.arguments["parent_data"]
except KeyError:
# MODEL_API < 20
parent_data = self.job.parent_data or {}
self.normal_vars = Job._deepUpdate(parent_data.copy(),
self.job.variables)
try:
self.secret_vars = self.arguments["secret_parent_data"]
except KeyError:
# MODEL_API < 20
self.secret_vars = self.job.secret_parent_data or {}
def run(self):
self.running = True
self.thread = threading.Thread(target=self.execute,
@ -1992,7 +2012,7 @@ class AnsibleJob(object):
# var or all-var, then don't do anything here; let the
# user control.
api = 'ansible_python_interpreter'
if (api not in self.job.combined_variables and
if (api not in self.normal_vars and
not is_group_var_set(api, name, self.nodeset, self.job)):
python = getattr(node, 'python_path', 'auto')
host_vars.setdefault(api, python)
@ -2261,7 +2281,7 @@ class AnsibleJob(object):
:arg secrets dict: Actual Zuul secrets.
'''
secret_vars = self.job.secret_parent_data or {}
secret_vars = self.secret_vars
# We need to handle secret vars specially. We want to pass
# them to Ansible as we do secrets, but we want them to have
@ -2270,7 +2290,7 @@ class AnsibleJob(object):
# anything above it in precedence.
other_vars = set()
other_vars.update(self.job.combined_variables.keys())
other_vars.update(self.normal_vars.keys())
for group_vars in self.job.group_variables.values():
other_vars.update(group_vars.keys())
for host_vars in self.job.host_variables.values():
@ -2495,8 +2515,8 @@ class AnsibleJob(object):
return zuul_resources
def prepareVars(self, args, zuul_resources):
all_vars = self.job.combined_variables.copy()
check_varnames(all_vars)
normal_vars = self.normal_vars.copy()
check_varnames(normal_vars)
# Check the group and extra var names for safety; they'll get
# merged later
@ -2564,7 +2584,7 @@ class AnsibleJob(object):
}
host_list = self.host_list + [localhost]
self.original_hostvars = squash_variables(
host_list, self.nodeset, all_vars,
host_list, self.nodeset, normal_vars,
self.job.group_variables, self.job.extra_variables)
def loadFrozenHostvars(self):

View File

@ -2634,14 +2634,6 @@ class FrozenJob(zkobject.ZKObject):
def affected_projects(self):
return self._getJobData('_affected_projects')
@property
def combined_variables(self):
"""
Combines the data that has been returned by parent jobs with the
job variables where job variables have priority over parent data.
"""
return Job._deepUpdate(self.parent_data or {}, self.variables)
def getSafeAttributes(self):
return Attributes(name=self.name)
@ -5468,9 +5460,9 @@ class QueueItem(zkobject.ZKObject):
self.log.debug("Found artifacts in DB: %s", repr(data))
return data
def providesRequirements(self, job, data, recurse=True):
# Mutates data and returns true/false if requirements
# satisfied.
def providesRequirements(self, job, data=None, recurse=True):
# Returns true/false if requirements are satisfied and updates
# the 'data' dictionary if provided.
requirements = job.requires
if not requirements:
return True
@ -5485,12 +5477,16 @@ class QueueItem(zkobject.ZKObject):
found = True
break
if found:
if not item.providesRequirements(job, data,
if not item.providesRequirements(job, data=data,
recurse=False):
return False
else:
# Look for this item in the SQL DB.
data += self._getRequirementsResultFromSQL(job)
# Try to get the requirements from the databse for
# the side-effect of raising an exception when the
# found build failed.
artifacts = self._getRequirementsResultFromSQL(job)
if data is not None:
data.extend(artifacts)
if self.hasJobGraph():
for _job in self.getJobs():
if _job.provides.intersection(requirements):
@ -5501,31 +5497,36 @@ class QueueItem(zkobject.ZKObject):
return False
if not build.result and not build.paused:
return False
artifacts = get_artifacts_from_result_data(
build.result_data,
logger=self.log)
for a in artifacts:
a.update({'project': self.change.project.name,
'change': self.change.number,
'patchset': self.change.patchset,
'job': build.job.name})
self.log.debug("Found live artifacts: %s", repr(artifacts))
data += artifacts
if data is not None:
artifacts = get_artifacts_from_result_data(
build.result_data,
logger=self.log)
for a in artifacts:
a.update({'project': self.change.project.name,
'change': self.change.number,
'patchset': self.change.patchset,
'job': build.job.name})
self.log.debug(
"Found live artifacts: %s", repr(artifacts))
data.extend(artifacts)
if not self.item_ahead:
return True
if not recurse:
return True
return self.item_ahead.providesRequirements(job, data)
return self.item_ahead.providesRequirements(job, data=data)
def jobRequirementsReady(self, job):
if not self.item_ahead:
return True
try:
data = []
ret = self.item_ahead.providesRequirements(job, data)
data.reverse()
data = None
if COMPONENT_REGISTRY.model_api < 20:
data = []
ret = self.item_ahead.providesRequirements(job, data=data)
if data:
data.reverse()
job.setArtifactData(data)
return ret
except RequirementsError as e:
self.log.info(str(e))
fakebuild = Build.new(self.pipeline.manager.current_context,
@ -5537,8 +5538,7 @@ class QueueItem(zkobject.ZKObject):
tenant=self.pipeline.tenant.name,
final=True)
self.setResult(fakebuild)
ret = False
return ret
return False
def findDuplicateBundles(self):
"""
@ -5667,7 +5667,8 @@ class QueueItem(zkobject.ZKObject):
# Iterate over all jobs of the graph (which is
# in sorted config order) and apply parent data of the jobs we
# already found.
if len(parent_builds_with_data) > 0:
if (parent_builds_with_data
and COMPONENT_REGISTRY.model_api < 20):
# We have all of the parent data here, so we can
# start from scratch each time.
new_parent_data = {}
@ -5692,6 +5693,41 @@ class QueueItem(zkobject.ZKObject):
new_artifact_data)
job._set(_ready_to_run=True)
def getArtifactData(self, job):
data = []
self.providesRequirements(job, data)
data.reverse()
return data
def getJobParentData(self, job):
job_graph = self.current_build_set.job_graph
parent_builds_with_data = {}
for parent_job in job_graph.getParentJobsRecursively(job.name):
parent_build = self.current_build_set.getBuild(parent_job.name)
if parent_build and parent_build.result_data:
parent_builds_with_data[parent_job.name] = parent_build
parent_data = {}
secret_parent_data = {}
# We may have artifact data from
# jobRequirementsReady, so we preserve it.
# updateParentData de-duplicates it.
artifact_data = job.artifact_data or self.getArtifactData(job)
# Iterate over all jobs of the graph (which is
# in sorted config order) and apply parent data of the jobs we
# already found.
for parent_job in job_graph.getJobs():
parent_build = parent_builds_with_data.get(parent_job.name)
if not parent_build:
continue
(parent_data, secret_parent_data, artifact_data
) = FrozenJob.updateParentData(
parent_data,
secret_parent_data,
artifact_data,
parent_build)
return parent_data, secret_parent_data, artifact_data
def deduplicateJobs(self, log):
"""Sync node request and build info with deduplicated jobs

View File

@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
MODEL_API = 19
MODEL_API = 20

View File

@ -1753,6 +1753,8 @@ class ZuulWebAPI(object):
uuid, self.zuulweb.connections, job, item, item.pipeline)
params['zuul'].update(zuul.executor.common.zuul_params_from_job(job))
del params['job_ref']
del params['parent_data']
del params['secret_parent_data']
params['job'] = job.name
params['zuul']['buildset'] = None
params['timeout'] = job.timeout
@ -1768,7 +1770,7 @@ class ZuulWebAPI(object):
params['post_playbooks'] = job.post_run
params['cleanup_playbooks'] = job.cleanup_run
params["nodeset"] = job.nodeset.toDict()
params['vars'] = job.combined_variables
params['vars'] = job.variables
params['extra_vars'] = job.extra_variables
params['host_vars'] = job.host_variables
params['group_vars'] = job.group_variables