Add global semaphore support
This adds support for global semaphores which can be used by multiple tenants. This supports the use case where they represent real-world resources which operate independentyl of Zuul tenants. This implements and removes the spec describing the feature. One change from the spec is that the configuration object in the tenant config file is "global-semaphore" rather than "semaphore". This makes it easier to distinguish them in documentation (facilitating easier cross-references and deep links), and may also make it easier for users to understand that they have distinct behavoirs. Change-Id: I5f2225a700d8f9bef0399189017f23b3f4caad17
This commit is contained in:
parent
7842e3fcf1
commit
7fc94effe7
|
@ -219,14 +219,14 @@ Here is an example of two job definitions:
|
|||
|
||||
.. attr:: semaphores
|
||||
|
||||
The name of a :ref:`semaphore` (or list of them) which should be
|
||||
acquired and released when the job begins and ends. If the
|
||||
semaphore is at maximum capacity, then Zuul will wait until it
|
||||
can be acquired before starting the job. The format is either a
|
||||
string, a dictionary, or a list of either of those in the case
|
||||
of multiple semaphores. If it's a string it references a
|
||||
semaphore using the default value for
|
||||
:attr:`job.semaphores.resources-first`.
|
||||
The name of a :ref:`semaphore` (or list of them) or
|
||||
:ref:`global_semaphore` which should be acquired and released
|
||||
when the job begins and ends. If the semaphore is at maximum
|
||||
capacity, then Zuul will wait until it can be acquired before
|
||||
starting the job. The format is either a string, a dictionary,
|
||||
or a list of either of those in the case of multiple
|
||||
semaphores. If it's a string it references a semaphore using the
|
||||
default value for :attr:`job.semaphores.resources-first`.
|
||||
|
||||
If multiple semaphores are requested, the job will not start
|
||||
until all have been acquired, and Zuul will wait until all are
|
||||
|
|
|
@ -15,6 +15,10 @@ project as long as the value is the same. This is to aid in branch
|
|||
maintenance, so that creating a new branch based on an existing branch
|
||||
will not immediately produce a configuration error.
|
||||
|
||||
Zuul also supports global semaphores (see :ref:`global_semaphore`)
|
||||
which may only be created by the Zuul administrator, but can be used
|
||||
to coordinate resources across multiple tenants.
|
||||
|
||||
Semaphores are never subject to dynamic reconfiguration. If the value
|
||||
of a semaphore is changed, it will take effect only when the change
|
||||
where it is updated is merged. However, Zuul will attempt to validate
|
||||
|
|
|
@ -1,127 +0,0 @@
|
|||
Global Semaphores
|
||||
=================
|
||||
|
||||
.. warning:: This is not authoritative documentation. These features
|
||||
are not currently available in Zuul. They may change significantly
|
||||
before final implementation, or may never be fully completed.
|
||||
|
||||
Semaphores are useful for limiting access to resources, but their
|
||||
implementation as a per-tenant configuration construct may be limiting
|
||||
if they are used for real-world resources that span tenants.
|
||||
|
||||
This is a proposal to address that by adding global semaphores.
|
||||
|
||||
Background
|
||||
----------
|
||||
|
||||
Semaphores may be used for a variety of purposes. One of these is to
|
||||
limit access to constrained resources. Doing so allows Zuul to avoid
|
||||
requesting nodes and scheduling jobs until these resources are
|
||||
available. This makes the overall system more efficient as jobs don't
|
||||
need to wait for resources during their run phase (where they may be
|
||||
idling test nodes which could otherwise be put to better use).
|
||||
|
||||
A concrete example of this is software licenses. If a job requires
|
||||
software which uses a license server to ensure that the number of
|
||||
in-use seats does not exceed the available seats, a semaphore with a
|
||||
max value equal to the number of available seats can be used to help
|
||||
Zuul avoid starting jobs which would otherwise need to wait for a
|
||||
license.
|
||||
|
||||
If only one Zuul tenant uses this piece of software, the existing
|
||||
implementation of semaphores in Zuul is satisfactory. But if the
|
||||
software licenses are shared across Zuul tenants, then a Zuul
|
||||
semaphore can't be used in this way since semaphores are per-tenant
|
||||
constructs.
|
||||
|
||||
The general solution to sharing Zuul configuration objects across
|
||||
tenants is to define them in a single git repo and include that git
|
||||
repo in multiple tenants. That works as expected for Jobs, Project
|
||||
Templates, etc. But semaphores have a definition as well as a
|
||||
run-time state (whether they are aquired and by whom). Including a
|
||||
semaphore in multiple tenants essentially makes copies of that
|
||||
semaphore, each with its own distinct set of holders.
|
||||
|
||||
Proposed Change
|
||||
---------------
|
||||
|
||||
A new global semaphore configuration would be added to the tenant
|
||||
configuration file. Note this is the global configuration file (where
|
||||
tenants are defined); not in-repo configuration where semaphores are
|
||||
currently defined.
|
||||
|
||||
The definition would be identical to the current in-repo semaphore
|
||||
configuration. In order to grant access to only certain tenants, each
|
||||
tenant will also need to specify whether that semaphore should be
|
||||
available to the tenant. This scheme is similar to the way that
|
||||
authorization rules are defined in this file and then attached to
|
||||
tenants.
|
||||
|
||||
For example:
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
- semaphore:
|
||||
name: licensed-software
|
||||
max: 8
|
||||
|
||||
- tenant:
|
||||
name: example-tenant
|
||||
semaphores:
|
||||
- licensed-software
|
||||
source:
|
||||
gerrit:
|
||||
config-projects:
|
||||
...
|
||||
|
||||
The existing in-repo semaphores will remain as they are today -- they
|
||||
will not be deprecated (they are still very useful on their own for
|
||||
most other use cases).
|
||||
|
||||
If an in-repo semaphore is defined with the same name as a global
|
||||
semaphore, that will become a configuration error. The global
|
||||
semaphore will take precedence.
|
||||
|
||||
Implementation
|
||||
--------------
|
||||
|
||||
The user-visible configuration is described above.
|
||||
|
||||
Current semaphores are stored in the ZooKeeper path
|
||||
``/zuul/semaphores/<tenant>/<semaphore>``. Global semaphores will use
|
||||
a similar scheme without the tenant name:
|
||||
``/zuul/global-semaphores/<semaphore>``.
|
||||
|
||||
Locking, releasing, and leak cleanup will all behave similarly to the
|
||||
current per-tenant semaphores. On release, a per-tenant semaphore
|
||||
broadcasts a PipelineSemaphoreReleaseEvent to all pipelines in order
|
||||
to trigger a pipeline run and start any jobs which may be waiting on
|
||||
the semaphore. A global semaphore will do the same, but for every
|
||||
pipeline of every tenant which includes the semaphore.
|
||||
|
||||
Alternatives
|
||||
------------
|
||||
|
||||
We could add a field to the in-repo definitions of semaphores which
|
||||
indicates that the semaphore should be global. As this has the
|
||||
ability to affect other tenants, we would need to restrict this to
|
||||
config-projects only. However, that still opens the possibility of
|
||||
one tenant affecting another via the contents of a config-project.
|
||||
Some method of allowing the administrator to control this via the
|
||||
tenant config file would still likely be necessary. As long as that's
|
||||
the case, it seems simpler to just define the semaphores there too.
|
||||
|
||||
We could outsource this to Nodepool. In fact, having nodepool manage
|
||||
resources like this seems like a natural fit. However, the current
|
||||
Nodepool implementation doesn't permit more than one provider to
|
||||
satisfy a node request, so a hypothetical semaphore provider wouldn't
|
||||
be able to be combined with a provider of actual test nodes.
|
||||
Addressing this is in-scope and a worthwhile change for Nodepool, but
|
||||
it is potentially a large and complex change. Additionally, the idea
|
||||
of waiting for a semaphore before submitting requests for real
|
||||
resources adds a new dimension to even that idea -- Nodepool would
|
||||
need to know whether to run the semaphore provider first or last
|
||||
depending on the desired resource aquisition order. Meanwhile, since
|
||||
Zuul does have the concept of semaphores already and they almost fit
|
||||
this use case, this seems like a reasonable change to make in Zuul
|
||||
regardless of any potential Nodepool changes.
|
|
@ -23,4 +23,3 @@ documentation instead.
|
|||
enhanced-regional-executors
|
||||
tenant-resource-quota
|
||||
community-matrix
|
||||
global-semaphores
|
||||
|
|
|
@ -414,6 +414,13 @@ This is a reference for object layout in Zookeeper.
|
|||
An election to decide which scheduler will report system-wide stats
|
||||
(such as total node requests).
|
||||
|
||||
.. path:: zuul/global-semaphores/<semaphore>
|
||||
:type: SemaphoreHandler
|
||||
|
||||
Represents a global semaphore (shared by multiple tenants).
|
||||
Information about which builds hold the semaphore is stored in the
|
||||
znode data.
|
||||
|
||||
.. path:: zuul/semaphores/<tenant>/<semaphore>
|
||||
:type: SemaphoreHandler
|
||||
|
||||
|
|
|
@ -384,6 +384,57 @@ configuration. Some examples of tenant definitions are:
|
|||
to add finer filtering to admin rules, for example filtering by the ``iss``
|
||||
claim (generally equal to the issuer ID).
|
||||
|
||||
.. attr:: semaphores
|
||||
|
||||
A list of names of :attr:`global-semaphore` objects to allow
|
||||
jobs in this tenant to access.
|
||||
|
||||
.. _global_semaphore:
|
||||
|
||||
Global Semaphore
|
||||
----------------
|
||||
|
||||
Semaphores are normally defined in in-repo configuration (see
|
||||
:ref:`semaphore`), however to support use-cases where semaphores are
|
||||
used to represent constrained global resources that may be used by
|
||||
multiple Zuul tenants, semaphores may be defined within the main
|
||||
tenant configuration file.
|
||||
|
||||
In order for a job to use a global semaphore, the semaphore must first
|
||||
be defined in the tenant configuration file with
|
||||
:attr:`global-semaphore` and then added to each tenant which should
|
||||
have access to it with :attr:`tenant.semaphores`. Once that is done,
|
||||
Zuul jobs may use that semaphore in the same way they would use a
|
||||
normal tenant-scoped semaphore.
|
||||
|
||||
If any tenant which is granted access to a global semaphore also has a
|
||||
tenant-scoped semaphore defined with the same name, that definition
|
||||
will be treated as a configuration error and subsequently ignored in
|
||||
favor of the global semaphore.
|
||||
|
||||
An example definition looks similar to the normal semaphore object:
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
- global-semaphore:
|
||||
name: global-semaphore-foo
|
||||
max: 5
|
||||
|
||||
.. attr:: global-semaphore
|
||||
|
||||
The following attributes are available:
|
||||
|
||||
.. attr:: name
|
||||
:required:
|
||||
|
||||
The name of the semaphore, referenced by jobs.
|
||||
|
||||
.. attr:: max
|
||||
:default: 1
|
||||
|
||||
The maximum number of running jobs which can use this semaphore.
|
||||
|
||||
|
||||
.. _admin_rule_definition:
|
||||
|
||||
Access Rule
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Support for global (cross-tenant) semaphores has been added. See
|
||||
:ref:`global_semaphore`.
|
|
@ -0,0 +1,10 @@
|
|||
- tenant:
|
||||
name: tenant-two
|
||||
semaphores:
|
||||
- global-semaphore
|
||||
source:
|
||||
gerrit:
|
||||
config-projects:
|
||||
- common-config
|
||||
untrusted-projects:
|
||||
- org/project2
|
1
tests/fixtures/config/global-semaphores-config/git/common-config/playbooks/run.yaml
vendored
Normal file
1
tests/fixtures/config/global-semaphores-config/git/common-config/playbooks/run.yaml
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
---
|
|
@ -0,0 +1,52 @@
|
|||
- pipeline:
|
||||
name: check
|
||||
manager: independent
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: patchset-created
|
||||
- event: comment-added
|
||||
comment: '^(Patch Set [0-9]+:\n\n)?(?i:recheck)$'
|
||||
success:
|
||||
gerrit:
|
||||
Verified: 1
|
||||
failure:
|
||||
gerrit:
|
||||
Verified: -1
|
||||
|
||||
- pipeline:
|
||||
name: gate
|
||||
manager: dependent
|
||||
success-message: Build succeeded (gate).
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: comment-added
|
||||
approval:
|
||||
- Approved: 1
|
||||
success:
|
||||
gerrit:
|
||||
Verified: 2
|
||||
submit: true
|
||||
failure:
|
||||
gerrit:
|
||||
Verified: -2
|
||||
start:
|
||||
gerrit:
|
||||
Verified: 0
|
||||
precedence: high
|
||||
|
||||
- job:
|
||||
name: base
|
||||
parent: null
|
||||
run: playbooks/run.yaml
|
||||
|
||||
- semaphore:
|
||||
name: common-semaphore
|
||||
max: 10
|
||||
|
||||
- job:
|
||||
name: test-global-semaphore
|
||||
semaphores: global-semaphore
|
||||
|
||||
- job:
|
||||
name: test-common-semaphore
|
||||
semaphores: common-semaphore
|
|
@ -0,0 +1 @@
|
|||
test
|
|
@ -0,0 +1,19 @@
|
|||
# Not actually the global semaphore -- this will be overridden
|
||||
- semaphore:
|
||||
name: global-semaphore
|
||||
max: 2
|
||||
|
||||
- semaphore:
|
||||
name: project1-semaphore
|
||||
max: 11
|
||||
|
||||
- job:
|
||||
name: test-project1-semaphore
|
||||
semaphores: project1-semaphore
|
||||
|
||||
- project:
|
||||
check:
|
||||
jobs:
|
||||
- test-global-semaphore
|
||||
- test-common-semaphore
|
||||
- test-project1-semaphore
|
|
@ -0,0 +1 @@
|
|||
test
|
|
@ -0,0 +1,14 @@
|
|||
- semaphore:
|
||||
name: project2-semaphore
|
||||
max: 12
|
||||
|
||||
- job:
|
||||
name: test-project2-semaphore
|
||||
semaphores: project2-semaphore
|
||||
|
||||
- project:
|
||||
check:
|
||||
jobs:
|
||||
- test-global-semaphore
|
||||
- test-common-semaphore
|
||||
- test-project2-semaphore
|
|
@ -0,0 +1 @@
|
|||
test
|
|
@ -0,0 +1,20 @@
|
|||
# Not actually the global semaphore -- this tenant doesn't have it, so
|
||||
# this semaphore will be used.
|
||||
- semaphore:
|
||||
name: global-semaphore
|
||||
max: 999
|
||||
|
||||
- semaphore:
|
||||
name: project3-semaphore
|
||||
max: 13
|
||||
|
||||
- job:
|
||||
name: test-project3-semaphore
|
||||
semaphores: project3-semaphore
|
||||
|
||||
- project:
|
||||
check:
|
||||
jobs:
|
||||
- test-global-semaphore
|
||||
- test-common-semaphore
|
||||
- test-project3-semaphore
|
|
@ -0,0 +1,34 @@
|
|||
- global-semaphore:
|
||||
name: global-semaphore
|
||||
max: 100
|
||||
|
||||
- tenant:
|
||||
name: tenant-one
|
||||
semaphores:
|
||||
- global-semaphore
|
||||
source:
|
||||
gerrit:
|
||||
config-projects:
|
||||
- common-config
|
||||
untrusted-projects:
|
||||
- org/project1
|
||||
|
||||
- tenant:
|
||||
name: tenant-two
|
||||
semaphores:
|
||||
- global-semaphore
|
||||
source:
|
||||
gerrit:
|
||||
config-projects:
|
||||
- common-config
|
||||
untrusted-projects:
|
||||
- org/project2
|
||||
|
||||
- tenant:
|
||||
name: tenant-three
|
||||
source:
|
||||
gerrit:
|
||||
config-projects:
|
||||
- common-config
|
||||
untrusted-projects:
|
||||
- org/project3
|
|
@ -0,0 +1 @@
|
|||
---
|
|
@ -0,0 +1,44 @@
|
|||
- pipeline:
|
||||
name: check
|
||||
manager: independent
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: patchset-created
|
||||
- event: comment-added
|
||||
comment: '^(Patch Set [0-9]+:\n\n)?(?i:recheck)$'
|
||||
success:
|
||||
gerrit:
|
||||
Verified: 1
|
||||
failure:
|
||||
gerrit:
|
||||
Verified: -1
|
||||
|
||||
- pipeline:
|
||||
name: gate
|
||||
manager: dependent
|
||||
success-message: Build succeeded (gate).
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: comment-added
|
||||
approval:
|
||||
- Approved: 1
|
||||
success:
|
||||
gerrit:
|
||||
Verified: 2
|
||||
submit: true
|
||||
failure:
|
||||
gerrit:
|
||||
Verified: -2
|
||||
start:
|
||||
gerrit:
|
||||
Verified: 0
|
||||
precedence: high
|
||||
|
||||
- job:
|
||||
name: base
|
||||
parent: null
|
||||
run: playbooks/run.yaml
|
||||
|
||||
- job:
|
||||
name: test-global-semaphore
|
||||
semaphores: global-semaphore
|
|
@ -0,0 +1 @@
|
|||
test
|
|
@ -0,0 +1,4 @@
|
|||
- project:
|
||||
check:
|
||||
jobs:
|
||||
- test-global-semaphore
|
|
@ -0,0 +1 @@
|
|||
test
|
|
@ -0,0 +1,4 @@
|
|||
- project:
|
||||
check:
|
||||
jobs:
|
||||
- test-global-semaphore
|
|
@ -0,0 +1,25 @@
|
|||
- global-semaphore:
|
||||
name: global-semaphore
|
||||
max: 1
|
||||
|
||||
- tenant:
|
||||
name: tenant-one
|
||||
semaphores:
|
||||
- global-semaphore
|
||||
source:
|
||||
gerrit:
|
||||
config-projects:
|
||||
- common-config
|
||||
untrusted-projects:
|
||||
- org/project1
|
||||
|
||||
- tenant:
|
||||
name: tenant-two
|
||||
semaphores:
|
||||
- global-semaphore
|
||||
source:
|
||||
gerrit:
|
||||
config-projects:
|
||||
- common-config
|
||||
untrusted-projects:
|
||||
- org/project2
|
|
@ -0,0 +1,169 @@
|
|||
# Copyright 2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import zuul.configloader
|
||||
|
||||
from tests.base import ZuulTestCase
|
||||
|
||||
|
||||
class TestGlobalSemaphoresConfig(ZuulTestCase):
|
||||
tenant_config_file = 'config/global-semaphores-config/main.yaml'
|
||||
|
||||
def assertSemaphores(self, tenant, semaphores):
|
||||
for k, v in semaphores.items():
|
||||
self.assertEqual(
|
||||
len(tenant.semaphore_handler.semaphoreHolders(k)),
|
||||
v, k)
|
||||
|
||||
def assertSemaphoresMax(self, tenant, semaphores):
|
||||
for k, v in semaphores.items():
|
||||
abide = tenant.semaphore_handler.abide
|
||||
semaphore = tenant.layout.getSemaphore(abide, k)
|
||||
self.assertEqual(semaphore.max, v, k)
|
||||
|
||||
def test_semaphore_scope(self):
|
||||
# This tests global and tenant semaphore scope
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
tenant1 = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
tenant2 = self.scheds.first.sched.abide.tenants.get('tenant-two')
|
||||
tenant3 = self.scheds.first.sched.abide.tenants.get('tenant-three')
|
||||
|
||||
# The different max values will tell us that we have the right
|
||||
# semaphore objects. Each tenant has one tenant-scope
|
||||
# semaphore in a tenant-specific project, and one tenant-scope
|
||||
# semaphore with a common definition. Tenants 1 and 2 share a
|
||||
# global-scope semaphore, and tenant 3 has a tenant-scope
|
||||
# semaphore with the same name.
|
||||
|
||||
# Here is what is defined in each tenant:
|
||||
# Tenant-one:
|
||||
# * global-semaphore: scope:global max:100 definition:main.yaml
|
||||
# * common-semaphore: scope:tenant max:10 definition:common-config
|
||||
# * project1-semaphore: scope:tenant max:11 definition:project1
|
||||
# * (global-semaphore): scope:tenant max:2 definition:project1
|
||||
# [unused since it shadows the actual global-semaphore]
|
||||
# Tenant-two:
|
||||
# * global-semaphore: scope:global max:100 definition:main.yaml
|
||||
# * common-semaphore: scope:tenant max:10 definition:common-config
|
||||
# * project2-semaphore: scope:tenant max:12 definition:project2
|
||||
# Tenant-three:
|
||||
# * global-semaphore: scope:global max:999 definition:project3
|
||||
# * common-semaphore: scope:tenant max:10 definition:common-config
|
||||
# * project3-semaphore: scope:tenant max:13 definition:project3
|
||||
self.assertSemaphoresMax(tenant1, {'global-semaphore': 100,
|
||||
'common-semaphore': 10,
|
||||
'project1-semaphore': 11,
|
||||
'project2-semaphore': 1,
|
||||
'project3-semaphore': 1})
|
||||
self.assertSemaphoresMax(tenant2, {'global-semaphore': 100,
|
||||
'common-semaphore': 10,
|
||||
'project1-semaphore': 1,
|
||||
'project2-semaphore': 12,
|
||||
'project3-semaphore': 1})
|
||||
# This "global" semaphore is really tenant-scoped, it just has
|
||||
# the same name.
|
||||
self.assertSemaphoresMax(tenant3, {'global-semaphore': 999,
|
||||
'common-semaphore': 10,
|
||||
'project1-semaphore': 1,
|
||||
'project2-semaphore': 1,
|
||||
'project3-semaphore': 13})
|
||||
|
||||
# We should have a config error in tenant1 due to the
|
||||
# redefinition.
|
||||
self.assertEquals(len(tenant1.layout.loading_errors), 1)
|
||||
self.assertEquals(len(tenant2.layout.loading_errors), 0)
|
||||
self.assertEquals(len(tenant3.layout.loading_errors), 0)
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
|
||||
C = self.fake_gerrit.addFakeChange('org/project3', 'master', 'C')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Checking the number of holders tells us whethere we are
|
||||
# using global or tenant-scoped semaphores. Each in-use
|
||||
# semaphore in a tenant should have only one holder except the
|
||||
# global-scope semaphore shared between tenants 1 and 2.
|
||||
self.assertSemaphores(tenant1, {'global-semaphore': 2,
|
||||
'common-semaphore': 1,
|
||||
'project1-semaphore': 1,
|
||||
'project2-semaphore': 0,
|
||||
'project3-semaphore': 0})
|
||||
self.assertSemaphores(tenant2, {'global-semaphore': 2,
|
||||
'common-semaphore': 1,
|
||||
'project1-semaphore': 0,
|
||||
'project2-semaphore': 1,
|
||||
'project3-semaphore': 0})
|
||||
self.assertSemaphores(tenant3, {'global-semaphore': 1,
|
||||
'common-semaphore': 1,
|
||||
'project1-semaphore': 0,
|
||||
'project2-semaphore': 0,
|
||||
'project3-semaphore': 1})
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
|
||||
class TestGlobalSemaphoresBroken(ZuulTestCase):
|
||||
validate_tenants = []
|
||||
tenant_config_file = 'config/global-semaphores-config/broken.yaml'
|
||||
# This test raises a config error during the startup of the test
|
||||
# case which makes the first scheduler fail during its startup.
|
||||
# The second (or any additional) scheduler won't even run as the
|
||||
# startup is serialized in tests/base.py.
|
||||
# Thus it doesn't make sense to execute this test with multiple
|
||||
# schedulers.
|
||||
scheduler_count = 1
|
||||
|
||||
def setUp(self):
|
||||
self.assertRaises(zuul.configloader.GlobalSemaphoreNotFoundError,
|
||||
super().setUp)
|
||||
|
||||
def test_broken_global_semaphore_config(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestGlobalSemaphores(ZuulTestCase):
|
||||
tenant_config_file = 'config/global-semaphores/main.yaml'
|
||||
|
||||
def test_global_semaphores(self):
|
||||
# This tests that a job finishing in one tenant will correctly
|
||||
# start a job in another tenant waiting on the semahpore.
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
|
||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertHistory([])
|
||||
self.assertBuilds([
|
||||
dict(name='test-global-semaphore', changes='1,1'),
|
||||
])
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertHistory([
|
||||
dict(name='test-global-semaphore',
|
||||
result='SUCCESS', changes='1,1'),
|
||||
dict(name='test-global-semaphore',
|
||||
result='SUCCESS', changes='2,1'),
|
||||
], ordered=False)
|
|
@ -8173,8 +8173,8 @@ class TestSemaphoreInRepo(ZuulTestCase):
|
|||
item_dynamic_layout = pipeline.manager._layout_cache.get(
|
||||
queue_item.layout_uuid)
|
||||
self.assertIsNotNone(item_dynamic_layout)
|
||||
dynamic_test_semaphore = \
|
||||
item_dynamic_layout.semaphores.get('test-semaphore')
|
||||
dynamic_test_semaphore = item_dynamic_layout.getSemaphore(
|
||||
self.scheds.first.sched.abide, 'test-semaphore')
|
||||
self.assertEqual(dynamic_test_semaphore.max, 1)
|
||||
|
||||
# one build must be in queue, one semaphores acquired
|
||||
|
@ -8197,7 +8197,8 @@ class TestSemaphoreInRepo(ZuulTestCase):
|
|||
|
||||
# now that change A was merged, the new semaphore max must be effective
|
||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
self.assertEqual(tenant.layout.semaphores.get('test-semaphore').max, 2)
|
||||
self.assertEqual(tenant.layout.getSemaphore(
|
||||
self.scheds.first.sched.abide, 'test-semaphore').max, 2)
|
||||
|
||||
# two builds must be in queue, two semaphores acquired
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
|
|
|
@ -190,6 +190,16 @@ class ProjectNotPermittedError(Exception):
|
|||
super(ProjectNotPermittedError, self).__init__(message)
|
||||
|
||||
|
||||
class GlobalSemaphoreNotFoundError(Exception):
|
||||
def __init__(self, semaphore):
|
||||
message = textwrap.dedent("""\
|
||||
The global semaphore "{semaphore}" was not found. All
|
||||
global semaphores must be added to the main configuration
|
||||
file by the Zuul administrator.""")
|
||||
message = textwrap.fill(message.format(semaphore=semaphore))
|
||||
super(GlobalSemaphoreNotFoundError, self).__init__(message)
|
||||
|
||||
|
||||
class YAMLDuplicateKeyError(ConfigurationSyntaxError):
|
||||
def __init__(self, key, node, context, start_mark):
|
||||
intro = textwrap.fill(textwrap.dedent("""\
|
||||
|
@ -1454,6 +1464,26 @@ class AuthorizationRuleParser(object):
|
|||
return a
|
||||
|
||||
|
||||
class GlobalSemaphoreParser(object):
|
||||
def __init__(self):
|
||||
self.log = logging.getLogger("zuul.GlobalSemaphoreParser")
|
||||
self.schema = self.getSchema()
|
||||
|
||||
def getSchema(self):
|
||||
semaphore = {vs.Required('name'): str,
|
||||
'max': int,
|
||||
}
|
||||
|
||||
return vs.Schema(semaphore)
|
||||
|
||||
def fromYaml(self, conf):
|
||||
self.schema(conf)
|
||||
semaphore = model.Semaphore(conf['name'], conf.get('max', 1),
|
||||
global_scope=True)
|
||||
semaphore.freeze()
|
||||
return semaphore
|
||||
|
||||
|
||||
class ParseContext(object):
|
||||
"""Hold information about a particular run of the parser"""
|
||||
|
||||
|
@ -1569,6 +1599,7 @@ class TenantParser(object):
|
|||
'default-parent': str,
|
||||
'default-ansible-version': vs.Any(str, float),
|
||||
'admin-rules': to_list(str),
|
||||
'semaphores': to_list(str),
|
||||
'authentication-realm': str,
|
||||
# TODO: Ignored, allowed for backwards compat, remove for v5.
|
||||
'report-build-page': bool,
|
||||
|
@ -1601,6 +1632,11 @@ class TenantParser(object):
|
|||
tenant.authorization_rules = conf['admin-rules']
|
||||
if conf.get('authentication-realm') is not None:
|
||||
tenant.default_auth_realm = conf['authentication-realm']
|
||||
if conf.get('semaphores') is not None:
|
||||
tenant.global_semaphores = set(as_list(conf['semaphores']))
|
||||
for semaphore_name in tenant.global_semaphores:
|
||||
if semaphore_name not in abide.semaphores:
|
||||
raise GlobalSemaphoreNotFoundError(semaphore_name)
|
||||
tenant.web_root = conf.get('web-root', self.globals.web_root)
|
||||
if tenant.web_root and not tenant.web_root.endswith('/'):
|
||||
tenant.web_root += '/'
|
||||
|
@ -1673,7 +1709,7 @@ class TenantParser(object):
|
|||
|
||||
if self.scheduler:
|
||||
tenant.semaphore_handler = SemaphoreHandler(
|
||||
self.zk_client, self.statsd, tenant.name, tenant.layout
|
||||
self.zk_client, self.statsd, tenant.name, tenant.layout, abide
|
||||
)
|
||||
# Only call the postConfig hook if we have a scheduler as this will
|
||||
# change data in ZooKeeper. In case we are in a zuul-web context,
|
||||
|
@ -2410,6 +2446,7 @@ class ConfigLoader(object):
|
|||
connections, zk_client, scheduler, merger, keystorage,
|
||||
zuul_globals, statsd)
|
||||
self.admin_rule_parser = AuthorizationRuleParser()
|
||||
self.global_semaphore_parser = GlobalSemaphoreParser()
|
||||
|
||||
def expandConfigPath(self, config_path):
|
||||
if config_path:
|
||||
|
@ -2466,6 +2503,12 @@ class ConfigLoader(object):
|
|||
admin_rule = self.admin_rule_parser.fromYaml(conf_admin_rule)
|
||||
abide.admin_rules[admin_rule.name] = admin_rule
|
||||
|
||||
def loadSemaphores(self, abide, unparsed_abide):
|
||||
abide.semaphores.clear()
|
||||
for conf_semaphore in unparsed_abide.semaphores:
|
||||
semaphore = self.global_semaphore_parser.fromYaml(conf_semaphore)
|
||||
abide.semaphores[semaphore.name] = semaphore
|
||||
|
||||
def loadTPCs(self, abide, unparsed_abide, tenants=None):
|
||||
if tenants:
|
||||
tenants_to_load = {t: unparsed_abide.tenants[t] for t in tenants
|
||||
|
|
|
@ -6831,11 +6831,13 @@ class UnparsedAbideConfig(object):
|
|||
self.ltime = -1
|
||||
self.tenants = {}
|
||||
self.admin_rules = []
|
||||
self.semaphores = []
|
||||
|
||||
def extend(self, conf):
|
||||
if isinstance(conf, UnparsedAbideConfig):
|
||||
self.tenants.update(conf.tenants)
|
||||
self.admin_rules.extend(conf.admin_rules)
|
||||
self.semaphores.extend(conf.semaphores)
|
||||
return
|
||||
|
||||
if not isinstance(conf, list):
|
||||
|
@ -6854,6 +6856,8 @@ class UnparsedAbideConfig(object):
|
|||
self.tenants[value["name"]] = value
|
||||
elif key == 'admin-rule':
|
||||
self.admin_rules.append(value)
|
||||
elif key == 'global-semaphore':
|
||||
self.semaphores.append(value)
|
||||
else:
|
||||
raise ConfigItemUnknownError(item)
|
||||
|
||||
|
@ -6862,6 +6866,7 @@ class UnparsedAbideConfig(object):
|
|||
"uuid": self.uuid,
|
||||
"tenants": self.tenants,
|
||||
"admin_rules": self.admin_rules,
|
||||
"semaphores": self.semaphores,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
|
@ -6871,6 +6876,7 @@ class UnparsedAbideConfig(object):
|
|||
unparsed_abide.ltime = ltime
|
||||
unparsed_abide.tenants = data["tenants"]
|
||||
unparsed_abide.admin_rules = data["admin_rules"]
|
||||
unparsed_abide.semaphores = data.get("semaphores", [])
|
||||
return unparsed_abide
|
||||
|
||||
|
||||
|
@ -7142,6 +7148,9 @@ class Layout(object):
|
|||
# It's ok to have a duplicate semaphore definition, but only if
|
||||
# they are in different branches of the same repo, and have
|
||||
# the same values.
|
||||
if semaphore.name in self.tenant.global_semaphores:
|
||||
raise Exception("Semaphore %s shadows a global semaphore and "
|
||||
"will be ignored" % (semaphore.name))
|
||||
other = self.semaphores.get(semaphore.name)
|
||||
if other is not None:
|
||||
if not semaphore.source_context.isSameProject(
|
||||
|
@ -7161,6 +7170,19 @@ class Layout(object):
|
|||
return
|
||||
self.semaphores[semaphore.name] = semaphore
|
||||
|
||||
def getSemaphore(self, abide, semaphore_name):
|
||||
if semaphore_name in self.tenant.global_semaphores:
|
||||
return abide.semaphores[semaphore_name]
|
||||
semaphore = self.semaphores.get(semaphore_name)
|
||||
if semaphore:
|
||||
return semaphore
|
||||
# Return an implied semaphore with max=1
|
||||
# TODO: consider deprecating implied semaphores to avoid typo
|
||||
# config errors
|
||||
semaphore = Semaphore(semaphore_name)
|
||||
semaphore.freeze()
|
||||
return semaphore
|
||||
|
||||
def addQueue(self, queue):
|
||||
# Change queues must be unique and cannot be overridden.
|
||||
if queue.name in self.queues:
|
||||
|
@ -7545,9 +7567,10 @@ class Layout(object):
|
|||
|
||||
|
||||
class Semaphore(ConfigObject):
|
||||
def __init__(self, name, max=1):
|
||||
def __init__(self, name, max=1, global_scope=False):
|
||||
super(Semaphore, self).__init__()
|
||||
self.name = name
|
||||
self.global_scope = global_scope
|
||||
self.max = int(max)
|
||||
|
||||
def __ne__(self, other):
|
||||
|
@ -7623,6 +7646,7 @@ class Tenant(object):
|
|||
|
||||
self.authorization_rules = []
|
||||
self.default_auth_realm = None
|
||||
self.global_semaphores = set()
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Tenant {self.name}>"
|
||||
|
@ -7826,6 +7850,7 @@ class UnparsedBranchCache(object):
|
|||
class Abide(object):
|
||||
def __init__(self):
|
||||
self.admin_rules = {}
|
||||
self.semaphores = {}
|
||||
self.tenants = {}
|
||||
# tenant -> project -> list(tpcs)
|
||||
# The project TPCs are stored as a list as we don't check for
|
||||
|
|
|
@ -1296,6 +1296,7 @@ class Scheduler(threading.Thread):
|
|||
try:
|
||||
abide = Abide()
|
||||
loader.loadAdminRules(abide, unparsed_abide)
|
||||
loader.loadSemaphores(abide, unparsed_abide)
|
||||
loader.loadTPCs(abide, unparsed_abide)
|
||||
for tenant_name in tenants_to_load:
|
||||
loader.loadTenant(abide, tenant_name, self.ansible_manager,
|
||||
|
@ -1353,8 +1354,9 @@ class Scheduler(threading.Thread):
|
|||
for tenant_name in deleted_tenants:
|
||||
self.abide.clearTPCs(tenant_name)
|
||||
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide)
|
||||
loader.loadAdminRules(self.abide, self.unparsed_abide)
|
||||
loader.loadSemaphores(self.abide, self.unparsed_abide)
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide)
|
||||
|
||||
if event.smart:
|
||||
# Consider caches always valid
|
||||
|
@ -1999,8 +2001,9 @@ class Scheduler(threading.Thread):
|
|||
tenant_config, from_script=script)
|
||||
self.system_config_cache.set(self.unparsed_abide, self.globals)
|
||||
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide)
|
||||
loader.loadAdminRules(self.abide, self.unparsed_abide)
|
||||
loader.loadSemaphores(self.abide, self.unparsed_abide)
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide)
|
||||
|
||||
def updateSystemConfig(self):
|
||||
with self.layout_lock:
|
||||
|
@ -2019,8 +2022,9 @@ class Scheduler(threading.Thread):
|
|||
for tenant_name in deleted_tenants:
|
||||
self.abide.clearTPCs(tenant_name)
|
||||
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide)
|
||||
loader.loadAdminRules(self.abide, self.unparsed_abide)
|
||||
loader.loadSemaphores(self.abide, self.unparsed_abide)
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide)
|
||||
|
||||
def process_pipelines(self, tenant, tenant_lock):
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
|
|
|
@ -2074,8 +2074,9 @@ class ZuulWeb(object):
|
|||
for tenant_name in deleted_tenants:
|
||||
self.abide.clearTPCs(tenant_name)
|
||||
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide)
|
||||
loader.loadAdminRules(self.abide, self.unparsed_abide)
|
||||
loader.loadSemaphores(self.abide, self.unparsed_abide)
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide)
|
||||
|
||||
def updateLayout(self):
|
||||
self.log.debug("Updating layout state")
|
||||
|
|
|
@ -39,14 +39,23 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
|
|||
log = logging.getLogger("zuul.zk.SemaphoreHandler")
|
||||
|
||||
semaphore_root = "/zuul/semaphores"
|
||||
global_semaphore_root = "/zuul/global-semaphores"
|
||||
|
||||
def __init__(self, client, statsd, tenant_name, layout):
|
||||
def __init__(self, client, statsd, tenant_name, layout, abide):
|
||||
super().__init__(client)
|
||||
self.abide = abide
|
||||
self.layout = layout
|
||||
self.statsd = statsd
|
||||
self.tenant_name = tenant_name
|
||||
self.tenant_root = f"{self.semaphore_root}/{tenant_name}"
|
||||
|
||||
def _makePath(self, semaphore):
|
||||
semaphore_key = quote_plus(semaphore.name)
|
||||
if semaphore.global_scope:
|
||||
return f"{self.global_semaphore_root}/{semaphore_key}"
|
||||
else:
|
||||
return f"{self.tenant_root}/{semaphore_key}"
|
||||
|
||||
def _emitStats(self, semaphore_path, num_holders):
|
||||
if self.statsd is None:
|
||||
return
|
||||
|
@ -80,8 +89,8 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
|
|||
return False
|
||||
return True
|
||||
|
||||
def _acquire_one(self, log, item, job, request_resources, semaphore):
|
||||
if semaphore.resources_first and request_resources:
|
||||
def _acquire_one(self, log, item, job, request_resources, job_semaphore):
|
||||
if job_semaphore.resources_first and request_resources:
|
||||
# We're currently in the resource request phase and want to get the
|
||||
# resources before locking. So we don't need to do anything here.
|
||||
return True
|
||||
|
@ -92,8 +101,8 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
|
|||
# the resources phase.
|
||||
pass
|
||||
|
||||
semaphore_key = quote_plus(semaphore.name)
|
||||
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
|
||||
semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name)
|
||||
semaphore_path = self._makePath(semaphore)
|
||||
semaphore_handle = {
|
||||
"buildset_path": item.current_build_set.getPath(),
|
||||
"job_name": job.name,
|
||||
|
@ -139,10 +148,13 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
|
|||
return holdersFromData(data), zstat
|
||||
|
||||
def getSemaphores(self):
|
||||
try:
|
||||
return self.kazoo_client.get_children(self.tenant_root)
|
||||
except NoNodeError:
|
||||
return []
|
||||
ret = []
|
||||
for root in (self.global_semaphore_root, self.tenant_root):
|
||||
try:
|
||||
ret.extend(self.kazoo_client.get_children(root))
|
||||
except NoNodeError:
|
||||
pass
|
||||
return ret
|
||||
|
||||
def _release(self, log, semaphore_path, semaphore_handle, quiet,
|
||||
legacy_handle=None):
|
||||
|
@ -183,23 +195,31 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
|
|||
|
||||
log = get_annotated_logger(self.log, item.event)
|
||||
|
||||
for semaphore in job.semaphores:
|
||||
self._release_one(log, item, job, semaphore, quiet)
|
||||
for job_semaphore in job.semaphores:
|
||||
self._release_one(log, item, job, job_semaphore, quiet)
|
||||
|
||||
# If a scheduler has been provided (which it is except in the
|
||||
# case of a rollback from acquire in this class), broadcast an
|
||||
# event to trigger pipeline runs.
|
||||
if sched is None:
|
||||
return
|
||||
for pipeline_name in self.layout.pipelines.keys():
|
||||
event = PipelineSemaphoreReleaseEvent()
|
||||
sched.pipeline_management_events[
|
||||
self.tenant_name][pipeline_name].put(
|
||||
event, needs_result=False)
|
||||
|
||||
def _release_one(self, log, item, job, semaphore, quiet):
|
||||
semaphore_key = quote_plus(semaphore.name)
|
||||
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
|
||||
semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name)
|
||||
if semaphore.global_scope:
|
||||
tenants = [t for t in self.abide.tenants.values()
|
||||
if job_semaphore.name in t.global_semaphores]
|
||||
else:
|
||||
tenants = [self.abide.tenants[self.tenant_name]]
|
||||
for tenant in tenants:
|
||||
for pipeline_name in tenant.layout.pipelines.keys():
|
||||
event = PipelineSemaphoreReleaseEvent()
|
||||
sched.pipeline_management_events[
|
||||
tenant.name][pipeline_name].put(
|
||||
event, needs_result=False)
|
||||
|
||||
def _release_one(self, log, item, job, job_semaphore, quiet):
|
||||
semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name)
|
||||
semaphore_path = self._makePath(semaphore)
|
||||
semaphore_handle = {
|
||||
"buildset_path": item.current_build_set.getPath(),
|
||||
"job_name": job.name,
|
||||
|
@ -209,16 +229,16 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
|
|||
legacy_handle)
|
||||
|
||||
def semaphoreHolders(self, semaphore_name):
|
||||
semaphore_key = quote_plus(semaphore_name)
|
||||
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
|
||||
semaphore = self.layout.getSemaphore(self.abide, semaphore_name)
|
||||
semaphore_path = self._makePath(semaphore)
|
||||
try:
|
||||
holders, _ = self.getHolders(semaphore_path)
|
||||
except NoNodeError:
|
||||
holders = []
|
||||
return holders
|
||||
|
||||
def _max_count(self, semaphore_name: str) -> int:
|
||||
semaphore = self.layout.semaphores.get(semaphore_name)
|
||||
def _max_count(self, semaphore_name):
|
||||
semaphore = self.layout.getSemaphore(self.abide, semaphore_name)
|
||||
return 1 if semaphore is None else semaphore.max
|
||||
|
||||
def cleanupLeaks(self):
|
||||
|
@ -240,8 +260,9 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
|
|||
is not None):
|
||||
continue
|
||||
|
||||
semaphore_key = quote_plus(semaphore_name)
|
||||
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
|
||||
semaphore = self.layout.getSemaphore(
|
||||
self.abide, semaphore_name)
|
||||
semaphore_path = self._makePath(semaphore)
|
||||
self.log.error("Releasing leaked semaphore %s held by %s",
|
||||
semaphore_path, holder)
|
||||
self._release(self.log, semaphore_path, holder, quiet=False)
|
||||
|
|
Loading…
Reference in New Issue