Add a trust notifier task

This adds the ability to send keystone authentified notifications using
trusts. To do so, you specify the posted URL with the "trust+" prefix,
and Zaqar will create and store a trust when subscribing to a queue, if
the trust not provided in the subscription options

It also add a capability to the webhook task to be able to send more
structured data in the notification, allowing to include the Zaqar
message in the data.

blueprint mistral-notifications
DocImpact
Change-Id: I12b9c1b34cdd220fcf1bdc2720043d4a8f75dc85
This commit is contained in:
Thomas Herve 2016-06-08 16:06:28 +02:00
parent 39be4c7d58
commit 51604b4954
22 changed files with 365 additions and 38 deletions

View File

@ -87,7 +87,7 @@ function configure_zaqar {
iniset $ZAQAR_CONF signed_url secret_key notreallysecret
if is_service_enabled key; then
iniset $ZAQAR_CONF DEFAULT auth_strategy keystone
iniset $ZAQAR_CONF DEFAULT auth_strategy keystone
fi
iniset $ZAQAR_CONF storage message_pipeline zaqar.notification.notifier
@ -100,6 +100,12 @@ function configure_zaqar {
configure_auth_token_middleware $ZAQAR_CONF zaqar $ZAQAR_AUTH_CACHE_DIR
iniset $ZAQAR_CONF trustee auth_plugin password
iniset $ZAQAR_CONF trustee auth_url $KEYSTONE_AUTH_URI
iniset $ZAQAR_CONF trustee username $ZAQAR_TRUSTEE_USER
iniset $ZAQAR_CONF trustee password $ZAQAR_TRUSTEE_PASSWORD
iniset $ZAQAR_CONF trustee user_domain_id $ZAQAR_TRUSTEE_DOMAIN
iniset $ZAQAR_CONF DEFAULT pooling True
iniset $ZAQAR_CONF 'pooling:catalog' enable_virtual_pool True

View File

@ -32,6 +32,11 @@ ZAQAR_SERVICE_PORT=${ZAQAR_SERVICE_PORT:-8888}
ZAQAR_WEBSOCKET_PORT=${ZAQAR_WEBSOCKET_PORT:-9000}
ZAQAR_SERVICE_PROTOCOL=${ZAQAR_SERVICE_PROTOCOL:-$SERVICE_PROTOCOL}
# Set Zaqar trust configuration
ZAQAR_TRUSTEE_USER=${ZAQAR_TRUSTEE_USER:-zaqar}
ZAQAR_TRUSTEE_PASSWORD=${ZAQAR_TRUSTEE_PASSWORD:-$SERVICE_PASSWORD}
ZAQAR_TRUSTEE_DOMAIN=${ZAQAR_TRUSTEE_DOMAIN:-default}
# Tell Tempest this project is present
TEMPEST_SERVICES+=,zaqar

View File

@ -1,6 +1,7 @@
[DEFAULT]
output_file = etc/zaqar.conf.sample
namespace = zaqar.bootstrap
namespace = zaqar.common.auth
namespace = zaqar.common.configs
namespace = zaqar.storage.pipeline
namespace = zaqar.storage.pooling

View File

@ -0,0 +1,10 @@
---
features:
- Add a new webhook notifier using trust authentication. When using the
'trust+' URL prefix, Zaqar will create a Keystone trust for the user, and
then use it when a notification happens to authenticate against Keystone
and send the token to the endpoint.
- Support 'post_data' and 'post_headers' options on subscribers, allowing
customization of the payload when having a webhook subscriber. The
'post_data' option supports the '$zaqar_message$' string template, which
will be replaced by the serialized JSON message if specified.

View File

@ -48,6 +48,7 @@ zaqar.transport =
websocket = zaqar.transport.websocket.driver:Driver
oslo.config.opts =
zaqar.common.auth = zaqar.common.auth:_config_options
zaqar.common.configs = zaqar.common.configs:_config_options
zaqar.storage.pipeline = zaqar.storage.pipeline:_config_options
zaqar.storage.pooling = zaqar.storage.pooling:_config_options
@ -72,6 +73,8 @@ zaqar.notification.tasks =
http = zaqar.notification.tasks.webhook:WebhookTask
https = zaqar.notification.tasks.webhook:WebhookTask
mailto = zaqar.notification.tasks.mailto:MailtoTask
trust+http = zaqar.notification.tasks.trust:TrustTask
trust+https = zaqar.notification.tasks.trust:TrustTask
tempest.test_plugins =
zaqar_tests = zaqar.tests.tempest_plugin.plugin:ZaqarTempestPlugin

View File

@ -91,7 +91,7 @@ class Handler(object):
return response.Response(req, body, headers)
@staticmethod
def create_request(payload=None):
def create_request(payload=None, env=None):
if payload is None:
payload = {}
action = payload.get('action')
@ -99,7 +99,7 @@ class Handler(object):
headers = payload.get('headers')
return request.Request(action=action, body=body,
headers=headers, api="v2")
headers=headers, api="v2", env=env)
def get_defaults(self):
return self.v2_endpoints._defaults

View File

@ -12,7 +12,10 @@
# License for the specific language governing permissions and limitations under
# the License.
from stevedore import driver
from oslo_log import log as logging
from oslo_utils import netutils
from zaqar.common.api import errors as api_errors
from zaqar.common.api import response
@ -798,6 +801,8 @@ class Endpoints(object):
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
options = req._body.get('options', {})
ttl = req._body.get('ttl', self._defaults.subscription_ttl)
LOG.debug(
u'Subscription create - queue: %(queue)s, project: %(project)s',
@ -805,9 +810,15 @@ class Endpoints(object):
'project': project_id})
try:
url = netutils.urlsplit(subscriber)
mgr = driver.DriverManager('zaqar.notification.tasks', url.scheme,
invoke_on_load=True)
req_data = req._env.copy()
mgr.driver.register(subscriber, options, ttl, project_id, req_data)
data = {'subscriber': subscriber,
'options': req._body.get('options'),
'ttl': req._body.get('ttl')}
'options': options,
'ttl': ttl}
self._validate.subscription_posting(data)
self._validate.queue_identification(queue_name, project_id)
if not self._queue_controller.exists(queue_name, project_id):

