Use pipeline summary from Zookeeper in zuul-web

With this change zuul-web will generate the status JSON on its own by
directly using the data from Zookeeper. This includes the event queue
lengths as well as the pipeline summary.

Change-Id: Ib80d9c019a15dd9de9d694cb62fd34030016c311
This commit is contained in:
Simon Westphahl 2021-11-08 14:12:40 +01:00 committed by Felix Edel
parent 660b4237ae
commit 59edeaf3d1
6 changed files with 105 additions and 103 deletions

View File

@ -4064,6 +4064,7 @@ class ZuulWebFixture(fixtures.Fixture):
info=self.info,
connections=self.connections,
authenticators=self.authenticators)
self.connections.load(self.web.zk_client)
self.web.start()
self.addCleanup(self.stop)

View File

@ -1075,19 +1075,18 @@ class TestScheduler(ZuulTestCase):
self.waitUntilSettled()
time.sleep(2)
data = json.loads(self.scheds.first.sched
.formatStatusJSON('tenant-one'))
found_job = None
for pipeline in data['pipelines']:
if pipeline['name'] != 'gate':
continue
for queue in pipeline['change_queues']:
for head in queue['heads']:
for item in head:
for job in item['jobs']:
if job['name'] == 'project-merge':
found_job = job
break
pipeline = self.scheds.first.sched.abide.tenants[
'tenant-one'].layout.pipelines['gate']
pipeline_status = pipeline.formatStatusJSON(
self.scheds.first.sched.globals.websocket_url)
for queue in pipeline_status['change_queues']:
for head in queue['heads']:
for item in head:
for job in item['jobs']:
if job['name'] == 'project-merge':
found_job = job
break
self.assertIsNotNone(found_job)
if iteration == 1:
@ -4155,10 +4154,11 @@ class TestScheduler(ZuulTestCase):
self.waitUntilSettled()
def get_job():
data = json.loads(self.scheds.first.sched
.formatStatusJSON('tenant-one'))
for pipeline in data['pipelines']:
for queue in pipeline['change_queues']:
tenant = self.scheds.first.sched.abide.tenants['tenant-one']
for pipeline in tenant.layout.pipelines.values():
pipeline_status = pipeline.formatStatusJSON(
self.scheds.first.sched.globals.websocket_url)
for queue in pipeline_status['change_queues']:
for head in queue['heads']:
for item in head:
for job in item['jobs']:
@ -4333,11 +4333,13 @@ class TestScheduler(ZuulTestCase):
# Ensure that the status json has the ref so we can render it in the
# web ui.
data = json.loads(self.scheds.first.sched
.formatStatusJSON('tenant-one'))
pipeline = [x for x in data['pipelines'] if x['name'] == 'periodic'][0]
first = pipeline['change_queues'][0]['heads'][0][0]
second = pipeline['change_queues'][1]['heads'][0][0]
pipeline = self.scheds.first.sched.abide.tenants[
'tenant-one'].layout.pipelines['periodic']
pipeline_status = pipeline.formatStatusJSON(
self.scheds.first.sched.globals.websocket_url)
first = pipeline_status['change_queues'][0]['heads'][0][0]
second = pipeline_status['change_queues'][1]['heads'][0][0]
self.assertIn(first['ref'], ['refs/heads/master', 'refs/heads/stable'])
self.assertIn(second['ref'],
['refs/heads/master', 'refs/heads/stable'])

View File

@ -61,6 +61,7 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
connections=self.connections,
authenticators=self.authenticators,
)
self.connections.load(self.web.zk_client)
except Exception:
self.log.exception("Error creating ZuulWeb:")
sys.exit(1)

View File

@ -15,7 +15,6 @@
import json
import logging
import time
from abc import ABCMeta
from typing import List
@ -148,7 +147,6 @@ class RPCListener(RPCListenerBase):
'get_admin_tenants',
'get_running_jobs',
'tenant_list',
'status_get',
'job_get',
'job_list',
'project_get',
@ -311,16 +309,6 @@ class RPCListener(RPCListenerBase):
'queue': queue_size})
job.sendWorkComplete(json.dumps(output))
def handle_status_get(self, job):
args = json.loads(job.arguments)
start = time.monotonic()
output = self.sched.formatStatusJSON(args.get("tenant"))
end = time.monotonic()
self.log.debug('Formatting tenant %s status took %.3f seconds for '
'%d bytes', args.get("tenant"), end - start,
len(output))
job.sendWorkComplete(output)
def handle_job_get(self, gear_job):
args = json.loads(gear_job.arguments)
tenant = self.sched.abide.tenants.get(args.get("tenant"))

View File

@ -15,7 +15,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import socket
import sys
@ -2287,58 +2286,6 @@ class Scheduler(threading.Thread):
self.log.warning("Duplicate nodes provisioned event: %s",
event)
def formatStatusJSON(self, tenant_name):
# TODOv3(jeblair): use tenants
data = {}
data['zuul_version'] = self.zuul_version
data['trigger_event_queue'] = {}
data['trigger_event_queue']['length'] = len(
self.trigger_events[tenant_name])
data['management_event_queue'] = {}
data['management_event_queue']['length'] = len(
self.management_events[tenant_name]
) + self.reconfigure_event_queue.qsize()
data['connection_event_queues'] = {}
for connection in self.connections.connections.values():
queue = connection.getEventQueue()
if queue is not None:
data['connection_event_queues'][connection.connection_name] = {
'length': len(queue),
}
layout_state = self.tenant_layout_state.get(tenant_name)
if layout_state:
data['last_reconfigured'] = layout_state.last_reconfigured * 1000
pipelines = []
data['pipelines'] = pipelines
tenant = self.abide.tenants.get(tenant_name)
if not tenant:
if tenant_name not in self.unparsed_abide.tenants:
return json.dumps({
"message": "Unknown tenant",
"code": 404
})
self.log.warning("Tenant %s isn't loaded" % tenant_name)
return json.dumps({
"message": "Tenant %s isn't ready" % tenant_name,
"code": 204
})
trigger_event_queues = self.pipeline_trigger_events[tenant_name]
result_event_queues = self.pipeline_result_events[tenant_name]
management_event_queues = self.pipeline_management_events[tenant_name]
for pipeline in tenant.layout.pipelines.values():
status = pipeline.formatStatusJSON(self.globals.websocket_url)
status['trigger_events'] = len(
trigger_event_queues[pipeline.name])
status['result_events'] = len(
result_event_queues[pipeline.name])
status['management_events'] = len(
management_event_queues[pipeline.name])
pipelines.append(status)
return json.dumps(data)
def cancelJob(self, buildset, job, build=None, final=False,
force=False):
"""Cancel a running build

View File

@ -58,13 +58,19 @@ from zuul.version import get_version_string
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ComponentRegistry, WebComponent
from zuul.zk.config_cache import SystemConfigCache
from zuul.zk.event_queues import PipelineManagementEventQueue
from zuul.zk.event_queues import (
TenantManagementEventQueue,
TenantTriggerEventQueue,
PipelineManagementEventQueue,
PipelineResultEventQueue,
PipelineTriggerEventQueue,
)
from zuul.zk.executor import ExecutorApi
from zuul.zk.layout import LayoutStateStore
from zuul.zk.locks import tenant_read_lock
from zuul.zk.nodepool import ZooKeeperNodepool
from zuul.zk.system import ZuulSystem
from zuul.zk.zkobject import LocalZKContext
from zuul.zk.zkobject import LocalZKContext, ZKContext
from zuul.lib.auth import AuthenticatorRegistry
from zuul.lib.config import get_default
@ -898,26 +904,68 @@ class ZuulWebAPI(object):
resp.headers["Access-Control-Allow-Origin"] = "*"
return ret
def _getStatus(self, tenant):
def _getStatus(self, tenant_name):
tenant = self.zuulweb.abide.tenants.get(tenant_name)
if not tenant:
if tenant_name not in self.zuulweb.unparsed_abide.tenants:
raise cherrypy.HTTPError(404, "Unknown tenant")
self.log.warning("Tenant %s isn't loaded", tenant_name)
raise cherrypy.HTTPError(204, f"Tenant {tenant_name} isn't ready")
with self.status_lock:
if tenant not in self.cache or \
(time.time() - self.cache_time[tenant]) > self.cache_expiry:
job = self.rpc.submitJob('zuul:status_get',
{'tenant': tenant})
self.cache[tenant] = json.loads(job.data[0])
self.cache_time[tenant] = time.time()
payload = self.cache[tenant]
if payload.get('code') == 404:
raise cherrypy.HTTPError(404, payload['message'])
self.cache[tenant_name] = self.formatStatus(tenant)
self.cache_time[tenant_name] = time.time()
payload = self.cache[tenant_name]
resp = cherrypy.response
resp.headers["Cache-Control"] = "public, max-age=%d" % \
self.cache_expiry
last_modified = datetime.utcfromtimestamp(self.cache_time[tenant])
resp.headers["Cache-Control"] = f"public, max-age={self.cache_expiry}"
last_modified = datetime.utcfromtimestamp(self.cache_time[tenant_name])
last_modified_header = last_modified.strftime('%a, %d %b %Y %X GMT')
resp.headers["Last-modified"] = last_modified_header
resp.headers['Access-Control-Allow-Origin'] = '*'
return payload
def formatStatus(self, tenant):
data = {}
data['zuul_version'] = self.zuulweb.component_info.version
data['trigger_event_queue'] = {}
data['trigger_event_queue']['length'] = len(
self.zuulweb.trigger_events[tenant.name])
data['management_event_queue'] = {}
data['management_event_queue']['length'] = len(
self.zuulweb.management_events[tenant.name]
)
data['connection_event_queues'] = {}
for connection in self.zuulweb.connections.connections.values():
queue = connection.getEventQueue()
if queue is not None:
data['connection_event_queues'][connection.connection_name] = {
'length': len(queue),
}
layout_state = self.zuulweb.tenant_layout_state[tenant.name]
data['last_reconfigured'] = layout_state.last_reconfigured * 1000
pipelines = []
data['pipelines'] = pipelines
trigger_event_queues = self.zuulweb.pipeline_trigger_events[
tenant.name]
result_event_queues = self.zuulweb.pipeline_result_events[tenant.name]
management_event_queues = self.zuulweb.pipeline_management_events[
tenant.name]
for pipeline in tenant.layout.pipelines.values():
status = pipeline.summary.refresh(self.zuulweb.zk_context)
status['trigger_events'] = len(trigger_event_queues[pipeline.name])
status['result_events'] = len(result_event_queues[pipeline.name])
status['management_events'] = len(
management_event_queues[pipeline.name])
pipelines.append(status)
return data
@cherrypy.expose
@cherrypy.tools.save_params()
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
@ -1498,15 +1546,30 @@ class ZuulWeb(object):
self.zk_client, self.system_config_cache_wake_event.set)
self.local_layout_state = {}
self.pipeline_management_events = (
PipelineManagementEventQueue.createRegistry(self.zk_client)
)
self.connections = connections
self.authenticators = authenticators
self.stream_manager = StreamManager()
self.zone = get_default(self.config, 'web', 'zone')
self.management_events = TenantManagementEventQueue.createRegistry(
self.zk_client)
self.pipeline_management_events = (
PipelineManagementEventQueue.createRegistry(self.zk_client)
)
self.trigger_events = TenantTriggerEventQueue.createRegistry(
self.zk_client, self.connections
)
self.pipeline_trigger_events = (
PipelineTriggerEventQueue.createRegistry(
self.zk_client, self.connections
)
)
self.pipeline_result_events = PipelineResultEventQueue.createRegistry(
self.zk_client
)
self.zk_context = ZKContext(self.zk_client, None, None, self.log)
command_socket = get_default(
self.config, 'web', 'command_socket',
'/var/lib/zuul/web.socket'