datasource driver framework to accept webhook
Add an api/webhook_model and datasource_driver framework so that a datasource driver can accept webhook notifications by defining the _webhook_handler method. Accepts webhook POST to the following URL scheme: /v1/data-sources/<data-source-name>/webhook Change-Id: I5d13ddcf6d355b6f2f498cfeca6ec7db2ff049b1
This commit is contained in:
parent
245c499fd1
commit
3c13cded2b
|
@ -143,6 +143,14 @@ class APIRouterV1(object):
|
|||
row_element_handler = webservice.ElementHandler(row_path, table_rows)
|
||||
resource_mgr.register_handler(row_element_handler)
|
||||
|
||||
# Setup /v1/data-sources/<ds_id>/webhook
|
||||
webhook = process_dict['api-webhook']
|
||||
webhook_path = "%s/webhook" % ds_path
|
||||
webhook_collection_handler = webservice.CollectionHandler(
|
||||
webhook_path,
|
||||
webhook)
|
||||
resource_mgr.register_handler(webhook_collection_handler)
|
||||
|
||||
# Setup /v1/system/datasource-drivers
|
||||
system = process_dict['api-system']
|
||||
# NOTE(arosen): start url out with datasource-drivers since we don't
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
# Copyright (c) 2018 VMware, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
from __future__ import division
|
||||
from __future__ import absolute_import
|
||||
|
||||
from congress.api import api_utils
|
||||
from congress.api import base
|
||||
from congress.api import webservice
|
||||
from congress import exception
|
||||
|
||||
|
||||
class WebhookModel(base.APIModel):
|
||||
"""Model for handling webhook notifications."""
|
||||
|
||||
def add_item(self, item, id_=None, context=None):
|
||||
"""POST webhook notification.
|
||||
|
||||
:param item: The webhook payload
|
||||
:param id_: not used in this case; should be None
|
||||
:param context: Key-values providing frame of reference of request
|
||||
"""
|
||||
caller, source_id = api_utils.get_id_from_context(context)
|
||||
try:
|
||||
args = {'payload': item}
|
||||
# Note(thread-safety): blocking call
|
||||
self.invoke_rpc(caller, 'process_webhook_notification', args)
|
||||
except exception.CongressException as e:
|
||||
raise webservice.DataModelException.create(e)
|
|
@ -1193,6 +1193,25 @@ class PushedDataSourceDriver(DataSourceDriver):
|
|||
else:
|
||||
raise Exception('Push-type datasource driver does not have ID')
|
||||
|
||||
# Note (thread-safety): blocking function
|
||||
def process_webhook_notification(self, payload):
|
||||
self.prior_state = dict(self.state)
|
||||
# call specific webhook handler of driver
|
||||
updated_tables = self._webhook_handler(payload)
|
||||
self.number_of_updates += 1
|
||||
self.last_updated_time = datetime.datetime.now()
|
||||
|
||||
# persist in DB
|
||||
if self.persist_data:
|
||||
for tablename in updated_tables:
|
||||
if self.ds_id is not None:
|
||||
# Note (thread-safety): blocking call
|
||||
db_ds_table_data.store_ds_table_data(
|
||||
self.ds_id, tablename, self.state[tablename])
|
||||
else:
|
||||
raise Exception(
|
||||
'Push-type datasource driver does not have ID')
|
||||
|
||||
|
||||
class PushedDataSourceDriverEndpoints(data_service.DataServiceEndPoints):
|
||||
def __init__(self, service):
|
||||
|
@ -1203,6 +1222,11 @@ class PushedDataSourceDriverEndpoints(data_service.DataServiceEndPoints):
|
|||
# Note (thread-safety): blocking call
|
||||
return self.service.replace_entire_table_data(table_id, objs)
|
||||
|
||||
# Note (thread-safety): blocking function
|
||||
def process_webhook_notification(self, context, payload):
|
||||
# Note (thread-safety): blocking call
|
||||
return self.service.process_webhook_notification(payload)
|
||||
|
||||
|
||||
class PollingDataSourceDriver(DataSourceDriver):
|
||||
def __init__(self, name='', args=None):
|
||||
|
|
|
@ -35,6 +35,7 @@ from congress.api import schema_model
|
|||
from congress.api import status_model
|
||||
from congress.api.system import driver_model
|
||||
from congress.api import table_model
|
||||
from congress.api import webhook_model
|
||||
from congress.db import datasources as db_datasources
|
||||
from congress.dse2 import datasource_manager as ds_manager
|
||||
from congress.dse2 import dse_node
|
||||
|
@ -129,6 +130,7 @@ def create_api_models(bus):
|
|||
'api-datasource', bus=bus)
|
||||
res['api-schema'] = schema_model.SchemaModel('api-schema', bus=bus)
|
||||
res['api-table'] = table_model.TableModel('api-table', bus=bus)
|
||||
res['api-webhook'] = webhook_model.WebhookModel('api-webhook', bus=bus)
|
||||
res['api-status'] = status_model.StatusModel('api-status', bus=bus)
|
||||
res['api-action'] = action_model.ActionsModel('api-action', bus=bus)
|
||||
res['api-system'] = driver_model.DatasourceDriverModel(
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
# Copyright (c) 2018 VMware Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
from __future__ import division
|
||||
from __future__ import absolute_import
|
||||
|
||||
from congress.tests.api import base as api_base
|
||||
from congress.tests import base
|
||||
|
||||
|
||||
class TestWebhookModel(base.SqlTestCase):
|
||||
def setUp(self):
|
||||
super(TestWebhookModel, self).setUp()
|
||||
services = api_base.setup_config()
|
||||
self.webhook_model = services['api']['api-webhook']
|
||||
self.node = services['node']
|
||||
self.data = services['data']
|
||||
|
||||
def test_add_item(self):
|
||||
context = {'ds_id': self.data.service_id}
|
||||
payload = {'test_payload': 'test_payload'}
|
||||
self.webhook_model.add_item(item=payload, context=context)
|
||||
self.assertEqual(self.data.webhook_payload, payload)
|
|
@ -71,3 +71,6 @@ class FakeDataSource(datasource_driver.PollingDataSourceDriver,
|
|||
|
||||
def execute(self, action, action_args):
|
||||
self.exec_history.append((action, action_args))
|
||||
|
||||
def _webhook_handler(self, payload):
|
||||
self.webhook_payload = payload
|
||||
|
|
Loading…
Reference in New Issue