Implement db consistency logic in df local controller

Change-Id: Iafe306116d6ad738a75cae2a640135411169ac2a
Closes-bug: #1596455
Partially-implements: blueprint keep-db-consistency
This commit is contained in:
hujie 2016-11-25 19:12:37 +08:00
parent 28e101ae16
commit 611e23c283
10 changed files with 638 additions and 1 deletions

View File

@ -53,6 +53,9 @@ DF_OPTS = [
cfg.BoolOpt('enable_df_pub_sub',
default=False,
help=_("Enable use of Dragonflow built-in pub/sub")),
cfg.BoolOpt('enable_df_db_consistency',
default=True,
help=_("Enable use of Dragonflow db consistency")),
cfg.StrOpt('pub_sub_driver',
default='zmq_pubsub_driver',
help=_('Drivers to use for the Dragonflow pub/sub')),
@ -95,6 +98,11 @@ DF_OPTS = [
default=300,
help=_('Publisher idle timeout before it is removed from the table')
),
cfg.IntOpt(
'db_sync_time',
default=120,
help=_('Min periodically db comparison time')
),
cfg.IntOpt(
'publisher_rate_limit_timeout',
default=180,

View File

@ -34,6 +34,7 @@ from dragonflow.controller import df_db_objects_refresh
from dragonflow.controller import ryu_base_app
from dragonflow.controller import topology
from dragonflow.db import api_nb
from dragonflow.db import db_consistent
from dragonflow.db import db_store
from dragonflow.db import models
from dragonflow.ovsdb import vswitch_impl
@ -81,6 +82,8 @@ class DfLocalController(object):
self.open_flow_app = app_mgr.instantiate(ryu_base_app.RyuDFAdapter,
**kwargs)
self.topology = None
self.db_consistency_manager = None
self.enable_db_consistency = cfg.CONF.df.enable_df_db_consistency
self.enable_selective_topo_dist = \
cfg.CONF.df.enable_selective_topology_distribution
self.integration_bridge = cfg.CONF.df.integration_bridge
@ -97,6 +100,11 @@ class DfLocalController(object):
is_neutron_server=False)
self.topology = topology.Topology(self,
self.enable_selective_topo_dist)
if self.enable_db_consistency:
self.db_consistency_manager = \
db_consistent.DBConsistencyManager(self)
self.nb_api.set_db_consistency_manager(self.db_consistency_manager)
self.db_consistency_manager.daemonize()
# both set_controller and del_controller will delete flows.
# for reliability, here we should check if controller is set for OVS,

View File

@ -12,6 +12,7 @@
from oslo_config import cfg
from oslo_log import log
import six
from dragonflow._i18n import _LI, _LE, _LW
from dragonflow.common import constants
@ -307,3 +308,38 @@ class Topology(object):
lport = self.nb_api.get_logical_port(port_id, topic)
return lport
def check_topology_info(self):
"""
In order to prevent the situation that the connection between
df controller and df db break down, we should recheck the local
ovs ports to make sure all the topics of these ovs ports could
be subscribed and all the vms could work well.
"""
new_ovs_to_lport_mapping = {}
add_ovs_to_lport_mapping = {}
delete_ovs_to_lport_mapping = self.ovs_to_lport_mapping
for key, ovs_port in six.iteritems(self.ovs_ports):
if ovs_port.get_type() == db_models.OvsPort.TYPE_VM:
lport_id = ovs_port.get_iface_id()
lport = self._get_lport(lport_id)
if lport is None:
LOG.warning(_LW("No logical port found for ovs port: %s"),
ovs_port)
continue
topic = lport.get_topic()
new_ovs_to_lport_mapping[key] = {
'lport_id': lport_id, 'topic': topic}
if not delete_ovs_to_lport_mapping.pop(key, None):
add_ovs_to_lport_mapping[key] = {
'lport_id': lport_id, 'topic': topic}
self.ovs_to_lport_mapping = new_ovs_to_lport_mapping
for value in add_ovs_to_lport_mapping.values():
lport_id = value['lport_id']
topic = value['topic']
self._add_to_topic_subscribed(topic, lport_id)
for value in delete_ovs_to_lport_mapping.values():
lport_id = value['lport_id']
topic = value['topic']
self._del_from_topic_subscribed(topic, lport_id)

View File

@ -33,7 +33,8 @@ LOG = log.getLogger(__name__)
DB_ACTION_LIST = ['create', 'set', 'delete', 'log',
'sync', 'sync_started', 'sync_finished', 'dbrestart']
'sync', 'sync_started', 'sync_finished', 'dbrestart',
'db_sync']
_nb_api = None
@ -48,6 +49,7 @@ class NbApi(object):
self.use_pubsub = use_pubsub
self.publisher = None
self.subscriber = None
self.db_consistency_manager = None
self.is_neutron_server = is_neutron_server
self.enable_selective_topo_dist = \
cfg.CONF.df.enable_selective_topology_distribution
@ -92,11 +94,16 @@ class NbApi(object):
self.db_change_callback)
self.subscriber.register_hamsg_for_db()
def set_db_consistency_manager(self, db_consistency_manager):
self.db_consistency_manager = db_consistency_manager
def db_recover_callback(self):
# only db with HA func can go in here
self.driver.process_ha()
self.publisher.process_ha()
self.subscriber.process_ha()
if self.db_consistency_manager and not self.is_neutron_server:
self.db_consistency_manager.process(True)
def _get_publisher(self):
if cfg.CONF.df.pub_sub_use_multiproc:
@ -206,6 +213,9 @@ class NbApi(object):
elif action == 'dbrestart':
self.db_recover_callback()
return
elif action == 'db_sync':
self.db_consistency_manager.process(False)
return
if 'secgroup' == table:
if action == 'set' or action == 'create':

View File

@ -0,0 +1,287 @@
# Copyright (c) 2016 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_config import cfg
from oslo_log import log
from dragonflow._i18n import _LE, _LW
from dragonflow.common import utils as df_utils
LOG = log.getLogger(__name__)
MIN_SYNC_INTERVAL_TIME = 60
class CacheManager(object):
def __init__(self):
self._table_name_mapping = {
'lswitch': {},
'port': {},
'router': {},
'floatingip': {},
'secgroup': {},
'publisher': {}
}
def get(self, table, key):
return self._table_name_mapping[table].get(key)
def set(self, table, key, value):
self._table_name_mapping[table][key] = value
def remove(self, table, key):
del self._table_name_mapping[table][key]
def get_tables(self):
return self._table_name_mapping.keys()
class DBConsistencyManager(object):
def __init__(self, controller):
self.topology = controller.topology
self.nb_api = controller.nb_api
self.db_store = controller.db_store
self.controller = controller
self.db_sync_time = cfg.CONF.df.db_sync_time
if self.db_sync_time < MIN_SYNC_INTERVAL_TIME:
self.db_sync_time = MIN_SYNC_INTERVAL_TIME
self._daemon = df_utils.DFDaemon()
self.cache_manager = CacheManager()
def process(self, direct):
self.topology.check_topology_info()
self._process_db_tables_comparison(direct)
def run(self):
while True:
time.sleep(self.db_sync_time)
self.nb_api.db_change_callback(None, None, "db_sync", "db_sync")
LOG.debug("Enter db consistent processing")
def daemonize(self):
return self._daemon.daemonize(self.run)
def stop(self):
return self._daemon.stop()
def _process_db_tables_comparison(self, direct):
"""Do the comparison and sync according to the difference between
df db and local cache
:param direct: Indicate the process mode, if True, it will sync
the data immediately once it found the difference,
if False, it will do the sync job after twice data
comparisons.
"""
self.controller.register_chassis()
self.controller.create_tunnels()
topics = self.topology.topic_subscribed.keys()
for table in self.cache_manager.get_tables():
try:
self.handle_data_comparison(topics, table, direct)
except Exception as e:
LOG.exception(_LE("Exception occurred when"
"handling db comparison: %s"), e)
def _process_object(self, table, action, df_object, local_object=None):
if table == 'lswitch':
if action == 'delete':
self.controller.logical_switch_deleted(local_object.get_id())
else:
self.controller.logical_switch_updated(df_object)
elif table == 'port':
if action == 'create':
self.controller.logical_port_created(df_object)
elif action == 'update':
self.controller.logical_port_updated(df_object)
else:
self.controller.logical_port_deleted(local_object.get_id())
elif table == 'router':
if action == 'delete':
self.controller.router_deleted(local_object.get_id())
else:
self.controller.router_updated(df_object)
elif table == 'secgroup':
if action == 'delete':
self.controller.security_group_deleted(local_object.get_id())
else:
self.controller.security_group_updated(df_object)
elif table == 'floatingip':
if action == 'delete':
self.controller.floatingip_deleted(local_object.get_id())
else:
self.controller.floatingip_updated(df_object)
elif table == 'publisher':
if action == 'delete':
self.controller.publisher_deleted(local_object.get_id())
else:
self.controller.publisher_updated(df_object)
def _verify_object(self, table, id, action, df_object, local_object=None):
"""Verify the object status and judge whether to create/update/delete
the object or not, we'll use twice comparison to verify the status,
first comparison result will be stored in the cache and if second
comparison result is still consistent with the cache, we can make
sure the object status
:param table: Resource object type
:param id: Resource object id
:param action: Operate action(create/update/delete)
:param df_object: Object from df db
:param local_object: Object from local cache
"""
df_version = df_object.get_version() if df_object else None
local_version = local_object.get_version() if local_object else None
old_cache_obj = self.cache_manager.get(table, id)
if not old_cache_obj or old_cache_obj.get_action() != action:
cache_obj = CacheObject(action, df_version, local_version)
self.cache_manager.set(table, id, cache_obj)
return
old_df_version = old_cache_obj.get_df_version()
old_local_version = old_cache_obj.get_local_version()
if action == 'create':
if df_version >= old_df_version:
self._process_object(table, 'create', df_object)
self.cache_manager.remove(table, id)
return
elif action == 'update':
if df_version < old_df_version:
return
if local_version <= old_local_version:
self._process_object(table, 'update', df_object)
self.cache_manager.remove(table, id)
else:
cache_obj = CacheObject(action, df_version, local_version)
self.cache_manager.set(table, id, cache_obj)
elif action == 'delete':
self._process_object(table, 'delete', None, local_object)
self.cache_manager.remove(table, id)
else:
LOG.warning(_LW('Unknown action %s in db consistent'), action)
def _get_df_and_local_objects(self, topic, table):
df_objects = []
local_objects = []
if table == 'lswitch':
df_objects = self.nb_api.get_all_logical_switches(topic)
local_objects = self.db_store.get_lswitchs(topic)
elif table == 'port':
df_objects = self.nb_api.get_all_logical_ports(topic)
local_objects = self.db_store.get_ports(topic)
elif table == 'router':
df_objects = self.nb_api.get_routers(topic)
local_objects = self.db_store.get_routers(topic)
elif table == 'secgroup':
df_objects = self.nb_api.get_security_groups(topic)
local_objects = self.db_store.get_security_groups(topic)
elif table == 'floatingip':
df_objects = self.nb_api.get_floatingips(topic)
local_objects = self.db_store.get_floatingips(topic)
elif table == 'publisher':
df_objects = self.nb_api.get_publishers()
local_objects = self.db_store.get_publishers()
return df_objects, local_objects
def _compare_df_and_local_data(
self, table, df_objects, local_objects, direct):
"""Compare specific resource type df objects and local objects
one by one, we could judge whether to create/update/delete
the corresponding object.
:param table: Resource object type
:param df_object: Object from df db
:param local_object: Object from local cache
:param direct: the process model, if True, we'll do the operation
directly after this comparison, if False, we'll go into the verify
process which need twice comparision to do the operation.
"""
local_object_map = {}
for local_object in local_objects:
local_object_map[local_object.get_id()] = local_object
for df_object in df_objects[:]:
df_id = df_object.get_id()
df_version = df_object.get_version()
if not df_version:
LOG.error(_LE("Version is None in df_object: %s"), df_object)
continue
local_object = local_object_map.pop(df_id, None)
if local_object:
local_version = local_object.get_version()
if not local_version:
LOG.debug("Version is None in local_object: %s",
local_object)
self._process_object(
table, 'update', df_object)
elif df_version > local_version:
LOG.debug("Find a newer version df object: %s",
df_object)
if direct:
self._process_object(
table, 'update', df_object)
else:
self._verify_object(
table, df_id, 'update',
df_object, local_object)
else:
LOG.debug("Find an additional df object: %s", df_object)
if direct:
self._process_object(table, 'create', df_object)
else:
self._verify_object(table, df_id,
'create', df_object)
for local_object in local_object_map.values():
LOG.debug("Find a redundant local object: %s", local_object)
if direct:
self._process_object(table, 'delete', None, local_object)
else:
self._verify_object(
table, local_object.get_id(),
'delete', None, local_object)
def _get_and_compare_df_and_local_data(self, table, direct, topic=None):
df_objects, local_objects = self._get_df_and_local_objects(
topic, table)
self._compare_df_and_local_data(
table, df_objects, local_objects, direct)
def handle_data_comparison(self, tenants, table, direct):
if table == 'publisher':
self._get_and_compare_df_and_local_data(table, direct)
return
for topic in tenants:
self._get_and_compare_df_and_local_data(table, direct, topic)
class CacheObject(object):
def __init__(self, action, df_version, local_version):
self.action = action
self.df_version = df_version
self.local_version = local_version
def get_action(self):
return self.action
def get_df_version(self):
return self.df_version
def get_local_version(self):
return self.local_version

View File

@ -299,5 +299,8 @@ class DbStore(object):
def get_publisher(self, uuid, topic=None):
return self.get('publishers', uuid, topic)
def get_publishers(self, topic=None):
return self.values('publishers', topic)
def delete_publisher(self, uuid, topic=None):
self.delete('publishers', uuid, topic)

View File

@ -369,6 +369,9 @@ class Publisher(NbDbObject):
def get_last_activity_timestamp(self):
return self.inner_obj.get('last_activity_timestamp')
def get_version(self):
return 0
class OvsPort(object):

View File

@ -0,0 +1,141 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from dragonflow.db import db_consistent
from dragonflow.db import models
from dragonflow.tests.common import constants
from dragonflow.tests.common import utils
from dragonflow.tests.fullstack import test_base
from dragonflow.tests.fullstack import test_objects as objects
from oslo_serialization import jsonutils
class TestDbConsistent(test_base.DFTestBase):
def _check_l2_lookup_rule(self, flows, mac):
for flow in flows:
if flow['table'] == '17' and 'goto_table:64' in flow['actions']:
if 'dl_dst=' + mac in flow['match']:
return True
return False
def _check_lswitch_dhcp_rule(self, flows, dhcp_ip):
for flow in flows:
if flow['table'] == '9' and flow['actions'] == 'goto_table:11':
if ('nw_dst=' + dhcp_ip + ',tp_src=68,tp_dst=67'
in flow['match']):
return True
return False
def _check_no_lswitch_dhcp_rule(self, flows, dhcp_ip):
if self._check_lswitch_dhcp_rule(flows, dhcp_ip):
return False
return True
def test_db_consistent(self):
self.db_sync_time = self.conf.db_sync_time
if self.db_sync_time < db_consistent.MIN_SYNC_INTERVAL_TIME:
self.db_sync_time = db_consistent.MIN_SYNC_INTERVAL_TIME
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network_id = network.create(network={'name': 'private'})
topic = network.get_topic()
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
network_id))
subnet_body = {'network_id': network_id,
'cidr': '10.50.0.0/24',
'gateway_ip': '10.50.0.1',
'ip_version': 4,
'name': 'private',
'enable_dhcp': True}
subnet.create(subnet=subnet_body)
time.sleep(constants.DEFAULT_RESOURCE_READY_TIMEOUT)
self.assertTrue(network.exists())
self.assertTrue(subnet.exists())
vm = self.store(objects.VMTestObj(self, self.neutron))
vm.create(network=network)
self.assertIsNotNone(vm.server.addresses['private'])
mac = vm.server.addresses['private'][0]['OS-EXT-IPS-MAC:mac_addr']
self.assertIsNotNone(mac)
ovs = utils.OvsFlowsParser()
utils.wait_until_true(
lambda: self._check_l2_lookup_rule(
ovs.dump(self.integration_bridge), mac),
timeout=10, sleep=1,
exception=Exception('no rule for vm in l2 lookup table')
)
df_network = {}
net_id = '11111111-1111-1111-1111-111111111111'
df_network['id'] = net_id
df_network['topic'] = topic
df_network['name'] = 'df_nw1'
df_network['network_type'] = 'vxlan'
df_network['segmentation_id'] = 4000
df_network['router_external'] = False
df_network['mtu'] = 1500
df_network[models.UNIQUE_KEY] = 1
df_network['version'] = 1
df_subnet = {}
df_subnet['id'] = '22222222-2222-2222-2222-222222222222'
df_subnet['lswitch'] = net_id
df_subnet['name'] = 'df_sn1'
df_subnet['enable_dhcp'] = True
df_subnet['cidr'] = '10.60.0.0/24'
df_subnet['dhcp_ip'] = '10.60.0.2'
df_subnet['gateway_ip'] = '10.60.0.1'
df_subnet['dns_nameservers'] = []
df_subnet['host_routes'] = []
df_network['subnets'] = [df_subnet]
df_network_json = jsonutils.dumps(df_network)
self.nb_api.driver.create_key(
'lswitch', net_id, df_network_json, topic)
time.sleep(self.db_sync_time)
utils.wait_until_true(
lambda: self._check_lswitch_dhcp_rule(
ovs.dump(self.integration_bridge), '10.60.0.2'),
timeout=self.db_sync_time + constants.DEFAULT_CMD_TIMEOUT, sleep=1,
exception=Exception('no goto dhcp rule for lswitch')
)
df_network['version'] = 2
df_network['subnets'][0]['dhcp_ip'] = '10.60.0.3'
df_network_json = jsonutils.dumps(df_network)
self.nb_api.driver.set_key('lswitch', net_id, df_network_json, topic)
time.sleep(self.db_sync_time)
utils.wait_until_true(
lambda: self._check_lswitch_dhcp_rule(
ovs.dump(self.integration_bridge), '10.60.0.3'),
timeout=self.db_sync_time + constants.DEFAULT_CMD_TIMEOUT, sleep=1,
exception=Exception('no goto dhcp rule for lswitch')
)
self.nb_api.driver.delete_key('lswitch', net_id, topic)
time.sleep(self.db_sync_time)
utils.wait_until_true(
lambda: self._check_no_lswitch_dhcp_rule(
ovs.dump(self.integration_bridge), '10.60.0.3'),
timeout=self.db_sync_time + constants.DEFAULT_CMD_TIMEOUT, sleep=1,
exception=Exception('could not delete goto dhcp rule for lswitch')
)
vm.close()
subnet.close()
network.close()