View File

@ -14,10 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from zaqar.common import decorators
class Request(object):
"""General data for a Zaqar request
@ -33,20 +29,17 @@ class Request(object):
:type headers: dict
:param api: Api entry point. i.e: 'queues.v1'
:type api: `six.text_type`.
:param env: Request environment. Default: None
:type env: dict
"""
def __init__(self, action,
body=None, headers=None, api=None):
body=None, headers=None, api=None, env=None):
self._action = action
self._body = body
self._headers = headers or {}
self._api = api
@decorators.lazy_property()
def deserialized_content(self):
if self._body is not None:
return json.loads(self._body)
return None
self._env = env or {}
def get_request(self):
return {'action': self._action,

66
zaqar/common/auth.py Normal file
View File

@ -0,0 +1,66 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from keystoneauth1 import loading
from keystoneauth1 import session
from keystoneclient.v3 import client
from oslo_config import cfg
PASSWORD_PLUGIN = 'password'
TRUSTEE_CONF_GROUP = 'trustee'
loading.register_auth_conf_options(cfg.CONF, TRUSTEE_CONF_GROUP)
def _config_options():
trustee_opts = loading.get_auth_common_conf_options()
trustee_opts.extend(loading.get_auth_plugin_conf_options(PASSWORD_PLUGIN))
yield TRUSTEE_CONF_GROUP, trustee_opts
def get_trusted_token(trust_id):
"""Return a Keystone token using the given trust_id."""
auth_plugin = loading.load_auth_from_conf_options(
cfg.CONF, TRUSTEE_CONF_GROUP, trust_id=trust_id)
trust_session = session.Session(auth=auth_plugin)
return trust_session.auth.get_access(trust_session).auth_token
def _get_admin_session():
auth_plugin = loading.load_auth_from_conf_options(
cfg.CONF, TRUSTEE_CONF_GROUP)
return session.Session(auth=auth_plugin)
def _get_user_client(auth_plugin):
sess = session.Session(auth=auth_plugin)
return client.Client(session=sess)
def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles,
expires_at):
"""Create a trust with the given user for the configured trustee user."""
admin_session = _get_admin_session()
trustee_user_id = admin_session.get_user_id()
client = _get_user_client(auth_plugin)
trust = client.trusts.create(trustor_user=trustor_user_id,
trustee_user=trustee_user_id,
project=trustor_project_id,
impersonation=True,
role_names=roles,
expires_at=expires_at)
return trust.id

