Require latest layout for processing mgmt events

A scheduler should only be allowed to process the management event
queue for a tenant when the local layout is up-to-date. This is already
the case for the tenant trigger event queue.

Otherwise the scheduler could forward mangement events to pipelines that
no longer exist.

Change-Id: I3c485b9e54a65e7c1c09dc51b1f2b4c83c45de19
This commit is contained in:
Simon Westphahl 2023-01-19 15:55:00 +01:00
parent 944b9852c9
commit 19668e0bc7
No known key found for this signature in database
1 changed files with 21 additions and 15 deletions

View File

@ -1292,6 +1292,21 @@ class Scheduler(threading.Thread):
self.log.info("Local layout update complete for %s (duration: %s "
"seconds)", tenant_name, duration)
def isTenantLayoutUpToDate(self, tenant_name):
remote_state = self.tenant_layout_state.get(tenant_name)
if remote_state is None:
# The tenant may still be in the
# process of initial configuration
self.wake_event.set()
return False
local_state = self.local_layout_state.get(tenant_name)
if local_state is None or remote_state > local_state:
self.log.debug("Local layout of tenant %s not up to date",
tenant_name)
self.layout_update_event.set()
return False
return True
def _checkTenantSourceConf(self, config):
tenant_config = None
script = False
@ -2034,21 +2049,7 @@ class Scheduler(threading.Thread):
with tenant_read_lock(
self.zk_client, tenant_name, blocking=False
) as tlock:
remote_state = self.tenant_layout_state.get(
tenant_name)
if remote_state is None:
# The tenant may still be in the
# process of initial configuration
self.wake_event.set()
continue
local_state = self.local_layout_state.get(
tenant_name)
if (local_state is None or
remote_state > local_state):
self.log.debug(
"Local layout of tenant %s not up to date",
tenant.name)
self.layout_update_event.set()
if not self.isTenantLayoutUpToDate(tenant_name):
continue
# Get tenant again, as it might have been updated
@ -2470,6 +2471,11 @@ class Scheduler(threading.Thread):
with management_queue_lock(
self.zk_client, tenant.name, blocking=False
):
if not self.isTenantLayoutUpToDate(tenant.name):
self.log.debug(
"Skipping management event queue for tenant %s",
tenant.name)
return
self._process_tenant_management_queue(tenant)
except LockException:
self.log.debug("Skipping locked management event queue"