Merge "Add web-based console log streaming" into feature/zuulv3
This commit is contained in:
commit
e128b517d0
|
@ -33,6 +33,10 @@ default_username=zuul
|
|||
trusted_ro_dirs=/opt/zuul-scripts:/var/cache
|
||||
trusted_rw_dirs=/opt/zuul-logs
|
||||
|
||||
[web]
|
||||
listen_address=127.0.0.1
|
||||
port=9000
|
||||
|
||||
[webapp]
|
||||
listen_address=0.0.0.0
|
||||
port=8001
|
||||
|
|
|
@ -24,3 +24,5 @@ cryptography>=1.6
|
|||
cachecontrol
|
||||
pyjwt
|
||||
iso8601
|
||||
aiohttp
|
||||
uvloop;python_version>='3.5'
|
||||
|
|
|
@ -26,6 +26,7 @@ console_scripts =
|
|||
zuul-cloner = zuul.cmd.cloner:main
|
||||
zuul-executor = zuul.cmd.executor:main
|
||||
zuul-bwrap = zuul.driver.bubblewrap:main
|
||||
zuul-web = zuul.cmd.web:main
|
||||
|
||||
[build_sphinx]
|
||||
source-dir = doc/source
|
||||
|
|
|
@ -1266,7 +1266,6 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
|
|||
self.build_history = []
|
||||
self.fail_tests = {}
|
||||
self.job_builds = {}
|
||||
self.hostname = 'zl.example.com'
|
||||
|
||||
def failJob(self, name, change):
|
||||
"""Instruct the executor to report matching builds as failures.
|
||||
|
|
|
@ -121,7 +121,8 @@ class TestSQLConnection(ZuulDBTestCase):
|
|||
self.assertEqual('project-merge', buildset0_builds[0]['job_name'])
|
||||
self.assertEqual("SUCCESS", buildset0_builds[0]['result'])
|
||||
self.assertEqual(
|
||||
'finger://zl.example.com/{uuid}'.format(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=buildset0_builds[0]['uuid']),
|
||||
buildset0_builds[0]['log_url'])
|
||||
self.assertEqual('check', buildset1['pipeline'])
|
||||
|
@ -144,7 +145,8 @@ class TestSQLConnection(ZuulDBTestCase):
|
|||
self.assertEqual('project-test1', buildset1_builds[-2]['job_name'])
|
||||
self.assertEqual("FAILURE", buildset1_builds[-2]['result'])
|
||||
self.assertEqual(
|
||||
'finger://zl.example.com/{uuid}'.format(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=buildset1_builds[-2]['uuid']),
|
||||
buildset1_builds[-2]['log_url'])
|
||||
|
||||
|
|
|
@ -14,6 +14,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import logging
|
||||
import json
|
||||
import os
|
||||
import os.path
|
||||
import socket
|
||||
|
@ -21,6 +25,7 @@ import tempfile
|
|||
import threading
|
||||
import time
|
||||
|
||||
import zuul.web
|
||||
import zuul.lib.log_streamer
|
||||
import tests.base
|
||||
|
||||
|
@ -57,6 +62,7 @@ class TestLogStreamer(tests.base.BaseTestCase):
|
|||
class TestStreaming(tests.base.AnsibleZuulTestCase):
|
||||
|
||||
tenant_config_file = 'config/streamer/main.yaml'
|
||||
log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming")
|
||||
|
||||
def setUp(self):
|
||||
super(TestStreaming, self).setUp()
|
||||
|
@ -146,9 +152,116 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
# job and deleted. However, we still have a file handle to it, so we
|
||||
# can make sure that we read the entire contents at this point.
|
||||
# Compact the returned lines into a single string for easy comparison.
|
||||
file_contents = ''.join(logfile.readlines())
|
||||
file_contents = logfile.read()
|
||||
logfile.close()
|
||||
|
||||
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
|
||||
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
|
||||
self.assertEqual(file_contents, self.streaming_data)
|
||||
|
||||
def runWSClient(self, build_uuid, event):
|
||||
async def client(loop, build_uuid, event):
|
||||
uri = 'http://127.0.0.1:9000/console-stream'
|
||||
try:
|
||||
session = aiohttp.ClientSession(loop=loop)
|
||||
async with session.ws_connect(uri) as ws:
|
||||
req = {'uuid': build_uuid, 'logfile': None}
|
||||
ws.send_str(json.dumps(req))
|
||||
event.set() # notify we are connected and req sent
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
self.ws_client_results += msg.data
|
||||
elif msg.type == aiohttp.WSMsgType.CLOSED:
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
break
|
||||
session.close()
|
||||
except Exception as e:
|
||||
self.log.exception("client exception:")
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.set_debug(True)
|
||||
loop.run_until_complete(client(loop, build_uuid, event))
|
||||
loop.close()
|
||||
|
||||
def test_websocket_streaming(self):
|
||||
# Need to set the streaming port before submitting the job
|
||||
finger_port = 7902
|
||||
self.executor_server.log_streaming_port = finger_port
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
|
||||
# We don't have any real synchronization for the ansible jobs, so
|
||||
# just wait until we get our running build.
|
||||
while not len(self.builds):
|
||||
time.sleep(0.1)
|
||||
build = self.builds[0]
|
||||
self.assertEqual(build.name, 'python27')
|
||||
|
||||
build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
|
||||
while not os.path.exists(build_dir):
|
||||
time.sleep(0.1)
|
||||
|
||||
# Need to wait to make sure that jobdir gets set
|
||||
while build.jobdir is None:
|
||||
time.sleep(0.1)
|
||||
build = self.builds[0]
|
||||
|
||||
# Wait for the job to begin running and create the ansible log file.
|
||||
# The job waits to complete until the flag file exists, so we can
|
||||
# safely access the log here. We only open it (to force a file handle
|
||||
# to be kept open for it after the job finishes) but wait to read the
|
||||
# contents until the job is done.
|
||||
ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
|
||||
while not os.path.exists(ansible_log):
|
||||
time.sleep(0.1)
|
||||
logfile = open(ansible_log, 'r')
|
||||
self.addCleanup(logfile.close)
|
||||
|
||||
# Start the finger streamer daemon
|
||||
streamer = zuul.lib.log_streamer.LogStreamer(
|
||||
None, self.host, finger_port, self.executor_server.jobdir_root)
|
||||
self.addCleanup(streamer.stop)
|
||||
|
||||
# Start the web server
|
||||
web_server = zuul.web.ZuulWeb(
|
||||
listen_address='127.0.0.1', listen_port=9000,
|
||||
gear_server='127.0.0.1', gear_port=self.gearman_server.port)
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.set_debug(True)
|
||||
ws_thread = threading.Thread(target=web_server.run, args=(loop,))
|
||||
ws_thread.start()
|
||||
self.addCleanup(loop.close)
|
||||
self.addCleanup(ws_thread.join)
|
||||
self.addCleanup(web_server.stop)
|
||||
|
||||
# Wait until web server is started
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
while s.connect_ex((self.host, 9000)):
|
||||
time.sleep(0.1)
|
||||
|
||||
# Start a thread with the websocket client
|
||||
ws_client_event = threading.Event()
|
||||
self.ws_client_results = ''
|
||||
ws_client_thread = threading.Thread(
|
||||
target=self.runWSClient, args=(build.uuid, ws_client_event)
|
||||
)
|
||||
ws_client_thread.start()
|
||||
ws_client_event.wait()
|
||||
|
||||
# Allow the job to complete
|
||||
flag_file = os.path.join(build_dir, 'test_wait')
|
||||
open(flag_file, 'w').close()
|
||||
|
||||
# Wait for the websocket client to complete, which it should when
|
||||
# it's received the full log.
|
||||
ws_client_thread.join()
|
||||
|
||||
self.waitUntilSettled()
|
||||
|
||||
file_contents = logfile.read()
|
||||
logfile.close()
|
||||
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
|
||||
self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
|
||||
self.assertEqual(file_contents, self.ws_client_results)
|
||||
|
|
|
@ -2289,22 +2289,40 @@ class TestScheduler(ZuulTestCase):
|
|||
status_jobs.append(job)
|
||||
self.assertEqual('project-merge', status_jobs[0]['name'])
|
||||
# TODO(mordred) pull uuids from self.builds
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
|
||||
status_jobs[0]['url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[0]['uuid']),
|
||||
status_jobs[0]['url'])
|
||||
# TOOD(mordred) configure a success-url on the base job
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
|
||||
status_jobs[0]['report_url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[0]['uuid']),
|
||||
status_jobs[0]['report_url'])
|
||||
self.assertEqual('project-test1', status_jobs[1]['name'])
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
|
||||
status_jobs[1]['url'])
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
|
||||
status_jobs[1]['report_url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[1]['uuid']),
|
||||
status_jobs[1]['url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[1]['uuid']),
|
||||
status_jobs[1]['report_url'])
|
||||
|
||||
self.assertEqual('project-test2', status_jobs[2]['name'])
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
|
||||
status_jobs[2]['url'])
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
|
||||
status_jobs[2]['report_url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[2]['uuid']),
|
||||
status_jobs[2]['url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[2]['uuid']),
|
||||
status_jobs[2]['report_url'])
|
||||
|
||||
def test_live_reconfiguration(self):
|
||||
"Test that live reconfiguration works"
|
||||
|
@ -3577,8 +3595,11 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
self.assertEqual('project-merge', job['name'])
|
||||
self.assertEqual('gate', job['pipeline'])
|
||||
self.assertEqual(False, job['retry'])
|
||||
self.assertEqual('finger://zl.example.com/%s' % job['uuid'],
|
||||
job['url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=job['uuid']),
|
||||
job['url'])
|
||||
self.assertEqual(2, len(job['worker']))
|
||||
self.assertEqual(False, job['canceled'])
|
||||
self.assertEqual(True, job['voting'])
|
||||
|
@ -4674,7 +4695,8 @@ class TestSchedulerSuccessURL(ZuulTestCase):
|
|||
|
||||
# NOTE: This default URL is currently hard-coded in executor/server.py
|
||||
self.assertIn(
|
||||
'- docs-draft-test2 finger://zl.example.com/{uuid}'.format(
|
||||
'- docs-draft-test2 finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=uuid_test2),
|
||||
body[3])
|
||||
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
#!/usr/bin/env python
|
||||
# Copyright 2017 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import daemon
|
||||
import extras
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
|
||||
import zuul.cmd
|
||||
import zuul.web
|
||||
|
||||
from zuul.lib.config import get_default
|
||||
|
||||
# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
|
||||
# instead it depends on lockfile-0.9.1 which uses pidfile.
|
||||
pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
|
||||
|
||||
|
||||
class WebServer(zuul.cmd.ZuulApp):
|
||||
|
||||
def parse_arguments(self):
|
||||
parser = argparse.ArgumentParser(description='Zuul Web Server.')
|
||||
parser.add_argument('-c', dest='config',
|
||||
help='specify the config file')
|
||||
parser.add_argument('-d', dest='nodaemon', action='store_true',
|
||||
help='do not run as a daemon')
|
||||
parser.add_argument('--version', dest='version', action='version',
|
||||
version=self._get_version(),
|
||||
help='show zuul version')
|
||||
self.args = parser.parse_args()
|
||||
|
||||
def exit_handler(self, signum, frame):
|
||||
self.web.stop()
|
||||
|
||||
def _main(self):
|
||||
params = dict()
|
||||
|
||||
params['listen_address'] = get_default(self.config,
|
||||
'web', 'listen_address',
|
||||
'127.0.0.1')
|
||||
params['listen_port'] = get_default(self.config, 'web', 'port', 9000)
|
||||
params['gear_server'] = get_default(self.config, 'gearman', 'server')
|
||||
params['gear_port'] = get_default(self.config, 'gearman', 'port', 4730)
|
||||
params['ssl_key'] = get_default(self.config, 'gearman', 'ssl_key')
|
||||
params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
|
||||
try:
|
||||
self.web = zuul.web.ZuulWeb(**params)
|
||||
except Exception as e:
|
||||
self.log.exception("Error creating ZuulWeb:")
|
||||
sys.exit(1)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
signal.signal(signal.SIGUSR1, self.exit_handler)
|
||||
signal.signal(signal.SIGTERM, self.exit_handler)
|
||||
|
||||
self.log.info('Zuul Web Server starting')
|
||||
self.thread = threading.Thread(target=self.web.run,
|
||||
args=(loop,),
|
||||
name='web')
|
||||
self.thread.start()
|
||||
|
||||
try:
|
||||
signal.pause()
|
||||
except KeyboardInterrupt:
|
||||
print("Ctrl + C: asking web server to exit nicely...\n")
|
||||
self.exit_handler(signal.SIGINT, None)
|
||||
|
||||
self.thread.join()
|
||||
loop.stop()
|
||||
loop.close()
|
||||
self.log.info("Zuul Web Server stopped")
|
||||
|
||||
def main(self):
|
||||
self.setup_logging('web', 'log_config')
|
||||
self.log = logging.getLogger("zuul.WebServer")
|
||||
|
||||
try:
|
||||
self._main()
|
||||
except Exception:
|
||||
self.log.exception("Exception from WebServer:")
|
||||
|
||||
|
||||
def main():
|
||||
server = WebServer()
|
||||
server.parse_arguments()
|
||||
server.read_config()
|
||||
|
||||
pid_fn = get_default(server.config, 'web', 'pidfile',
|
||||
'/var/run/zuul-web/zuul-web.pid', expand_user=True)
|
||||
|
||||
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
|
||||
|
||||
if server.args.nodaemon:
|
||||
server.main()
|
||||
else:
|
||||
with daemon.DaemonContext(pidfile=pid):
|
||||
server.main()
|
|
@ -15,6 +15,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import os.path
|
||||
import pwd
|
||||
|
@ -212,6 +213,8 @@ class LogStreamer(object):
|
|||
'''
|
||||
|
||||
def __init__(self, user, host, port, jobdir_root):
|
||||
self.log = logging.getLogger('zuul.lib.LogStreamer')
|
||||
self.log.debug("LogStreamer starting on port %s", port)
|
||||
self.server = CustomForkingTCPServer((host, port),
|
||||
RequestHandler,
|
||||
user=user,
|
||||
|
@ -227,3 +230,4 @@ class LogStreamer(object):
|
|||
if self.thd.isAlive():
|
||||
self.server.shutdown()
|
||||
self.server.server_close()
|
||||
self.log.debug("LogStreamer stopped")
|
||||
|
|
|
@ -86,3 +86,11 @@ class RPCClient(object):
|
|||
|
||||
def shutdown(self):
|
||||
self.gearman.shutdown()
|
||||
|
||||
def get_job_log_stream_address(self, uuid, logfile='console.log'):
|
||||
data = {'uuid': uuid, 'logfile': logfile}
|
||||
job = self.submitJob('zuul:get_job_log_stream_address', data)
|
||||
if job.failure:
|
||||
return False
|
||||
else:
|
||||
return json.loads(job.data[0])
|
||||
|
|
|
@ -53,6 +53,7 @@ class RPCListener(object):
|
|||
self.worker.registerFunction("zuul:enqueue_ref")
|
||||
self.worker.registerFunction("zuul:promote")
|
||||
self.worker.registerFunction("zuul:get_running_jobs")
|
||||
self.worker.registerFunction("zuul:get_job_log_stream_address")
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
|
@ -173,3 +174,29 @@ class RPCListener(object):
|
|||
running_items.append(item.formatJSON())
|
||||
|
||||
job.sendWorkComplete(json.dumps(running_items))
|
||||
|
||||
def handle_get_job_log_stream_address(self, job):
|
||||
# TODO: map log files to ports. Currently there is only one
|
||||
# log stream for a given job. But many jobs produce many
|
||||
# log files, so this is forwards compatible with a future
|
||||
# where there are more logs to potentially request than
|
||||
# "console.log"
|
||||
def find_build(uuid):
|
||||
for tenant in self.sched.abide.tenants.values():
|
||||
for pipeline_name, pipeline in tenant.layout.pipelines.items():
|
||||
for queue in pipeline.queues:
|
||||
for item in queue.queue:
|
||||
for bld in item.current_build_set.getBuilds():
|
||||
if bld.uuid == uuid:
|
||||
return bld
|
||||
return None
|
||||
|
||||
args = json.loads(job.arguments)
|
||||
uuid = args['uuid']
|
||||
# TODO: logfile = args['logfile']
|
||||
job_log_stream_address = {}
|
||||
build = find_build(uuid)
|
||||
if build:
|
||||
job_log_stream_address['server'] = build.worker.hostname
|
||||
job_log_stream_address['port'] = build.worker.log_port
|
||||
job.sendWorkComplete(json.dumps(job_log_stream_address))
|
||||
|
|
|
@ -0,0 +1,232 @@
|
|||
#!/usr/bin/env python
|
||||
# Copyright (c) 2017 Red Hat
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import uvloop
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import web
|
||||
|
||||
import zuul.rpcclient
|
||||
|
||||
|
||||
class LogStreamingHandler(object):
|
||||
log = logging.getLogger("zuul.web.LogStreamingHandler")
|
||||
|
||||
def __init__(self, loop, gear_server, gear_port,
|
||||
ssl_key=None, ssl_cert=None, ssl_ca=None):
|
||||
self.event_loop = loop
|
||||
self.gear_server = gear_server
|
||||
self.gear_port = gear_port
|
||||
self.ssl_key = ssl_key
|
||||
self.ssl_cert = ssl_cert
|
||||
self.ssl_ca = ssl_ca
|
||||
|
||||
def _getPortLocation(self, job_uuid):
|
||||
'''
|
||||
Query Gearman for the executor running the given job.
|
||||
|
||||
:param str job_uuid: The job UUID we want to stream.
|
||||
'''
|
||||
# TODO: Fetch the entire list of uuid/file/server/ports once and
|
||||
# share that, and fetch a new list on cache misses perhaps?
|
||||
# TODO: Avoid recreating a client for each request.
|
||||
rpc = zuul.rpcclient.RPCClient(self.gear_server, self.gear_port,
|
||||
self.ssl_key, self.ssl_cert,
|
||||
self.ssl_ca)
|
||||
ret = rpc.get_job_log_stream_address(job_uuid)
|
||||
rpc.shutdown()
|
||||
return ret
|
||||
|
||||
async def _fingerClient(self, ws, server, port, job_uuid):
|
||||
'''
|
||||
Create a client to connect to the finger streamer and pull results.
|
||||
|
||||
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
|
||||
:param str server: The executor server running the job.
|
||||
:param str port: The executor server port.
|
||||
:param str job_uuid: The job UUID to stream.
|
||||
'''
|
||||
self.log.debug("Connecting to finger server %s:%s", server, port)
|
||||
reader, writer = await asyncio.open_connection(host=server, port=port,
|
||||
loop=self.event_loop)
|
||||
|
||||
self.log.debug("Sending finger request for %s", job_uuid)
|
||||
msg = "%s\n" % job_uuid # Must have a trailing newline!
|
||||
|
||||
writer.write(msg.encode('utf8'))
|
||||
await writer.drain()
|
||||
|
||||
while True:
|
||||
data = await reader.read(1024)
|
||||
if data:
|
||||
await ws.send_str(data.decode('utf8'))
|
||||
else:
|
||||
writer.close()
|
||||
return
|
||||
|
||||
async def _streamLog(self, ws, request):
|
||||
'''
|
||||
Stream the log for the requested job back to the client.
|
||||
|
||||
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
|
||||
:param dict request: The client request parameters.
|
||||
'''
|
||||
for key in ('uuid', 'logfile'):
|
||||
if key not in request:
|
||||
return (4000, "'{key}' missing from request payload".format(
|
||||
key=key))
|
||||
|
||||
# Schedule the blocking gearman work in an Executor
|
||||
gear_task = self.event_loop.run_in_executor(
|
||||
None, self._getPortLocation, request['uuid'])
|
||||
|
||||
try:
|
||||
port_location = await asyncio.wait_for(gear_task, 10)
|
||||
except asyncio.TimeoutError:
|
||||
return (4010, "Gearman timeout")
|
||||
|
||||
if not port_location:
|
||||
return (4011, "Error with Gearman")
|
||||
|
||||
await self._fingerClient(
|
||||
ws, port_location['server'], port_location['port'], request['uuid']
|
||||
)
|
||||
|
||||
return (1000, "No more data")
|
||||
|
||||
async def processRequest(self, request):
|
||||
'''
|
||||
Handle a client websocket request for log streaming.
|
||||
|
||||
:param aiohttp.web.Request request: The client request.
|
||||
'''
|
||||
try:
|
||||
ws = web.WebSocketResponse()
|
||||
await ws.prepare(request)
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
req = json.loads(msg.data)
|
||||
self.log.debug("Websocket request: %s", req)
|
||||
code, msg = await self._streamLog(ws, req)
|
||||
|
||||
# We expect to process only a single message. I.e., we
|
||||
# can stream only a single file at a time.
|
||||
await ws.close(code=code, message=msg)
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
self.log.error(
|
||||
"Websocket connection closed with exception %s",
|
||||
ws.exception()
|
||||
)
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.CLOSED:
|
||||
break
|
||||
except Exception as e:
|
||||
self.log.exception("Websocket exception:")
|
||||
await ws.close(code=4009, message=str(e).encode('utf-8'))
|
||||
return ws
|
||||
|
||||
|
||||
class ZuulWeb(object):
|
||||
|
||||
log = logging.getLogger("zuul.web.ZuulWeb")
|
||||
|
||||
def __init__(self, listen_address, listen_port,
|
||||
gear_server, gear_port,
|
||||
ssl_key=None, ssl_cert=None, ssl_ca=None):
|
||||
self.listen_address = listen_address
|
||||
self.listen_port = listen_port
|
||||
self.gear_server = gear_server
|
||||
self.gear_port = gear_port
|
||||
self.ssl_key = ssl_key
|
||||
self.ssl_cert = ssl_cert
|
||||
self.ssl_ca = ssl_ca
|
||||
|
||||
async def _handleWebsocket(self, request):
|
||||
handler = LogStreamingHandler(self.event_loop,
|
||||
self.gear_server, self.gear_port,
|
||||
self.ssl_key, self.ssl_cert, self.ssl_ca)
|
||||
return await handler.processRequest(request)
|
||||
|
||||
def run(self, loop=None):
|
||||
'''
|
||||
Run the websocket daemon.
|
||||
|
||||
Because this method can be the target of a new thread, we need to
|
||||
set the thread event loop here, rather than in __init__().
|
||||
|
||||
:param loop: The event loop to use. If not supplied, the default main
|
||||
thread event loop is used. This should be supplied if ZuulWeb
|
||||
is run within a separate (non-main) thread.
|
||||
'''
|
||||
routes = [
|
||||
('GET', '/console-stream', self._handleWebsocket)
|
||||
]
|
||||
|
||||
self.log.debug("ZuulWeb starting")
|
||||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||
user_supplied_loop = loop is not None
|
||||
if not loop:
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
self.event_loop = loop
|
||||
|
||||
app = web.Application()
|
||||
for method, path, handler in routes:
|
||||
app.router.add_route(method, path, handler)
|
||||
handler = app.make_handler(loop=self.event_loop)
|
||||
|
||||
# create the server
|
||||
coro = self.event_loop.create_server(handler,
|
||||
self.listen_address,
|
||||
self.listen_port)
|
||||
self.server = self.event_loop.run_until_complete(coro)
|
||||
|
||||
self.term = asyncio.Future()
|
||||
|
||||
# start the server
|
||||
self.event_loop.run_until_complete(self.term)
|
||||
|
||||
# cleanup
|
||||
self.log.debug("ZuulWeb stopping")
|
||||
self.server.close()
|
||||
self.event_loop.run_until_complete(self.server.wait_closed())
|
||||
self.event_loop.run_until_complete(app.shutdown())
|
||||
self.event_loop.run_until_complete(handler.shutdown(60.0))
|
||||
self.event_loop.run_until_complete(app.cleanup())
|
||||
self.log.debug("ZuulWeb stopped")
|
||||
|
||||
# Only run these if we are controlling the loop - they need to be
|
||||
# run from the main thread
|
||||
if not user_supplied_loop:
|
||||
loop.stop()
|
||||
loop.close()
|
||||
|
||||
def stop(self):
|
||||
self.event_loop.call_soon_threadsafe(self.term.set_result, True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.set_debug(True)
|
||||
z = ZuulWeb()
|
||||
z.run(loop)
|
Loading…
Reference in New Issue