View File

@ -51,3 +51,6 @@ class MailtoTask(object):
'because %s.') % str(err))
except Exception as exc:
LOG.exception(_LE('Failed to send email because %s.') % str(exc))
def register(self, subscriber, options, ttl, project_id, request_data):
pass

View File

@ -0,0 +1,63 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
from oslo_utils import timeutils
from zaqar.common import auth
from zaqar.notification.tasks import webhook
class TrustTask(webhook.WebhookTask):
"""A webhook using trust authentication.
This webhook will use the trust stored in the subscription to ask for a
token, which will then be passed to the notified service.
"""
def execute(self, subscription, messages, **kwargs):
subscription = copy.deepcopy(subscription)
subscriber = subscription['subscriber']
trust_id = subscription['options']['trust_id']
token = auth.get_trusted_token(trust_id)
subscription['subscriber'] = subscriber[6:]
headers = {'X-Auth-Token': token,
'Content-Type': 'application/json'}
super(TrustTask, self).execute(subscription, messages, headers,
**kwargs)
def register(self, subscriber, options, ttl, project_id, request_data):
if 'trust_id' not in options:
# We have a trust subscriber without a trust ID,
# create it
trustor_user_id = request_data.get('X-USER-ID')
roles = request_data.get('X-ROLES', '')
if roles:
roles = roles.split(',')
else:
roles = []
auth_plugin = request_data.get('keystone.token_auth')
expires_at = None
if ttl:
expires_at = timeutils.utcnow() + datetime.timedelta(
seconds=ttl)
trust_id = auth.create_trust_id(
auth_plugin, trustor_user_id, project_id, roles,
expires_at)
options['trust_id'] = trust_id

View File

@ -24,15 +24,26 @@ LOG = logging.getLogger(__name__)
class WebhookTask(object):
def execute(self, subscription, messages, **kwargs):
def execute(self, subscription, messages, headers=None, **kwargs):
if headers is None:
headers = {'Content-Type': 'application/json'}
headers.update(subscription['options'].get('post_headers', {}))
try:
for msg in messages:
# NOTE(Eva-i): Unfortunately this will add 'queue_name' key to
# our original messages(dicts) which will be later consumed in
# the storage controller. It seems safe though.
msg['queue_name'] = subscription['source']
if 'post_data' in subscription['options']:
data = subscription['options']['post_data']
data = data.replace('"$zaqar_message$"', json.dumps(msg))
else:
data = json.dumps(msg)
requests.post(subscription['subscriber'],
data=json.dumps(msg),
headers={'Content-Type': 'application/json'})
data=data,
headers=headers)
except Exception as e:
LOG.exception(_LE('webhook task got exception: %s.') % str(e))
def register(self, subscriber, options, ttl, project_id, request_data):
pass

View File

