Implement HTTPListener and HTTPClient and add OpenAPI specs

- `HTTPListener`: This class sets up and manages an HTTP server
to listen for incoming messages. It also manages the FlaskApp app
that runs on the HTTP server, registers and deregisters services
with the service broker, handles concurrent requests, listens to
incoming messages, and processes them in batches.

- `HTTPClient`: it represents an HTTP client for making RPC calls
to a target service. It provides methods for performing different
types of RPC calls using the HTTP protocol.

In addition, two placeholder modules `openapi.server` and `openapi.client`
have been added for future auto-generated OpenAPI code integration.
The OpenAPI configuration file that outlines the API specifications
is included as `openapi.yaml`.

Partially-implements: blueprint oslo-http-driver
Change-Id: Ic076f2b83cb6f6aa074ebedc3103463ee0e66667
This commit is contained in:
Xiang Wang 2024-04-03 20:07:53 +09:00 committed by xiang-roger-wang
parent 7bdb665d84
commit 5fee2d754c
9 changed files with 1324 additions and 28 deletions

View File

@ -0,0 +1,71 @@
# Copyright 2024 LY Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module is a temporary placeholder for an auto-generated module of a
Restful API service.
The actual implementation of the API service will be generated and added
by OpenAPI. This module only contains placeholder classes and methods that
are currently used by HTTP driver.
"""
class RpcMessage:
def __init__(self, args, **kwargs):
self.args = args
def __eq__(self, other) :
return vars(self) == vars(other)
class FanoutMessage:
def __init__(self, *args, **kwargs):
pass
def __eq__(self, other) :
return vars(self) == vars(other)
class Configuration:
def __init__(self, *args, **kwargs):
self.verify_ssl = True
self.retries = 3
class ApiClient:
def __init__(self, *args, **kwargs):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
class DefaultApi:
def __init__(self, *args, **kwargs):
pass
def rpc_call(self, *args, **kwargs):
return {'result': 'data'}
def rpc_cast(self, *args, **kwargs):
pass
def rpc_cast_fanout(self, *args, **kwargs):
pass
class ApiException(Exception):
pass

View File

@ -0,0 +1,224 @@
openapi: 3.0.0
info:
description: HTTP Driver RPC API
title: HTTP_RPC
version: "1.0"
servers:
- url: http://localhost:3000
paths:
/call:
post:
description: RPC Call
operationId: rpc_call
parameters:
- explode: false
in: header
name: X-Reverse-Proxy-Endpoint
required: false
schema:
type: string
style: simple
- explode: false
in: header
name: X-Rpchost-Port
required: false
schema:
format: int32
type: integer
style: simple
- explode: false
in: header
name: X-Rpchost
required: false
schema:
type: string
style: simple
requestBody:
content:
application/json:
examples: {}
schema:
$ref: '#/components/schemas/RpcMessage'
responses:
"201":
content:
application/json:
schema:
additionalProperties: true
type: object
description: RPC Call Success
security:
- api_key: []
summary: RPC Call
tags: []
x-openapi-router-controller: oslo_messaging._drivers.http_driver.openapi.server.controllers.default_controller
/cast:
post:
description: RPC Call
operationId: rpc_cast
parameters:
- explode: false
in: header
name: X-Reverse-Proxy-Endpoint
required: false
schema:
type: string
style: simple
- explode: false
in: header
name: X-Rpchost-Port
required: false
schema:
format: int32
type: integer
style: simple
- explode: false
in: header
name: X-Rpchost
required: false
schema:
type: string
style: simple
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/RpcMessage'
responses:
"200":
description: OK
security:
- api_key: []
summary: RPC Call
tags: []
x-openapi-router-controller: oslo_messaging._drivers.http_driver.openapi.server.controllers.default_controller
/cast_fanout:
post:
description: RPC Cast Fanout
operationId: rpc_cast_fanout
parameters:
- explode: false
in: header
name: X-Reverse-Proxy-Endpoint
required: false
schema:
type: string
style: simple
- explode: false
in: header
name: X-Rpchost-Port
required: false
schema:
format: int32
type: integer
style: simple
- explode: false
in: header
name: X-Rpchost
required: false
schema:
type: string
style: simple
requestBody:
$ref: '#/components/requestBodies/FanoutMessage'
responses:
"200":
description: OK
security:
- api_key: []
summary: RPC Call
tags: []
x-openapi-router-controller: oslo_messaging._drivers.http_driver.openapi.server.controllers.default_controller
/healthcheck:
get:
description: It returns a 200 OK if this endpoint is considered healthy.
operationId: healthcheck_get
responses:
"200":
content:
application/json:
schema:
$ref: '#/components/schemas/Healthcheck'
application/json+healthcheck:
examples:
response:
value: |-
{
"checked_at": "2018-06-01T14:33:33.383Z",
}
description: Health Check
summary: Health Check
x-openapi-router-controller: oslo_messaging._drivers.http_driver.openapi.server.controllers.default_controller
components:
requestBodies:
FanoutMessage:
content:
application/json:
schema:
$ref: '#/components/schemas/FanoutMessage'
schemas:
RpcMessage:
example:
args: '{}'
method: method
namespace: namespace
version: version
properties:
method:
minLength: 1
type: string
version:
minLength: 1
type: string
namespace:
minLength: 1
type: string
args:
type: object
required:
- args
- method
type: object
x-examples:
example-1:
method: string
version: string
namespace: string
args: {}
FanoutMessage:
example:
server: server
topic: topic
exchange: exchange
RpcMessage:
args: '{}'
method: method
namespace: namespace
version: version
properties:
topic:
type: string
server:
type: string
exchange:
type: string
RpcMessage:
$ref: '#/components/schemas/RpcMessage'
type: object
Healthcheck:
example:
checked_at: 2000-01-23T04:56:07.000+00:00
properties:
checked_at:
description: The ISO8601 date and time representing the time of a healthcheck
format: date-time
type: string
required:
- checked_at
type: object
securitySchemes:
api_key:
in: header
name: X-Auth
type: apiKey
x-apikeyInfoFunc: oslo_messaging._drivers.http_driver.openapi.server.auth.apikey_auth

View File

@ -0,0 +1,26 @@
# Copyright 2024 LY Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module is a temporary placeholder for an auto-generated module of a
Restful API service.
The actual implementation of the API service will be generated and added
by OpenAPI. This module only contains placeholder classes and methods that
are currently used by HTTP driver.
"""
class JSONEncoder(object):
pass

