Include job_uuid in NodeRequests

This is part of the circular dependency refactor.  It updates the
NodeRequest object to include the job_uuid in addition to the job_name
(which is temporarily kept for backwards compatability).  When node
requests are completed, we now look up the job by uuid if supplied.

Change-Id: I57d4ab6c241b03f76f80346b5567600e1692947a
This commit is contained in:
James E. Blair 2023-12-20 09:52:40 -08:00
parent 9201f9ee28
commit 7262ef7f6f
8 changed files with 86 additions and 17 deletions

View File

@ -180,3 +180,9 @@ Version 23
:Prior Zuul version: 9.3.0
:Description: Add model_version field to bulid sets.
Affects schedulers.
Version 24
----------
:Prior Zuul version: 9.3.0
:Description: Add job_uuid to NodeRequests.
Affects schedulers.

View File

@ -758,6 +758,27 @@ class TestDataReturn(AnsibleZuulTestCase):
def test_model_22_23(self):
self._test_circ_dep_refactor(23)
# To 24
@model_version(18)
def test_model_18_24(self):
self._test_circ_dep_refactor(24)
@model_version(20)
def test_model_20_24(self):
self._test_circ_dep_refactor(24)
@model_version(21)
def test_model_21_24(self):
self._test_circ_dep_refactor(24)
@model_version(22)
def test_model_22_24(self):
self._test_circ_dep_refactor(24)
@model_version(23)
def test_model_23_24(self):
self._test_circ_dep_refactor(24)
def _test_circ_dep_refactor(self, final_model_api):
# Test backwards compat for job graph dependency freezing.
# First test the entire lifecycle under the old api.

View File

