diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 80eddfc4a7..30db44b629 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -490,9 +490,10 @@ class TestScheduler(ZuulTestCase): 'zuul.tenant.tenant-one.pipeline.gate.write_objects', 'zuul.tenant.tenant-one.pipeline.gate.read_znodes', 'zuul.tenant.tenant-one.pipeline.gate.write_znodes', - 'zuul.tenant.tenant-one.pipeline.gate.read_bytes', 'zuul.tenant.tenant-one.pipeline.gate.write_bytes', ]: + # 'zuul.tenant.tenant-one.pipeline.gate.read_bytes' is + # expected to be zero since it's initialized after reading val = self.assertReportedStat(key, kind='g') self.assertTrue(0.0 < float(val) < 60000.0) diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index 7e3c19dfe8..f5c1fd4c42 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -18,6 +18,7 @@ import json import queue import threading import uuid +from unittest import mock import testtools @@ -53,10 +54,12 @@ from tests.base import ( BaseTestCase, HoldableExecutorApi, HoldableMergerApi, iterate_timeout ) -from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext +from zuul.zk.zkobject import ( + ShardedZKObject, ZKObject, ZKContext +) from zuul.zk.locks import tenant_write_lock -from kazoo.exceptions import ZookeeperError, OperationTimeoutError +from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError class ZooKeeperBaseTestCase(BaseTestCase): @@ -2037,3 +2040,72 @@ class TestBlobStore(ZooKeeperBaseTestCase): with testtools.ExpectedException(KeyError): bs.get(path) + + +class TestPipelineInit(ZooKeeperBaseTestCase): + # Test the initialize-on-refresh code paths of various pipeline objects + + def test_pipeline_state_new_object(self): + # Test the initialize-on-refresh code path with no existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, layout.uuid, pipeline.state) + context = ZKContext(self.zk_client, None, None, self.log) + pipeline.state.refresh(context) + self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath())) + + def test_pipeline_state_existing_object(self): + # Test the initialize-on-refresh code path with a pre-existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.manager = mock.Mock() + pipeline.state = model.PipelineState.create( + pipeline, layout.uuid, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + # We refresh the change list here purely for the side effect + # of creating the pipeline state object with no data (the list + # is a subpath of the state object). + pipeline.change_list.refresh(context) + pipeline.state.refresh(context) + self.assertTrue( + self.zk_client.client.exists(pipeline.change_list.getPath())) + self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath())) + + def test_pipeline_change_list_new_object(self): + # Test the initialize-on-refresh code path with no existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, layout.uuid, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + pipeline.change_list.refresh(context) + self.assertTrue( + self.zk_client.client.exists(pipeline.change_list.getPath())) + + def test_pipeline_change_list_new_object_without_lock(self): + # Test the initialize-on-refresh code path if we don't have + # the lock. This should fail. + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, layout.uuid, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + with testtools.ExpectedException(NoNodeError): + pipeline.change_list.refresh(context, allow_init=False) + self.assertIsNone( + self.zk_client.client.exists(pipeline.change_list.getPath())) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 832be780ae..e85d5124e6 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -28,6 +28,8 @@ from zuul.model import ( ) from zuul.zk.change_cache import ChangeKey from zuul.zk.components import COMPONENT_REGISTRY +from zuul.zk.exceptions import LockException +from zuul.zk.locks import pipeline_lock from opentelemetry import trace @@ -95,21 +97,46 @@ class PipelineManager(metaclass=ABCMeta): def _postConfig(self): layout = self.pipeline.tenant.layout self.buildChangeQueues(layout) - with self.sched.createZKContext(None, self.log) as ctx,\ - self.currentContext(ctx): - # Make sure we have state and change list objects, and - # ensure that they exist in ZK. We don't hold the - # pipeline lock, but if they don't exist, that means they - # are new, so no one else will either, so the write on - # create is okay. If they do exist and we have an old - # object, we'll just reuse it. If it does exist and we - # don't have an old object, we'll get a new empty one. - # Regardless, these will not automatically refresh now, so - # they will be out of date until they are refreshed later. - self.pipeline.state = PipelineState.create( - self.pipeline, layout.uuid, self.pipeline.state) - self.pipeline.change_list = PipelineChangeList.create( - self.pipeline) + # Make sure we have state and change list objects. We + # don't actually ensure they exist in ZK here; these are + # just local objects until they are serialized the first + # time. Since we don't hold the pipeline lock, we can't + # reliably perform any read or write operations; we just + # need to ensure we have in-memory objects to work with + # and they will be initialized or loaded on the next + # refresh. + + # These will be out of date until they are refreshed later. + self.pipeline.state = PipelineState.create( + self.pipeline, layout.uuid, self.pipeline.state) + self.pipeline.change_list = PipelineChangeList.create( + self.pipeline) + + # Now, try to acquire a non-blocking pipeline lock and refresh + # them for the side effect of initializing them if necessary. + # In the case of a new pipeline, no one else should have a + # lock anyway, and this helps us avoid emitting a whole bunch + # of errors elsewhere on startup when these objects don't + # exist. If the pipeline already exists and we can't acquire + # the lock, that's fine, we're much less likely to encounter + # read errors elsewhere in that case anyway. + try: + with pipeline_lock( + self.sched.zk_client, self.pipeline.tenant.name, + self.pipeline.name, blocking=False) as lock,\ + self.sched.createZKContext(lock, self.log) as ctx,\ + self.currentContext(ctx): + if not self.pipeline.state.exists(ctx): + # We only do this if the pipeline doesn't exist in + # ZK because in that case, this process should be + # fast since it's empty. If it does exist, + # refreshing it may be slow and since other actors + # won't encounter errors due to its absence, we + # would rather defer the work to later. + self.pipeline.state.refresh(ctx) + self.pipeline.change_list.refresh(ctx) + except LockException: + pass def buildChangeQueues(self, layout): self.log.debug("Building relative_priority queues") diff --git a/zuul/model.py b/zuul/model.py index 8e0ae4eee9..3b6a497104 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -620,6 +620,18 @@ class PipelineState(zkobject.ZKObject): _read_only=False, ) + def _lateInitData(self): + # If we're initializing the object on our initial refresh, + # reset the data to this. + return dict( + state=Pipeline.STATE_NORMAL, + queues=[], + old_queues=[], + consecutive_failures=0, + disabled=False, + layout_uuid=self.pipeline.tenant.layout.uuid, + ) + @classmethod def fromZK(klass, context, path, pipeline, **kw): obj = klass() @@ -632,20 +644,22 @@ class PipelineState(zkobject.ZKObject): @classmethod def create(cls, pipeline, layout_uuid, old_state=None): - # If the object does not exist in ZK, create it with the - # default attributes and the supplied layout UUID. Otherwise, - # return an initialized object (or the old object for reuse) - # without loading any data so that data can be loaded on the - # next refresh. - ctx = pipeline.manager.current_context + # If we are resetting an existing pipeline, we will have an + # old_state, so just clean up the object references there and + # let the next refresh handle updating any data. + if old_state: + old_state._resetObjectRefs() + return old_state + + # Otherwise, we are initializing a pipeline that we haven't + # seen before. It still might exist in ZK, but since we + # haven't seen it, we don't have any object references to + # clean up. We can just start with a clean object, set the + # pipeline reference, and let the next refresh deal with + # whether there might be any data in ZK. state = cls() state._set(pipeline=pipeline) - if state.exists(ctx): - if old_state: - old_state._resetObjectRefs() - return old_state - return state - return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid) + return state def _resetObjectRefs(self): # Update the pipeline references on the queue objects. @@ -712,8 +726,34 @@ class PipelineState(zkobject.ZKObject): # This is so that we can refresh the object in circumstances # where we haven't verified that our local layout matches # what's in ZK. + + # Notably, this need not prevent us from performing the + # initialization below if necessary. The case of the object + # being brand new in ZK supercedes our worry that our old copy + # might be out of date since our old copy is, itself, brand + # new. self._set(_read_only=read_only) - return super().refresh(context) + try: + return super().refresh(context) + except NoNodeError: + # If the object doesn't exist we will receive a + # NoNodeError. This happens because the postConfig call + # creates this object without holding the pipeline lock, + # so it can't determine whether or not it exists in ZK. + # We do hold the pipeline lock here, so if we get this + # error, we know we're initializing the object, and we + # should write it to ZK. + + # Note that typically this code is not used since + # currently other objects end up creating the pipeline + # path in ZK first. It is included in case that ever + # changes. Currently the empty byte-string code path in + # deserialize() is used instead. + context.log.warning("Initializing pipeline state for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + self._set(**self._lateInitData()) + self.internalCreate(context) def deserialize(self, raw, context): # We may have old change objects in the pipeline cache, so @@ -721,6 +761,20 @@ class PipelineState(zkobject.ZKObject): # source change cache. self.pipeline.manager.clearCache() + # If the object doesn't exist we will get back an empty byte + # string. This happens because the postConfig call creates + # this object without holding the pipeline lock, so it can't + # determine whether or not it exists in ZK. We do hold the + # pipeline lock here, so if we get the empty byte string, we + # know we're initializing the object. In that case, we should + # initialize the layout id to the current layout. Nothing + # else needs to be set. + if raw == b'': + context.log.warning("Initializing pipeline state for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + return self._lateInitData() + data = super().deserialize(raw, context) if not self._read_only: @@ -898,9 +952,31 @@ class PipelineChangeList(zkobject.ShardedZKObject): _change_keys=[], ) - def refresh(self, context): - self._retry(context, super().refresh, - context, max_tries=5) + def refresh(self, context, allow_init=True): + # Set allow_init to false to indicate that we don't hold the + # lock and we should not try to initialize the object in ZK if + # it does not exist. + try: + self._retry(context, super().refresh, + context, max_tries=5) + except NoNodeError: + # If the object doesn't exist we will receive a + # NoNodeError. This happens because the postConfig call + # creates this object without holding the pipeline lock, + # so it can't determine whether or not it exists in ZK. + # We do hold the pipeline lock here, so if we get this + # error, we know we're initializing the object, and + # we should write it to ZK. + if allow_init: + context.log.warning( + "Initializing pipeline change list for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + self.internalCreate(context) + else: + # If we're called from a context where we can't + # initialize the change list, re-raise the exception. + raise def getPath(self): return self.getChangeListPath(self.pipeline) @@ -911,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject): return pipeline_path + '/change_list' @classmethod - def create(cls, pipeline, old_list=None): - # If the object does not exist in ZK, create it with the - # default attributes. Otherwise, return an initialized object - # (or the old object for reuse) without loading any data so - # that data can be loaded on the next refresh. - ctx = pipeline.manager.current_context + def create(cls, pipeline): + # This object may or may not exist in ZK, but we using any of + # that data here. We can just start with a clean object, set + # the pipeline reference, and let the next refresh deal with + # whether there might be any data in ZK. change_list = cls() change_list._set(pipeline=pipeline) - if change_list.exists(ctx): - if old_list: - return old_list - return change_list - return cls.new(ctx, pipeline=pipeline) + return change_list def serialize(self, context): data = { @@ -931,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject): } return json.dumps(data, sort_keys=True).encode("utf8") - def deserialize(self, data, context): - data = super().deserialize(data, context) + def deserialize(self, raw, context): + data = super().deserialize(raw, context) change_keys = [] # We must have a dictionary with a 'changes' key; otherwise we # may be reading immediately after truncating. Allow the diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 7f61f3fe4a..a546339c3e 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2354,7 +2354,9 @@ class Scheduler(threading.Thread): for pipeline in tenant.layout.pipelines.values(): self.log.debug("Gather relevant cache items for: %s %s", tenant.name, pipeline.name) - pipeline.change_list.refresh(ctx) + # This will raise an exception and abort the process if + # unable to refresh the change list. + pipeline.change_list.refresh(ctx, allow_init=False) change_keys = pipeline.change_list.getChangeKeys() relevant_changes = pipeline.manager.resolveChangeKeys( change_keys) @@ -2383,8 +2385,16 @@ class Scheduler(threading.Thread): # Update the pipeline changes ctx = self.createZKContext(None, self.log) for pipeline in tenant.layout.pipelines.values(): + # This will raise an exception if it is unable to + # refresh the change list. We will proceed anyway + # and use our data from the last time we did + # refresh in order to avoid stalling trigger + # processing. In this case we may not forward + # some events which are related to changes in the + # pipeline but don't match the pipeline trigger + # criteria. try: - pipeline.change_list.refresh(ctx) + pipeline.change_list.refresh(ctx, allow_init=False) except Exception: self.log.exception( "Unable to refresh pipeline change list for %s", diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index b228ecaa4a..87d76bca67 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -233,7 +233,18 @@ class ZKObject: obj._load(context, path=path) return obj + def internalCreate(self, context): + """Create the object in ZK from an existing ZKObject + + This should only be used in special circumstances: when we + know it's safe to start using a ZKObject before it's actually + created in ZK. Normally use .new() + """ + data = self._trySerialize(context) + self._save(context, data, create=True) + def refresh(self, context): + """Update data from ZK""" self._load(context)