@ -130,7 +130,7 @@ message_ttl = {
list_messages_links = {
'type': 'array',
'maxItems': 1,
'minItems': 1,
'minItems': 0,
'items': {
'type': 'object',
'properties': {
@ -143,7 +143,7 @@ list_messages_links = {
list_messages_response = {
'type': 'array',
'minItems': 1,
'minItems': 0,
'items': {
'type': 'object',
'properties': {

View File

@ -13,9 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import uuid
from tempest.common.utils import data_utils
from tempest.lib import decorators
from tempest import test
from zaqar.tests.tempest_plugin.tests import base
@ -91,6 +94,32 @@ class TestSubscriptions(base.BaseV2MessagingTest):
subscription_id = result[1]["subscription_id"]
self.delete_subscription(self.queue_name, subscription_id)
@decorators.idempotent_id('ff4344b4-ba78-44c5-9ffc-44e53e484f76')
def test_trust_subscription(self):
sub_queue = data_utils.rand_name('Queues-Test')
self.addCleanup(self.client.delete_queue, sub_queue)
subscriber = 'trust+{0}/{1}/queues/{2}/messages'.format(
self.client.base_url, self.client.uri_prefix, sub_queue)
post_body = json.dumps(
{'messages': [{'body': '$zaqar_message$', 'ttl': 60}]})
post_headers = {'X-Project-ID': self.client.tenant_id,
'Client-ID': str(uuid.uuid4())}
sub_body = {'ttl': 1200, 'subscriber': subscriber,
'options': {'post_data': post_body,
'post_headers': post_headers}}
self.create_subscription(queue_name=self.queue_name, rbody=sub_body)
message_body = self.generate_message_body()
self.post_messages(queue_name=self.queue_name, rbody=message_body)
if not test.call_until_true(
lambda: self.list_messages(sub_queue)[1]['messages'], 10, 1):
self.fail("Couldn't get messages")
messages = self.list_messages(sub_queue)
expected = message_body['messages'][0]
expected['queue_name'] = self.queue_name
self.assertEqual(expected, messages[1]['messages'][0]['body'])
@classmethod
def resource_cleanup(cls):
cls.delete_queue(cls.queue_name)

View File

@ -21,8 +21,11 @@ from zaqar.tests import base
class TestRequest(base.TestBase):
def test_request_deserialized(self):
def test_request(self):
action = 'message_post'
data = '{"data": "tons of GBs"}'
req = request.Request(action=action, body=data)
self.assertIsInstance(req.deserialized_content, dict)
data = 'body'
env = {'foo': 'bar'}
req = request.Request(action=action, body=data, env=env)
self.assertEqual({'foo': 'bar'}, req._env)
self.assertEqual('body', req._body)
self.assertEqual('message_post', req._action)

View File

@ -54,11 +54,14 @@ class NotifierTest(testing.TestBase):
def test_webhook(self):
subscription = [{'subscriber': 'http://trigger_me',
'source': 'fake_queue'},
'source': 'fake_queue',
'options': {}},
{'subscriber': 'http://call_me',
'source': 'fake_queue'},
'source': 'fake_queue',
'options': {}},
{'subscriber': 'http://ping_me',
'source': 'fake_queue'}]
'source': 'fake_queue',
'options': {}}]
ctlr = mock.MagicMock()
ctlr.list = mock.Mock(return_value=iter([subscription, {}]))
driver = notifier.NotifierDriver(subscription_controller=ctlr)
@ -98,11 +101,45 @@ class NotifierTest(testing.TestBase):
], any_order=True)
self.assertEqual(6, len(mock_post.mock_calls))
def test_webhook_post_data(self):
post_data = {'foo': 'bar', 'egg': '$zaqar_message$'}
subscription = [{'subscriber': 'http://trigger_me',
'source': 'fake_queue',
'options': {'post_data': json.dumps(post_data)}}]
ctlr = mock.MagicMock()
ctlr.list = mock.Mock(return_value=iter([subscription, {}]))
driver = notifier.NotifierDriver(subscription_controller=ctlr)
headers = {'Content-Type': 'application/json'}
with mock.patch('requests.post') as mock_post:
driver.post('fake_queue', self.messages, self.client_id,
self.project)
driver.executor.shutdown()
# Let's deserialize "data" from JSON string to dict in each mock
# call, so we can do dict comparisons. JSON string comparisons
# often fail, because dict keys can be serialized in different
# order inside the string.
for call in mock_post.call_args_list:
call[1]['data'] = json.loads(call[1]['data'])
# These are not real calls. In real calls each "data" argument is
# serialized by json.dumps. But we made a substitution before,
# so it will work.
mock_post.assert_has_calls([
mock.call(subscription[0]['subscriber'],
data={'foo': 'bar', 'egg': self.notifications[0]},
headers=headers),
mock.call(subscription[0]['subscriber'],
data={'foo': 'bar', 'egg': self.notifications[1]},
headers=headers),
], any_order=True)
self.assertEqual(2, len(mock_post.mock_calls))
def test_marker(self):
subscription1 = [{'subscriber': 'http://trigger_me1',
'source': 'fake_queue'}]
'source': 'fake_queue',
'options': {}}]
subscription2 = [{'subscriber': 'http://trigger_me2',
'source': 'fake_queue'}]
'source': 'fake_queue',
'options': {}}]
ctlr = mock.MagicMock()
def mock_list(queue, project, marker):
@ -141,12 +178,12 @@ class NotifierTest(testing.TestBase):
def test_mailto(self, mock_popen):
subscription = [{'subscriber': 'mailto:aaa@example.com',
'source': 'fake_queue',
'options': {'subject': 'Hello',
'from': 'zaqar@example.com'}},
'options': {'subject': 'Hello',
'from': 'zaqar@example.com'}},
{'subscriber': 'mailto:bbb@example.com',
'source': 'fake_queue',
'options': {'subject': 'Hello',
'from': 'zaqar@example.com'}}]
'options': {'subject': 'Hello',
'from': 'zaqar@example.com'}}]
ctlr = mock.MagicMock()
ctlr.list = mock.Mock(return_value=iter([subscription, {}]))
driver = notifier.NotifierDriver(subscription_controller=ctlr)
@ -208,7 +245,8 @@ class NotifierTest(testing.TestBase):
def test_proper_notification_data(self):
subscription = [{'subscriber': 'http://trigger_me',
'source': 'fake_queue'}]
'source': 'fake_queue',
'options': {}}]
ctlr = mock.MagicMock()
ctlr.list = mock.Mock(return_value=iter([subscription, {}]))
driver = notifier.NotifierDriver(subscription_controller=ctlr)

