Store frozen jobs using UUID instead of name

Change the frozen job storage in ZK from being identified by name to
UUID. This allows us to handle multiple frozen jobs in a buildset with
the same name.

The job graph will get a new field that is a sorted list of job UUIDs
with the index being the same as for the job name list. Jobs that are
created with the old name-based path will have their UUID set to None.

This is done in preparation of the circular dependency refactoring as
detailed in I8c754689ef73ae20fd97ac34ffc75c983d4797b0.

Change-Id: Ic4df16e8e1ec6908234ecdf91fe08408182d05bb
This commit is contained in:
Simon Westphahl 2023-09-27 10:52:24 +02:00
parent 11c06b5939
commit 93b4f71d8e
No known key found for this signature in database
7 changed files with 115 additions and 37 deletions

View File

@ -149,3 +149,9 @@ Version 18
:Prior Zuul version: 9.2.0
:Description: Adds new merge modes 'recursive' and 'ort' for the Github
driver.
Version 19
----------
:Prior Zuul version: 9.2.0
:Description: Changes the storage path of a frozen job to use the job's UUID
instead of the name as identifier.

View File

@ -3531,13 +3531,9 @@ class TestingExecutorApi(HoldableExecutorApi):
self._test_build_request_job_map = {}
if build_request.uuid in self._test_build_request_job_map:
return self._test_build_request_job_map[build_request.uuid]
d = self.getParams(build_request)
if d:
data = d.get('job_ref', '').split('/')[-1]
else:
data = ''
self._test_build_request_job_map[build_request.uuid] = data
return data
job_name = build_request.job_name
self._test_build_request_job_map[build_request.uuid] = job_name
return build_request.job_name
def release(self, what=None):
"""

View File

@ -19,6 +19,7 @@ import collections
import os
import random
import types
import uuid
from unittest import mock
import fixtures
@ -476,11 +477,18 @@ class TestJob(BaseTestCase):
self.assertEqual(run_secret, secret2_data)
class FakeFrozenJob(model.Job):
def __init__(self, name):
super().__init__(name)
self.uuid = uuid.uuid4().hex
class TestGraph(BaseTestCase):
def test_job_graph_disallows_multiple_jobs_with_same_name(self):
graph = model.JobGraph({})
job1 = model.Job('job')
job2 = model.Job('job')
job1 = FakeFrozenJob('job')
job2 = FakeFrozenJob('job')
graph.addJob(job1)
with testtools.ExpectedException(Exception,
"Job job already added"):
@ -488,7 +496,7 @@ class TestGraph(BaseTestCase):
def test_job_graph_disallows_circular_dependencies(self):
graph = model.JobGraph({})
jobs = [model.Job('job%d' % i) for i in range(0, 10)]
jobs = [FakeFrozenJob('job%d' % i) for i in range(0, 10)]
prevjob = None
for j in jobs[:3]:
if prevjob:
@ -502,7 +510,7 @@ class TestGraph(BaseTestCase):
with testtools.ExpectedException(
Exception,
"Dependency cycle detected in job jobX"):
j = model.Job('jobX')
j = FakeFrozenJob('jobX')
j.dependencies = frozenset([model.JobDependency(j.name)])
graph.addJob(j)
@ -535,8 +543,8 @@ class TestGraph(BaseTestCase):
graph.addJob(jobs[6])
def test_job_graph_allows_soft_dependencies(self):
parent = model.Job('parent')
child = model.Job('child')
parent = FakeFrozenJob('parent')
child = FakeFrozenJob('child')
child.dependencies = frozenset([
model.JobDependency(parent.name, True)])
@ -554,8 +562,8 @@ class TestGraph(BaseTestCase):
def test_job_graph_allows_soft_dependencies4(self):
# A more complex scenario with multiple parents at each level
parents = [model.Job('parent%i' % i) for i in range(6)]
child = model.Job('child')
parents = [FakeFrozenJob('parent%i' % i) for i in range(6)]
child = FakeFrozenJob('child')
child.dependencies = frozenset([
model.JobDependency(parents[0].name, True),
model.JobDependency(parents[1].name)])

View File

@ -350,6 +350,52 @@ class TestModelUpgrade(ZuulTestCase):
matcher2 = change_matcher.BranchMatcher.deserialize(ser)
self.assertEqual(matcher, matcher2)
@model_version(18)
def test_model_18(self):
# Test backward compatibility with old name-based job storage
# when not all components have updated yet.
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
], ordered=False)
@model_version(18)
def test_model_18_19(self):
# Test backward compatibility with existing job graphs that
# still use the name-based job storage.
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
# Upgrade our component
self.model_test_component_info.model_api = 19
component_registry = ComponentRegistry(self.zk_client)
for _ in iterate_timeout(30, "model api to update"):
if component_registry.model_api == 19:
break
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
], ordered=False)
class TestGithubModelUpgrade(ZuulTestCase):
config_file = 'zuul-github-driver.conf'

View File

@ -1087,9 +1087,6 @@ class TestScheduler(ZuulTestCase):
def test_failed_change_at_head_with_queue(self):
"Test that if a change at the head fails, queued jobs are canceled"
def get_name(params):
return params.get('job_ref', '').split('/')[-1]
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
@ -1110,7 +1107,7 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(len(queue), 1)
self.assertEqual(queue[0].zone, None)
params = self.executor_server.executor_api.getParams(queue[0])
self.assertEqual(get_name(params), 'project-merge')
self.assertEqual(queue[0].job_name, 'project-merge')
self.assertEqual(params['items'][0]['number'], '%d' % A.number)
self.executor_api.release('.*-merge')
@ -1119,19 +1116,17 @@ class TestScheduler(ZuulTestCase):
self.waitUntilSettled()
self.executor_api.release('.*-merge')
self.waitUntilSettled()
queue = list(self.executor_api.queued())
params = [self.executor_server.executor_api.getParams(item)
for item in queue]
queue = list(self.executor_api.queued())
self.assertEqual(len(self.builds), 0)
self.assertEqual(len(queue), 6)
self.assertEqual(get_name(params[0]), 'project-test1')
self.assertEqual(get_name(params[1]), 'project-test2')
self.assertEqual(get_name(params[2]), 'project-test1')
self.assertEqual(get_name(params[3]), 'project-test2')
self.assertEqual(get_name(params[4]), 'project-test1')
self.assertEqual(get_name(params[5]), 'project-test2')
self.assertEqual(queue[0].job_name, 'project-test1')
self.assertEqual(queue[1].job_name, 'project-test2')
self.assertEqual(queue[2].job_name, 'project-test1')
self.assertEqual(queue[3].job_name, 'project-test2')
self.assertEqual(queue[4].job_name, 'project-test1')
self.assertEqual(queue[5].job_name, 'project-test2')
self.executor_api.release(queue[0])
self.waitUntilSettled()

View File

@ -2403,6 +2403,10 @@ class FrozenJob(zkobject.ZKObject):
@classmethod
def new(klass, context, **kw):
obj = klass()
if (COMPONENT_REGISTRY.model_api < 19):
obj._set(uuid=None)
else:
obj._set(uuid=uuid4().hex)
# Convert these to JobData after creation.
job_data_vars = {}
@ -2432,18 +2436,24 @@ class FrozenJob(zkobject.ZKObject):
return self.parent is None
@classmethod
def jobPath(cls, job_name, parent_path):
safe_job = urllib.parse.quote_plus(job_name)
return f"{parent_path}/job/{safe_job}"
def jobPath(cls, job_id, parent_path):
# MODEL_API < 19
# Job name is used as path component and needs to be quoted.
safe_id = urllib.parse.quote_plus(job_id)
return f"{parent_path}/job/{safe_id}"
def getPath(self):
return self.jobPath(self.name, self.buildset.getPath())
# MODEL_API < 19
job_id = self.uuid or self.name
return self.jobPath(job_id, self.buildset.getPath())
def serialize(self, context):
# Ensure that any special handling in this method is matched
# in Job.freezeJob so that FrozenJobs are identical regardless
# of whether they have been deserialized.
data = {}
data = {
"uuid": self.uuid,
}
for k in self.attributes:
# TODO: Backwards compat handling, remove after 5.0
if k == 'config_hash':
@ -2504,6 +2514,9 @@ class FrozenJob(zkobject.ZKObject):
if 'failure_output' not in data:
data['failure_output'] = []
# MODEL_API < 19
data.setdefault("uuid", None)
if hasattr(self, 'nodeset_alternatives'):
alts = self.nodeset_alternatives
else:
@ -3623,6 +3636,8 @@ class JobGraph(object):
self._job_map = job_map
# An ordered list of jobs
self.jobs = []
# An ordered list of job UUIDs
self.job_uuids = []
# dependent_job_name -> dict(parent_job_name -> soft)
self._dependencies = {}
self.project_metadata = {}
@ -3633,6 +3648,7 @@ class JobGraph(object):
def toDict(self):
data = {
"jobs": self.jobs,
"job_uuids": self.job_uuids,
"dependencies": self._dependencies,
"project_metadata": {
k: v.toDict() for (k, v) in self.project_metadata.items()
@ -3644,6 +3660,9 @@ class JobGraph(object):
def fromDict(klass, data, job_map):
self = klass(job_map)
self.jobs = data['jobs']
# MODEL_API < 19: if job uuids is not set, we default the
# UUID for all jobs to None.
self.job_uuids = data.get('job_uuids', [None] * len(self.jobs))
self._dependencies = data['dependencies']
self.project_metadata = {
k: ProjectMetadata.fromDict(v)
@ -3658,6 +3677,7 @@ class JobGraph(object):
raise Exception("Job %s already added" % (job.name,))
self._job_map[job.name] = job
self.jobs.append(job.name)
self.job_uuids.append(job.uuid)
# Append the dependency information
self._dependencies.setdefault(job.name, {})
try:
@ -3674,6 +3694,7 @@ class JobGraph(object):
except Exception:
del self._job_map[job.name]
self.jobs.pop()
self.job_uuids.pop()
del self._dependencies[job.name]
raise
@ -3681,6 +3702,9 @@ class JobGraph(object):
# Report in the order of layout cfg
return list([self._job_map[x] for x in self.jobs])
def getUuidForJob(self, job_name):
return self.job_uuids[self.jobs.index(job_name)]
def getDirectDependentJobs(self, parent_job, skip_soft=False):
ret = set()
for dependent_name, parents in self._dependencies.items():
@ -4545,8 +4569,8 @@ class BuildSet(zkobject.ZKObject):
job_versions = data.get('job_versions', {})
build_versions = data.get('build_versions', {})
# jobs (deserialize as separate objects)
if data['job_graph']:
for job_name in data['job_graph'].jobs:
if job_graph := data['job_graph']:
for job_name in job_graph.jobs:
# If we have a current build before refreshing, we may
# be able to skip refreshing some items since they
# will not have changed.
@ -4562,7 +4586,10 @@ class BuildSet(zkobject.ZKObject):
tpe_jobs.append((None, job_name,
tpe.submit(job.refresh, context)))
else:
job_path = FrozenJob.jobPath(job_name, self.getPath())
job_uuid = job_graph.getUuidForJob(job_name)
# MODEL_API < 19; use job_name if job_uuid is None
job_path = FrozenJob.jobPath(
job_uuid or job_name, self.getPath())
tpe_jobs.append(('job', job_name, tpe.submit(
FrozenJob.fromZK, context, job_path, buildset=self)))

View File

@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
MODEL_API = 18
MODEL_API = 19