Merge "Create a websocket transport"

This commit is contained in:
Jenkins 2016-07-21 05:21:42 +00:00 committed by Gerrit Code Review
commit fa1a86c846
6 changed files with 232 additions and 15 deletions

View File

@ -9,6 +9,7 @@ jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
# Oslo Packages
oslo.i18n>=2.1.0 # Apache-2.0
oslo.utils>=3.15.0 # Apache-2.0
python-keystoneclient!=1.8.0,!=2.1.0,>=1.7.0 # Apache-2.0
osc-lib>=0.3.0 # Apache-2.0

View File

@ -40,6 +40,10 @@ zaqarclient.transport =
http.v2 = zaqarclient.transport.http:HttpTransport
https.v2 = zaqarclient.transport.http:HttpTransport
ws.v1 = zaqarclient.transport.ws:WebsocketTransport
ws.v1.1 = zaqarclient.transport.ws:WebsocketTransport
ws.v2 = zaqarclient.transport.ws:WebsocketTransport
zaqarclient.api =
queues.v1 = zaqarclient.queues.v1.api:V1
queues.v1.1 = zaqarclient.queues.v1.api:V1_1

View File

@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
from zaqarclient.tests import base
from zaqarclient.transport import request
from zaqarclient.transport import ws
class TestWsTransport(base.TestBase):
def setUp(self):
super(TestWsTransport, self).setUp()
os_opts = {
'os_auth_token': 'FAKE_TOKEN',
'os_auth_url': 'http://127.0.0.0:5000/v3',
'os_project_id': 'admin',
'os_service_type': 'messaging-websocket',
}
auth_opts = {'backend': 'keystone',
'options': os_opts}
self.options = {'auth_opts': auth_opts}
self.endpoint = 'ws://127.0.0.1:9000'
@mock.patch.object(ws.WebsocketTransport, "_create_connection")
def test_make_client(self, ws_create_connection):
ws_create_connection.return_value.recv.return_value = json.dumps({
"headers": {
"status": 200
}
})
transport = ws.WebsocketTransport(self.options)
req = request.Request(self.endpoint)
transport.send(req)
ws_create_connection.assert_called_with("ws://127.0.0.1:9000")
@mock.patch.object(ws.WebsocketTransport, "recv")
@mock.patch.object(ws.WebsocketTransport, "_create_connection")
def test_recv(self, ws_create_connection, recv_mock):
send_ack = {
"headers": {
"status": 200
}
}
recv_mock.side_effect = [send_ack, send_ack, send_ack, {
"body": {
"payload": "foo"
}
}, send_ack]
transport = ws.WebsocketTransport(self.options)
req = request.Request(self.endpoint)
transport.send(req)
count = 0
while True:
count += 1
data = transport.recv()
if 'body' in data:
self.assertEqual(data['body']['payload'], 'foo')
break
if count >= 4:
self.fail('Failed to recieve expected message.')

View File

@ -17,10 +17,23 @@ import abc
import six
from zaqarclient.transport import errors
@six.add_metaclass(abc.ABCMeta)
class Transport(object):
# common HTTP codes used by multiple transports
http_to_zaqar = {
400: errors.MalformedRequest,
401: errors.UnauthorizedError,
403: errors.ForbiddenError,
404: errors.ResourceNotFound,
409: errors.ConflictError,
500: errors.InternalServerError,
503: errors.ServiceUnavailableError
}
def __init__(self, options):
self.options = options

View File

@ -18,26 +18,11 @@ import json
from zaqarclient.common import http
from zaqarclient.transport import base
# NOTE(flaper87): Something is completely borked
# with some imports. Using `from ... import errors`
# will end up importing `zaqarclient.errors` instead
# of transports
import zaqarclient.transport.errors as errors
from zaqarclient.transport import response
class HttpTransport(base.Transport):
http_to_zaqar = {
400: errors.MalformedRequest,
401: errors.UnauthorizedError,
403: errors.ForbiddenError,
404: errors.ResourceNotFound,
409: errors.ConflictError,
500: errors.InternalServerError,
503: errors.ServiceUnavailableError
}
def __init__(self, options):
super(HttpTransport, self).__init__(options)
self.client = http.Client()

134
zaqarclient/transport/ws.py Normal file
View File

@ -0,0 +1,134 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import json
import logging
import uuid
from oslo_utils import importutils
from zaqarclient.transport import base
from zaqarclient.transport import request
from zaqarclient.transport import response
websocket = importutils.try_import('websocket')
LOG = logging.getLogger(__name__)
class WebsocketTransport(base.Transport):
"""Zaqar websocket transport.
*NOTE:* Zaqar's websocket interface does not yet appear to work
well with parameters. Until it does the websocket transport may not
integrate with all of zaqarclients higherlevel request. Even so...
websockets today is still quite usable and use of the transport
via lower level API's in zaqarclient work quite nicely. Example:
conf = {
'auth_opts': {
'backend': 'keystone',
'options': {
'os_auth_token': ks.auth_token,
'os_project_id': CONF.zaqar.project_id
}
}
}
endpoint = 'ws://172.19.0.3:9000'
with transport.get_transport_for(endpoint, options=conf) as ws:
req = request.Request(endpoint, 'queue_create',
content=json.dumps({'queue_name': 'foo'}))
resp = ws.send(req)
"""
def __init__(self, options):
super(WebsocketTransport, self).__init__(options)
self._project_id = options['auth_opts']['options']['os_project_id']
self._token = options['auth_opts']['options']['os_auth_token']
self._websocket_client_id = None
self._ws = None
def _init_client(self, endpoint):
"""Initialize a websocket transport client.
:param endpoint: The websocket endpoint. Example: ws://127.0.0.1:9000/.
Required.
:type endpoint: string
"""
self._websocket_client_id = str(uuid.uuid4())
LOG.debug('Instantiating messaging websocket client: %s', endpoint)
self._ws = self._create_connection(endpoint)
auth_req = request.Request(endpoint, 'authenticate',
headers={'X-Auth-Token': self._token})
self.send(auth_req)
def _create_connection(self, endpoint):
return websocket.create_connection(endpoint)
def send(self, request):
if not self._ws:
self._init_client(request.endpoint)
headers = request.headers.copy()
headers.update({
'Client-ID': self._websocket_client_id,
'X-Project-ID': self._project_id
})
msg = {'action': request.operation, 'headers': headers}
if request.content:
msg['body'] = json.loads(request.content)
# NOTE(dprince): Zaqar websockets do not yet seem to support params?!
# Users of this protocol will need to send everything in the body.
if request.params:
LOG.warning('Websocket transport does not yet support params.')
self._ws.send(json.dumps(msg))
ret = self.recv()
resp = response.Response(request, json.dumps(ret.get('body', '')),
headers=ret['headers'],
status_code=int(ret['headers']['status']))
if resp.status_code in self.http_to_zaqar:
kwargs = {}
try:
error_body = json.loads(resp.content)
kwargs['title'] = 'Websocket Transport Error'
kwargs['description'] = error_body['error']
except Exception:
kwargs['text'] = resp.content
raise self.http_to_zaqar[resp.status_code](**kwargs)
return resp
def recv(self):
return json.loads(self._ws.recv())
def cleanup(self):
if self._ws:
self._ws.close()
self._ws = None
def __enter__(self):
"""Return self to allow usage as a context manager"""
return self
def __exit__(self, *exc):
"""Call cleanup when exiting the context manager"""
self.cleanup()