View File

@ -67,6 +67,18 @@ class AuthTest(base.V2Base):
self.assertEqual(1, len(responses))
self.assertEqual('200 OK', responses[0])
# Check that the env is available to future requests
req = json.dumps({'action': 'message_list',
'body': {'queue_name': 'myqueue'},
'headers': self.headers})
process_request = mock.patch.object(self.protocol._handler,
'process_request').start()
process_request.return_value = self.protocol._handler.create_response(
200, {})
self.protocol.onMessage(req, False)
self.assertEqual(1, process_request.call_count)
self.assertEqual(self.env, process_request.call_args[0][0]._env)
def test_post_between_auth(self):
headers = self.headers.copy()
headers['X-Auth-Token'] = 'mytoken1'

View File

@ -20,6 +20,7 @@ import uuid
import mock
import msgpack
from zaqar.common import auth
from zaqar.storage import errors as storage_errors
from zaqar.tests.unit.transport.websocket import base
from zaqar.tests.unit.transport.websocket import utils as test_utils
@ -118,6 +119,37 @@ class SubscriptionTest(base.V1_1Base):
'kitkat', self.project_id)))
self.assertEqual([], subscribers)
@mock.patch.object(auth, 'create_trust_id')
def test_subscription_create_trust(self, create_trust):
create_trust.return_value = 'trust_id'
action = 'subscription_create'
body = {'queue_name': 'kitkat', 'ttl': 600,
'subscriber': 'trust+http://example.com'}
self.protocol._auth_env = {}
self.protocol._auth_env['X-USER-ID'] = 'user-id'
self.protocol._auth_env['X-ROLES'] = 'my-roles'
send_mock = mock.patch.object(self.protocol, 'sendMessage')
self.addCleanup(send_mock.stop)
send_mock.start()
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
[subscriber] = list(
next(
self.boot.storage.subscription_controller.list(
'kitkat', self.project_id)))
self.addCleanup(
self.boot.storage.subscription_controller.delete, 'kitkat',
subscriber['id'], project=self.project_id)
self.assertEqual('trust+http://example.com',
subscriber['subscriber'])
self.assertEqual({'trust_id': 'trust_id'}, subscriber['options'])
self.assertEqual('user-id', create_trust.call_args[0][1])
self.assertEqual(self.project_id, create_trust.call_args[0][2])
self.assertEqual(['my-roles'], create_trust.call_args[0][3])
def test_subscription_delete(self):
sub = self.boot.storage.subscription_controller.create(
'kitkat', '', 600, {}, project=self.project_id)

View File

