diff --git a/devstack/plugin.sh b/devstack/plugin.sh index bb8e6a428..a02a9c7f5 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -87,7 +87,7 @@ function configure_zaqar { iniset $ZAQAR_CONF signed_url secret_key notreallysecret if is_service_enabled key; then - iniset $ZAQAR_CONF DEFAULT auth_strategy keystone + iniset $ZAQAR_CONF DEFAULT auth_strategy keystone fi iniset $ZAQAR_CONF storage message_pipeline zaqar.notification.notifier @@ -100,6 +100,12 @@ function configure_zaqar { configure_auth_token_middleware $ZAQAR_CONF zaqar $ZAQAR_AUTH_CACHE_DIR + iniset $ZAQAR_CONF trustee auth_plugin password + iniset $ZAQAR_CONF trustee auth_url $KEYSTONE_AUTH_URI + iniset $ZAQAR_CONF trustee username $ZAQAR_TRUSTEE_USER + iniset $ZAQAR_CONF trustee password $ZAQAR_TRUSTEE_PASSWORD + iniset $ZAQAR_CONF trustee user_domain_id $ZAQAR_TRUSTEE_DOMAIN + iniset $ZAQAR_CONF DEFAULT pooling True iniset $ZAQAR_CONF 'pooling:catalog' enable_virtual_pool True diff --git a/devstack/settings b/devstack/settings index 65a2319fc..a7da058d5 100644 --- a/devstack/settings +++ b/devstack/settings @@ -32,6 +32,11 @@ ZAQAR_SERVICE_PORT=${ZAQAR_SERVICE_PORT:-8888} ZAQAR_WEBSOCKET_PORT=${ZAQAR_WEBSOCKET_PORT:-9000} ZAQAR_SERVICE_PROTOCOL=${ZAQAR_SERVICE_PROTOCOL:-$SERVICE_PROTOCOL} +# Set Zaqar trust configuration +ZAQAR_TRUSTEE_USER=${ZAQAR_TRUSTEE_USER:-zaqar} +ZAQAR_TRUSTEE_PASSWORD=${ZAQAR_TRUSTEE_PASSWORD:-$SERVICE_PASSWORD} +ZAQAR_TRUSTEE_DOMAIN=${ZAQAR_TRUSTEE_DOMAIN:-default} + # Tell Tempest this project is present TEMPEST_SERVICES+=,zaqar diff --git a/etc/oslo-config-generator/zaqar.conf b/etc/oslo-config-generator/zaqar.conf index e8a56c309..02324b05d 100644 --- a/etc/oslo-config-generator/zaqar.conf +++ b/etc/oslo-config-generator/zaqar.conf @@ -1,6 +1,7 @@ [DEFAULT] output_file = etc/zaqar.conf.sample namespace = zaqar.bootstrap +namespace = zaqar.common.auth namespace = zaqar.common.configs namespace = zaqar.storage.pipeline namespace = zaqar.storage.pooling diff --git a/releasenotes/notes/add-a-notifier-using-trust-271d9cd1d2b4cdeb.yaml b/releasenotes/notes/add-a-notifier-using-trust-271d9cd1d2b4cdeb.yaml new file mode 100644 index 000000000..1f6ddb508 --- /dev/null +++ b/releasenotes/notes/add-a-notifier-using-trust-271d9cd1d2b4cdeb.yaml @@ -0,0 +1,10 @@ +--- +features: + - Add a new webhook notifier using trust authentication. When using the + 'trust+' URL prefix, Zaqar will create a Keystone trust for the user, and + then use it when a notification happens to authenticate against Keystone + and send the token to the endpoint. + - Support 'post_data' and 'post_headers' options on subscribers, allowing + customization of the payload when having a webhook subscriber. The + 'post_data' option supports the '$zaqar_message$' string template, which + will be replaced by the serialized JSON message if specified. diff --git a/setup.cfg b/setup.cfg index 85bafbd8b..18877fee1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -48,6 +48,7 @@ zaqar.transport = websocket = zaqar.transport.websocket.driver:Driver oslo.config.opts = + zaqar.common.auth = zaqar.common.auth:_config_options zaqar.common.configs = zaqar.common.configs:_config_options zaqar.storage.pipeline = zaqar.storage.pipeline:_config_options zaqar.storage.pooling = zaqar.storage.pooling:_config_options @@ -72,6 +73,8 @@ zaqar.notification.tasks = http = zaqar.notification.tasks.webhook:WebhookTask https = zaqar.notification.tasks.webhook:WebhookTask mailto = zaqar.notification.tasks.mailto:MailtoTask + trust+http = zaqar.notification.tasks.trust:TrustTask + trust+https = zaqar.notification.tasks.trust:TrustTask tempest.test_plugins = zaqar_tests = zaqar.tests.tempest_plugin.plugin:ZaqarTempestPlugin diff --git a/zaqar/api/handler.py b/zaqar/api/handler.py index dd792b6e2..fecbc0d36 100644 --- a/zaqar/api/handler.py +++ b/zaqar/api/handler.py @@ -91,7 +91,7 @@ class Handler(object): return response.Response(req, body, headers) @staticmethod - def create_request(payload=None): + def create_request(payload=None, env=None): if payload is None: payload = {} action = payload.get('action') @@ -99,7 +99,7 @@ class Handler(object): headers = payload.get('headers') return request.Request(action=action, body=body, - headers=headers, api="v2") + headers=headers, api="v2", env=env) def get_defaults(self): return self.v2_endpoints._defaults diff --git a/zaqar/api/v2/endpoints.py b/zaqar/api/v2/endpoints.py index 547b97dd5..874cdb6eb 100644 --- a/zaqar/api/v2/endpoints.py +++ b/zaqar/api/v2/endpoints.py @@ -12,7 +12,10 @@ # License for the specific language governing permissions and limitations under # the License. +from stevedore import driver + from oslo_log import log as logging +from oslo_utils import netutils from zaqar.common.api import errors as api_errors from zaqar.common.api import response @@ -798,6 +801,8 @@ class Endpoints(object): """ project_id = req._headers.get('X-Project-ID') queue_name = req._body.get('queue_name') + options = req._body.get('options', {}) + ttl = req._body.get('ttl', self._defaults.subscription_ttl) LOG.debug( u'Subscription create - queue: %(queue)s, project: %(project)s', @@ -805,9 +810,15 @@ class Endpoints(object): 'project': project_id}) try: + url = netutils.urlsplit(subscriber) + mgr = driver.DriverManager('zaqar.notification.tasks', url.scheme, + invoke_on_load=True) + req_data = req._env.copy() + mgr.driver.register(subscriber, options, ttl, project_id, req_data) + data = {'subscriber': subscriber, - 'options': req._body.get('options'), - 'ttl': req._body.get('ttl')} + 'options': options, + 'ttl': ttl} self._validate.subscription_posting(data) self._validate.queue_identification(queue_name, project_id) if not self._queue_controller.exists(queue_name, project_id): diff --git a/zaqar/common/api/request.py b/zaqar/common/api/request.py index 3d4d8c117..3760281f8 100644 --- a/zaqar/common/api/request.py +++ b/zaqar/common/api/request.py @@ -14,10 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json - -from zaqar.common import decorators - class Request(object): """General data for a Zaqar request @@ -33,20 +29,17 @@ class Request(object): :type headers: dict :param api: Api entry point. i.e: 'queues.v1' :type api: `six.text_type`. + :param env: Request environment. Default: None + :type env: dict """ def __init__(self, action, - body=None, headers=None, api=None): + body=None, headers=None, api=None, env=None): self._action = action self._body = body self._headers = headers or {} self._api = api - - @decorators.lazy_property() - def deserialized_content(self): - if self._body is not None: - return json.loads(self._body) - return None + self._env = env or {} def get_request(self): return {'action': self._action, diff --git a/zaqar/common/auth.py b/zaqar/common/auth.py new file mode 100644 index 000000000..3ab9196ab --- /dev/null +++ b/zaqar/common/auth.py @@ -0,0 +1,66 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from keystoneauth1 import loading +from keystoneauth1 import session +from keystoneclient.v3 import client +from oslo_config import cfg + + +PASSWORD_PLUGIN = 'password' +TRUSTEE_CONF_GROUP = 'trustee' +loading.register_auth_conf_options(cfg.CONF, TRUSTEE_CONF_GROUP) + + +def _config_options(): + trustee_opts = loading.get_auth_common_conf_options() + trustee_opts.extend(loading.get_auth_plugin_conf_options(PASSWORD_PLUGIN)) + yield TRUSTEE_CONF_GROUP, trustee_opts + + +def get_trusted_token(trust_id): + """Return a Keystone token using the given trust_id.""" + auth_plugin = loading.load_auth_from_conf_options( + cfg.CONF, TRUSTEE_CONF_GROUP, trust_id=trust_id) + + trust_session = session.Session(auth=auth_plugin) + return trust_session.auth.get_access(trust_session).auth_token + + +def _get_admin_session(): + auth_plugin = loading.load_auth_from_conf_options( + cfg.CONF, TRUSTEE_CONF_GROUP) + return session.Session(auth=auth_plugin) + + +def _get_user_client(auth_plugin): + sess = session.Session(auth=auth_plugin) + return client.Client(session=sess) + + +def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles, + expires_at): + """Create a trust with the given user for the configured trustee user.""" + admin_session = _get_admin_session() + trustee_user_id = admin_session.get_user_id() + + client = _get_user_client(auth_plugin) + trust = client.trusts.create(trustor_user=trustor_user_id, + trustee_user=trustee_user_id, + project=trustor_project_id, + impersonation=True, + role_names=roles, + expires_at=expires_at) + return trust.id diff --git a/zaqar/notification/tasks/mailto.py b/zaqar/notification/tasks/mailto.py index f13ec3af8..1b7a5169a 100644 --- a/zaqar/notification/tasks/mailto.py +++ b/zaqar/notification/tasks/mailto.py @@ -51,3 +51,6 @@ class MailtoTask(object): 'because %s.') % str(err)) except Exception as exc: LOG.exception(_LE('Failed to send email because %s.') % str(exc)) + + def register(self, subscriber, options, ttl, project_id, request_data): + pass diff --git a/zaqar/notification/tasks/trust.py b/zaqar/notification/tasks/trust.py new file mode 100644 index 000000000..7989cfe25 --- /dev/null +++ b/zaqar/notification/tasks/trust.py @@ -0,0 +1,63 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import datetime + +from oslo_utils import timeutils + +from zaqar.common import auth +from zaqar.notification.tasks import webhook + + +class TrustTask(webhook.WebhookTask): + """A webhook using trust authentication. + + This webhook will use the trust stored in the subscription to ask for a + token, which will then be passed to the notified service. + """ + + def execute(self, subscription, messages, **kwargs): + subscription = copy.deepcopy(subscription) + subscriber = subscription['subscriber'] + + trust_id = subscription['options']['trust_id'] + token = auth.get_trusted_token(trust_id) + + subscription['subscriber'] = subscriber[6:] + headers = {'X-Auth-Token': token, + 'Content-Type': 'application/json'} + super(TrustTask, self).execute(subscription, messages, headers, + **kwargs) + + def register(self, subscriber, options, ttl, project_id, request_data): + if 'trust_id' not in options: + # We have a trust subscriber without a trust ID, + # create it + trustor_user_id = request_data.get('X-USER-ID') + roles = request_data.get('X-ROLES', '') + if roles: + roles = roles.split(',') + else: + roles = [] + auth_plugin = request_data.get('keystone.token_auth') + expires_at = None + if ttl: + expires_at = timeutils.utcnow() + datetime.timedelta( + seconds=ttl) + + trust_id = auth.create_trust_id( + auth_plugin, trustor_user_id, project_id, roles, + expires_at) + options['trust_id'] = trust_id diff --git a/zaqar/notification/tasks/webhook.py b/zaqar/notification/tasks/webhook.py index e2e0e7da3..ecc66cfb3 100644 --- a/zaqar/notification/tasks/webhook.py +++ b/zaqar/notification/tasks/webhook.py @@ -24,15 +24,26 @@ LOG = logging.getLogger(__name__) class WebhookTask(object): - def execute(self, subscription, messages, **kwargs): + def execute(self, subscription, messages, headers=None, **kwargs): + if headers is None: + headers = {'Content-Type': 'application/json'} + headers.update(subscription['options'].get('post_headers', {})) try: for msg in messages: # NOTE(Eva-i): Unfortunately this will add 'queue_name' key to # our original messages(dicts) which will be later consumed in # the storage controller. It seems safe though. msg['queue_name'] = subscription['source'] + if 'post_data' in subscription['options']: + data = subscription['options']['post_data'] + data = data.replace('"$zaqar_message$"', json.dumps(msg)) + else: + data = json.dumps(msg) requests.post(subscription['subscriber'], - data=json.dumps(msg), - headers={'Content-Type': 'application/json'}) + data=data, + headers=headers) except Exception as e: LOG.exception(_LE('webhook task got exception: %s.') % str(e)) + + def register(self, subscriber, options, ttl, project_id, request_data): + pass diff --git a/zaqar/tests/tempest_plugin/api_schema/response/v2/queues.py b/zaqar/tests/tempest_plugin/api_schema/response/v2/queues.py index 4a10efe7e..e22ffaac7 100644 --- a/zaqar/tests/tempest_plugin/api_schema/response/v2/queues.py +++ b/zaqar/tests/tempest_plugin/api_schema/response/v2/queues.py @@ -130,7 +130,7 @@ message_ttl = { list_messages_links = { 'type': 'array', 'maxItems': 1, - 'minItems': 1, + 'minItems': 0, 'items': { 'type': 'object', 'properties': { @@ -143,7 +143,7 @@ list_messages_links = { list_messages_response = { 'type': 'array', - 'minItems': 1, + 'minItems': 0, 'items': { 'type': 'object', 'properties': { diff --git a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py index 47c6f34f2..303af388e 100644 --- a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py +++ b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py @@ -13,9 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json +import uuid from tempest.common.utils import data_utils from tempest.lib import decorators +from tempest import test from zaqar.tests.tempest_plugin.tests import base @@ -91,6 +94,32 @@ class TestSubscriptions(base.BaseV2MessagingTest): subscription_id = result[1]["subscription_id"] self.delete_subscription(self.queue_name, subscription_id) + @decorators.idempotent_id('ff4344b4-ba78-44c5-9ffc-44e53e484f76') + def test_trust_subscription(self): + sub_queue = data_utils.rand_name('Queues-Test') + self.addCleanup(self.client.delete_queue, sub_queue) + subscriber = 'trust+{0}/{1}/queues/{2}/messages'.format( + self.client.base_url, self.client.uri_prefix, sub_queue) + post_body = json.dumps( + {'messages': [{'body': '$zaqar_message$', 'ttl': 60}]}) + post_headers = {'X-Project-ID': self.client.tenant_id, + 'Client-ID': str(uuid.uuid4())} + sub_body = {'ttl': 1200, 'subscriber': subscriber, + 'options': {'post_data': post_body, + 'post_headers': post_headers}} + + self.create_subscription(queue_name=self.queue_name, rbody=sub_body) + message_body = self.generate_message_body() + self.post_messages(queue_name=self.queue_name, rbody=message_body) + + if not test.call_until_true( + lambda: self.list_messages(sub_queue)[1]['messages'], 10, 1): + self.fail("Couldn't get messages") + messages = self.list_messages(sub_queue) + expected = message_body['messages'][0] + expected['queue_name'] = self.queue_name + self.assertEqual(expected, messages[1]['messages'][0]['body']) + @classmethod def resource_cleanup(cls): cls.delete_queue(cls.queue_name) diff --git a/zaqar/tests/unit/common/test_request.py b/zaqar/tests/unit/common/test_request.py index 25391bade..070d72566 100644 --- a/zaqar/tests/unit/common/test_request.py +++ b/zaqar/tests/unit/common/test_request.py @@ -21,8 +21,11 @@ from zaqar.tests import base class TestRequest(base.TestBase): - def test_request_deserialized(self): + def test_request(self): action = 'message_post' - data = '{"data": "tons of GBs"}' - req = request.Request(action=action, body=data) - self.assertIsInstance(req.deserialized_content, dict) + data = 'body' + env = {'foo': 'bar'} + req = request.Request(action=action, body=data, env=env) + self.assertEqual({'foo': 'bar'}, req._env) + self.assertEqual('body', req._body) + self.assertEqual('message_post', req._action) diff --git a/zaqar/tests/unit/notification/test_notifier.py b/zaqar/tests/unit/notification/test_notifier.py index cd9656bc8..eb11ce647 100644 --- a/zaqar/tests/unit/notification/test_notifier.py +++ b/zaqar/tests/unit/notification/test_notifier.py @@ -54,11 +54,14 @@ class NotifierTest(testing.TestBase): def test_webhook(self): subscription = [{'subscriber': 'http://trigger_me', - 'source': 'fake_queue'}, + 'source': 'fake_queue', + 'options': {}}, {'subscriber': 'http://call_me', - 'source': 'fake_queue'}, + 'source': 'fake_queue', + 'options': {}}, {'subscriber': 'http://ping_me', - 'source': 'fake_queue'}] + 'source': 'fake_queue', + 'options': {}}] ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([subscription, {}])) driver = notifier.NotifierDriver(subscription_controller=ctlr) @@ -98,11 +101,45 @@ class NotifierTest(testing.TestBase): ], any_order=True) self.assertEqual(6, len(mock_post.mock_calls)) + def test_webhook_post_data(self): + post_data = {'foo': 'bar', 'egg': '$zaqar_message$'} + subscription = [{'subscriber': 'http://trigger_me', + 'source': 'fake_queue', + 'options': {'post_data': json.dumps(post_data)}}] + ctlr = mock.MagicMock() + ctlr.list = mock.Mock(return_value=iter([subscription, {}])) + driver = notifier.NotifierDriver(subscription_controller=ctlr) + headers = {'Content-Type': 'application/json'} + with mock.patch('requests.post') as mock_post: + driver.post('fake_queue', self.messages, self.client_id, + self.project) + driver.executor.shutdown() + # Let's deserialize "data" from JSON string to dict in each mock + # call, so we can do dict comparisons. JSON string comparisons + # often fail, because dict keys can be serialized in different + # order inside the string. + for call in mock_post.call_args_list: + call[1]['data'] = json.loads(call[1]['data']) + # These are not real calls. In real calls each "data" argument is + # serialized by json.dumps. But we made a substitution before, + # so it will work. + mock_post.assert_has_calls([ + mock.call(subscription[0]['subscriber'], + data={'foo': 'bar', 'egg': self.notifications[0]}, + headers=headers), + mock.call(subscription[0]['subscriber'], + data={'foo': 'bar', 'egg': self.notifications[1]}, + headers=headers), + ], any_order=True) + self.assertEqual(2, len(mock_post.mock_calls)) + def test_marker(self): subscription1 = [{'subscriber': 'http://trigger_me1', - 'source': 'fake_queue'}] + 'source': 'fake_queue', + 'options': {}}] subscription2 = [{'subscriber': 'http://trigger_me2', - 'source': 'fake_queue'}] + 'source': 'fake_queue', + 'options': {}}] ctlr = mock.MagicMock() def mock_list(queue, project, marker): @@ -141,12 +178,12 @@ class NotifierTest(testing.TestBase): def test_mailto(self, mock_popen): subscription = [{'subscriber': 'mailto:aaa@example.com', 'source': 'fake_queue', - 'options': {'subject': 'Hello', - 'from': 'zaqar@example.com'}}, + 'options': {'subject': 'Hello', + 'from': 'zaqar@example.com'}}, {'subscriber': 'mailto:bbb@example.com', 'source': 'fake_queue', - 'options': {'subject': 'Hello', - 'from': 'zaqar@example.com'}}] + 'options': {'subject': 'Hello', + 'from': 'zaqar@example.com'}}] ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([subscription, {}])) driver = notifier.NotifierDriver(subscription_controller=ctlr) @@ -208,7 +245,8 @@ class NotifierTest(testing.TestBase): def test_proper_notification_data(self): subscription = [{'subscriber': 'http://trigger_me', - 'source': 'fake_queue'}] + 'source': 'fake_queue', + 'options': {}}] ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([subscription, {}])) driver = notifier.NotifierDriver(subscription_controller=ctlr) diff --git a/zaqar/tests/unit/transport/websocket/v2/test_auth.py b/zaqar/tests/unit/transport/websocket/v2/test_auth.py index a2c7fa3ec..c9718f9b7 100644 --- a/zaqar/tests/unit/transport/websocket/v2/test_auth.py +++ b/zaqar/tests/unit/transport/websocket/v2/test_auth.py @@ -67,6 +67,18 @@ class AuthTest(base.V2Base): self.assertEqual(1, len(responses)) self.assertEqual('200 OK', responses[0]) + # Check that the env is available to future requests + req = json.dumps({'action': 'message_list', + 'body': {'queue_name': 'myqueue'}, + 'headers': self.headers}) + process_request = mock.patch.object(self.protocol._handler, + 'process_request').start() + process_request.return_value = self.protocol._handler.create_response( + 200, {}) + self.protocol.onMessage(req, False) + self.assertEqual(1, process_request.call_count) + self.assertEqual(self.env, process_request.call_args[0][0]._env) + def test_post_between_auth(self): headers = self.headers.copy() headers['X-Auth-Token'] = 'mytoken1' diff --git a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py index f9e45db1c..5ac39b81d 100644 --- a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py +++ b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py @@ -20,6 +20,7 @@ import uuid import mock import msgpack +from zaqar.common import auth from zaqar.storage import errors as storage_errors from zaqar.tests.unit.transport.websocket import base from zaqar.tests.unit.transport.websocket import utils as test_utils @@ -118,6 +119,37 @@ class SubscriptionTest(base.V1_1Base): 'kitkat', self.project_id))) self.assertEqual([], subscribers) + @mock.patch.object(auth, 'create_trust_id') + def test_subscription_create_trust(self, create_trust): + create_trust.return_value = 'trust_id' + action = 'subscription_create' + body = {'queue_name': 'kitkat', 'ttl': 600, + 'subscriber': 'trust+http://example.com'} + self.protocol._auth_env = {} + self.protocol._auth_env['X-USER-ID'] = 'user-id' + self.protocol._auth_env['X-ROLES'] = 'my-roles' + + send_mock = mock.patch.object(self.protocol, 'sendMessage') + self.addCleanup(send_mock.stop) + send_mock.start() + + req = test_utils.create_request(action, body, self.headers) + self.protocol.onMessage(req, False) + [subscriber] = list( + next( + self.boot.storage.subscription_controller.list( + 'kitkat', self.project_id))) + self.addCleanup( + self.boot.storage.subscription_controller.delete, 'kitkat', + subscriber['id'], project=self.project_id) + self.assertEqual('trust+http://example.com', + subscriber['subscriber']) + self.assertEqual({'trust_id': 'trust_id'}, subscriber['options']) + + self.assertEqual('user-id', create_trust.call_args[0][1]) + self.assertEqual(self.project_id, create_trust.call_args[0][2]) + self.assertEqual(['my-roles'], create_trust.call_args[0][3]) + def test_subscription_delete(self): sub = self.boot.storage.subscription_controller.create( 'kitkat', '', 600, {}, project=self.project_id) diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py index 9cf6fec51..0f929601f 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py @@ -19,6 +19,7 @@ import falcon import mock from oslo_serialization import jsonutils +from zaqar.common import auth from zaqar.storage import errors as storage_errors from zaqar import tests as testing from zaqar.tests.unit.transport.wsgi import base @@ -333,3 +334,22 @@ class TestSubscriptionsMongoDB(base.V2Base): resp = self.simulate_get(self.subscription_path + '/' + sid, headers=self.headers) self.assertEqual(falcon.HTTP_404, self.srmock.status) + + @mock.patch.object(auth, 'create_trust_id') + def test_create_with_trust(self, create_trust): + create_trust.return_value = 'trust_id' + self.headers['X-USER-ID'] = 'user-id' + self.headers['X-ROLES'] = 'my-roles' + self._create_subscription('trust+http://example.com') + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + self.assertEqual('user-id', create_trust.call_args[0][1]) + self.assertEqual(self.project_id, create_trust.call_args[0][2]) + self.assertEqual(['my-roles'], create_trust.call_args[0][3]) + + resp_list = self.simulate_get(self.subscription_path, + headers=self.headers) + resp_list_doc = jsonutils.loads(resp_list[0]) + options = resp_list_doc['subscriptions'][0]['options'] + + self.assertEqual({'a': 1, 'trust_id': 'trust_id'}, options) diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index 43bf3359b..dd539f3aa 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -75,7 +75,8 @@ _TRANSPORT_LIMITS_OPTIONS = ( deprecated_group='limits:transport', help='Defines the maximum message grace period in seconds.'), - cfg.ListOpt('subscriber_types', default=['http', 'https', 'mailto'], + cfg.ListOpt('subscriber_types', default=['http', 'https', 'mailto', + 'trust+http', 'trust+https'], help='Defines supported subscriber types.'), ) diff --git a/zaqar/transport/websocket/protocol.py b/zaqar/transport/websocket/protocol.py index b786ecabc..e1d091c21 100644 --- a/zaqar/transport/websocket/protocol.py +++ b/zaqar/transport/websocket/protocol.py @@ -63,6 +63,7 @@ class MessagingProtocol(websocket.WebSocketServerProtocol): self._auth_strategy = auth_strategy self._loop = loop self._authentified = False + self._auth_env = None self._auth_app = None self._auth_in_binary = None self._deauth_handle = None @@ -103,7 +104,7 @@ class MessagingProtocol(websocket.WebSocketServerProtocol): resp = self._handler.create_response(400, body) return self._send_response(resp, isBinary) # Parse the request - req = self._handler.create_request(payload) + req = self._handler.create_request(payload, self._auth_env) # Validate and process the request resp = self._handler.validate_request(payload, req) if resp is None: @@ -155,6 +156,9 @@ class MessagingProtocol(websocket.WebSocketServerProtocol): def _auth_start(self, env, start_response): self._authentified = True + self._auth_env = dict( + (self._env_var_to_header(key), value) + for key, value in env.items()) self._auth_app = None expire = env['keystone.token_info']['token']['expires_at'] expire_time = timeutils.parse_isotime(expire) @@ -169,6 +173,7 @@ class MessagingProtocol(websocket.WebSocketServerProtocol): def _deauthenticate(self): self._authentified = False + self._auth_env = None self.sendClose(4003, u'Authentication expired.') def _auth_response(self, status, message): @@ -186,6 +191,12 @@ class MessagingProtocol(websocket.WebSocketServerProtocol): def _header_to_env_var(self, key): return 'HTTP_%s' % key.replace('-', '_').upper() + def _env_var_to_header(self, key): + if key.startswith("HTTP_"): + return key[5:].replace("_", "-") + else: + return key + def _send_response(self, resp, in_binary): if in_binary: pack_name = 'bin' diff --git a/zaqar/transport/wsgi/v2_0/subscriptions.py b/zaqar/transport/wsgi/v2_0/subscriptions.py index 0510a36a3..6b65ecd51 100644 --- a/zaqar/transport/wsgi/v2_0/subscriptions.py +++ b/zaqar/transport/wsgi/v2_0/subscriptions.py @@ -15,7 +15,9 @@ import falcon from oslo_log import log as logging +from oslo_utils import netutils import six +from stevedore import driver from zaqar.common import decorators from zaqar.i18n import _ @@ -176,8 +178,15 @@ class CollectionResource(object): self._queue_controller.create(queue_name, project=project_id) self._validate.subscription_posting(document) subscriber = document['subscriber'] - ttl = document.get('ttl', self._default_subscription_ttl) options = document.get('options', {}) + url = netutils.urlsplit(subscriber) + ttl = document.get('ttl', self._default_subscription_ttl) + mgr = driver.DriverManager('zaqar.notification.tasks', url.scheme, + invoke_on_load=True) + req_data = req.headers.copy() + req_data.update(req.env) + mgr.driver.register(subscriber, options, ttl, project_id, req_data) + created = self._subscription_controller.create(queue_name, subscriber, ttl,