Add support for websocket-proxy

Reuse the websocketify project

Implements: blueprint stream-via-rest-api

Procedure to use the websocket proxy:

1.Apply the patches in sequence:
https://review.openstack.org/462373
https://review.openstack.org/459086
https://review.openstack.org/462374

2.Run devstack with the above patch. Make sure the "RECLONE=no". Devstack will automatically start a service "zun-wsproxy" in the
same node with zun-api.

3.By default the websocket proxy server will running at 127.0.0.1:6784

4.Create a container with interactive:
    zun create --name test0 -i cirros "/bin/sh"
    zun start test0

5.Run attach command:
    zun attach test0

6.Once connected, you can see the output info from screen service "zun-wsproxy"

Change-Id: Ib905d4852b9b5f6398316651da5224232b003dbc
Signed-off-by: Kevin Zhao <kevin.zhao@linaro.org>
This commit is contained in:
Kevin Zhao 2017-05-03 15:49:51 +08:00
parent 294a5f1204
commit 72a7997080
10 changed files with 478 additions and 5 deletions

39
zun/cmd/wsproxy.py Normal file
View File

@ -0,0 +1,39 @@
# Copyright 2017 Linaro Limited
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
import sys
from zun.common import config
from zun.common import service as zun_service
import zun.conf
from zun.websocket import websocketproxy
CONF = zun.conf.CONF
LOG = logging.getLogger(__name__)
def main():
zun_service.prepare_service(sys.argv)
config.parse_args(sys.argv)
LOG.info("start websocket proxy")
host = CONF.websocket_proxy.wsproxy_host
port = CONF.websocket_proxy.wsproxy_port
websocketproxy.ZunWebSocketProxy(
listen_host=host,
listen_port=port,
file_only=True,
RequestHandlerClass=websocketproxy.ZunProxyRequestHandler
).start_server()

View File

@ -475,3 +475,23 @@ class ClassNotFound(NotFound):
class ApiVersionsIntersect(ZunException):
message = _("Version of %(name)s %(min_ver)s %(max_ver)s intersects "
"with another versions.")
class ConnectionFailed(ZunException):
msg_fmt = _("Failed to connect to remote host")
class SocketException(ZunException):
msg_fmt = _("Socket exceptions")
class InvalidWebsocketUrl(ZunException):
msg_fmt = _("Websocket Url invalid")
class InvalidWebsocketToken(ZunException):
msg_fmt = _("Websocket token is invalid")
class ValidationError(ZunException):
msg_fmt = _("Validation error")

View File

@ -16,6 +16,7 @@ import six
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import uuidutils
from zun.common import consts
from zun.common import exception
@ -401,7 +402,12 @@ class Manager(object):
LOG.debug('Get websocket url from the container: %s', container.uuid)
try:
url = self.driver.get_websocket_url(container)
return url
token = uuidutils.generate_uuid()
access_url = '%s?token=%s' % (CONF.websocket_proxy.base_url, token)
container.websocket_url = url
container.websocket_token = token
container.save(context)
return access_url
except Exception as e:
LOG.error(("Error occurred while calling "
"get websocket url function: %s"),

View File

@ -29,6 +29,7 @@ from zun.conf import profiler
from zun.conf import scheduler
from zun.conf import services
from zun.conf import ssl
from zun.conf import websocket_proxy
from zun.conf import zun_client
CONF = cfg.CONF
@ -49,3 +50,4 @@ ssl.register_opts(CONF)
profiler.register_opts(CONF)
neutron_client.register_opts(CONF)
network.register_opts(CONF)
websocket_proxy.register_opts(CONF)

View File

@ -0,0 +1,92 @@
# Copyright 2017 Linaro Limited
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
wsproxy_group = cfg.OptGroup("websocket_proxy",
title="Websocket Proxy Group",
help="""
Users use the websocket proxy to connect to containers, instead of
connecting to containers directly, hence protects the socket daemon.
""")
wsproxy_opts = [
cfg.URIOpt('base_url',
default='ws://$wsproxy_host:$wsproxy_port/',
help="""
The URL an end user would use to connect to the ``zun-wsproxy`` service.
The ``zun-wsproxy`` service is called with this token enriched URL
and establishes the connection to the proper instance.
Related options:
* The IP address must be the same as the address to which the
``zun-wsproxy`` service is listening (see option ``wsproxy_host``
in this section).
* The port must be the same as ``wsproxy_port``in this section.
"""),
cfg.StrOpt('wsproxy_host',
default='127.0.0.1',
help="""
The IP address which is used by the ``zun-wsproxy`` service to listen
for incoming requests.
The ``zun-wsproxy`` service listens on this IP address for incoming
connection requests.
Related options:
* Ensure that this is the same IP address which is defined in the option
``base_url`` of this section or use ``0.0.0.0`` to listen on all addresses.
"""),
cfg.PortOpt('wsproxy_port',
default=6784,
help="""
The port number which is used by the ``zun-wsproxy`` service to listen
for incoming requests.
The ``zun-wsproxy`` service listens on this port number for incoming
connection requests.
Related options:
* Ensure that this is the same port number as that defined in the option
``base_url`` of this section.
"""),
cfg.ListOpt('allowed_origins',
default=[],
help="""
Adds list of allowed origins to the console websocket proxy to allow
connections from other origin hostnames.
Websocket proxy matches the host header with the origin header to
prevent cross-site requests. This list specifies if any there are
values other than host are allowed in the origin header.
Possible values:
* A list where each element is an allowed origin hostnames, else an empty list
"""),
]
ALL_OPTS = (wsproxy_opts)
def register_opts(conf):
conf.register_group(wsproxy_group)
conf.register_opts(wsproxy_opts, group=wsproxy_group)
def list_opts():
return {wsproxy_group: ALL_OPTS}

View File

@ -453,12 +453,12 @@ class TestManager(base.TestCase):
self.context, container, {})
@mock.patch.object(fake_driver, 'attach')
@mock.patch('zun.container.driver.ContainerDriver.get_websocket_url')
def test_container_attach(self, mock_attach, mock_getwebsocket_url):
@mock.patch.object(Container, 'save')
def test_container_attach(self, mock_save, mock_attach):
container = Container(self.context, **utils.get_test_container())
mock_getwebsocket_url.return_value = "ws://test"
mock_attach.return_value = "ws://test"
self.compute_manager.container_attach(self.context, container)
mock_attach.assert_called_once_with(container)
mock_save.assert_called_with(self.context)
@mock.patch.object(fake_driver, 'attach')
def test_container_attach_failed(self, mock_attach):

