datasource driver framework to accept webhook

Add an api/webhook_model and datasource_driver framework
so that a datasource driver can accept webhook
notifications by defining the _webhook_handler method.

Accepts webhook POST to the following URL scheme:
/v1/data-sources/<data-source-name>/webhook

Change-Id: I5d13ddcf6d355b6f2f498cfeca6ec7db2ff049b1
This commit is contained in:
Eric K 2018-05-07 15:55:58 -07:00
parent 245c499fd1
commit 3c13cded2b
6 changed files with 114 additions and 0 deletions

View File

@ -143,6 +143,14 @@ class APIRouterV1(object):
row_element_handler = webservice.ElementHandler(row_path, table_rows)
resource_mgr.register_handler(row_element_handler)
# Setup /v1/data-sources/<ds_id>/webhook
webhook = process_dict['api-webhook']
webhook_path = "%s/webhook" % ds_path
webhook_collection_handler = webservice.CollectionHandler(
webhook_path,
webhook)
resource_mgr.register_handler(webhook_collection_handler)
# Setup /v1/system/datasource-drivers
system = process_dict['api-system']
# NOTE(arosen): start url out with datasource-drivers since we don't

View File

@ -0,0 +1,41 @@
# Copyright (c) 2018 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from congress.api import api_utils
from congress.api import base
from congress.api import webservice
from congress import exception
class WebhookModel(base.APIModel):
"""Model for handling webhook notifications."""
def add_item(self, item, id_=None, context=None):
"""POST webhook notification.
:param item: The webhook payload
:param id_: not used in this case; should be None
:param context: Key-values providing frame of reference of request
"""
caller, source_id = api_utils.get_id_from_context(context)
try:
args = {'payload': item}
# Note(thread-safety): blocking call
self.invoke_rpc(caller, 'process_webhook_notification', args)
except exception.CongressException as e:
raise webservice.DataModelException.create(e)

View File

@ -1193,6 +1193,25 @@ class PushedDataSourceDriver(DataSourceDriver):
else:
raise Exception('Push-type datasource driver does not have ID')
# Note (thread-safety): blocking function
def process_webhook_notification(self, payload):
self.prior_state = dict(self.state)
# call specific webhook handler of driver
updated_tables = self._webhook_handler(payload)
self.number_of_updates += 1
self.last_updated_time = datetime.datetime.now()
# persist in DB
if self.persist_data:
for tablename in updated_tables:
if self.ds_id is not None:
# Note (thread-safety): blocking call
db_ds_table_data.store_ds_table_data(
self.ds_id, tablename, self.state[tablename])
else:
raise Exception(
'Push-type datasource driver does not have ID')
class PushedDataSourceDriverEndpoints(data_service.DataServiceEndPoints):
def __init__(self, service):
@ -1203,6 +1222,11 @@ class PushedDataSourceDriverEndpoints(data_service.DataServiceEndPoints):
# Note (thread-safety): blocking call
return self.service.replace_entire_table_data(table_id, objs)
# Note (thread-safety): blocking function
def process_webhook_notification(self, context, payload):
# Note (thread-safety): blocking call
return self.service.process_webhook_notification(payload)
class PollingDataSourceDriver(DataSourceDriver):
def __init__(self, name='', args=None):

View File

@ -35,6 +35,7 @@ from congress.api import schema_model
from congress.api import status_model
from congress.api.system import driver_model
from congress.api import table_model
from congress.api import webhook_model
from congress.db import datasources as db_datasources
from congress.dse2 import datasource_manager as ds_manager
from congress.dse2 import dse_node
@ -129,6 +130,7 @@ def create_api_models(bus):
'api-datasource', bus=bus)
res['api-schema'] = schema_model.SchemaModel('api-schema', bus=bus)
res['api-table'] = table_model.TableModel('api-table', bus=bus)
res['api-webhook'] = webhook_model.WebhookModel('api-webhook', bus=bus)
res['api-status'] = status_model.StatusModel('api-status', bus=bus)
res['api-action'] = action_model.ActionsModel('api-action', bus=bus)
res['api-system'] = driver_model.DatasourceDriverModel(

View File

@ -0,0 +1,36 @@
# Copyright (c) 2018 VMware Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from congress.tests.api import base as api_base
from congress.tests import base
class TestWebhookModel(base.SqlTestCase):
def setUp(self):
super(TestWebhookModel, self).setUp()
services = api_base.setup_config()
self.webhook_model = services['api']['api-webhook']
self.node = services['node']
self.data = services['data']
def test_add_item(self):
context = {'ds_id': self.data.service_id}
payload = {'test_payload': 'test_payload'}
self.webhook_model.add_item(item=payload, context=context)
self.assertEqual(self.data.webhook_payload, payload)

View File

@ -71,3 +71,6 @@ class FakeDataSource(datasource_driver.PollingDataSourceDriver,
def execute(self, action, action_args):
self.exec_history.append((action, action_args))
def _webhook_handler(self, payload):
self.webhook_payload = payload