Fix race condition in pipeline change list init

Simon Westphahl describes the race condition:

> [The race condition] can occur after a reconfiguration while
> some schedulers are updating their local layout and some
> already start processing pipelines in the same tenant.
>
> In this case the pipeline manager's `_postConfig()` method that
> calls `PipelineChangeList.create(...)` races with the pipeline
> processor updating the change keys.
>
> This leads to two change lists being written as separate
> shards, that can't be correctly loaded, as all shards combined
> are expected to form a single JSON object.
>
> The sequence of events seems to be the following:
> 1. S1: pipeline processor needs to update the change keys
> 2. S1 the shard writer will delete the `change_list` key with the old
>    shards
> 3. S2: configloader calls the `_postConfig()` method
> 4. S2: `PipelineChangeList.create()` notices that the `change_list` node
>    doesn't exists in Zookeeper:
>    https://opendev.org/zuul/zuul/src/branch/master/zuul/model.py#L921
> 6. S2: the shard writer creates the first shard `0000000000`
> 7. S1: the shard writer creates the second shared `0000000001`
>
> The race condition seems to be introduced with
> Ib1e467b5adb907f93bab0de61da84d2efc22e2a7

That change updated the pipeline manager _postConfig method so
that it no longer acquires the pipeline lock when initalizing the
pipeline state and change lists.  This greatly reduces potential
delays during reconfiguration, but has, perhaps predictably, lead
to the race condition above.

In the commit message for that change, I noted that we might be
able to avoid even more work if we accept some caveats related to
error reporting.  Avoiding that work mean avoiding performing any
writes during _postConfig which addresses the root cause of the
race condition (steps 3-6 above.  Ed. note: there is no step 5).

From the commit message:

> We still need to attach new state and change list objects to
> the pipeline in _postConfig (since our pipeline object is new).
> We also should make sure that the objects exist in ZK before we
> leave that method, so that if a new pipeline is created, other
> schedulers will be able to load the (potentially still empty)
> objects from ZK.  As an alternative, we could avoid even this
> work in _postConfig, but then we would have to handle missing
> objects on refresh, and it would not be possible to tell if the
> object was missing due to it being new or due to an error.  To
> avoid masking errors, we keep the current expectation that we
> will create these objects in ZK on the initial reconfiguration.

The current change does exactly that.  We no longer perform any
ZK write operations on the state and change list objects in
_postConfig.  Instead, inside of the refresh methods, we detect
the cases where they should be newly created and do so at that
time.  This happens with the pipeline lock, so is safe against
any simultaneous operation from other components.

There will be "ERROR" level log messages indicating that reading
the state from ZK has failed when these objects are first
initialized.  To indicate that this is probably okay, they will
now be immediately followed by "WARNING" level messages explaining
that.

Strictly speaking, this particular race should only occur for the
change list object, not the pipeline state, since the race
condition above requires a sharded object and of the two, only
the change list is sharded.  However, to keep the lifecycle of
these two objects matched (and to simplify _postConfig) the same
treatment is applied to both.

Note that change I7fa99cd83a857216321f8d946fd42abd9ec427a3 merged
after Ib1e467b and changed the behavior slightly, introducing the
old_state and old_list arguments.  Curiously, the old_list
argument is effectively unused, so it is removed entirely in this
change.  Old_state still has a purpose and is retained.

Change-Id: I519348e7d5d74e675808e990920480fb6e1fb981
This commit is contained in:
James E. Blair 2023-02-01 16:13:32 -08:00
parent c3334743f6
commit 98dcd51d90
6 changed files with 241 additions and 49 deletions

View File

@ -490,9 +490,10 @@ class TestScheduler(ZuulTestCase):
'zuul.tenant.tenant-one.pipeline.gate.write_objects',
'zuul.tenant.tenant-one.pipeline.gate.read_znodes',
'zuul.tenant.tenant-one.pipeline.gate.write_znodes',
'zuul.tenant.tenant-one.pipeline.gate.read_bytes',
'zuul.tenant.tenant-one.pipeline.gate.write_bytes',
]:
# 'zuul.tenant.tenant-one.pipeline.gate.read_bytes' is
# expected to be zero since it's initialized after reading
val = self.assertReportedStat(key, kind='g')
self.assertTrue(0.0 < float(val) < 60000.0)

View File

@ -18,6 +18,7 @@ import json
import queue
import threading
import uuid
from unittest import mock
import testtools
@ -53,10 +54,12 @@ from tests.base import (
BaseTestCase, HoldableExecutorApi, HoldableMergerApi,
iterate_timeout
)
from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext
from zuul.zk.zkobject import (
ShardedZKObject, ZKObject, ZKContext
)
from zuul.zk.locks import tenant_write_lock
from kazoo.exceptions import ZookeeperError, OperationTimeoutError
from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError
class ZooKeeperBaseTestCase(BaseTestCase):
@ -2037,3 +2040,72 @@ class TestBlobStore(ZooKeeperBaseTestCase):
with testtools.ExpectedException(KeyError):
bs.get(path)
class TestPipelineInit(ZooKeeperBaseTestCase):
# Test the initialize-on-refresh code paths of various pipeline objects
def test_pipeline_state_new_object(self):
# Test the initialize-on-refresh code path with no existing object
tenant = model.Tenant('tenant')
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.state = model.PipelineState.create(
pipeline, layout.uuid, pipeline.state)
context = ZKContext(self.zk_client, None, None, self.log)
pipeline.state.refresh(context)
self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
def test_pipeline_state_existing_object(self):
# Test the initialize-on-refresh code path with a pre-existing object
tenant = model.Tenant('tenant')
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.manager = mock.Mock()
pipeline.state = model.PipelineState.create(
pipeline, layout.uuid, pipeline.state)
pipeline.change_list = model.PipelineChangeList.create(
pipeline)
context = ZKContext(self.zk_client, None, None, self.log)
# We refresh the change list here purely for the side effect
# of creating the pipeline state object with no data (the list
# is a subpath of the state object).
pipeline.change_list.refresh(context)
pipeline.state.refresh(context)
self.assertTrue(
self.zk_client.client.exists(pipeline.change_list.getPath()))
self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
def test_pipeline_change_list_new_object(self):
# Test the initialize-on-refresh code path with no existing object
tenant = model.Tenant('tenant')
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.state = model.PipelineState.create(
pipeline, layout.uuid, pipeline.state)
pipeline.change_list = model.PipelineChangeList.create(
pipeline)
context = ZKContext(self.zk_client, None, None, self.log)
pipeline.change_list.refresh(context)
self.assertTrue(
self.zk_client.client.exists(pipeline.change_list.getPath()))
def test_pipeline_change_list_new_object_without_lock(self):
# Test the initialize-on-refresh code path if we don't have
# the lock. This should fail.
tenant = model.Tenant('tenant')
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.state = model.PipelineState.create(
pipeline, layout.uuid, pipeline.state)
pipeline.change_list = model.PipelineChangeList.create(
pipeline)
context = ZKContext(self.zk_client, None, None, self.log)
with testtools.ExpectedException(NoNodeError):
pipeline.change_list.refresh(context, allow_init=False)
self.assertIsNone(
self.zk_client.client.exists(pipeline.change_list.getPath()))

View File

@ -28,6 +28,8 @@ from zuul.model import (
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
from zuul.zk.exceptions import LockException
from zuul.zk.locks import pipeline_lock
from opentelemetry import trace
@ -95,21 +97,46 @@ class PipelineManager(metaclass=ABCMeta):
def _postConfig(self):
layout = self.pipeline.tenant.layout
self.buildChangeQueues(layout)
with self.sched.createZKContext(None, self.log) as ctx,\
self.currentContext(ctx):
# Make sure we have state and change list objects, and
# ensure that they exist in ZK. We don't hold the
# pipeline lock, but if they don't exist, that means they
# are new, so no one else will either, so the write on
# create is okay. If they do exist and we have an old
# object, we'll just reuse it. If it does exist and we
# don't have an old object, we'll get a new empty one.
# Regardless, these will not automatically refresh now, so
# they will be out of date until they are refreshed later.
self.pipeline.state = PipelineState.create(
self.pipeline, layout.uuid, self.pipeline.state)
self.pipeline.change_list = PipelineChangeList.create(
self.pipeline)
# Make sure we have state and change list objects. We
# don't actually ensure they exist in ZK here; these are
# just local objects until they are serialized the first
# time. Since we don't hold the pipeline lock, we can't
# reliably perform any read or write operations; we just
# need to ensure we have in-memory objects to work with
# and they will be initialized or loaded on the next
# refresh.
# These will be out of date until they are refreshed later.
self.pipeline.state = PipelineState.create(
self.pipeline, layout.uuid, self.pipeline.state)
self.pipeline.change_list = PipelineChangeList.create(
self.pipeline)
# Now, try to acquire a non-blocking pipeline lock and refresh
# them for the side effect of initializing them if necessary.
# In the case of a new pipeline, no one else should have a
# lock anyway, and this helps us avoid emitting a whole bunch
# of errors elsewhere on startup when these objects don't
# exist. If the pipeline already exists and we can't acquire
# the lock, that's fine, we're much less likely to encounter
# read errors elsewhere in that case anyway.
try:
with pipeline_lock(
self.sched.zk_client, self.pipeline.tenant.name,
self.pipeline.name, blocking=False) as lock,\
self.sched.createZKContext(lock, self.log) as ctx,\
self.currentContext(ctx):
if not self.pipeline.state.exists(ctx):
# We only do this if the pipeline doesn't exist in
# ZK because in that case, this process should be
# fast since it's empty. If it does exist,
# refreshing it may be slow and since other actors
# won't encounter errors due to its absence, we
# would rather defer the work to later.
self.pipeline.state.refresh(ctx)
self.pipeline.change_list.refresh(ctx)
except LockException:
pass
def buildChangeQueues(self, layout):
self.log.debug("Building relative_priority queues")

View File

@ -620,6 +620,18 @@ class PipelineState(zkobject.ZKObject):
_read_only=False,
)
def _lateInitData(self):
# If we're initializing the object on our initial refresh,
# reset the data to this.
return dict(
state=Pipeline.STATE_NORMAL,
queues=[],
old_queues=[],
consecutive_failures=0,
disabled=False,
layout_uuid=self.pipeline.tenant.layout.uuid,
)
@classmethod
def fromZK(klass, context, path, pipeline, **kw):
obj = klass()
@ -632,20 +644,22 @@ class PipelineState(zkobject.ZKObject):
@classmethod
def create(cls, pipeline, layout_uuid, old_state=None):
# If the object does not exist in ZK, create it with the
# default attributes and the supplied layout UUID. Otherwise,
# return an initialized object (or the old object for reuse)
# without loading any data so that data can be loaded on the
# next refresh.
ctx = pipeline.manager.current_context
# If we are resetting an existing pipeline, we will have an
# old_state, so just clean up the object references there and
# let the next refresh handle updating any data.
if old_state:
old_state._resetObjectRefs()
return old_state
# Otherwise, we are initializing a pipeline that we haven't
# seen before. It still might exist in ZK, but since we
# haven't seen it, we don't have any object references to
# clean up. We can just start with a clean object, set the
# pipeline reference, and let the next refresh deal with
# whether there might be any data in ZK.
state = cls()
state._set(pipeline=pipeline)
if state.exists(ctx):
if old_state:
old_state._resetObjectRefs()
return old_state
return state
return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid)
return state
def _resetObjectRefs(self):
# Update the pipeline references on the queue objects.
@ -712,8 +726,34 @@ class PipelineState(zkobject.ZKObject):
# This is so that we can refresh the object in circumstances
# where we haven't verified that our local layout matches
# what's in ZK.
# Notably, this need not prevent us from performing the
# initialization below if necessary. The case of the object
# being brand new in ZK supercedes our worry that our old copy
# might be out of date since our old copy is, itself, brand
# new.
self._set(_read_only=read_only)
return super().refresh(context)
try:
return super().refresh(context)
except NoNodeError:
# If the object doesn't exist we will receive a
# NoNodeError. This happens because the postConfig call
# creates this object without holding the pipeline lock,
# so it can't determine whether or not it exists in ZK.
# We do hold the pipeline lock here, so if we get this
# error, we know we're initializing the object, and we
# should write it to ZK.
# Note that typically this code is not used since
# currently other objects end up creating the pipeline
# path in ZK first. It is included in case that ever
# changes. Currently the empty byte-string code path in
# deserialize() is used instead.
context.log.warning("Initializing pipeline state for %s; "
"this is expected only for new pipelines",
self.pipeline.name)
self._set(**self._lateInitData())
self.internalCreate(context)
def deserialize(self, raw, context):
# We may have old change objects in the pipeline cache, so
@ -721,6 +761,20 @@ class PipelineState(zkobject.ZKObject):
# source change cache.
self.pipeline.manager.clearCache()
# If the object doesn't exist we will get back an empty byte
# string. This happens because the postConfig call creates
# this object without holding the pipeline lock, so it can't
# determine whether or not it exists in ZK. We do hold the
# pipeline lock here, so if we get the empty byte string, we
# know we're initializing the object. In that case, we should
# initialize the layout id to the current layout. Nothing
# else needs to be set.
if raw == b'':
context.log.warning("Initializing pipeline state for %s; "
"this is expected only for new pipelines",
self.pipeline.name)
return self._lateInitData()
data = super().deserialize(raw, context)
if not self._read_only:
@ -898,9 +952,31 @@ class PipelineChangeList(zkobject.ShardedZKObject):
_change_keys=[],
)
def refresh(self, context):
self._retry(context, super().refresh,
context, max_tries=5)
def refresh(self, context, allow_init=True):
# Set allow_init to false to indicate that we don't hold the
# lock and we should not try to initialize the object in ZK if
# it does not exist.
try:
self._retry(context, super().refresh,
context, max_tries=5)
except NoNodeError:
# If the object doesn't exist we will receive a
# NoNodeError. This happens because the postConfig call
# creates this object without holding the pipeline lock,
# so it can't determine whether or not it exists in ZK.
# We do hold the pipeline lock here, so if we get this
# error, we know we're initializing the object, and
# we should write it to ZK.
if allow_init:
context.log.warning(
"Initializing pipeline change list for %s; "
"this is expected only for new pipelines",
self.pipeline.name)
self.internalCreate(context)
else:
# If we're called from a context where we can't
# initialize the change list, re-raise the exception.
raise
def getPath(self):
return self.getChangeListPath(self.pipeline)
@ -911,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject):
return pipeline_path + '/change_list'
@classmethod
def create(cls, pipeline, old_list=None):
# If the object does not exist in ZK, create it with the
# default attributes. Otherwise, return an initialized object
# (or the old object for reuse) without loading any data so
# that data can be loaded on the next refresh.
ctx = pipeline.manager.current_context
def create(cls, pipeline):
# This object may or may not exist in ZK, but we using any of
# that data here. We can just start with a clean object, set
# the pipeline reference, and let the next refresh deal with
# whether there might be any data in ZK.
change_list = cls()
change_list._set(pipeline=pipeline)
if change_list.exists(ctx):
if old_list:
return old_list
return change_list
return cls.new(ctx, pipeline=pipeline)
return change_list
def serialize(self, context):
data = {
@ -931,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject):
}
return json.dumps(data, sort_keys=True).encode("utf8")
def deserialize(self, data, context):
data = super().deserialize(data, context)
def deserialize(self, raw, context):
data = super().deserialize(raw, context)
change_keys = []
# We must have a dictionary with a 'changes' key; otherwise we
# may be reading immediately after truncating. Allow the