View File

@ -12,20 +12,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import secrets
import socket
import threading
import uuid
import connexion
import eventlet
from eventlet import wsgi
import greenlet
from oslo_config import cfg
from oslo_log import log as logging
import urllib3
from urllib3.exceptions import TimeoutError
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers.http_driver import consul_operator
from oslo_messaging._drivers.http_driver.openapi import client as \
openapi_client
from oslo_messaging._drivers.http_driver.openapi.server import encoder
from oslo_messaging._drivers.http_driver import service_broker
from oslo_messaging import exceptions
DEFAULT_HOST = socket.gethostbyname(socket.getfqdn())
LOG = logging.getLogger(__name__)
urllib3.disable_warnings()
@ -33,6 +44,42 @@ http_opts = [
cfg.IntOpt('max_rpc_request_retries',
default=5,
help='Maximum number of retries for sending RPC requests'),
cfg.StrOpt('reverse_proxy_endpoint',
help="Reverse Proxy endpoint URL format."
"https://<host>:<port> is an expected format"),
cfg.StrOpt('proxy_endpoint',
help="Proxy endpoint URL format."
"https://<host>:<port> is an expected format"),
cfg.BoolOpt('tls_skip_verify',
default=False,
help='Skip TLS certificate verification when sending '
'requests to the HTTP server of each service. '
'Set this option to True to disable TLS certificate '
'verification. Use with caution as it may expose '
'your application to security risks.'),
cfg.StrOpt('api_config_file',
default="openapi.yaml",
help='The filename of the OpenAPI definition file'),
cfg.StrOpt('api_auth_token',
default="",
help='The authentication token used in the X-Auth header '
'for the HTTP server of each service'),
cfg.IntOpt('listen_timeout',
default=60,
help='The timeout in seconds for listening from HTTP servers'),
cfg.IntOpt('max_concurrent_requests',
default=20,
help='The maximum number of simultaneous requests '
'each HTTP server can handle'),
cfg.BoolOpt('enable_ssl',
default=False,
help='Enable SSL for the HTTP server of this service'),
cfg.StrOpt('ssl_certfile',
default="/etc/oslo_messaging/server.cert",
help='The path to the SSL certificate file for enabling SSL'),
cfg.StrOpt('ssl_keyfile',
default="/etc/oslo_messaging/server.key",
help='The path to the SSL private key file for enabling SSL'),
]
@ -43,42 +90,396 @@ def register_driver_opts(conf):
conf.register_opts(http_opts, group=opt_group)
class HTTPRequestException(Exception):
def __init__(self, message):
message = "Fail to send HTTP request: %s" % message
super(HTTPRequestException, self).__init__(message)
class CallRPCIncomingMessage(base.RpcIncomingMessage):
def __init__(self, ctxt, message, msg_id=None):
super(CallRPCIncomingMessage, self).__init__(ctxt, message)
self.message = message
self.reply_data = None
self.failure = None
self.reply_done = threading.Event()
self.method = message['method']
self.msg_id = msg_id
def __str__(self):
return "msg_id %(msg_id)s, method %(method)s" % \
{'msg_id': self.msg_id,
'method': self.method}
def acknowledge(self):
# Do nothing
pass
def requeue(self):
# Do nothing. Requeue is for the notification driver
pass
def reply(self, reply=None, failure=None, log_failure=True):
"""Sends a reply or failure response.
:param reply: The reply data to be sent.
:type reply: Any
:param failure: The failure data to be sent.
:type failure: Any
:param log_failure: A boolean indicating whether to log the failure.
:type log_failure: bool
:return: None
:rtype: None
"""
self.reply_data = json.dumps(reply, default=self.__json_serialize)
self.failure = failure
if failure:
# Handle failure
data = {"failure": driver_common.serialize_remote_exception(
failure)}
self.reply_data = data
self.reply_done.set()
def get_result(self, timeout=None):
# Since the message is submitted to GreenThreadPoolExecutor
# to be executed asynchronously, we need to wait until the result is
# ready.
is_completed = self.reply_done.wait(timeout)
if not is_completed:
# Operation times out.
raise CallbackTimeoutException()
return self.reply_data, self.failure
def heartbeat(self):
LOG.warning("heartbeat is not supported")
# Serialize set/datetime object.
def __json_serialize(self, obj):
if hasattr(obj, '__iter__'):
return list(obj)
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
else:
return str(obj)
class CastRPCIncomingMessage(base.RpcIncomingMessage):
def __init__(self, ctxt, message, msg_id=None):
super(CastRPCIncomingMessage, self).__init__(ctxt, message,)
self.acknowledge_done = threading.Event()
self.method = message['method']
self.msg_id = msg_id
def __str__(self):
return "msg_id %(msg_id)s, method %(method)s" % \
{'msg_id': self.msg_id,
'method': self.method}
def acknowledge(self):
self.acknowledge_done.set()
def requeue(self):
# Do nothing. Requeue is for the notification driver
pass
def reply(self, reply=None, failure=None, log_failure=True):
# Do nothing
pass
def wait_for_acknowledge(self, timeout=None):
# The message is submitted to GreenThreadPoolExecutor
# to be executed asynchronously, wait for acknowledgement.
is_acknowledged = self.acknowledge_done.wait(timeout)
if not is_acknowledged:
# Operation times out.
raise CallbackTimeoutException()
def heartbeat(self):
LOG.warning("heartbeat is not supported")
class HTTPClient(object):
def __init__(self, conf, allowed_remote_exmods):
"""To be implemented."""
pass # TODO(xiang-roger-wang): Implement __init__ method.
"""Represents an HTTP client for making RPC calls to a target service.
def call(self, service_name, message, timeout):
"""To be implemented."""
pass # TODO(xiang-roger-wang): Implement call method.
This class provides methods for performing different types of RPC calls,
such as `call`, `cast`, and `fanout`, using the HTTP protocol.
def cast(self, service_name, message):
"""To be implemented."""
pass # TODO(xiang-roger-wang): Implement cast method.
The `HTTPClient` class requires an API authentication token,
a list of allowed remote exceptions, a proxy endpoint,
and an optional flag to skip TLS verification.
"""
def fanout(self, service_name, topic, server, exchange, message):
"""To be implemented."""
pass # TODO(xiang-roger-wang): Implement fanout method.
def __init__(self, api_auth_token, allowed_remote_exmods, proxy_endpoint,
tls_skip_verify=False):
self.api_auth_token = api_auth_token
self.allowed_remote_exmods = allowed_remote_exmods
self.proxy_endpoint = proxy_endpoint
self.tls_skip_verify = tls_skip_verify
def request_http(self, service_name, rpc_type, rpc_data, timeout=None):
"""To be implemented."""
pass # TODO(xiang-roger-wang): Implement request_http method.
def call(self, target_service, message, timeout):
# Generate RPC message for target HTTP server
rpc_message = openapi_client.RpcMessage(
method=message['method'],
version=message.get('version', None),
namespace=message.get('namespace', None),
args=message['args'])
ret = self.request_http(target_service, "Call", rpc_message,
timeout=timeout)
return ret
def cast(self, target_service, message):
# Generate RPC message for target HTTP server
rpc_message = openapi_client.RpcMessage(
method=message['method'],
version=message.get('version', None),
namespace=message.get('namespace', None),
args=message['args'])
self.request_http(target_service, "Cast", rpc_message)
def fanout(self, target_service, topic, server, exchange, message):
# Generate RPC message for target HTTP server
fanout_message = openapi_client.FanoutMessage(
topic=topic,
server=server,
exchange=exchange,
rpc_message=openapi_client.RpcMessage(
method=message['method'],
version=message.get('version', None),
namespace=message.get('namespace', None),
args=message['args'])
)
self.request_http(target_service, "Fanout", fanout_message)
def request_http(self, target_service, rpc_type, rpc_data, timeout=None):
"""Sends an HTTP request to the target service.
:param target_service: The target service to send the request to.
:type target_service: service_broker.Service
:param rpc_type: The type of RPC to perform.
:type rpc_type: str
:param rpc_data: The data to send in the RPC.
:type rpc_data: dict
:param timeout: The timeout for the request in seconds.
:type timeout: float, optional
:return: The result of the RPC call, if applicable.
:rtype: Any
:raises HTTPRequestException: If the HTTP request fails.
"""
# If the client has a Proxy, the send the request to the Proxy.
if self.proxy_endpoint:
host = self.proxy_endpoint
else:
host = target_service.host
# Set openapi configurations
openapi_conf = openapi_client.Configuration(
host=host,
api_key={'X-Auth': self.api_auth_token}
)
openapi_conf.verify_ssl = not self.tls_skip_verify
# Disable retry on the urllib3 level.
openapi_conf.retries = 0
x_reverse_proxy_endpoint = target_service.reverse_proxy_endpoint
x_rpchost = target_service.rpchost
x_rpchost_port = target_service.rpchost_port
x_msg_id = uuid.uuid4().hex
with openapi_client.ApiClient(openapi_conf) as api_client:
api_instance = openapi_client.DefaultApi(api_client)
LOG.info(
"Send %(type)s to %(host)s. "
"msg_id: %(msg_id)s, "
"x_reverse_proxy_endpoint: %(rproxy)s, "
"x_rpchost_port: %(rpchost_port)s, x_rpchost: %(rpchost)s"
% {"type": rpc_type, "host": host,
"msg_id": x_msg_id,
"rpchost_port": x_rpchost_port,
"rproxy": x_reverse_proxy_endpoint,
"rpchost": x_rpchost})
try:
if rpc_type == "Call":
result = api_instance.rpc_call(
x_msg_id=x_msg_id,
rpc_message=rpc_data,
_request_timeout=timeout,
x_reverse_proxy_endpoint=x_reverse_proxy_endpoint,
x_rpchost=x_rpchost,
x_rpchost_port=x_rpchost_port).get('result')
if isinstance(result, dict):
if result.get('failure'):
e = driver_common.deserialize_remote_exception(
result.get('failure'),
self.allowed_remote_exmods)
raise e
return result
elif rpc_type == "Cast":
api_instance.rpc_cast(
x_msg_id=x_msg_id,
rpc_message=rpc_data,
x_reverse_proxy_endpoint=x_reverse_proxy_endpoint,
x_rpchost=x_rpchost,
x_rpchost_port=x_rpchost_port)
elif rpc_type == "Fanout":
api_instance.rpc_cast_fanout(
x_msg_id=x_msg_id,
fanout_message=rpc_data,
x_reverse_proxy_endpoint=x_reverse_proxy_endpoint,
x_rpchost=x_rpchost,
x_rpchost_port=x_rpchost_port)
except openapi_client.ApiException as e:
msg = "HTTP request to an endpoint failed: %s" % e
raise HTTPRequestException(msg)
class HTTPListener(base.Listener):
"""To be implemented."""
"""HTTPListener represents a listener for HTTP messages.
def __init__(self, driver, target, service_broker,
batch_size, batch_timeout):
This class is responsible for setting up and managing an HTTP server to
listen for incoming messages. It handles the configuration of the server,
including SSL settings, authentication token, and timeouts.
The incoming messages are stored in a list for further processing.
The HTTPListener class provides the following functionality:
- Setting up the FlaskApp app that runs on the HTTP server.
- Registering and deregistering services with the service broker.
- Handling concurrent requests using an eventlet green pool.
- Listening to incoming messages on a specified address and port range.
- Processing incoming messages in batches with configurable batch size and
timeout.
Note: The actual implementation of the HTTP server and message processing
is in the FlaskApp app that is auto-generated by OpenAPI.
"""
def __init__(self, target, exchange, service_broker, api_config_file,
api_auth_token, listen_timeout, max_concurrent_requests,
address, port_range, reserver_proxy_endpoint,
batch_size, batch_timeout, prefetch_size,
enable_ssl, ssl_certfile=None, ssl_keyfile=None):
super(HTTPListener, self).__init__(
batch_size, batch_timeout, driver.prefetch_size
batch_size, batch_timeout, prefetch_size
)
self.incoming = []
self.target = target
self.exchange = exchange
self.service_broker = service_broker
api_config_file = api_config_file
# Set up FlaskApp app that runs on the http server.
app = connexion.App(__name__, specification_dir='./openapi/')
app.app.json_encoder = encoder.JSONEncoder
app.app.config['http_token'] = api_auth_token
app.app.config['listen_timeout'] = listen_timeout
app.add_api(api_config_file,
arguments={'title': 'HTTP_RPC'},
pythonic_params=True)
self.app = app
self.max_concurrent_requests = max_concurrent_requests
self.address = address
self.port_range = port_range
self.reserver_proxy_endpoint = reserver_proxy_endpoint
# SSL settings
self.enable_ssl = enable_ssl
if self.enable_ssl:
self.ssl_certfile = ssl_certfile
self.ssl_keyfile = ssl_keyfile
# service id will be initialized later when the port is decided.
self.service_id = None
def _run_http_server(self, eventlet_socket, app):
wsgi.server(eventlet_socket, app,
custom_pool=self._thread_pool)
def _attempt_to_listen_on_port(self, port):
try:
eventlet_socket = eventlet.listen((self.address, port),
reuse_port=False)
except Exception:
LOG.debug("Listening on %s:%s failed.", self.address, port)
return None
else:
LOG.debug("Listening on %s:%s succeeded.", self.address, port)
return eventlet_socket
def _create_socket(self):
start = int(self.port_range[0])
end = int(self.port_range[1])
port = secrets.choice(range(start, end))
initial_port = port
# Try to enumerate all ports in the range until
# we found one available.
eventlet_socket = self._attempt_to_listen_on_port(port)
while not eventlet_socket:
if port + 1 >= end:
port = start
else:
port += 1
if port == initial_port:
msg = "No available ports to use. Port range: %s" % \
str(self.port_range)
raise InsufficientPortsException(msg)
eventlet_socket = self._attempt_to_listen_on_port(port)
if self.enable_ssl:
eventlet_socket = eventlet.wrap_ssl(eventlet_socket,
certfile=self.ssl_certfile,
keyfile=self.ssl_keyfile,
server_side=True)
return eventlet_socket
def _register_service(self, address, port):
self.service_id = '{}:{}'.format(self.target.server, str(port))
LOG.debug("Start registration on the service broker")
name = '{}.{}'.format(self.exchange, self.target.topic)
tags = [self.target.server]
self.service_broker.register_service(name, self.service_id,
address,
port, tags,
self.reserver_proxy_endpoint,
self.enable_ssl)
LOG.info("Registered service on the service broker successfully")
def start(self, on_incoming_callback):
super(HTTPListener, self).start(on_incoming_callback)
# Step 1: Try to create a socket.
self.eventlet_socket = self._create_socket()
address, port = self.eventlet_socket.getsockname()
# Step 2: Try to register the service on the service broker.
self._register_service(address, port)
prefix = 'https://%s:%s' if self.enable_ssl else 'http://%s:%s'
# Step 3: Start the wsgi server.
LOG.debug('Starting WSGI server with ' + prefix, address, port)
# Pass the callback function for processing incoming messages from
# RPC server to http server.
self.app.app.config['callback'] = on_incoming_callback
# Start the http server.
self._thread_pool = eventlet.GreenPool(self.max_concurrent_requests)
self.server_thread = eventlet.spawn(self._run_http_server,
self.eventlet_socket, self.app)
def stop(self):
"""Stops eventlet server. Doesn't allow accept new connecting."""
self.service_broker.deregister_service(self.service_id)
LOG.debug("Stopping WSGI server")
self._thread_pool.resize(0)
self.server_thread.kill()
def cleanup(self):
"""Block, until the server has stopped.
Waits on the server's eventlet to finish, then returns.
"""
try:
num = self._thread_pool.running()
LOG.debug("Waiting WSGI server to finish %d requests.", num)
self._thread_pool.waitall()
except greenlet.GreenletExit:
pass
LOG.debug("WSGI server stopped")
class HTTPDriver(base.BaseDriver):
@ -91,16 +492,42 @@ class HTTPDriver(base.BaseDriver):
# consul_operator is the only supported service broker for now.
self.service_broker = consul_operator.ConsulOperator()
self.http_client = HTTPClient(conf, allowed_remote_exmods)
self.http_client = HTTPClient(conf.oslo_messaging_http.api_auth_token,
allowed_remote_exmods,
conf.oslo_messaging_http.proxy_endpoint,
conf.oslo_messaging_http.tls_skip_verify)
self.retries = conf.oslo_messaging_http.max_rpc_request_retries
self.config = conf
def _get_exchange(self, target):
return target.exchange or self._default_exchange
def listen(self, target, batch_size=1, batch_timeout=1):
# Provide arguments to create a HTTPListener instance.
exchange = self._get_exchange(target)
service_broker = self.service_broker
prefetch_size = self.prefetch_size
api_config_file = self.conf.oslo_messaging_http.api_config_file
api_auth_token = self.conf.oslo_messaging_http.api_auth_token
listen_timeout = self.conf.oslo_messaging_http.listen_timeout
max_concurrent_requests = self.conf.oslo_messaging_http.\
max_concurrent_requests
address = self.conf.oslo_messaging_http.address
port_range = self.conf.oslo_messaging_http.port_range.split(':')
reserver_proxy_endpoint = self.conf.oslo_messaging_http.\
reserver_proxy_endpoint
enable_ssl = self.conf.oslo_messaging_http.enable_ssl
ssl_certfile = self.conf.oslo_messaging_http.ssl_certfile
ssl_keyfile = self.conf.oslo_messaging_http.ssl_keyfile
listener = HTTPListener(
self, target, self.service_broker, batch_size,
batch_timeout)
target, exchange, service_broker, api_config_file,
api_auth_token, listen_timeout, max_concurrent_requests,
address, port_range, reserver_proxy_endpoint,
batch_size, batch_timeout, prefetch_size,
enable_ssl, ssl_certfile, ssl_keyfile)
return listener
def cleanup(self, *args):
@ -248,3 +675,17 @@ def retry_decorator(func, times, excluded_exceptions=()):
attempt += 1
return func(*args, **kwargs)
return new_func
class CallbackTimeoutException(Exception):
"""Raised if invoking callback functions times out."""
class InsufficientPortsException(Exception):
"""Raised if there are not enough ports available."""
class HTTPRequestException(Exception):
def __init__(self, message):
message = "Fail to send HTTP request: %s" % message
super(HTTPRequestException, self).__init__(message)

View File

@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import date
from datetime import datetime
import json
from unittest import mock
from oslo_config import cfg
@ -19,9 +22,13 @@ import testscenarios
from urllib3.exceptions import TimeoutError
import oslo_messaging
from oslo_messaging._drivers import common as driver_common
import oslo_messaging._drivers.http_driver.openapi.client as openapi_client
from oslo_messaging._drivers.http_driver import service_broker
from oslo_messaging._drivers import impl_http
from oslo_messaging import exceptions
from oslo_messaging.rpc import dispatcher
from oslo_messaging.rpc import server
from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
@ -73,6 +80,531 @@ class FakeException(Exception):
pass
class TestCallRPCIncomingMessage(test_utils.BaseTestCase):
def setUp(self):
super(TestCallRPCIncomingMessage, self).setUp()
self.ctxt = 'message-ctxt'
self.message = {
'method': 'rpc_method_name',
'args': {
'arg1': 'value1',
'arg2': 1
},
'namespace': 'test-namespace',
'version': '1.0',
}
@server.expected_exceptions(FakeException)
def raise_fake_exception(self, msg):
raise FakeException(msg)
def test_init_CallRPCIncomingMessage(self):
call_msg = impl_http.CallRPCIncomingMessage(self.ctxt, self.message)
# Check RpcIncomingMessage value
self.assertEqual(call_msg.ctxt, 'message-ctxt')
self.assertEqual(call_msg.message['method'], 'rpc_method_name')
self.assertEqual(call_msg.message['args'],
{'arg1': 'value1', 'arg2': 1})
self.assertEqual(call_msg.message['namespace'], 'test-namespace')
self.assertEqual(call_msg.message['version'], '1.0')
def test_acknowledge(self):
call_msg = impl_http.CallRPCIncomingMessage(self.ctxt, self.message)
call_msg.acknowledge()
# Check no change in the message data
self.assertEqual(call_msg.ctxt, 'message-ctxt')
self.assertEqual(call_msg.message['method'], 'rpc_method_name')
self.assertEqual(call_msg.message['args'],
{'arg1': 'value1', 'arg2': 1})
self.assertEqual(call_msg.message['namespace'], 'test-namespace')
self.assertEqual(call_msg.message['version'], '1.0')
def test_requeue(self):
call_msg = impl_http.CallRPCIncomingMessage(self.ctxt, self.message)
call_msg.requeue()
# Check no change in the message data
self.assertEqual(call_msg.ctxt, 'message-ctxt')
self.assertEqual(call_msg.message['method'], 'rpc_method_name')
self.assertEqual(call_msg.message['args'],
{'arg1': 'value1', 'arg2': 1})
self.assertEqual(call_msg.message['namespace'], 'test-namespace')
self.assertEqual(call_msg.message['version'], '1.0')
def test_reply(self):
call_msg = impl_http.CallRPCIncomingMessage(self.ctxt, self.message)
reply = 'dummy-reply'
call_msg.reply(reply=reply)
def test_reply_with_none(self):
call_msg = impl_http.CallRPCIncomingMessage(self.ctxt, self.message)
call_msg.reply(reply=None)
result, failure = call_msg.get_result()
def test_reply_including_set_object(self):
call_msg = impl_http.CallRPCIncomingMessage(self.ctxt, self.message)
reply = {
'result': {
'devices': set(["192.168.1.25", "192.168.1.20"]),
}
}
# No exception to dump JSON format with set object
call_msg.reply(reply=reply)
def test_reply_unsupported_type_except_set(self):
call_msg = impl_http.CallRPCIncomingMessage(self.ctxt, self.message)
reply = {
'result': {
'dict': dict(),
'date': date.today(),
'datetime': datetime.now(),
'object': object,
'instance': mock.MagicMock(),
}
}
# No exception to dump JSON format
call_msg.reply(reply=reply)
def test_reply_with_failure(self):
call_msg = impl_http.CallRPCIncomingMessage(self.ctxt, self.message)
failure = None
try:
self.raise_fake_exception('test-exception')
except dispatcher.ExpectedException as e:
failure = e.exc_info
call_msg.reply(failure=failure)
expected_failure = {
'class': 'FakeException',
'module': 'oslo_messaging.tests.drivers.test_impl_http',
'message': 'test-exception',
'args': ['test-exception'],
'kwargs': {}
}
expect = {'failure': driver_common.serialize_remote_exception(failure)}
# NOTE: Skip to value check of traceback 'tb' key because the line
# number will be changed when updating the test code.
actual_failure = json.loads(expect['failure'])
for k, v in expected_failure.items():
self.assertEqual(v, actual_failure[k])
self.assertIsNot(0, len(actual_failure['tb']))
@mock.patch('threading.Event')
def test_reply_with_timeout(self, m_event):
# To trigger a timeout to happen.
event_obj = mock.Mock()
event_obj.wait.return_value = False
m_event.return_value = event_obj
call_msg = impl_http.CallRPCIncomingMessage(self.ctxt, self.message)
reply = 'dummy-reply'
call_msg.reply(reply=reply)
self.assertRaises(impl_http.CallbackTimeoutException,
call_msg.get_result)
class TestCastRPCIncomingMessage(test_utils.BaseTestCase):
def setUp(self):
super(TestCastRPCIncomingMessage, self).setUp()
self.ctxt = 'message-ctxt'
self.message = {
'method': 'rpc_method_name',
'args': {
'arg1': 'value1',
'arg2': 1
},
'namespace': 'test-namespace',
'version': '1.0'
}
def test_init_CastRPCIncomingMessage(self):
cast_msg = impl_http.CastRPCIncomingMessage(self.ctxt, self.message)
# Check RpcIncomingMessage value
self.assertEqual(cast_msg.ctxt, 'message-ctxt')
self.assertEqual(cast_msg.message['method'], 'rpc_method_name')
self.assertEqual(cast_msg.message['args'],
{'arg1': 'value1', 'arg2': 1})
self.assertEqual(cast_msg.message['namespace'], 'test-namespace')
self.assertEqual(cast_msg.message['version'], '1.0')
def test_acknowledge(self):
cast_msg = impl_http.CastRPCIncomingMessage(self.ctxt, self.message)
cast_msg.acknowledge()
# Check RpcIncomingMessage value
self.assertEqual(cast_msg.ctxt, 'message-ctxt')
self.assertEqual(cast_msg.message['method'], 'rpc_method_name')
self.assertEqual(cast_msg.message['args'],
{'arg1': 'value1', 'arg2': 1})
self.assertEqual(cast_msg.message['namespace'], 'test-namespace')
self.assertEqual(cast_msg.message['version'], '1.0')
def test_requeue(self):
cast_msg = impl_http.CastRPCIncomingMessage(self.ctxt, self.message)
cast_msg.requeue()
# Check RpcIncomingMessage value
self.assertEqual(cast_msg.ctxt, 'message-ctxt')
self.assertEqual(cast_msg.message['method'], 'rpc_method_name')
self.assertEqual(cast_msg.message['args'],
{'arg1': 'value1', 'arg2': 1})
self.assertEqual(cast_msg.message['namespace'], 'test-namespace')
self.assertEqual(cast_msg.message['version'], '1.0')
def test_reply(self):
cast_msg = impl_http.CastRPCIncomingMessage(self.ctxt, self.message)
cast_msg.reply('method-result')
# Check RpcIncomingMessage value
self.assertEqual(cast_msg.ctxt, 'message-ctxt')
self.assertEqual(cast_msg.message['method'], 'rpc_method_name')
self.assertEqual(cast_msg.message['args'],
{'arg1': 'value1', 'arg2': 1})
self.assertEqual(cast_msg.message['namespace'], 'test-namespace')
self.assertEqual(cast_msg.message['version'], '1.0')
def test_reply_with_failure(self):
cast_msg = impl_http.CastRPCIncomingMessage(self.ctxt, self.message)
input_exception = dispatcher.ExpectedException()
cast_msg.reply(failure=input_exception.exc_info)
# Check RpcIncomingMessage value
self.assertEqual(cast_msg.ctxt, 'message-ctxt')
self.assertEqual(cast_msg.message['method'], 'rpc_method_name')
self.assertEqual(cast_msg.message['args'],
{'arg1': 'value1', 'arg2': 1})
self.assertEqual(cast_msg.message['namespace'], 'test-namespace')
self.assertEqual(cast_msg.message['version'], '1.0')
@mock.patch('threading.Event')
def test_acknowledge_timeout(self, m_event):
# To trigger a timeout to happen.
event_obj = mock.Mock()
event_obj.wait.return_value = False
m_event.return_value = event_obj
cast_msg = impl_http.CastRPCIncomingMessage(self.ctxt, self.message)
cast_msg.acknowledge()
self.assertRaises(impl_http.CallbackTimeoutException,
cast_msg.wait_for_acknowledge)
class TestHTTPClient(test_utils.BaseTestCase):
def setUp(self):
conf = cfg.ConfigOpts()
super(TestHTTPClient, self).setUp(conf=conf)
impl_http.register_driver_opts(conf)
self.messaging_conf.transport_url = 'http://'
self.http_client = impl_http.HTTPClient("token", None, None)
self.message = message.copy()
self.message['args']['context'] = ctxt
self.data_with_context = {
'args': {
'spec_obj': {
'nova_object.version': '1.5',
'nova_object.name': 'RequestSpec',
'nova_object.data': {
'instance_uuid': 'dummy_uuid',
'nova_object.namespace': 'nova'
}
}
},
'context': {
'domain': None,
'project_name': 'test-project',
'project_domain': None,
'timestamp': '2021-11-29T13:07:16.305218',
'auth_token': 'dummy_token',
'remote_address': '127.0.0.1',
'is_admin': 1,
'user': 'dummy_uuid',
'tenant': 'dummy_tenant_id',
'project_id': 'dummy_tenant_id',
'user_id': u'dummy_user_id',
'roles': [u'member', u'power_user'],
'request_id': 'req-12345',
'user_name': 'dummy_user_name'
}
}
self.expected_service = service_broker.Service(
host='https://172.25.0.4:3000',
reverse_proxy_endpoint='',
rpchost='172.25.0.4',
rpchost_port=3000)
@mock.patch('oslo_messaging._drivers.http_driver.openapi.client.'
'DefaultApi.rpc_call')
def test_call_http_request(self, mock_stub):
fake_reply = {'result': {'key1': 'value1', 'key2': 'value2'}}
mock_stub.return_value = fake_reply
ret = self.http_client.call(self.expected_service,
self.message, None)
request_data = openapi_client.RpcMessage(
method='select_destinations',
version='4.3',
namespace='nova',
args=self.data_with_context
)
self.assertEqual(ret, {'key1': 'value1', 'key2': 'value2'})
mock_stub.assert_called_once_with(
x_msg_id=mock.ANY,
_request_timeout=None,
rpc_message=request_data,
x_reverse_proxy_endpoint='',
x_rpchost='172.25.0.4',
x_rpchost_port=3000
)
@mock.patch('oslo_messaging._drivers.common.deserialize_remote_exception')
@mock.patch('oslo_messaging._drivers.http_driver.openapi.client.'
'DefaultApi.rpc_call')
def test_call_http_request_failure(self, mock_stub, mock_deserialize):
fake_reply = {'result': {'failure': 'fake'}}
mock_stub.return_value = fake_reply
mock_deserialize.return_value = BaseException
self.assertRaises(
BaseException,
self.http_client.call,
self.expected_service, self.message, None)
mock_deserialize.assert_called_once_with('fake', None)
@mock.patch('oslo_messaging._drivers.http_driver.openapi.client.'
'DefaultApi.rpc_call')
def test_call_http_request_apiexception(self, mock_stub):
mock_stub.side_effect = openapi_client.ApiException
self.assertRaises(
impl_http.HTTPRequestException,
self.http_client.call,
self.expected_service, self.message, None)
@mock.patch('oslo_messaging._drivers.http_driver.openapi.client.'
'DefaultApi.rpc_call')
def test_call_http_request_with_reverse_proxy(self, mock_stub):
fake_reply = {'result': {'key1': 'value1', 'key2': 'value2'}}
mock_stub.return_value = fake_reply
# Set reverse-proxy.
target_service = self.expected_service
target_service.reverse_proxy_endpoint = \
"https://reverse-proxy.example.com"
ret = self.http_client.call(target_service, self.message, None)
request_data = openapi_client.RpcMessage(
method='select_destinations',
version='4.3',
namespace='nova',
args=self.data_with_context
)
self.assertEqual(ret, {'key1': 'value1', 'key2': 'value2'})
mock_stub.assert_called_once_with(
x_msg_id=mock.ANY,
_request_timeout=None,
rpc_message=request_data,
x_reverse_proxy_endpoint='https://reverse-proxy.example.com',
x_rpchost='172.25.0.4',
x_rpchost_port=3000
)
@mock.patch('oslo_messaging._drivers.http_driver.openapi.client.'
'Configuration')
@mock.patch('oslo_messaging._drivers.http_driver.openapi.client.'
'DefaultApi.rpc_call')
def test_call_http_request_with_proxy_and_reverse_proxy(self, m_rpc_call,
m_config):
fake_reply = {'result': {'key1': 'value1', 'key2': 'value2'}}
m_rpc_call.return_value = fake_reply
# Set proxy.
self.http_client.proxy_endpoint = 'https://proxy.example.com:1234'
# Set reverse-proxy.
target_service = self.expected_service
target_service.reverse_proxy_endpoint = \
"https://reverse-proxy.example.com"
ret = self.http_client.call(target_service, self.message, None)
# Check if openapi client config set the proxy as the host to send
# requests.
m_config.assert_called_once_with(host='https://proxy.example.com:1234',
api_key={'X-Auth': 'token'})
request_data = openapi_client.RpcMessage(
method='select_destinations',
version='4.3',
namespace='nova',
args=self.data_with_context
)
self.assertEqual(ret, {'key1': 'value1', 'key2': 'value2'})
m_rpc_call.assert_called_once_with(
x_msg_id=mock.ANY,
_request_timeout=None,
rpc_message=request_data,
x_reverse_proxy_endpoint='https://reverse-proxy.example.com',
x_rpchost='172.25.0.4',
x_rpchost_port=3000
)
@mock.patch('oslo_messaging._drivers.http_driver.openapi.client.'
'DefaultApi.rpc_cast')
def test_cast_http_request(self, mock_stub):
mock_stub.return_value = mock.MagicMock()
ret = self.http_client.cast(self.expected_service, self.message)
request_data = openapi_client.RpcMessage(
method='select_destinations',
version='4.3',
namespace='nova',
args=self.data_with_context
)
request_data.args.update(self.data_with_context)
self.assertIsNone(ret)
mock_stub.assert_called_once_with(
x_msg_id=mock.ANY,
rpc_message=request_data,
x_reverse_proxy_endpoint='',
x_rpchost='172.25.0.4',
x_rpchost_port=3000
)
@mock.patch('oslo_messaging._drivers.http_driver.openapi.client.'
'DefaultApi.rpc_cast_fanout')
def test_fanout_http_request(self, mock_stub):
mock_stub.return_value = mock.MagicMock()
ret = self.http_client.fanout(self.expected_service, 'scheduler',
None, 'nova', self.message)
request_data = openapi_client.FanoutMessage(
topic='scheduler',
server=None,
exchange='nova',
rpc_message=openapi_client.RpcMessage(
method='select_destinations',
version='4.3',
namespace='nova',
args=self.data_with_context
)
)
self.assertIsNone(ret)
mock_stub.assert_called_once_with(
x_msg_id=mock.ANY,
fanout_message=request_data,
x_reverse_proxy_endpoint='',
x_rpchost='172.25.0.4',
x_rpchost_port=3000
)
@mock.patch('connexion.App')
class TestHTTPListener(test_utils.BaseTestCase):
def setUp(self):
conf = cfg.ConfigOpts()
super(TestHTTPListener, self).setUp(conf=conf)
impl_http.register_driver_opts(conf)
self.messaging_conf.transport_url = 'http://'
transport = oslo_messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
self.target = Target()
def setup_listener(self, m_service_broker):
# Config setup
self.target.server = 'fake-service-id'
exchange = 'nova'
return impl_http.HTTPListener(self.target, exchange, m_service_broker,
api_config_file='openapi.yaml',
api_auth_token="", listen_timeout=60,
max_concurrent_requests=5,
address='127.0.0.1',
port_range=(10000, 10001),
reserver_proxy_endpoint="",
batch_size=1, batch_timeout=1,
prefetch_size=0,
enable_ssl=False, ssl_certfile=None,
ssl_keyfile=None)
@mock.patch('eventlet.spawn')
@mock.patch('eventlet.listen')
def test_listener_lifecycle(self, m_listen, m_spawn, m_app):
m_service_broker = mock.Mock()
listener = self.setup_listener(m_service_broker)
m_service_broker.register_service.called_once()
eventlet_socket = m_listen.return_value
eventlet_socket.getsockname.return_value = '127.0.0.1', 10000
m_server_thread = m_spawn.return_value
callback = mock.Mock()
# Step 1: test run.
listener.start(callback)
m_listen.assert_called_once_with(('127.0.0.1', 10000),
reuse_port=False)
m_spawn.assert_called_once_with(listener._run_http_server,
eventlet_socket, m_app.return_value)
# Check if the callback function is set correctly.
self.assertEqual(listener.on_incoming_callback, callback)
# Step 2: test stop.
listener.stop()
m_service_broker.deregister_service.assert_called_once_with(
'fake-service-id:10000')
m_server_thread.kill.assert_called_once()
@mock.patch('eventlet.listen')
def test__create_socket_with_single_port(self, mock_listen, m_app):
listener = self.setup_listener(mock.Mock())
listener.port_range = 10000, 10001
mock_listen.side_effect = lambda port, reuse_port: port
result = listener._create_socket()
self.assertEqual(('127.0.0.1', 10000), result)
@mock.patch('secrets.choice')
@mock.patch('eventlet.listen')
def test__create_socket_with_exceptions(self, mock_listen, m_choice,
m_app):
listener = self.setup_listener(mock.Mock())
listener.port_range = 10000, 10005
m_choice.return_value = 10001 # Use 10001 as the initial port.
mock_listen.side_effect = Exception()
self.assertRaises(impl_http.InsufficientPortsException,
listener._create_socket)
# Check if we enumerate ports in the correct order.
mock_listen.called_once_with(('127.0.0.1', 10001), reuse_port=False)
mock_listen.called_once_with(('127.0.0.1', 10002), reuse_port=False)
mock_listen.called_once_with(('127.0.0.1', 10003), reuse_port=False)
mock_listen.called_once_with(('127.0.0.1', 10004), reuse_port=False)
mock_listen.called_once_with(('127.0.0.1', 10000), reuse_port=False)
@mock.patch('secrets.choice')
@mock.patch('eventlet.listen')
def test__create_socket_with_exceptions_and_success(self, mock_listen,
m_choice, m_app):
listener = self.setup_listener(mock.Mock())
listener.port_range = 10000, 10003
m_choice.return_value = 10000 # Use 10000 as the initial port.
# Fail for the first two attempts.
mock_listen.side_effect = [Exception(), Exception(),
('127.0.0.1', 10002)]
result = listener._create_socket()
self.assertEqual(('127.0.0.1', 10002), result)
# Check if we enumerate ports in the correct order.
mock_listen.called_once_with(('127.0.0.1', 10000))
mock_listen.called_once_with(('127.0.0.1', 10001))
mock_listen.called_once_with(('127.0.0.1', 10002))
class TestHTTPDriver(test_utils.BaseTestCase):
def setUp(self):

View File

@ -32,6 +32,7 @@ kafka =
confluent-kafka>=1.3.0 # Apache-2.0
http =
python-consul2>=0.1.5 # Apache-2.0
connexion[swagger-ui]>=2.6.0 # Apache-2.0
[files]
packages =

View File

@ -14,6 +14,7 @@ confluent-kafka>=1.3.0 # Apache-2.0
# for test_impl_http
python-consul2>=0.1.5 # Apache-2.0
connexion[swagger-ui]>=2.6.0 # Apache-2.0
# when we can require tox>= 1.4, this can go into tox.ini:
# [testenv:cover]