@ -19,6 +19,7 @@ import falcon
import mock
from oslo_serialization import jsonutils
from zaqar.common import auth
from zaqar.storage import errors as storage_errors
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
@ -333,3 +334,22 @@ class TestSubscriptionsMongoDB(base.V2Base):
resp = self.simulate_get(self.subscription_path + '/' + sid,
headers=self.headers)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
@mock.patch.object(auth, 'create_trust_id')
def test_create_with_trust(self, create_trust):
create_trust.return_value = 'trust_id'
self.headers['X-USER-ID'] = 'user-id'
self.headers['X-ROLES'] = 'my-roles'
self._create_subscription('trust+http://example.com')
self.assertEqual(falcon.HTTP_201, self.srmock.status)
self.assertEqual('user-id', create_trust.call_args[0][1])
self.assertEqual(self.project_id, create_trust.call_args[0][2])
self.assertEqual(['my-roles'], create_trust.call_args[0][3])
resp_list = self.simulate_get(self.subscription_path,
headers=self.headers)
resp_list_doc = jsonutils.loads(resp_list[0])
options = resp_list_doc['subscriptions'][0]['options']
self.assertEqual({'a': 1, 'trust_id': 'trust_id'}, options)

View File

@ -75,7 +75,8 @@ _TRANSPORT_LIMITS_OPTIONS = (
deprecated_group='limits:transport',
help='Defines the maximum message grace period in seconds.'),
cfg.ListOpt('subscriber_types', default=['http', 'https', 'mailto'],
cfg.ListOpt('subscriber_types', default=['http', 'https', 'mailto',
'trust+http', 'trust+https'],
help='Defines supported subscriber types.'),
)

View File

@ -63,6 +63,7 @@ class MessagingProtocol(websocket.WebSocketServerProtocol):
self._auth_strategy = auth_strategy
self._loop = loop
self._authentified = False
self._auth_env = None
self._auth_app = None
self._auth_in_binary = None
self._deauth_handle = None
@ -103,7 +104,7 @@ class MessagingProtocol(websocket.WebSocketServerProtocol):
resp = self._handler.create_response(400, body)
return self._send_response(resp, isBinary)
# Parse the request
req = self._handler.create_request(payload)
req = self._handler.create_request(payload, self._auth_env)
# Validate and process the request
resp = self._handler.validate_request(payload, req)
if resp is None:
@ -155,6 +156,9 @@ class MessagingProtocol(websocket.WebSocketServerProtocol):
def _auth_start(self, env, start_response):
self._authentified = True
self._auth_env = dict(
(self._env_var_to_header(key), value)
for key, value in env.items())
self._auth_app = None
expire = env['keystone.token_info']['token']['expires_at']
expire_time = timeutils.parse_isotime(expire)
@ -169,6 +173,7 @@ class MessagingProtocol(websocket.WebSocketServerProtocol):
def _deauthenticate(self):
self._authentified = False
self._auth_env = None
self.sendClose(4003, u'Authentication expired.')
def _auth_response(self, status, message):
@ -186,6 +191,12 @@ class MessagingProtocol(websocket.WebSocketServerProtocol):
def _header_to_env_var(self, key):
return 'HTTP_%s' % key.replace('-', '_').upper()
def _env_var_to_header(self, key):
if key.startswith("HTTP_"):
return key[5:].replace("_", "-")
else:
return key
def _send_response(self, resp, in_binary):
if in_binary:
pack_name = 'bin'

View File

@ -15,7 +15,9 @@
import falcon
from oslo_log import log as logging
from oslo_utils import netutils
import six
from stevedore import driver
from zaqar.common import decorators
from zaqar.i18n import _
@ -176,8 +178,15 @@ class CollectionResource(object):
self._queue_controller.create(queue_name, project=project_id)
self._validate.subscription_posting(document)
subscriber = document['subscriber']
ttl = document.get('ttl', self._default_subscription_ttl)
options = document.get('options', {})
url = netutils.urlsplit(subscriber)
ttl = document.get('ttl', self._default_subscription_ttl)
mgr = driver.DriverManager('zaqar.notification.tasks', url.scheme,
invoke_on_load=True)
req_data = req.headers.copy()
req_data.update(req.env)
mgr.driver.register(subscriber, options, ttl, project_id, req_data)
created = self._subscription_controller.create(queue_name,
subscriber,
ttl,