View File

@ -79,6 +79,10 @@ class FakeDriver(driver.ContainerDriver):
def attach(self, container):
pass
@check_container_id
def get_websocket_url(self, container):
pass
@check_container_id
def resize(self, container, height, weight):
pass

View File

View File

@ -0,0 +1,44 @@
# Copyright 2017 Linaro Limited
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import socket
import websocket
from zun.common import exception
LOG = logging.getLogger(__name__)
class WebSocketClient(object):
def __init__(self, host_url, escape='~',
close_wait=0.5):
self.escape = escape
self.close_wait = close_wait
self.host_url = host_url
self.cs = None
def connect(self):
url = self.host_url
try:
self.ws = websocket.create_connection(url,
skip_utf8_validation=True)
except socket.error as e:
raise exception.ConnectionFailed(e)
except websocket.WebSocketConnectionClosedException as e:
raise exception.ConnectionFailed(e)
except websocket.WebSocketBadStatusException as e:
raise exception.ConnectionFailed(e)

View File

@ -0,0 +1,266 @@
# Copyright (c) 2017 Linaro Limited
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
'''
Websocket proxy that is compatible with OpenStack Zun.
Leverages websockify.py by Joel Martin
'''
import errno
import select
import socket
import sys
import time
from oslo_log import log as logging
from oslo_utils import uuidutils
import six.moves.urllib.parse as urlparse
import websockify
from zun.common import context
from zun.common import exception
from zun.common.i18n import _
import zun.conf
from zun.db import api as db_api
from zun.websocket.websocketclient import WebSocketClient
LOG = logging.getLogger(__name__)
CONF = zun.conf.CONF
class ZunProxyRequestHandlerBase(object):
def verify_origin_proto(self, access_url, origin_proto):
if not access_url:
detail = _("No access_url available."
"Cannot validate protocol")
raise exception.ValidationError(detail=detail)
expected_protos = [urlparse.urlparse(access_url).scheme]
# NOTE: For serial consoles the expected protocol could be ws or
# wss which correspond to http and https respectively in terms of
# security.
if 'ws' in expected_protos:
expected_protos.append('http')
if 'wss' in expected_protos:
expected_protos.append('https')
return origin_proto in expected_protos
def _send_buffer(self, buff, target, send_all=False):
size = len(buff)
tosend = size
already_sent = 0
while tosend > 0:
try:
# i should be able to send a bytearray
sent = target.send(buff[already_sent:])
if sent == 0:
raise RuntimeError('socket connection broken')
already_sent += sent
tosend -= sent
except socket.error as e:
# if full buffers then wait for them to drain and try again
if e.errno in [errno.EAGAIN, errno.EWOULDBLOCK]:
if send_all:
continue
return buff[already_sent:]
else:
raise exception.SocketException(str(e))
return None
def _handle_ins_outs(self, target, ins, outs):
'''Handle the select file ins and outs
handle the operation ins and outs from select
'''
if self.request in outs:
# Send queued target data to the client
self.c_pend = self.send_frames(self.cqueue)
self.cqueue = []
if self.request in ins:
# Receive client data, decode it, and queue for target
bufs, closed = self.recv_frames()
self.tqueue.extend(bufs)
if closed:
self.msg(_("Client closed connection:"
"%(host)s:%(port)s") % {
'host': self.server.target_host,
'port': self.server.target_port})
raise self.CClose(closed['code'], closed['reason'])
if target in outs:
while self.tqueue:
payload = self.tqueue.pop(0)
remaining = self._send_buffer(payload, target)
if remaining is not None:
self.tqueue.appendleft(remaining)
break
if target in ins:
# Receive target data, encode it and queue for client
buf = target.recv()
if len(buf) == 0:
self.msg(_("Client closed connection:"
"%(host)s:%(port)s") % {
'host': self.server.target_host,
'port': self.server.target_port})
raise self.CClose(1000, "Target closed")
self.cqueue.append(buf)
def do_proxy(self, target):
'''Proxy websocket link
Proxy client WebSocket to normal target socket.
'''
self.cqueue = []
self.tqueue = []
self.c_pend = 0
rlist = [self.request, target]
if self.server.heartbeat:
now = time.time()
self.heartbeat = now + self.server.heartbeat
else:
self.heartbeat = None
while True:
wlist = []
if self.heartbeat is not None:
now = time.time()
if now > self.heartbeat:
self.heartbeat = now + self.server.heartbeat
self.send_ping()
if self.tqueue:
wlist.append(target)
if self.cqueue or self.c_pend:
wlist.append(self.request)
try:
ins, outs, excepts = select.select(rlist, wlist, [], 1)
except (select.error, OSError):
exc = sys.exc_info()[1]
if hasattr(exc, 'errno'):
err = exc.errno
else:
err = exc[0]
if err != errno.EINTR:
raise
else:
continue
if excepts:
raise exception.SocketException()
self._handle_ins_outs(target, ins, outs)
def new_websocket_client(self):
"""Called after a new WebSocket connection has been established."""
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
from eventlet import hubs
hubs.use_hub()
# The zun expected behavior is to have token
# passed to the method GET of the request
parse = urlparse.urlparse(self.path)
if parse.scheme not in ('http', 'https'):
# From a bug in urlparse in Python < 2.7.4 we cannot support
# special schemes (cf: http://bugs.python.org/issue9374)
if sys.version_info < (2, 7, 4):
raise exception.ZunException(
_("We do not support scheme '%s' under Python < 2.7.4, "
"please use http or https") % parse.scheme)
query = parse.query
token = urlparse.parse_qs(query).get("token", [""]).pop()
dbapi = db_api._get_dbdriver_instance()
ctx = context.get_admin_context(all_tenants=True)
self.headerid = self.headers.get("User-Agent")
if uuidutils.is_uuid_like(self.headerid):
container = dbapi.get_container_by_uuid(ctx, self.headerid)
else:
container = dbapi.get_container_by_name(ctx, self.headerid)
if token != container.websocket_token:
raise exception.InvalidWebsocketToken(token)
access_url = '%s?token=%s' % (CONF.websocket_proxy.base_url, token)
# Verify Origin
expected_origin_hostname = self.headers.get('Host')
if ':' in expected_origin_hostname:
e = expected_origin_hostname
if '[' in e and ']' in e:
expected_origin_hostname = e.split(']')[0][1:]
else:
expected_origin_hostname = e.split(':')[0]
expected_origin_hostnames = CONF.websocket_proxy.allowed_origins
expected_origin_hostnames.append(expected_origin_hostname)
origin_url = self.headers.get('Origin')
# missing origin header indicates non-browser client which is OK
if origin_url is not None:
origin = urlparse.urlparse(origin_url)
origin_hostname = origin.hostname
origin_scheme = origin.scheme
if origin_hostname == '' or origin_scheme == '':
detail = _("Origin header not valid.")
raise exception.ValidationError(detail)
if origin_hostname not in expected_origin_hostnames:
detail = _("Origin header does not match this host.")
raise exception.ValidationError(detail)
if not self.verify_origin_proto(access_url, origin_scheme):
detail = _("Origin header protocol does not match this host.")
raise exception.ValidationError(detail)
if container.websocket_url:
target_url = container.websocket_url
escape = "~"
close_wait = 0.5
wscls = WebSocketClient(host_url=target_url, escape=escape,
close_wait=close_wait)
wscls.connect()
self.target = wscls
else:
raise exception.InvalidWebsocketUrl()
# Start proxying
try:
self.do_proxy(self.target.ws)
except Exception as e:
if self.target.ws:
self.target.ws.close()
self.vmsg(_("%Websocket client or target closed"))
raise
class ZunProxyRequestHandler(ZunProxyRequestHandlerBase,
websockify.ProxyRequestHandler):
def __init__(self, *args, **kwargs):
websockify.ProxyRequestHandler.__init__(self, *args, **kwargs)
def socket(self, *args, **kwargs):
return websockify.WebSocketServer.socket(*args, **kwargs)
class ZunWebSocketProxy(websockify.WebSocketProxy):
@staticmethod
def get_logger():
return LOG