@ -12,12 +12,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from zuul import model
import zuul.nodepool
from tests.base import BaseTestCase, FakeNodepool, iterate_timeout
from zuul.zk import ZooKeeperClient
from zuul.zk.nodepool import ZooKeeperNodepool
from zuul.zk.components import COMPONENT_REGISTRY
from zuul.model_api import MODEL_API
class NodepoolWithCallback(zuul.nodepool.Nodepool):
@ -31,6 +35,12 @@ class NodepoolWithCallback(zuul.nodepool.Nodepool):
self.provisioned_requests.append(request)
class Dummy(object):
def __init__(self, **kw):
for k, v in kw.items():
setattr(self, k, v)
class TestNodepoolBase(BaseTestCase):
# Tests the Nodepool interface class using a fake nodepool and
# scheduler.
@ -38,6 +48,9 @@ class TestNodepoolBase(BaseTestCase):
def setUp(self):
super().setUp()
COMPONENT_REGISTRY.registry = Dummy()
COMPONENT_REGISTRY.registry.model_api = MODEL_API
self.statsd = None
self.setupZK()
@ -58,6 +71,18 @@ class TestNodepoolBase(BaseTestCase):
self.addCleanup(self.fake_nodepool.stop)
class FakeFrozenJob(model.Job):
def __init__(self, name):
super().__init__(name)
self.uuid = uuid.uuid4().hex
self.ref = 'fake reference'
# MODEL_API < 19
@property
def _job_id(self):
return self.uuid or self.name
class TestNodepool(TestNodepoolBase):
def test_node_request(self):
# Test a simple node request
@ -65,7 +90,7 @@ class TestNodepool(TestNodepoolBase):
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job = FakeFrozenJob('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(
"test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
@ -110,7 +135,7 @@ class TestNodepool(TestNodepoolBase):
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job = FakeFrozenJob('testjob')
job.nodeset = nodeset
self.fake_nodepool.pause()
request = self.nodepool.requestNodes(
@ -136,7 +161,7 @@ class TestNodepool(TestNodepoolBase):
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job = FakeFrozenJob('testjob')
job.nodeset = nodeset
self.fake_nodepool.pause()
request1 = self.nodepool.requestNodes(

View File

@ -1010,7 +1010,7 @@ class PipelineManager(metaclass=ABCMeta):
build_set.setJobNodeRequestID(job, req.id)
if req.fulfilled:
nodeset = self.sched.nodepool.getNodeSet(req, job.nodeset)
job = build_set.item.getJob(req.job_name)
job = build_set.item.getJob(req._job_id)
build_set.jobNodeRequestComplete(job, nodeset)
else:
job.setWaitingStatus(f'node request: {req.id}')
@ -2169,11 +2169,11 @@ class PipelineManager(metaclass=ABCMeta):
log = get_annotated_logger(self.log, request.event_id)
self.reportPipelineTiming('node_request_time', request.created_time)
job = build_set.item.getJob(request.job_name)
job = build_set.item.getJob(request._job_id)
# First see if we need to retry the request
if not request.fulfilled:
log.info("Node request %s: failure for %s",
request, request.job_name)
request, job.name)
if self._handleNodeRequestFallback(log, build_set, job, request):
return
# No more fallbacks -- tell the buildset the request is complete
@ -2193,7 +2193,7 @@ class PipelineManager(metaclass=ABCMeta):
log.info("Completed node request %s for job %s of item %s "
"with nodes %s",
request, request.job_name, build_set.item, request.nodes)
request, job.name, build_set.item, request.nodes)
def reportItem(self, item, phase1=True, phase2=True):
log = get_annotated_logger(self.log, item.event)

View File

@ -1746,13 +1746,15 @@ class NodeRequest(object):
"""A request for a set of nodes."""
def __init__(self, requestor, build_set_uuid, tenant_name, pipeline_name,
job_name, labels, provider, relative_priority,
job_name, job_uuid, labels, provider, relative_priority,
event_id=None, span_info=None):
self.requestor = requestor
self.build_set_uuid = build_set_uuid
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
# MODEL_API < 24
self.job_name = job_name
self.job_uuid = job_uuid
self.labels = labels
self.nodes = []
self._state = STATE_REQUESTED
@ -1797,6 +1799,12 @@ class NodeRequest(object):
self._state = value
self.state_time = time.time()
@property
def _job_id(self):
# MODEL_API < 24
# Remove this after circular dep refactor
return self.job_uuid or self.job_name
def __repr__(self):
return '<NodeRequest %s %s>' % (self.id, self.labels)
@ -1818,6 +1826,8 @@ class NodeRequest(object):
"job_name": self.job_name,
"span_info": self.span_info,
}
if (COMPONENT_REGISTRY.model_api >= 24):
d["requestor_data"]['job_uuid'] = self.job_uuid
d.setdefault('node_types', self.labels)
d.setdefault('requestor', self.requestor)
d.setdefault('created_time', self.created_time)
@ -1862,6 +1872,7 @@ class NodeRequest(object):
tenant_name=requestor_data.get("tenant_name"),
pipeline_name=requestor_data.get("pipeline_name"),
job_name=requestor_data.get("job_name"),
job_uuid=requestor_data.get("job_uuid"),
labels=data["node_types"],
provider=data["provider"],
relative_priority=data.get("relative_priority", 0),
@ -5470,8 +5481,15 @@ class QueueItem(zkobject.ZKObject):
return []
return self.current_build_set.job_graph.getJobs()
def getJob(self, name):
return self.current_build_set.job_graph.getJobFromName(name)
def getJob(self, job_id):
# MODEL_API < 24
job_graph = self.current_build_set.job_graph
try:
job = job_graph.getJobFromUuid(job_id)
if job is not None:
return job
except (KeyError, ValueError):
return job_graph.getJobFromName(job_id)
@property
def items_ahead(self):

View File

@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
MODEL_API = 23
MODEL_API = 24

View File

@ -191,8 +191,8 @@ class Nodepool(object):
else:
event_id = None
req = model.NodeRequest(self.system_id, build_set_uuid, tenant_name,
pipeline_name, job.name, labels, provider,
relative_priority, event_id)
pipeline_name, job.name, job._job_id, labels,
provider, relative_priority, event_id)
if job.nodeset.nodes:
self.zk_nodepool.submitNodeRequest(req, priority)

View File

@ -3047,12 +3047,11 @@ class Scheduler(threading.Thread):
return
log = get_annotated_logger(self.log, request.event_id)
job = build_set.item.getJob(request.job_name)
job = build_set.item.getJob(request._job_id)
if job is None:
log.warning("Item %s does not contain job %s "
"for node request %s",
build_set.item, request.job_name, request)
build_set.removeJobNodeRequestID(request.job_name)
build_set.item, request._job_id, request)
return
# If the request failed, we must directly delete it as the nodes will
@ -3063,7 +3062,7 @@ class Scheduler(threading.Thread):
nodeset = self.nodepool.getNodeSet(request, job.nodeset)
job = build_set.item.getJob(request.job_name)
job = build_set.item.getJob(request._job_id)
if build_set.getJobNodeSetInfo(job) is None:
pipeline.manager.onNodesProvisioned(request, nodeset, build_set)
else: