Route datasource add/delete request to datasource node
API request adding/deleting datasource should be routed to datasource node, and then the datasource node creates/removes the requested datasource during the request. It enables user to use the datasource just after recieving the response of request. Change-Id: If365c6d1db9ecc5c578421ad5c362326fc262a78 Closes-bug: #1638729 Closes-bug: #1639116
This commit is contained in:
parent
e5be6524d5
commit
22ce339f23
|
@ -25,6 +25,7 @@ from congress.api import api_utils
|
|||
from congress.api import base
|
||||
from congress.api import error_codes
|
||||
from congress.api import webservice
|
||||
from congress.dse2 import dse_node
|
||||
from congress import exception
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -75,7 +76,10 @@ class DatasourceModel(base.APIModel):
|
|||
obj = None
|
||||
try:
|
||||
# Note(thread-safety): blocking call
|
||||
obj = self.bus.add_datasource(item=item)
|
||||
obj = self.invoke_rpc(dse_node.DS_MANAGER_SERVICE_ID,
|
||||
'add_datasource',
|
||||
{'items': item},
|
||||
timeout=self.dse_long_timeout)
|
||||
# Let PE synchronizer take care of creating the policy.
|
||||
except (exception.BadConfig,
|
||||
exception.DatasourceNameInUse,
|
||||
|
@ -100,18 +104,10 @@ class DatasourceModel(base.APIModel):
|
|||
# delete a different datasource
|
||||
# Fix: check UUID of datasource before operating.
|
||||
# Abort if mismatch
|
||||
|
||||
# Note(thread-safety): blocking call
|
||||
# FIXME(thread-safety):
|
||||
# by the time greenthread resumes, the
|
||||
# returned datasource name could refer to a totally different
|
||||
# datasource, causing the rest of this code to unintentionally
|
||||
# delete a different datasource
|
||||
# Fix: check UUID of datasource before operating.
|
||||
# Abort if mismatch
|
||||
|
||||
# Note(thread-safety): blocking call
|
||||
self.bus.delete_datasource(datasource)
|
||||
self.invoke_rpc(dse_node.DS_MANAGER_SERVICE_ID,
|
||||
'delete_datasource',
|
||||
{'datasource': datasource},
|
||||
timeout=self.dse_long_timeout)
|
||||
# Let PE synchronizer takes care of deleting policy
|
||||
except (exception.DatasourceNotFound,
|
||||
exception.DanglingReference) as e:
|
||||
|
|
|
@ -205,16 +205,6 @@ class DataService(object):
|
|||
def get_datasource(self, datasource_id):
|
||||
return self.node.get_datasource(datasource_id)
|
||||
|
||||
# Will be removed once the reference of node exists in api
|
||||
# Note(thread-safety): blocking function
|
||||
def add_datasource(self, **kwargs):
|
||||
return self.node.add_datasource(**kwargs)
|
||||
|
||||
# Will be removed once the reference of node exists in api
|
||||
# Note(thread-safety): blocking function
|
||||
def delete_datasource(self, datasource):
|
||||
return self.node.delete_datasource(datasource)
|
||||
|
||||
# Will be removed once the reference of node exists in api
|
||||
# Note(thread-safety): blocking function
|
||||
def get_drivers_info(self, *args):
|
||||
|
|
|
@ -36,6 +36,7 @@ from congress.api import base as api_base
|
|||
from congress.datasources import constants
|
||||
from congress.db import datasources as datasources_db
|
||||
from congress.dse2 import control_bus
|
||||
from congress.dse2 import data_service
|
||||
from congress import exception
|
||||
|
||||
|
||||
|
@ -898,3 +899,25 @@ class DseNodeEndpoints (object):
|
|||
self.node.service_object(s).receive_data_sequenced(
|
||||
publisher=publisher, table=table, data=data, seqnum=seqnum,
|
||||
is_snapshot=is_snapshot)
|
||||
|
||||
|
||||
DS_MANAGER_SERVICE_ID = '_ds_manager'
|
||||
|
||||
|
||||
class DSManagerService(data_service.DataService):
|
||||
"""A proxy service to datasource managing methods in dse_node."""
|
||||
def __init__(self, service_id):
|
||||
super(DSManagerService, self).__init__(DS_MANAGER_SERVICE_ID)
|
||||
self.add_rpc_endpoint(DSManagerEndpoints(self))
|
||||
|
||||
|
||||
class DSManagerEndpoints(object):
|
||||
|
||||
def __init__(self, service):
|
||||
self.service = service
|
||||
|
||||
def add_datasource(self, context, items):
|
||||
return self.service.node.add_datasource(items)
|
||||
|
||||
def delete_datasource(self, context, datasource):
|
||||
return self.service.node.delete_datasource(datasource)
|
||||
|
|
|
@ -109,6 +109,8 @@ def create2(node_id=None, bus_id=None, existing_node=None,
|
|||
services[api_base.ENGINE_SERVICE_ID].start_policy_synchronizer()
|
||||
if datasources:
|
||||
node.start_periodic_tasks()
|
||||
node.register_service(
|
||||
dse_node.DSManagerService(dse_node.DS_MANAGER_SERVICE_ID))
|
||||
return services
|
||||
|
||||
|
||||
|
|
|
@ -72,10 +72,12 @@ class TestDatasourceModel(base.SqlTestCase):
|
|||
datasource3 = self._get_datasource_request()
|
||||
datasource3['name'] = 'datasource-test-3'
|
||||
self.datasource_model.add_item(datasource3, {})
|
||||
ds_obj = self.node.service_object('datasource-test-3')
|
||||
self.engine.synchronize_policies()
|
||||
obj = self.engine.policy_object('datasource-test-3')
|
||||
self.assertIsNotNone(obj.schema)
|
||||
self.assertEqual('datasource-test-3', obj.name)
|
||||
self.assertIsNotNone(ds_obj)
|
||||
|
||||
def test_add_item_duplicate(self):
|
||||
self.assertRaises(webservice.DataModelException,
|
||||
|
@ -90,6 +92,8 @@ class TestDatasourceModel(base.SqlTestCase):
|
|||
self.assertTrue(self.engine.assert_policy_exists('test-datasource'))
|
||||
context = {'ds_id': d_id}
|
||||
self.datasource_model.delete_item(None, {}, context=context)
|
||||
ds_obj = self.node.service_object('test-datasource')
|
||||
self.assertIsNone(ds_obj)
|
||||
self.engine.synchronize_policies()
|
||||
self.assertRaises(exception.PolicyRuntimeException,
|
||||
self.engine.assert_policy_exists, 'test-datasource')
|
||||
|
|
|
@ -14,11 +14,13 @@
|
|||
#
|
||||
|
||||
import eventlet
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_messaging import conffixture
|
||||
|
||||
from congress.dse2 import data_service
|
||||
from congress.dse2 import dse_node
|
||||
from congress.tests import base
|
||||
from congress.tests import helper
|
||||
|
||||
|
@ -293,6 +295,40 @@ class TestDseNode(base.TestCase):
|
|||
actual = set(node.get_global_service_names())
|
||||
self.assertEqual(actual, set())
|
||||
|
||||
|
||||
class TestDSManagerService(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDSManagerService, self).setUp()
|
||||
|
||||
def test_ds_manager_endpoints_add_ds(self):
|
||||
ds_manager_service = dse_node.DSManagerService('test_mgr')
|
||||
node_mock = mock.MagicMock()
|
||||
node_mock.add_datasource = mock.MagicMock()
|
||||
node_mock.add_datasource.return_value = 'add_datasource'
|
||||
ds_manager_service.node = node_mock
|
||||
endpoints = dse_node.DSManagerEndpoints(ds_manager_service)
|
||||
|
||||
expect_ret = 'add_datasource'
|
||||
self.assertEqual(expect_ret, endpoints.add_datasource('context', {}))
|
||||
|
||||
node_mock.add_datasource.assert_called_with({})
|
||||
|
||||
def test_ds_manager_endpoints_delete_ds(self):
|
||||
ds_manager_service = dse_node.DSManagerService('test_mgr')
|
||||
node_mock = mock.MagicMock()
|
||||
node_mock.delete_datasource = mock.MagicMock()
|
||||
node_mock.delete_datasource.return_value = 'delete_datasource'
|
||||
ds_manager_service.node = node_mock
|
||||
endpoints = dse_node.DSManagerEndpoints(ds_manager_service)
|
||||
|
||||
expect_ret = 'delete_datasource'
|
||||
self.assertEqual(expect_ret,
|
||||
endpoints.delete_datasource('context', 'ds-id'))
|
||||
|
||||
node_mock.delete_datasource.assert_called_with('ds-id')
|
||||
|
||||
|
||||
# Leave this to make manual testing with RabbitMQ easy
|
||||
# if __name__ == '__main__':
|
||||
# import unittest
|
||||
|
|
Loading…
Reference in New Issue