From 3c13cded2b52e999a19e81f1eae9d5051bb02831 Mon Sep 17 00:00:00 2001 From: Eric K Date: Mon, 7 May 2018 15:55:58 -0700 Subject: [PATCH] datasource driver framework to accept webhook Add an api/webhook_model and datasource_driver framework so that a datasource driver can accept webhook notifications by defining the _webhook_handler method. Accepts webhook POST to the following URL scheme: /v1/data-sources//webhook Change-Id: I5d13ddcf6d355b6f2f498cfeca6ec7db2ff049b1 --- congress/api/router.py | 8 +++++ congress/api/webhook_model.py | 41 +++++++++++++++++++++++ congress/datasources/datasource_driver.py | 24 +++++++++++++ congress/harness.py | 2 ++ congress/tests/api/test_webhook_model.py | 36 ++++++++++++++++++++ congress/tests/fake_datasource.py | 3 ++ 6 files changed, 114 insertions(+) create mode 100644 congress/api/webhook_model.py create mode 100644 congress/tests/api/test_webhook_model.py diff --git a/congress/api/router.py b/congress/api/router.py index 98536d22e..d0a6f4705 100644 --- a/congress/api/router.py +++ b/congress/api/router.py @@ -143,6 +143,14 @@ class APIRouterV1(object): row_element_handler = webservice.ElementHandler(row_path, table_rows) resource_mgr.register_handler(row_element_handler) + # Setup /v1/data-sources//webhook + webhook = process_dict['api-webhook'] + webhook_path = "%s/webhook" % ds_path + webhook_collection_handler = webservice.CollectionHandler( + webhook_path, + webhook) + resource_mgr.register_handler(webhook_collection_handler) + # Setup /v1/system/datasource-drivers system = process_dict['api-system'] # NOTE(arosen): start url out with datasource-drivers since we don't diff --git a/congress/api/webhook_model.py b/congress/api/webhook_model.py new file mode 100644 index 000000000..bdec8c5d6 --- /dev/null +++ b/congress/api/webhook_model.py @@ -0,0 +1,41 @@ +# Copyright (c) 2018 VMware, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import print_function +from __future__ import division +from __future__ import absolute_import + +from congress.api import api_utils +from congress.api import base +from congress.api import webservice +from congress import exception + + +class WebhookModel(base.APIModel): + """Model for handling webhook notifications.""" + + def add_item(self, item, id_=None, context=None): + """POST webhook notification. + + :param item: The webhook payload + :param id_: not used in this case; should be None + :param context: Key-values providing frame of reference of request + """ + caller, source_id = api_utils.get_id_from_context(context) + try: + args = {'payload': item} + # Note(thread-safety): blocking call + self.invoke_rpc(caller, 'process_webhook_notification', args) + except exception.CongressException as e: + raise webservice.DataModelException.create(e) diff --git a/congress/datasources/datasource_driver.py b/congress/datasources/datasource_driver.py index 034c3a2bb..53ff8d6b6 100644 --- a/congress/datasources/datasource_driver.py +++ b/congress/datasources/datasource_driver.py @@ -1193,6 +1193,25 @@ class PushedDataSourceDriver(DataSourceDriver): else: raise Exception('Push-type datasource driver does not have ID') + # Note (thread-safety): blocking function + def process_webhook_notification(self, payload): + self.prior_state = dict(self.state) + # call specific webhook handler of driver + updated_tables = self._webhook_handler(payload) + self.number_of_updates += 1 + self.last_updated_time = datetime.datetime.now() + + # persist in DB + if self.persist_data: + for tablename in updated_tables: + if self.ds_id is not None: + # Note (thread-safety): blocking call + db_ds_table_data.store_ds_table_data( + self.ds_id, tablename, self.state[tablename]) + else: + raise Exception( + 'Push-type datasource driver does not have ID') + class PushedDataSourceDriverEndpoints(data_service.DataServiceEndPoints): def __init__(self, service): @@ -1203,6 +1222,11 @@ class PushedDataSourceDriverEndpoints(data_service.DataServiceEndPoints): # Note (thread-safety): blocking call return self.service.replace_entire_table_data(table_id, objs) + # Note (thread-safety): blocking function + def process_webhook_notification(self, context, payload): + # Note (thread-safety): blocking call + return self.service.process_webhook_notification(payload) + class PollingDataSourceDriver(DataSourceDriver): def __init__(self, name='', args=None): diff --git a/congress/harness.py b/congress/harness.py index b6dbf7ba2..0e9a16877 100644 --- a/congress/harness.py +++ b/congress/harness.py @@ -35,6 +35,7 @@ from congress.api import schema_model from congress.api import status_model from congress.api.system import driver_model from congress.api import table_model +from congress.api import webhook_model from congress.db import datasources as db_datasources from congress.dse2 import datasource_manager as ds_manager from congress.dse2 import dse_node @@ -129,6 +130,7 @@ def create_api_models(bus): 'api-datasource', bus=bus) res['api-schema'] = schema_model.SchemaModel('api-schema', bus=bus) res['api-table'] = table_model.TableModel('api-table', bus=bus) + res['api-webhook'] = webhook_model.WebhookModel('api-webhook', bus=bus) res['api-status'] = status_model.StatusModel('api-status', bus=bus) res['api-action'] = action_model.ActionsModel('api-action', bus=bus) res['api-system'] = driver_model.DatasourceDriverModel( diff --git a/congress/tests/api/test_webhook_model.py b/congress/tests/api/test_webhook_model.py new file mode 100644 index 000000000..363cc798a --- /dev/null +++ b/congress/tests/api/test_webhook_model.py @@ -0,0 +1,36 @@ +# Copyright (c) 2018 VMware Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from __future__ import division +from __future__ import absolute_import + +from congress.tests.api import base as api_base +from congress.tests import base + + +class TestWebhookModel(base.SqlTestCase): + def setUp(self): + super(TestWebhookModel, self).setUp() + services = api_base.setup_config() + self.webhook_model = services['api']['api-webhook'] + self.node = services['node'] + self.data = services['data'] + + def test_add_item(self): + context = {'ds_id': self.data.service_id} + payload = {'test_payload': 'test_payload'} + self.webhook_model.add_item(item=payload, context=context) + self.assertEqual(self.data.webhook_payload, payload) diff --git a/congress/tests/fake_datasource.py b/congress/tests/fake_datasource.py index 2a500cb0c..a1316f3ed 100644 --- a/congress/tests/fake_datasource.py +++ b/congress/tests/fake_datasource.py @@ -71,3 +71,6 @@ class FakeDataSource(datasource_driver.PollingDataSourceDriver, def execute(self, action, action_args): self.exec_history.append((action, action_args)) + + def _webhook_handler(self, payload): + self.webhook_payload = payload