Merge "Don't refresh the pipeline state during reconfig"
This commit is contained in:
commit
66c0865985
|
@ -117,8 +117,12 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
ctx = self.sched.createZKContext(lock, self.log)
|
||||
with self.currentContext(ctx):
|
||||
# Since the layout UUID is new, this will move queues
|
||||
# to "old_queues" and refresh the pipeline state as a
|
||||
# side effect.
|
||||
# to "old_queues". Note that it will *not* refresh
|
||||
# the contents, in fact, we will get a new
|
||||
# PipelineState python object with no queues, just as
|
||||
# above. Our state is guaranteed to be out of date
|
||||
# now, but we don't need to do anything with it, we
|
||||
# will let the next actor to use it refresh it then.
|
||||
self.pipeline.state = PipelineState.resetOrCreate(
|
||||
self.pipeline, layout.uuid)
|
||||
self.pipeline.change_list = PipelineChangeList.create(
|
||||
|
|
|
@ -597,18 +597,15 @@ class PipelineState(zkobject.ZKObject):
|
|||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._set(**self.defaultState())
|
||||
|
||||
@classmethod
|
||||
def defaultState(cls):
|
||||
return dict(
|
||||
self._set(
|
||||
state=Pipeline.STATE_NORMAL,
|
||||
queues=[],
|
||||
old_queues=[],
|
||||
consecutive_failures=0,
|
||||
disabled=False,
|
||||
pipeline=None,
|
||||
layout_uuid=None,
|
||||
# Local pipeline reference (not persisted in Zookeeper)
|
||||
pipeline=None,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
|
@ -639,22 +636,46 @@ class PipelineState(zkobject.ZKObject):
|
|||
|
||||
@classmethod
|
||||
def resetOrCreate(cls, pipeline, layout_uuid):
|
||||
# If there is an object in ZK, all the queues will be moved to
|
||||
# old_queues. Then this method will will create a new python
|
||||
# object with no attached queues regardless of the contents of
|
||||
# ZK (so refresh() will need to be called before use).
|
||||
ctx = pipeline.manager.current_context
|
||||
try:
|
||||
state = cls.fromZK(ctx, cls.pipelinePath(pipeline),
|
||||
pipeline=pipeline)
|
||||
if state.layout_uuid != layout_uuid:
|
||||
reset_state = {
|
||||
**cls.defaultState(),
|
||||
"layout_uuid": layout_uuid,
|
||||
"pipeline": pipeline,
|
||||
"old_queues": state.old_queues + state.queues,
|
||||
}
|
||||
state.updateAttributes(ctx, **reset_state)
|
||||
state = cls()
|
||||
state._set(pipeline=pipeline)
|
||||
state._reset(ctx, layout_uuid)
|
||||
return state
|
||||
except NoNodeError:
|
||||
return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid)
|
||||
|
||||
def _reset(self, context, layout_uuid):
|
||||
# Deserialize will recursively load/refresh children, but we
|
||||
# want to avoid that here, so we just load the raw data and
|
||||
# manipulate the top level attributes.
|
||||
raw = self._load(context, deserialize=False)
|
||||
state = json.loads(raw.decode("utf8"))
|
||||
|
||||
if state["layout_uuid"] == layout_uuid:
|
||||
return
|
||||
|
||||
# Note this differs from normal serialization in that we are
|
||||
# dealing with queues only as string path references rather
|
||||
# than objects.
|
||||
reset_state = dict(
|
||||
state=Pipeline.STATE_NORMAL,
|
||||
queues=[],
|
||||
old_queues=state["old_queues"] + state["queues"],
|
||||
consecutive_failures=0,
|
||||
disabled=False,
|
||||
layout_uuid=layout_uuid,
|
||||
)
|
||||
|
||||
raw = json.dumps(reset_state, sort_keys=True).encode("utf8")
|
||||
# Since we only save the object when the layout UUID changes, we can
|
||||
# skip the hash check that we have in other places.
|
||||
self._save(context, raw)
|
||||
|
||||
def getPath(self):
|
||||
if hasattr(self, '_path'):
|
||||
return self._path
|
||||
|
|
|
@ -1514,6 +1514,7 @@ class Scheduler(threading.Thread):
|
|||
self.zk_client, event.tenant_name,
|
||||
identifier=RECONFIG_LOCK_ID) as lock,\
|
||||
self.statsd_timer(f'{stats_key}.reconfiguration_time'):
|
||||
self.log.debug("Loading tenant %s", event.tenant_name)
|
||||
loader.loadTenant(
|
||||
self.abide, event.tenant_name, self.ansible_manager,
|
||||
self.unparsed_abide, min_ltimes=min_ltimes,
|
||||
|
|
|
@ -293,7 +293,7 @@ class ZKObject:
|
|||
context.cumulative_read_bytes += len(compressed_data)
|
||||
return compressed_data, zstat
|
||||
|
||||
def _load(self, context, path=None):
|
||||
def _load(self, context, path=None, deserialize=True):
|
||||
if path is None:
|
||||
path = self.getPath()
|
||||
if context.sessionIsInvalid():
|
||||
|
@ -306,12 +306,15 @@ class ZKObject:
|
|||
context.log.error(
|
||||
"Exception loading ZKObject %s at %s", self, path)
|
||||
raise
|
||||
self._set(_zkobject_hash=None)
|
||||
if deserialize:
|
||||
self._set(_zkobject_hash=None)
|
||||
try:
|
||||
data = zlib.decompress(compressed_data)
|
||||
except zlib.error:
|
||||
# Fallback for old, uncompressed data
|
||||
data = compressed_data
|
||||
if not deserialize:
|
||||
return data
|
||||
self._set(**self.deserialize(data, context))
|
||||
self._set(_zstat=zstat,
|
||||
_zkobject_hash=hash(data),
|
||||
|
@ -347,7 +350,7 @@ class ZKObject:
|
|||
if hasattr(self, '_zstat'):
|
||||
version = self._zstat.version
|
||||
else:
|
||||
version = None
|
||||
version = -1
|
||||
zstat = self._retry(context, self._retryableSave,
|
||||
context, create, path, compressed_data,
|
||||
version)
|
||||
|
|
Loading…
Reference in New Issue