Route datasource add/delete request to datasource node

API request adding/deleting datasource should be routed to datasource
node, and then the datasource node creates/removes the requested
datasource during the request. It enables user to use the datasource
just after recieving the response of request.

Change-Id: If365c6d1db9ecc5c578421ad5c362326fc262a78
Closes-bug: #1638729
Closes-bug: #1639116
This commit is contained in:
Masahito Muroi 2016-11-14 19:01:10 +09:00 committed by Anusha Ramineni
parent e5be6524d5
commit 22ce339f23
6 changed files with 74 additions and 23 deletions

View File

@ -25,6 +25,7 @@ from congress.api import api_utils
from congress.api import base
from congress.api import error_codes
from congress.api import webservice
from congress.dse2 import dse_node
from congress import exception
LOG = logging.getLogger(__name__)
@ -75,7 +76,10 @@ class DatasourceModel(base.APIModel):
obj = None
try:
# Note(thread-safety): blocking call
obj = self.bus.add_datasource(item=item)
obj = self.invoke_rpc(dse_node.DS_MANAGER_SERVICE_ID,
'add_datasource',
{'items': item},
timeout=self.dse_long_timeout)
# Let PE synchronizer take care of creating the policy.
except (exception.BadConfig,
exception.DatasourceNameInUse,
@ -100,18 +104,10 @@ class DatasourceModel(base.APIModel):
# delete a different datasource
# Fix: check UUID of datasource before operating.
# Abort if mismatch
# Note(thread-safety): blocking call
# FIXME(thread-safety):
# by the time greenthread resumes, the
# returned datasource name could refer to a totally different
# datasource, causing the rest of this code to unintentionally
# delete a different datasource
# Fix: check UUID of datasource before operating.
# Abort if mismatch
# Note(thread-safety): blocking call
self.bus.delete_datasource(datasource)
self.invoke_rpc(dse_node.DS_MANAGER_SERVICE_ID,
'delete_datasource',
{'datasource': datasource},
timeout=self.dse_long_timeout)
# Let PE synchronizer takes care of deleting policy
except (exception.DatasourceNotFound,
exception.DanglingReference) as e:

View File

@ -205,16 +205,6 @@ class DataService(object):
def get_datasource(self, datasource_id):
return self.node.get_datasource(datasource_id)
# Will be removed once the reference of node exists in api
# Note(thread-safety): blocking function
def add_datasource(self, **kwargs):
return self.node.add_datasource(**kwargs)
# Will be removed once the reference of node exists in api
# Note(thread-safety): blocking function
def delete_datasource(self, datasource):
return self.node.delete_datasource(datasource)
# Will be removed once the reference of node exists in api
# Note(thread-safety): blocking function
def get_drivers_info(self, *args):

View File

@ -36,6 +36,7 @@ from congress.api import base as api_base
from congress.datasources import constants
from congress.db import datasources as datasources_db
from congress.dse2 import control_bus
from congress.dse2 import data_service
from congress import exception
@ -898,3 +899,25 @@ class DseNodeEndpoints (object):
self.node.service_object(s).receive_data_sequenced(
publisher=publisher, table=table, data=data, seqnum=seqnum,
is_snapshot=is_snapshot)
DS_MANAGER_SERVICE_ID = '_ds_manager'
class DSManagerService(data_service.DataService):
"""A proxy service to datasource managing methods in dse_node."""
def __init__(self, service_id):
super(DSManagerService, self).__init__(DS_MANAGER_SERVICE_ID)
self.add_rpc_endpoint(DSManagerEndpoints(self))
class DSManagerEndpoints(object):
def __init__(self, service):
self.service = service
def add_datasource(self, context, items):
return self.service.node.add_datasource(items)
def delete_datasource(self, context, datasource):
return self.service.node.delete_datasource(datasource)

View File

@ -109,6 +109,8 @@ def create2(node_id=None, bus_id=None, existing_node=None,
services[api_base.ENGINE_SERVICE_ID].start_policy_synchronizer()
if datasources:
node.start_periodic_tasks()
node.register_service(
dse_node.DSManagerService(dse_node.DS_MANAGER_SERVICE_ID))
return services

View File

@ -72,10 +72,12 @@ class TestDatasourceModel(base.SqlTestCase):
datasource3 = self._get_datasource_request()
datasource3['name'] = 'datasource-test-3'
self.datasource_model.add_item(datasource3, {})
ds_obj = self.node.service_object('datasource-test-3')
self.engine.synchronize_policies()
obj = self.engine.policy_object('datasource-test-3')
self.assertIsNotNone(obj.schema)
self.assertEqual('datasource-test-3', obj.name)
self.assertIsNotNone(ds_obj)
def test_add_item_duplicate(self):
self.assertRaises(webservice.DataModelException,
@ -90,6 +92,8 @@ class TestDatasourceModel(base.SqlTestCase):
self.assertTrue(self.engine.assert_policy_exists('test-datasource'))
context = {'ds_id': d_id}
self.datasource_model.delete_item(None, {}, context=context)
ds_obj = self.node.service_object('test-datasource')
self.assertIsNone(ds_obj)
self.engine.synchronize_policies()
self.assertRaises(exception.PolicyRuntimeException,
self.engine.assert_policy_exists, 'test-datasource')

View File

@ -14,11 +14,13 @@
#
import eventlet
import mock
from oslo_config import cfg
from oslo_messaging import conffixture
from congress.dse2 import data_service
from congress.dse2 import dse_node
from congress.tests import base
from congress.tests import helper
@ -293,6 +295,40 @@ class TestDseNode(base.TestCase):
actual = set(node.get_global_service_names())
self.assertEqual(actual, set())
class TestDSManagerService(base.TestCase):
def setUp(self):
super(TestDSManagerService, self).setUp()
def test_ds_manager_endpoints_add_ds(self):
ds_manager_service = dse_node.DSManagerService('test_mgr')
node_mock = mock.MagicMock()
node_mock.add_datasource = mock.MagicMock()
node_mock.add_datasource.return_value = 'add_datasource'
ds_manager_service.node = node_mock
endpoints = dse_node.DSManagerEndpoints(ds_manager_service)
expect_ret = 'add_datasource'
self.assertEqual(expect_ret, endpoints.add_datasource('context', {}))
node_mock.add_datasource.assert_called_with({})
def test_ds_manager_endpoints_delete_ds(self):
ds_manager_service = dse_node.DSManagerService('test_mgr')
node_mock = mock.MagicMock()
node_mock.delete_datasource = mock.MagicMock()
node_mock.delete_datasource.return_value = 'delete_datasource'
ds_manager_service.node = node_mock
endpoints = dse_node.DSManagerEndpoints(ds_manager_service)
expect_ret = 'delete_datasource'
self.assertEqual(expect_ret,
endpoints.delete_datasource('context', 'ds-id'))
node_mock.delete_datasource.assert_called_with('ds-id')
# Leave this to make manual testing with RabbitMQ easy
# if __name__ == '__main__':
# import unittest