View File

@ -2354,7 +2354,9 @@ class Scheduler(threading.Thread):
for pipeline in tenant.layout.pipelines.values():
self.log.debug("Gather relevant cache items for: %s %s",
tenant.name, pipeline.name)
pipeline.change_list.refresh(ctx)
# This will raise an exception and abort the process if
# unable to refresh the change list.
pipeline.change_list.refresh(ctx, allow_init=False)
change_keys = pipeline.change_list.getChangeKeys()
relevant_changes = pipeline.manager.resolveChangeKeys(
change_keys)
@ -2383,8 +2385,16 @@ class Scheduler(threading.Thread):
# Update the pipeline changes
ctx = self.createZKContext(None, self.log)
for pipeline in tenant.layout.pipelines.values():
# This will raise an exception if it is unable to
# refresh the change list. We will proceed anyway
# and use our data from the last time we did
# refresh in order to avoid stalling trigger
# processing. In this case we may not forward
# some events which are related to changes in the
# pipeline but don't match the pipeline trigger
# criteria.
try:
pipeline.change_list.refresh(ctx)
pipeline.change_list.refresh(ctx, allow_init=False)
except Exception:
self.log.exception(
"Unable to refresh pipeline change list for %s",

View File

@ -233,7 +233,18 @@ class ZKObject:
obj._load(context, path=path)
return obj
def internalCreate(self, context):
"""Create the object in ZK from an existing ZKObject
This should only be used in special circumstances: when we
know it's safe to start using a ZKObject before it's actually
created in ZK. Normally use .new()
"""
data = self._trySerialize(context)
self._save(context, data, create=True)
def refresh(self, context):
"""Update data from ZK"""
self._load(context)