Add a model API version

This is a framework for making upgrades to the ZooKeeper data model
in a manner that can support a rolling Zuul system upgrade.

Change-Id: Iff09c95878420e19234908c2a937e9444832a6ec
This commit is contained in:
James E. Blair 2022-01-06 16:03:09 -08:00
parent 4c0fef8c1e
commit 29fbee7375
25 changed files with 338 additions and 59 deletions

View File

@ -19,4 +19,5 @@ Zuul, though advanced users may find it interesting.
javascript
specs/index
zookeeper
model-changelog
releasenotes

View File

@ -0,0 +1,33 @@
Data Model Changelog
====================
Record changes to the ZooKeeper data model which require API version
increases here.
When making a model change:
* Increment the value of ``MODEL_API`` in ``model.py``.
* Update code to use the new API by default and add
backwards-compatibility handling for older versions. This makes it
easier to clean up backwards-compatibility handling in the future.
* Make sure code that special cases model versions either references a
``model_api`` variable or has a comment like `MODEL_API: >
{version}` so that we can grep for that and clean up compatability
code that is no longer needed.
* Add a test to ``test_model_upgrade.py``.
* Add an entry to this log so we can decide when to remove
backwards-compatibility handlers.
Version 0
---------
:Prior Zuul version: 4.11.0
:Description: This is an implied version as of Zuul 4.12.0 to
initialize the series.
Version 1
---------
:Prior Zuul version: 4.11.0
:Description: No change since Version 0. This explicitly records the
component versions in ZooKeeper.

View File

@ -34,6 +34,7 @@ from collections import defaultdict, namedtuple
from queue import Queue
from typing import Callable, Optional, Any, Iterable, Generator, List, Dict
from unittest.case import skipIf
import zlib
import requests
import select
@ -91,6 +92,7 @@ from zuul.driver.elasticsearch import ElasticsearchDriver
from zuul.lib.collections import DefaultKeyDict
from zuul.lib.connections import ConnectionRegistry
from zuul.zk import zkobject, ZooKeeperClient
from zuul.zk.components import SchedulerComponent
from zuul.zk.event_queues import ConnectionEventQueue
from zuul.zk.executor import ExecutorApi
from zuul.zk.locks import tenant_read_lock, pipeline_lock, SessionAwareLock
@ -3900,7 +3902,7 @@ class ZuulWebFixture(fixtures.Fixture):
info=self.info,
connections=self.connections,
authenticators=self.authenticators)
self.connections.load(self.web.zk_client)
self.connections.load(self.web.zk_client, self.web.component_registry)
self.web.start()
self.addCleanup(self.stop)
@ -4171,6 +4173,15 @@ class BaseTestCase(testtools.TestCase):
def getZKPaths(self, path):
return list(self.getZKTree(path).keys())
def getZKObject(self, path):
compressed_data, zstat = self.zk_client.client.get(path)
try:
data = zlib.decompress(compressed_data)
except zlib.error:
# Fallback for old, uncompressed data
data = compressed_data
return data
class SymLink(object):
def __init__(self, target):
@ -4205,7 +4216,8 @@ class SchedulerTestApp:
if validate_tenants is None:
self.connections.registerScheduler(self.sched)
self.connections.load(self.sched.zk_client)
self.connections.load(self.sched.zk_client,
self.sched.component_registry)
# TODO (swestphahl): Can be removed when we no longer use global
# management events.
@ -4393,6 +4405,18 @@ class ZuulTestCase(BaseTestCase):
)
self.merge_server.start()
def _setupModelPin(self):
# Add a fake scheduler to the system that is on the old model
# version.
test_name = self.id().split('.')[-1]
test = getattr(self, test_name)
if hasattr(test, '__model_version__'):
version = getattr(test, '__model_version__')
self.model_test_component_info = SchedulerComponent(
self.zk_client, 'test_component')
self.model_test_component_info.model_api = version
self.model_test_component_info.register()
def setUp(self):
super(ZuulTestCase, self).setUp()
@ -4488,6 +4512,8 @@ class ZuulTestCase(BaseTestCase):
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self._setupModelPin()
self._context_lock = SessionAwareLock(
self.zk_client.client, f"/test/{uuid.uuid4().hex}")
@ -4549,7 +4575,8 @@ class ZuulTestCase(BaseTestCase):
self._context_lock.acquire(blocking=False)
lock = self._context_lock
return zkobject.ZKContext(self.zk_client, lock,
stop_event=None, log=self.log)
None, self.log,
self.scheds.first.sched.component_registry)
def __event_queues(self, matcher) -> List[Queue]:
# TODO (swestphahl): Can be removed when we no longer use global

70
tests/fixtures/layouts/simple.yaml vendored Normal file
View File

@ -0,0 +1,70 @@
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- pipeline:
name: gate
manager: dependent
success-message: Build succeeded (gate).
trigger:
gerrit:
- event: comment-added
approval:
- Approved: 1
success:
gerrit:
Verified: 2
submit: true
failure:
gerrit:
Verified: -2
start:
gerrit:
Verified: 0
precedence: high
- pipeline:
name: post
manager: independent
trigger:
gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
- job:
name: base
parent: null
run: playbooks/base.yaml
nodeset:
nodes:
- label: ubuntu-xenial
name: controller
- job:
name: check-job
run: playbooks/check.yaml
- job:
name: post-job
run: playbooks/post.yaml
- project:
name: org/project
check:
jobs:
- check-job
gate:
jobs:
- check-job
post:
jobs:
- post-job

View File

@ -0,0 +1,55 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from tests.base import ZuulTestCase, simple_layout
def model_version(version):
"""Specify a model version for a model upgrade test
This creates a dummy scheduler component with the specified model
API version. The component is created before any other, so it
will appear to Zuul that it is joining an existing cluster with
data at the old version.
"""
def decorator(test):
test.__model_version__ = version
return test
return decorator
class TestModelUpgrade(ZuulTestCase):
tenant_config_file = "config/single-tenant/main.yaml"
scheduler_count = 1
def getJobData(self, tenant, pipeline):
item_path = f'/zuul/tenant/{tenant}/pipeline/{pipeline}/item'
count = 0
for item in self.zk_client.client.get_children(item_path):
bs_path = f'{item_path}/{item}/buildset'
for buildset in self.zk_client.client.get_children(bs_path):
data = json.loads(self.getZKObject(
f'{bs_path}/{buildset}/job/check-job'))
count += 1
yield data
if not count:
raise Exception("No job data found")
@model_version(0)
@simple_layout('layouts/simple.yaml')
def test_model_upgrade_0_1(self):
pass

View File

@ -70,6 +70,7 @@ class ZooKeeperBaseTestCase(BaseTestCase):
tls_ca=self.zk_chroot_fixture.zookeeper_ca)
self.addCleanup(self.zk_client.disconnect)
self.zk_client.connect()
self.component_registry = ComponentRegistry(self.zk_client)
class TestZookeeperClient(ZooKeeperBaseTestCase):
@ -295,7 +296,8 @@ class TestComponentRegistry(ZooKeeperBaseTestCase):
)
self.addCleanup(self.second_zk_client.disconnect)
self.second_zk_client.connect()
self.component_registry = ComponentRegistry(self.second_zk_client)
self.second_component_registry = ComponentRegistry(
self.second_zk_client)
def assertComponentAttr(self, component_name, attr_name,
attr_value, timeout=10):
@ -303,7 +305,8 @@ class TestComponentRegistry(ZooKeeperBaseTestCase):
timeout,
f"{component_name} in cache has {attr_name} set to {attr_value}",
):
components = list(self.component_registry.all(component_name))
components = list(self.second_component_registry.all(
component_name))
if (
len(components) > 0 and
getattr(components[0], attr_name) == attr_value
@ -319,7 +322,8 @@ class TestComponentRegistry(ZooKeeperBaseTestCase):
for _ in iterate_timeout(
timeout, f"{component_name} in cache is stopped"
):
components = list(self.component_registry.all(component_name))
components = list(self.second_component_registry.all(
component_name))
if len(components) == 0:
break
@ -346,7 +350,7 @@ class TestComponentRegistry(ZooKeeperBaseTestCase):
# Make sure the registry didn't create any read/write
# component objects that re-registered themselves.
components = list(self.component_registry.all('executor'))
components = list(self.second_component_registry.all('executor'))
self.assertEqual(len(components), 1)
self.component_info.state = self.component_info.RUNNING
@ -1476,7 +1480,7 @@ class DummyZKObjectMixin:
def getPath(self):
return f'/zuul/pipeline/{self.name}'
def serialize(self):
def serialize(self, context):
d = {'name': self.name,
'foo': self.foo}
return json.dumps(d).encode('utf-8')
@ -1497,7 +1501,8 @@ class TestZKObject(ZooKeeperBaseTestCase):
# Create a new object
tenant_name = 'fake_tenant'
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
context = ZKContext(self.zk_client, lock, stop_event, self.log,
self.component_registry)
pipeline1 = zkobject_class.new(context,
name=tenant_name,
foo='bar')
@ -1505,7 +1510,8 @@ class TestZKObject(ZooKeeperBaseTestCase):
# Load an object from ZK (that we don't already have)
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
context = ZKContext(self.zk_client, lock, stop_event, self.log,
self.component_registry)
pipeline2 = zkobject_class.fromZK(context,
'/zuul/pipeline/fake_tenant')
self.assertEqual(pipeline2.foo, 'bar')
@ -1516,7 +1522,8 @@ class TestZKObject(ZooKeeperBaseTestCase):
# Update an object
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
context = ZKContext(self.zk_client, lock, stop_event, self.log,
self.component_registry)
ltime1 = get_ltime(pipeline1)
pipeline1.updateAttributes(context, foo='qux')
self.assertEqual(pipeline1.foo, 'qux')
@ -1530,7 +1537,8 @@ class TestZKObject(ZooKeeperBaseTestCase):
# Update an object using an active context
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
context = ZKContext(self.zk_client, lock, stop_event, self.log,
self.component_registry)
ltime1 = get_ltime(pipeline1)
with pipeline1.activeContext(context):
pipeline1.foo = 'baz'
@ -1551,13 +1559,15 @@ class TestZKObject(ZooKeeperBaseTestCase):
# Refresh an existing object
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
context = ZKContext(self.zk_client, lock, stop_event, self.log,
self.component_registry)
pipeline2.refresh(context)
self.assertEqual(pipeline2.foo, 'baz')
# Delete an object
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
context = ZKContext(self.zk_client, lock, stop_event, self.log,
self.component_registry)
self.assertIsNotNone(self.zk_client.client.exists(
'/zuul/pipeline/fake_tenant'))
pipeline2.delete(context)
@ -1600,7 +1610,8 @@ class TestZKObject(ZooKeeperBaseTestCase):
# Fail an update
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
context = ZKContext(self.zk_client, lock, stop_event, self.log,
self.component_registry)
pipeline1 = zkobject_class.new(context,
name=tenant_name,
foo='one')
@ -1649,7 +1660,7 @@ class TestZKObject(ZooKeeperBaseTestCase):
class TestBranchCache(ZooKeeperBaseTestCase):
def test_branch_cache_protected_then_all(self):
conn = DummyConnection()
cache = BranchCache(self.zk_client, conn)
cache = BranchCache(self.zk_client, conn, self.component_registry)
test_data = {
'project1': {
@ -1684,7 +1695,7 @@ class TestBranchCache(ZooKeeperBaseTestCase):
def test_branch_cache_all_then_protected(self):
conn = DummyConnection()
cache = BranchCache(self.zk_client, conn)
cache = BranchCache(self.zk_client, conn, self.component_registry)
test_data = {
'project1': {
@ -1728,7 +1739,7 @@ class TestBranchCache(ZooKeeperBaseTestCase):
def test_branch_cache_change_protected(self):
conn = DummyConnection()
cache = BranchCache(self.zk_client, conn)
cache = BranchCache(self.zk_client, conn, self.component_registry)
data1 = {
'project1': {
@ -1794,7 +1805,8 @@ class TestConfigurationErrorList(ZooKeeperBaseTestCase):
# Create a new object
with tenant_write_lock(self.zk_client, 'test') as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
context = ZKContext(self.zk_client, lock, stop_event, self.log,
self.component_registry)
pipeline = DummyZKObject.new(context, name="test", foo="bar")
e1 = model.ConfigurationError(
source_context, start_mark, "Test error1")

View File

@ -37,6 +37,7 @@ from zuul.lib.keystorage import KeyStorage
from zuul.zk.locks import tenant_write_lock
from zuul.zk.zkobject import ZKContext
from zuul.zk.layout import LayoutState, LayoutStateStore
from zuul.zk.components import ComponentRegistry
# todo This should probably live somewhere else
@ -962,13 +963,15 @@ class Client(zuul.cmd.ZuulApp):
args = self.args
safe_tenant = urllib.parse.quote_plus(args.tenant)
safe_pipeline = urllib.parse.quote_plus(args.pipeline)
component_registry = ComponentRegistry(zk_client)
with tenant_write_lock(zk_client, args.tenant) as lock:
path = f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}'
layout_uuid = None
zk_client.client.delete(
f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}',
recursive=True)
context = ZKContext(zk_client, lock, None, self.log)
context = ZKContext(zk_client, lock, None, self.log,
component_registry)
ps = PipelineState.new(context, _path=path,
layout_uuid=layout_uuid)
# Force everyone to make a new layout for this tenant in

View File

@ -84,7 +84,8 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.connections, self)
if self.args.validate_tenants is None:
self.connections.registerScheduler(self.sched)
self.connections.load(self.sched.zk_client)
self.connections.load(self.sched.zk_client,
self.sched.component_registry)
self.log.info('Starting scheduler')
try:

View File

@ -61,7 +61,8 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
connections=self.connections,
authenticators=self.authenticators,
)
self.connections.load(self.web.zk_client)
self.connections.load(self.web.zk_client,
self.web.component_registry)
except Exception:
self.log.exception("Error creating ZuulWeb:")
sys.exit(1)

View File

@ -68,7 +68,7 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
except Exception:
self.log.exception("Exception reporting event stats")
def onLoad(self, zk_client):
def onLoad(self, zk_client, component_registry):
pass
def onStop(self):

View File

@ -1586,7 +1586,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
}
self.addEvent(event)
def onLoad(self, zk_client):
def onLoad(self, zk_client, component_registry):
self.log.debug("Starting Gerrit Connection/Watchers")
try:
if self.session:
@ -1599,7 +1599,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.read_only = not self.sched
self.log.debug('Creating Zookeeper branch cache')
self._branch_cache = BranchCache(zk_client, self)
self._branch_cache = BranchCache(zk_client, self,
component_registry)
self.log.info("Creating Zookeeper event queue")
self.event_queue = ConnectionEventQueue(

View File

@ -166,7 +166,7 @@ class GitConnection(ZKChangeCacheMixin, BaseConnection):
# Pass the event to the scheduler
self.sched.addTriggerEvent(self.driver_name, event)
def onLoad(self, zk_client):
def onLoad(self, zk_client, component_registry):
self.log.debug("Creating Zookeeper change cache")
self._change_cache = GitChangeCache(zk_client, self)

View File

@ -1234,7 +1234,7 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
})
return d
def onLoad(self, zk_client):
def onLoad(self, zk_client, component_registry):
self.log.info('Starting GitHub connection: %s', self.connection_name)
self._github_client_manager.initialize()
@ -1243,7 +1243,7 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.read_only = not self.sched
self.log.debug('Creating Zookeeper branch cache')
self._branch_cache = BranchCache(zk_client, self)
self._branch_cache = BranchCache(zk_client, self, component_registry)
self.log.debug('Creating Zookeeper event queue')
self.event_queue = ConnectionEventQueue(

View File

@ -484,7 +484,7 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.gitlab_event_connector.stop()
self.gitlab_event_connector.join()
def onLoad(self, zk_client):
def onLoad(self, zk_client, component_registry):
self.log.info('Starting Gitlab connection: %s', self.connection_name)
# Set the project branch cache to read only if no scheduler is
@ -492,7 +492,7 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.read_only = not self.sched
self.log.debug('Creating Zookeeper branch cache')
self._branch_cache = BranchCache(zk_client, self)
self._branch_cache = BranchCache(zk_client, self, component_registry)
self.log.info('Creating Zookeeper event queue')
self.event_queue = ConnectionEventQueue(

View File

@ -62,7 +62,7 @@ class MQTTConnection(BaseConnection):
def _on_disconnect(self, client, userdata, rc):
self.connected = False
def onLoad(self, zk_client):
def onLoad(self, zk_client, component_registry):
self.log.debug("Starting MQTT Connection")
try:
self.client.connect(

View File

@ -497,7 +497,7 @@ class PagureConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
r"^\*\*Metadata Update", re.MULTILINE)
self.sched = None
def onLoad(self, zk_client):
def onLoad(self, zk_client, component_registry):
self.log.info('Starting Pagure connection: %s', self.connection_name)
# Set the project branch cache to read only if no scheduler is
@ -505,7 +505,7 @@ class PagureConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.read_only = not self.sched
self.log.debug('Creating Zookeeper branch cache')
self._branch_cache = BranchCache(zk_client, self)
self._branch_cache = BranchCache(zk_client, self, component_registry)
self.log.info('Creating Zookeeper event queue')
self.event_queue = ConnectionEventQueue(

View File

@ -301,7 +301,7 @@ class SQLConnection(BaseConnection):
else:
alembic.command.upgrade(config, 'head', tag=tag)
def onLoad(self, zk_client):
def onLoad(self, zk_client, component_registry=None):
safe_connection = quote_plus(self.connection_name)
while True:
try:

View File

@ -72,7 +72,7 @@ import zuul.model
from zuul.nodepool import Nodepool
from zuul.version import get_version_string
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.components import ExecutorComponent
from zuul.zk.components import ExecutorComponent, ComponentRegistry
from zuul.zk.exceptions import JobRequestNotFound
from zuul.zk.executor import ExecutorApi
from zuul.zk.job_request_queue import JobRequestEvent
@ -3154,7 +3154,6 @@ class ExecutorServer(BaseMergeServer):
self.keystore = KeyStorage(
self.zk_client,
password=self._get_key_store_password())
self.zk_context = ZKContext(self.zk_client, None, None, self.log)
self._running = False
self._command_running = False
# TODOv3(mordred): make the executor name more unique --
@ -3164,6 +3163,9 @@ class ExecutorServer(BaseMergeServer):
self.component_info = ExecutorComponent(
self.zk_client, self.hostname, version=get_version_string())
self.component_info.register()
self.component_registry = ComponentRegistry(self.zk_client)
self.zk_context = ZKContext(self.zk_client, None, None, self.log,
self.component_registry)
self.monitoring_server = MonitoringServer(self.config, 'executor',
self.component_info)
self.monitoring_server.start()

View File

@ -75,9 +75,9 @@ class ConnectionRegistry(object):
for connection_name, connection in self.connections.items():
connection.registerScheduler(sched)
def load(self, zk_client):
def load(self, zk_client, component_registry):
for connection in self.connections.values():
connection.onLoad(zk_client)
connection.onLoad(zk_client, component_registry)
def reconfigureDrivers(self, tenant):
for driver in self.drivers.values():

View File

@ -47,6 +47,10 @@ from zuul.lib.jsonutil import json_dumps
from zuul.zk import zkobject
from zuul.zk.change_cache import ChangeKey
# When making ZK schema changes, increment this and add a record to
# docs/developer/model-changelog.rst
MODEL_API = 1
MERGER_MERGE = 1 # "git merge"
MERGER_MERGE_RESOLVE = 2 # "git merge -s resolve"
MERGER_CHERRY_PICK = 3 # "git cherry-pick"
@ -270,7 +274,7 @@ class ConfigurationErrorList(zkobject.ShardedZKObject):
def getPath(self):
return self._path
def serialize(self):
def serialize(self, context):
data = {
"errors": [e.serialize() for e in self.errors],
}
@ -652,7 +656,7 @@ class PipelineState(zkobject.ZKObject):
with self.activeContext(context):
self.old_queues.remove(queue)
def serialize(self):
def serialize(self, context):
data = {
"state": self.state,
"consecutive_failures": self.consecutive_failures,
@ -805,7 +809,7 @@ class PipelineChangeList(zkobject.ShardedZKObject):
except NoNodeError:
return cls.new(ctx, pipeline=pipeline)
def serialize(self):
def serialize(self, context):
data = {
"changes": self.changes,
}
@ -852,7 +856,7 @@ class PipelineSummary(zkobject.ShardedZKObject):
status = self.pipeline.formatStatusJSON(zuul_globals.websocket_url)
self.updateAttributes(context, status=status)
def serialize(self):
def serialize(self, context):
data = {
"status": self.status,
}
@ -908,7 +912,7 @@ class ChangeQueue(zkobject.ZKObject):
dynamic=False,
)
def serialize(self):
def serialize(self, context):
data = {
"uuid": self.uuid,
"name": self.name,
@ -1913,7 +1917,7 @@ class JobData(zkobject.ShardedZKObject):
hasher.update(json_dumps(data, sort_keys=True).encode('utf8'))
return hasher.hexdigest()
def serialize(self):
def serialize(self, context):
data = {
"data": self.data,
"hash": self.hash,
@ -2019,7 +2023,7 @@ class FrozenJob(zkobject.ZKObject):
def getPath(self):
return self.jobPath(self.name, self.buildset.getPath())
def serialize(self):
def serialize(self, context):
data = {}
for k in self.attributes:
# TODO: Backwards compat handling, remove after 5.0
@ -3329,7 +3333,7 @@ class ResultData(zkobject.ShardedZKObject):
def getPath(self):
return self._path
def serialize(self):
def serialize(self, context):
data = {
"data": self.data,
"_path": self._path,
@ -3370,7 +3374,7 @@ class Build(zkobject.ZKObject):
build_request_ref=None,
)
def serialize(self):
def serialize(self, context):
data = {
"uuid": self.uuid,
"url": self.url,
@ -3494,7 +3498,7 @@ class RepoFiles(zkobject.ShardedZKObject):
def getPath(self):
return f"{self._buildset_path}/files"
def serialize(self):
def serialize(self, context):
data = {
"connections": self.connections,
"_buildset_path": self._buildset_path,
@ -3530,7 +3534,7 @@ class BaseRepoState(zkobject.ShardedZKObject):
super().__init__()
self._set(state={})
def serialize(self):
def serialize(self, context):
data = {
"state": self.state,
"_buildset_path": self._buildset_path,
@ -3675,7 +3679,7 @@ class BuildSet(zkobject.ZKObject):
def getPath(self):
return f"{self.item.getPath()}/buildset/{self.uuid}"
def serialize(self):
def serialize(self, context):
data = {
# "item": self.item,
"builds": {j: b.getPath() for j, b in self.builds.items()},
@ -4035,7 +4039,7 @@ class QueueItem(zkobject.ZKObject):
def itemPath(cls, pipeline_path, item_uuid):
return f"{pipeline_path}/item/{item_uuid}"
def serialize(self):
def serialize(self, context):
if isinstance(self.event, TriggerEvent):
event_type = "TriggerEvent"
else:

View File

@ -2455,4 +2455,5 @@ class Scheduler(threading.Thread):
tenant.semaphore_handler.release(item, job)
def createZKContext(self, lock, log):
return ZKContext(self.zk_client, lock, self.stop_event, log)
return ZKContext(self.zk_client, lock, self.stop_event, log,
self.component_registry)

View File

@ -1680,7 +1680,8 @@ class ZuulWeb(object):
self.zk_client
)
self.zk_context = ZKContext(self.zk_client, None, None, self.log)
self.zk_context = ZKContext(self.zk_client, None, None, self.log,
self.component_registry)
command_socket = get_default(
self.config, 'web', 'command_socket',

View File

@ -59,7 +59,7 @@ class BranchCacheZKObject(ShardedZKObject):
self._set(protected={},
remainder={})
def serialize(self):
def serialize(self, context):
data = {
"protected": self.protected,
"remainder": self.remainder,
@ -78,7 +78,7 @@ class BranchCacheZKObject(ShardedZKObject):
class BranchCache:
def __init__(self, zk_client, connection):
def __init__(self, zk_client, connection, component_registry):
self.log = logging.getLogger(
f"zuul.BranchCache.{connection.connection_name}")
@ -94,7 +94,8 @@ class BranchCache:
# TODO: standardize on a stop event for connections and add it
# to the context.
self.zk_context = ZKContext(zk_client, self.wlock, None, self.log)
self.zk_context = ZKContext(zk_client, self.wlock, None, self.log,
component_registry)
with locked(self.wlock):
try:

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import json
import logging
import threading
@ -20,6 +21,7 @@ from kazoo.exceptions import NoNodeError
from kazoo.protocol.states import EventType
from zuul.zk import ZooKeeperBase
from zuul import model
COMPONENTS_ROOT = "/zuul/components"
@ -60,6 +62,7 @@ class BaseComponent(ZooKeeperBase):
"state": self.STOPPED,
"kind": self.kind,
"version": version,
"model_api": model.MODEL_API,
}
super().__init__(client)
@ -206,6 +209,7 @@ class ComponentRegistry(ZooKeeperBase):
# kind -> hostname -> component
self._cached_components = defaultdict(dict)
self.model_api = None
# If we are already connected when the class is instantiated, directly
# call the onConnect callback.
if self.client.connected:
@ -223,6 +227,7 @@ class ComponentRegistry(ZooKeeperBase):
self.kazoo_client.ensure_path(root)
self.kazoo_client.ChildrenWatch(
root, self._makeComponentRootWatcher(kind))
self._updateMinimumModelApi()
def _makeComponentRootWatcher(self, kind):
def watch(children, event=None):
@ -280,6 +285,7 @@ class ComponentRegistry(ZooKeeperBase):
component._zstat = stat
self._cached_components[kind][hostname] = component
self._updateMinimumModelApi()
elif (etype == EventType.DELETED or data is None):
self.log.info(
"Noticed %s component %s disappeared",
@ -289,6 +295,7 @@ class ComponentRegistry(ZooKeeperBase):
except KeyError:
# If it's already gone, don't care
pass
self._updateMinimumModelApi()
# Return False to stop the datawatch
return False
@ -308,3 +315,51 @@ class ComponentRegistry(ZooKeeperBase):
# Filter the cached components for the given kind
return self._cached_components.get(kind, {}).values()
def getMinimumModelApi(self):
"""Get the minimum model API version of all currently connected
components"""
# Start with our own version in case we're the only component
# and we haven't registered.
version = model.MODEL_API
for kind, components in self.all():
for component in components:
version = min(version, component.content.get('model_api', 0))
return version
def _updateMinimumModelApi(self):
version = self.getMinimumModelApi()
if version != self.model_api:
self.log.info(f"System minimum data model version {version}; "
f"this component {model.MODEL_API}")
if self.model_api is None:
if version < model.MODEL_API:
self.log.info("The data model version of this component is "
"newer than the rest of the system; this "
"component will operate in compatability mode "
"until the system is upgraded")
elif version > model.MODEL_API:
self.log.error("The data model version of this component is "
"older than the rest of the system; "
"exiting to prevent data corruption")
sys.exit(1)
else:
if version > self.model_api:
if version > model.MODEL_API:
self.log.info("The data model version of this component "
"is older than other components in the "
"system, so other components will operate "
"in a compability mode; upgrade this "
"component as soon as possible to complete "
"the system upgrade")
elif version == model.MODEL_API:
self.log.info("The rest of the system has been upgraded "
"to the data model version of this "
"component")
elif version < self.model_api:
self.log.error("A component with a data model version older "
"than the rest of the system has been started; "
"data corruption is very likely to occur.")
# Should we exit here as well?
self.model_api = version

View File

@ -22,19 +22,25 @@ from kazoo.exceptions import (
from zuul.zk import sharding
from zuul.zk.exceptions import InvalidObjectError
from zuul import model
class ZKContext:
def __init__(self, zk_client, lock, stop_event, log):
def __init__(self, zk_client, lock, stop_event, log, registry):
self.client = zk_client.client
self.lock = lock
self.stop_event = stop_event
self.log = log
self.registry = registry
def sessionIsValid(self):
return ((not self.lock or self.lock.is_still_valid()) and
(not self.stop_event or not self.stop_event.is_set()))
@property
def model_api(self):
return self.registry.model_api
class LocalZKContext:
"""A Local ZKContext that means don't actually write anything to ZK"""
@ -44,10 +50,15 @@ class LocalZKContext:
self.lock = None
self.stop_event = None
self.log = log
self.registry = None
def sessionIsValid(self):
return True
@property
def model_api(self):
return model.MODEL_API
class ZKObject:
_retry_interval = 5
@ -60,7 +71,7 @@ class ZKObject:
"""
raise NotImplementedError()
def serialize(self):
def serialize(self, context):
"""Implement this method to return the data to save in ZK.
:returns: A byte string
@ -154,7 +165,7 @@ class ZKObject:
if isinstance(context, LocalZKContext):
return b''
try:
return self.serialize()
return self.serialize(context)
except Exception:
# A higher level must handle this exception, but log
# ourself here so we know what object triggered it.