Add support for Zaqar websockets

This patch adds a new zaqar.use_websockets option which if set to True
in the config file will configure the Zaqar collector to use
the websocket transport instead of wsgi. This can be more efficient
where you want to avoid the continuous polling of o-c-c and instead
just listen on the websocket that is subscribed to a queue.

Like other collectors each iteration creates a new socket object. This
allows us to use the normal re-exec logic in o-c-c and thus gives
the option to re-configure the agent in the future to use other types
of collectors. We could (optionally) look into a higher level option in
the future that would allow o-c-c to avoid re-exec'ing and thus re-use
the same websocket for multiple sets of metadata.

Depends-On: Ia2a8deb599252d8308e44d595eb2bf443999aaad
Change-Id: Id5c7ed590df776844b6c7961eb40f89206cd24e0
This commit is contained in:
Dan Prince 2016-07-14 21:07:28 -04:00
parent 358b73e239
commit b6364df3e5
2 changed files with 176 additions and 14 deletions

View File

@ -12,13 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import fixtures
from keystoneclient import discover as ks_discover
import mock
from oslo_config import cfg
from oslo_config import fixture as config_fixture
import testtools
from testtools import matchers
from zaqarclient.queues.v1 import message
from zaqarclient import transport
from zaqarclient.transport import response
from os_collect_config import collect
from os_collect_config import exc
@ -34,6 +39,14 @@ class FakeKeystoneClient(test_heat.FakeKeystoneClient):
return 'http://192.0.2.1:8888/'
class FakeKeystoneClientWebsocket(test_heat.FakeKeystoneClient):
def url_for(self, service_type, endpoint_type):
self._test.assertEqual('messaging-websocket', service_type)
self._test.assertEqual('publicURL', endpoint_type)
return 'ws://127.0.0.1:9000/'
class FakeZaqarClient(object):
def __init__(self, testcase):
@ -50,6 +63,31 @@ class FakeZaqarClient(object):
return FakeQueue()
class FakeZaqarWebsocketClient(object):
def __init__(self, options, messages=None, testcase=None):
self._messages = messages
self._test = testcase
def send(self, request):
self._test.assertEqual('ws://127.0.0.1:9000/', request.endpoint)
if request.operation == 'message_list':
body = json.loads(request.content)
self._test.assertEqual(
'4f3f46d3-09f1-42a7-8c13-f91a5457192c', body['queue_name'])
return response.Response(request, content=json.dumps(self._messages),
status_code=200)
def recv(self):
return {'body': test_heat.META_DATA}
def __enter__(self):
return self
def __exit__(self, *exc):
pass
class FakeQueue(object):
def pop(self):
@ -87,11 +125,17 @@ class TestZaqar(testtools.TestCase):
self.log = self.useFixture(fixtures.FakeLogger())
self.useFixture(fixtures.NestedTempfile())
collect.setup_conf()
cfg.CONF.zaqar.auth_url = 'http://192.0.2.1:5000/v3'
cfg.CONF.zaqar.user_id = '0123456789ABCDEF'
cfg.CONF.zaqar.password = 'FEDCBA9876543210'
cfg.CONF.zaqar.project_id = '9f6b09df-4d7f-4a33-8ec3-9924d8f46f10'
cfg.CONF.zaqar.queue_id = '4f3f46d3-09f1-42a7-8c13-f91a5457192c'
conf = config_fixture.Config()
self.useFixture(conf)
conf.config(group='zaqar', use_websockets=False)
conf.config(group='zaqar', auth_url='http://192.0.2.1:5000/v3')
conf.config(group='zaqar', user_id='0123456789ABCDEF')
conf.config(group='zaqar', password='FEDCBA9876543210')
conf.config(group='zaqar',
project_id='9f6b09df-4d7f-4a33-8ec3-9924d8f46f10')
conf.config(group='zaqar',
queue_id='4f3f46d3-09f1-42a7-8c13-f91a5457192c')
@mock.patch.object(ks_discover.Discover, '__init__')
@mock.patch.object(ks_discover.Discover, 'url_for')
@ -176,3 +220,51 @@ class TestZaqar(testtools.TestCase):
self.assertRaises(
exc.ZaqarMetadataNotConfigured, zaqar_collect.collect)
self.assertIn('No queue_id configured', self.log.output)
@mock.patch.object(transport, 'get_transport_for')
@mock.patch.object(ks_discover.Discover, '__init__')
@mock.patch.object(ks_discover.Discover, 'url_for')
def test_collect_zaqar_websocket(self, mock_url_for, mock___init__,
mock_transport):
mock___init__.return_value = None
mock_url_for.return_value = cfg.CONF.zaqar.auth_url
conf = config_fixture.Config()
self.useFixture(conf)
conf.config(group='zaqar', use_websockets=True)
messages = {'messages': [{'body': test_heat.META_DATA, 'id': 1}]}
ws = FakeZaqarWebsocketClient({}, messages=messages, testcase=self)
mock_transport.return_value = ws
zaqar_md = zaqar.Collector(
keystoneclient=FakeKeystoneClientWebsocket(self, cfg.CONF.zaqar)
).collect()
self.assertThat(zaqar_md, matchers.IsInstance(list))
self.assertEqual('zaqar', zaqar_md[0][0])
zaqar_md = zaqar_md[0][1]
for k in ('int1', 'strfoo', 'map_ab'):
self.assertIn(k, zaqar_md)
self.assertEqual(zaqar_md[k], test_heat.META_DATA[k])
@mock.patch.object(transport, 'get_transport_for')
@mock.patch.object(ks_discover.Discover, '__init__')
@mock.patch.object(ks_discover.Discover, 'url_for')
def test_collect_zaqar_websocket_recv(self, mock_url_for, mock___init__,
mock_transport):
mock___init__.return_value = None
mock_url_for.return_value = cfg.CONF.zaqar.auth_url
ws = FakeZaqarWebsocketClient({}, messages={}, testcase=self)
mock_transport.return_value = ws
conf = config_fixture.Config()
self.useFixture(conf)
conf.config(group='zaqar', use_websockets=True)
zaqar_md = zaqar.Collector(
keystoneclient=FakeKeystoneClientWebsocket(self, cfg.CONF.zaqar),
).collect()
self.assertThat(zaqar_md, matchers.IsInstance(list))
self.assertEqual('zaqar', zaqar_md[0][0])
zaqar_md = zaqar_md[0][1]
for k in ('int1', 'strfoo', 'map_ab'):
self.assertIn(k, zaqar_md)
self.assertEqual(zaqar_md[k], test_heat.META_DATA[k])