View File

@ -0,0 +1,112 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from dragonflow.db import db_consistent
from dragonflow.tests import base as tests_base
class TestDBConsistent(tests_base.BaseTestCase):
def setUp(self):
super(TestDBConsistent, self).setUp()
self.controller = mock.Mock()
self.topology = self.controller.topology
self.nb_api = self.controller.nb_api
self.db_store = self.controller.db_store
self.topic = '111-222-333'
self.lport_id1 = '1'
self.lport_id2 = '2'
self.lport_id3 = '3'
self.db_consistent = db_consistent.DBConsistencyManager(
self.controller)
def test_db_comparison(self):
df_obj1 = FakeDfLocalObj(self.lport_id1, 1)
df_obj2 = FakeDfLocalObj(self.lport_id2, 2)
local_obj1 = FakeDfLocalObj(self.lport_id2, 1)
local_obj2 = FakeDfLocalObj(self.lport_id3, 1)
self.nb_api.get_all_logical_switches.return_value = [df_obj1, df_obj2]
self.db_store.get_lswitchs.return_value = [local_obj1, local_obj2]
self.nb_api.get_all_logical_ports.return_value = [df_obj1, df_obj2]
self.db_store.get_ports.return_value = [local_obj1, local_obj2]
self.nb_api.get_routers.return_value = [df_obj1, df_obj2]
self.db_store.get_routers.return_value = [local_obj1, local_obj2]
self.nb_api.get_security_groups.return_value = [df_obj1, df_obj2]
self.db_store.get_security_groups.return_value = [
local_obj1, local_obj2]
self.nb_api.get_floatingips.return_value = [df_obj1, df_obj2]
self.db_store.get_floatingips.return_value = [local_obj1, local_obj2]
self.nb_api.get_publishers.return_value = [df_obj1, df_obj2]
self.db_store.get_publishers.return_value = [local_obj1, local_obj2]
self.db_consistent.handle_data_comparison(
[self.topic], 'publisher', True)
self.controller.publisher_updated.assert_any_call(df_obj1)
self.controller.publisher_updated.assert_any_call(df_obj2)
self.controller.publisher_deleted.assert_any_call(self.lport_id3)
self.db_consistent.handle_data_comparison(
[self.topic], 'lswitch', True)
self.controller.logical_switch_updated.assert_any_call(df_obj1)
self.controller.logical_switch_updated.assert_any_call(df_obj2)
self.controller.logical_switch_deleted.assert_any_call(
self.lport_id3)
self.db_consistent.handle_data_comparison(
[self.topic], 'port', True)
self.controller.logical_port_created.assert_any_call(df_obj1)
self.controller.logical_port_updated.assert_any_call(df_obj2)
self.controller.logical_port_deleted.assert_any_call(
self.lport_id3)
self.db_consistent.handle_data_comparison(
[self.topic], 'router', True)
self.controller.router_updated.assert_any_call(df_obj1)
self.controller.router_updated.assert_any_call(df_obj2)
self.controller.router_deleted.assert_any_call(self.lport_id3)
self.db_consistent.handle_data_comparison(
[self.topic], 'secgroup', True)
self.controller.security_group_updated.assert_any_call(df_obj1)
self.controller.security_group_updated.assert_any_call(df_obj2)
self.controller.security_group_deleted.assert_any_call(
self.lport_id3)
self.db_consistent.handle_data_comparison(
[self.topic], 'floatingip', True)
self.controller.floatingip_updated.assert_any_call(df_obj1)
self.controller.floatingip_updated.assert_any_call(df_obj2)
self.controller.floatingip_deleted.assert_any_call(
self.lport_id3)
class FakeDfLocalObj(object):
"""To generate df_obj or local_obj for testing purposes only."""
def __init__(self, id, version):
self.id = id
self.version = version
def get_id(self):
return self.id
def get_version(self):
return self.version

View File

@ -117,3 +117,32 @@ class TestTopology(tests_base.BaseTestCase):
self.lport1.get_id())
self.mock_nb_api.subscriber.unregister_topic.assert_called_with(
self.lport1.get_topic())
def test_check_topology_info(self):
topic = '111-222-333'
lport_id2 = '2'
ovs_port_id2 = 'ovs_port2'
lport_id3 = '3'
ovs_port_id3 = 'ovs_port3'
self.topology.ovs_to_lport_mapping = {
ovs_port_id2: {
'lport_id': lport_id2,
'topic': topic
},
ovs_port_id3: {
'lport_id': lport_id3,
'topic': topic
}
}
self.topology.ovs_ports = {
'ovs_port1': self.ovs_port1
}
self.topology.topic_subscribed = {
topic: {lport_id2, lport_id3}
}
lport1 = Mock()
lport1.get_topic.return_value = topic
self.db_store.set_port('lport1', lport1, True, topic)
self.topology.check_topology_info()
self.assertEqual(len(self.topology.topic_subscribed[topic]), 1)