View File

@ -12,11 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from keystoneclient.v3 import client as keystoneclient
from oslo_config import cfg
from oslo_log import log
import six
from zaqarclient.queues.v1 import client as zaqarclient
from zaqarclient import transport
from zaqarclient.transport import request
from os_collect_config import exc
from os_collect_config import keystone
@ -36,6 +40,9 @@ opts = [
help='URL for API authentication'),
cfg.StrOpt('queue-id',
help='ID of the queue to be checked'),
cfg.BoolOpt('use-websockets',
default=False,
help='Use the websocket transport to connect to Zaqar.'),
]
name = 'zaqar'
@ -44,10 +51,75 @@ class Collector(object):
def __init__(self,
keystoneclient=keystoneclient,
zaqarclient=zaqarclient,
discover_class=None):
discover_class=None,
transport=transport):
self.keystoneclient = keystoneclient
self.zaqarclient = zaqarclient
self.discover_class = discover_class
self.transport = transport
def get_data_wsgi(self, ks, conf):
endpoint = ks.service_catalog.url_for(
service_type='messaging', endpoint_type='publicURL')
logger.debug('Fetching metadata from %s' % endpoint)
zaqar = self.zaqarclient.Client(endpoint, conf=conf, version=1.1)
queue = zaqar.queue(CONF.zaqar.queue_id)
r = six.next(queue.pop())
return r.body
def _create_req(self, endpoint, action, body):
return request.Request(endpoint, action, content=json.dumps(body))
def get_data_websocket(self, ks, conf):
endpoint = ks.service_catalog.url_for(
service_type='messaging-websocket', endpoint_type='publicURL')
logger.debug('Fetching metadata from %s' % endpoint)
with self.transport.get_transport_for(endpoint, options=conf) as ws:
# create queue
req = self._create_req(endpoint, 'queue_create',
{'queue_name': CONF.zaqar.queue_id})
ws.send(req)
# subscribe to queue messages
req = self._create_req(endpoint, 'subscription_create',
{'queue_name': CONF.zaqar.queue_id,
'ttl': 10000})
ws.send(req)
# TODO(dprince) would be nice to use message_delete_many but
# websockets doesn't support parameters so we can't send 'pop'.
# This would allow us to avoid the 'message_delete' below. Example:
# req = self._create_req(endpoint, 'message_delete_many',
# {'queue_name': CONF.zaqar.queue_id, 'pop': 1})
req = self._create_req(endpoint, 'message_list',
{'queue_name': CONF.zaqar.queue_id,
'echo': True})
resp = ws.send(req)
messages = json.loads(resp.content).get('messages', [])
if len(messages) > 0:
# NOTE(dprince) In this case we are checking for queue
# messages that arrived before we subscribed.
logger.debug('Websocket message_list found...')
msg_0 = messages[0]
data = msg_0['body']
req = self._create_req(endpoint, 'message_delete',
{'queue_name': CONF.zaqar.queue_id,
'message_id': msg_0['id']})
ws.send(req)
else:
# NOTE(dprince) This will block until there is data available
# or the socket times out. Because we subscribe to the queue
# it will allow us to process data immediately.
logger.debug('websocket recv()')
data = ws.recv()['body']
return data
def collect(self):
if CONF.zaqar.auth_url is None:
@ -74,9 +146,7 @@ class Collector(object):
project_id=CONF.zaqar.project_id,
keystoneclient=self.keystoneclient,
discover_class=self.discover_class).client
endpoint = ks.service_catalog.url_for(
service_type='messaging', endpoint_type='publicURL')
logger.debug('Fetching metadata from %s' % endpoint)
conf = {
'auth_opts': {
'backend': 'keystone',
@ -87,13 +157,13 @@ class Collector(object):
}
}
zaqar = self.zaqarclient.Client(endpoint, conf=conf, version=1.1)
queue = zaqar.queue(CONF.zaqar.queue_id)
r = six.next(queue.pop())
if CONF.zaqar.use_websockets:
data = self.get_data_websocket(ks, conf)
else:
data = self.get_data_wsgi(ks, conf)
final_list = merger.merged_list_from_content(
r.body, cfg.CONF.deployment_key, name)
data, cfg.CONF.deployment_key, name)
return final_list
except Exception as e: