Implement db consistency logic in df local controller
Change-Id: Iafe306116d6ad738a75cae2a640135411169ac2a Closes-bug: #1596455 Partially-implements: blueprint keep-db-consistency
This commit is contained in:
parent
28e101ae16
commit
611e23c283
|
@ -53,6 +53,9 @@ DF_OPTS = [
|
|||
cfg.BoolOpt('enable_df_pub_sub',
|
||||
default=False,
|
||||
help=_("Enable use of Dragonflow built-in pub/sub")),
|
||||
cfg.BoolOpt('enable_df_db_consistency',
|
||||
default=True,
|
||||
help=_("Enable use of Dragonflow db consistency")),
|
||||
cfg.StrOpt('pub_sub_driver',
|
||||
default='zmq_pubsub_driver',
|
||||
help=_('Drivers to use for the Dragonflow pub/sub')),
|
||||
|
@ -95,6 +98,11 @@ DF_OPTS = [
|
|||
default=300,
|
||||
help=_('Publisher idle timeout before it is removed from the table')
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'db_sync_time',
|
||||
default=120,
|
||||
help=_('Min periodically db comparison time')
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'publisher_rate_limit_timeout',
|
||||
default=180,
|
||||
|
|
|
@ -34,6 +34,7 @@ from dragonflow.controller import df_db_objects_refresh
|
|||
from dragonflow.controller import ryu_base_app
|
||||
from dragonflow.controller import topology
|
||||
from dragonflow.db import api_nb
|
||||
from dragonflow.db import db_consistent
|
||||
from dragonflow.db import db_store
|
||||
from dragonflow.db import models
|
||||
from dragonflow.ovsdb import vswitch_impl
|
||||
|
@ -81,6 +82,8 @@ class DfLocalController(object):
|
|||
self.open_flow_app = app_mgr.instantiate(ryu_base_app.RyuDFAdapter,
|
||||
**kwargs)
|
||||
self.topology = None
|
||||
self.db_consistency_manager = None
|
||||
self.enable_db_consistency = cfg.CONF.df.enable_df_db_consistency
|
||||
self.enable_selective_topo_dist = \
|
||||
cfg.CONF.df.enable_selective_topology_distribution
|
||||
self.integration_bridge = cfg.CONF.df.integration_bridge
|
||||
|
@ -97,6 +100,11 @@ class DfLocalController(object):
|
|||
is_neutron_server=False)
|
||||
self.topology = topology.Topology(self,
|
||||
self.enable_selective_topo_dist)
|
||||
if self.enable_db_consistency:
|
||||
self.db_consistency_manager = \
|
||||
db_consistent.DBConsistencyManager(self)
|
||||
self.nb_api.set_db_consistency_manager(self.db_consistency_manager)
|
||||
self.db_consistency_manager.daemonize()
|
||||
|
||||
# both set_controller and del_controller will delete flows.
|
||||
# for reliability, here we should check if controller is set for OVS,
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import six
|
||||
|
||||
from dragonflow._i18n import _LI, _LE, _LW
|
||||
from dragonflow.common import constants
|
||||
|
@ -307,3 +308,38 @@ class Topology(object):
|
|||
lport = self.nb_api.get_logical_port(port_id, topic)
|
||||
|
||||
return lport
|
||||
|
||||
def check_topology_info(self):
|
||||
"""
|
||||
In order to prevent the situation that the connection between
|
||||
df controller and df db break down, we should recheck the local
|
||||
ovs ports to make sure all the topics of these ovs ports could
|
||||
be subscribed and all the vms could work well.
|
||||
"""
|
||||
new_ovs_to_lport_mapping = {}
|
||||
add_ovs_to_lport_mapping = {}
|
||||
delete_ovs_to_lport_mapping = self.ovs_to_lport_mapping
|
||||
for key, ovs_port in six.iteritems(self.ovs_ports):
|
||||
if ovs_port.get_type() == db_models.OvsPort.TYPE_VM:
|
||||
lport_id = ovs_port.get_iface_id()
|
||||
lport = self._get_lport(lport_id)
|
||||
if lport is None:
|
||||
LOG.warning(_LW("No logical port found for ovs port: %s"),
|
||||
ovs_port)
|
||||
continue
|
||||
topic = lport.get_topic()
|
||||
new_ovs_to_lport_mapping[key] = {
|
||||
'lport_id': lport_id, 'topic': topic}
|
||||
if not delete_ovs_to_lport_mapping.pop(key, None):
|
||||
add_ovs_to_lport_mapping[key] = {
|
||||
'lport_id': lport_id, 'topic': topic}
|
||||
self.ovs_to_lport_mapping = new_ovs_to_lport_mapping
|
||||
for value in add_ovs_to_lport_mapping.values():
|
||||
lport_id = value['lport_id']
|
||||
topic = value['topic']
|
||||
self._add_to_topic_subscribed(topic, lport_id)
|
||||
|
||||
for value in delete_ovs_to_lport_mapping.values():
|
||||
lport_id = value['lport_id']
|
||||
topic = value['topic']
|
||||
self._del_from_topic_subscribed(topic, lport_id)
|
||||
|
|
|
@ -33,7 +33,8 @@ LOG = log.getLogger(__name__)
|
|||
|
||||
|
||||
DB_ACTION_LIST = ['create', 'set', 'delete', 'log',
|
||||
'sync', 'sync_started', 'sync_finished', 'dbrestart']
|
||||
'sync', 'sync_started', 'sync_finished', 'dbrestart',
|
||||
'db_sync']
|
||||
|
||||
_nb_api = None
|
||||
|
||||
|
@ -48,6 +49,7 @@ class NbApi(object):
|
|||
self.use_pubsub = use_pubsub
|
||||
self.publisher = None
|
||||
self.subscriber = None
|
||||
self.db_consistency_manager = None
|
||||
self.is_neutron_server = is_neutron_server
|
||||
self.enable_selective_topo_dist = \
|
||||
cfg.CONF.df.enable_selective_topology_distribution
|
||||
|
@ -92,11 +94,16 @@ class NbApi(object):
|
|||
self.db_change_callback)
|
||||
self.subscriber.register_hamsg_for_db()
|
||||
|
||||
def set_db_consistency_manager(self, db_consistency_manager):
|
||||
self.db_consistency_manager = db_consistency_manager
|
||||
|
||||
def db_recover_callback(self):
|
||||
# only db with HA func can go in here
|
||||
self.driver.process_ha()
|
||||
self.publisher.process_ha()
|
||||
self.subscriber.process_ha()
|
||||
if self.db_consistency_manager and not self.is_neutron_server:
|
||||
self.db_consistency_manager.process(True)
|
||||
|
||||
def _get_publisher(self):
|
||||
if cfg.CONF.df.pub_sub_use_multiproc:
|
||||
|
@ -206,6 +213,9 @@ class NbApi(object):
|
|||
elif action == 'dbrestart':
|
||||
self.db_recover_callback()
|
||||
return
|
||||
elif action == 'db_sync':
|
||||
self.db_consistency_manager.process(False)
|
||||
return
|
||||
|
||||
if 'secgroup' == table:
|
||||
if action == 'set' or action == 'create':
|
||||
|
|
|
@ -0,0 +1,287 @@
|
|||
# Copyright (c) 2016 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from dragonflow._i18n import _LE, _LW
|
||||
from dragonflow.common import utils as df_utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
MIN_SYNC_INTERVAL_TIME = 60
|
||||
|
||||
|
||||
class CacheManager(object):
|
||||
def __init__(self):
|
||||
self._table_name_mapping = {
|
||||
'lswitch': {},
|
||||
'port': {},
|
||||
'router': {},
|
||||
'floatingip': {},
|
||||
'secgroup': {},
|
||||
'publisher': {}
|
||||
}
|
||||
|
||||
def get(self, table, key):
|
||||
return self._table_name_mapping[table].get(key)
|
||||
|
||||
def set(self, table, key, value):
|
||||
self._table_name_mapping[table][key] = value
|
||||
|
||||
def remove(self, table, key):
|
||||
del self._table_name_mapping[table][key]
|
||||
|
||||
def get_tables(self):
|
||||
return self._table_name_mapping.keys()
|
||||
|
||||
|
||||
class DBConsistencyManager(object):
|
||||
|
||||
def __init__(self, controller):
|
||||
self.topology = controller.topology
|
||||
self.nb_api = controller.nb_api
|
||||
self.db_store = controller.db_store
|
||||
self.controller = controller
|
||||
self.db_sync_time = cfg.CONF.df.db_sync_time
|
||||
if self.db_sync_time < MIN_SYNC_INTERVAL_TIME:
|
||||
self.db_sync_time = MIN_SYNC_INTERVAL_TIME
|
||||
self._daemon = df_utils.DFDaemon()
|
||||
self.cache_manager = CacheManager()
|
||||
|
||||
def process(self, direct):
|
||||
self.topology.check_topology_info()
|
||||
self._process_db_tables_comparison(direct)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
time.sleep(self.db_sync_time)
|
||||
self.nb_api.db_change_callback(None, None, "db_sync", "db_sync")
|
||||
LOG.debug("Enter db consistent processing")
|
||||
|
||||
def daemonize(self):
|
||||
return self._daemon.daemonize(self.run)
|
||||
|
||||
def stop(self):
|
||||
return self._daemon.stop()
|
||||
|
||||
def _process_db_tables_comparison(self, direct):
|
||||
"""Do the comparison and sync according to the difference between
|
||||
df db and local cache
|
||||
|
||||
:param direct: Indicate the process mode, if True, it will sync
|
||||
the data immediately once it found the difference,
|
||||
if False, it will do the sync job after twice data
|
||||
comparisons.
|
||||
"""
|
||||
self.controller.register_chassis()
|
||||
self.controller.create_tunnels()
|
||||
topics = self.topology.topic_subscribed.keys()
|
||||
for table in self.cache_manager.get_tables():
|
||||
try:
|
||||
self.handle_data_comparison(topics, table, direct)
|
||||
except Exception as e:
|
||||
LOG.exception(_LE("Exception occurred when"
|
||||
"handling db comparison: %s"), e)
|
||||
|
||||
def _process_object(self, table, action, df_object, local_object=None):
|
||||
if table == 'lswitch':
|
||||
if action == 'delete':
|
||||
self.controller.logical_switch_deleted(local_object.get_id())
|
||||
else:
|
||||
self.controller.logical_switch_updated(df_object)
|
||||
elif table == 'port':
|
||||
if action == 'create':
|
||||
self.controller.logical_port_created(df_object)
|
||||
elif action == 'update':
|
||||
self.controller.logical_port_updated(df_object)
|
||||
else:
|
||||
self.controller.logical_port_deleted(local_object.get_id())
|
||||
elif table == 'router':
|
||||
if action == 'delete':
|
||||
self.controller.router_deleted(local_object.get_id())
|
||||
else:
|
||||
self.controller.router_updated(df_object)
|
||||
elif table == 'secgroup':
|
||||
if action == 'delete':
|
||||
self.controller.security_group_deleted(local_object.get_id())
|
||||
else:
|
||||
self.controller.security_group_updated(df_object)
|
||||
elif table == 'floatingip':
|
||||
if action == 'delete':
|
||||
self.controller.floatingip_deleted(local_object.get_id())
|
||||
else:
|
||||
self.controller.floatingip_updated(df_object)
|
||||
elif table == 'publisher':
|
||||
if action == 'delete':
|
||||
self.controller.publisher_deleted(local_object.get_id())
|
||||
else:
|
||||
self.controller.publisher_updated(df_object)
|
||||
|
||||
def _verify_object(self, table, id, action, df_object, local_object=None):
|
||||
"""Verify the object status and judge whether to create/update/delete
|
||||
the object or not, we'll use twice comparison to verify the status,
|
||||
first comparison result will be stored in the cache and if second
|
||||
comparison result is still consistent with the cache, we can make
|
||||
sure the object status
|
||||
|
||||
:param table: Resource object type
|
||||
:param id: Resource object id
|
||||
:param action: Operate action(create/update/delete)
|
||||
:param df_object: Object from df db
|
||||
:param local_object: Object from local cache
|
||||
"""
|
||||
df_version = df_object.get_version() if df_object else None
|
||||
local_version = local_object.get_version() if local_object else None
|
||||
|
||||
old_cache_obj = self.cache_manager.get(table, id)
|
||||
if not old_cache_obj or old_cache_obj.get_action() != action:
|
||||
cache_obj = CacheObject(action, df_version, local_version)
|
||||
self.cache_manager.set(table, id, cache_obj)
|
||||
return
|
||||
|
||||
old_df_version = old_cache_obj.get_df_version()
|
||||
old_local_version = old_cache_obj.get_local_version()
|
||||
if action == 'create':
|
||||
if df_version >= old_df_version:
|
||||
self._process_object(table, 'create', df_object)
|
||||
self.cache_manager.remove(table, id)
|
||||
return
|
||||
elif action == 'update':
|
||||
if df_version < old_df_version:
|
||||
return
|
||||
if local_version <= old_local_version:
|
||||
self._process_object(table, 'update', df_object)
|
||||
self.cache_manager.remove(table, id)
|
||||
else:
|
||||
cache_obj = CacheObject(action, df_version, local_version)
|
||||
self.cache_manager.set(table, id, cache_obj)
|
||||
elif action == 'delete':
|
||||
self._process_object(table, 'delete', None, local_object)
|
||||
self.cache_manager.remove(table, id)
|
||||
else:
|
||||
LOG.warning(_LW('Unknown action %s in db consistent'), action)
|
||||
|
||||
def _get_df_and_local_objects(self, topic, table):
|
||||
df_objects = []
|
||||
local_objects = []
|
||||
if table == 'lswitch':
|
||||
df_objects = self.nb_api.get_all_logical_switches(topic)
|
||||
local_objects = self.db_store.get_lswitchs(topic)
|
||||
elif table == 'port':
|
||||
df_objects = self.nb_api.get_all_logical_ports(topic)
|
||||
local_objects = self.db_store.get_ports(topic)
|
||||
elif table == 'router':
|
||||
df_objects = self.nb_api.get_routers(topic)
|
||||
local_objects = self.db_store.get_routers(topic)
|
||||
elif table == 'secgroup':
|
||||
df_objects = self.nb_api.get_security_groups(topic)
|
||||
local_objects = self.db_store.get_security_groups(topic)
|
||||
elif table == 'floatingip':
|
||||
df_objects = self.nb_api.get_floatingips(topic)
|
||||
local_objects = self.db_store.get_floatingips(topic)
|
||||
elif table == 'publisher':
|
||||
df_objects = self.nb_api.get_publishers()
|
||||
local_objects = self.db_store.get_publishers()
|
||||
|
||||
return df_objects, local_objects
|
||||
|
||||
def _compare_df_and_local_data(
|
||||
self, table, df_objects, local_objects, direct):
|
||||
"""Compare specific resource type df objects and local objects
|
||||
one by one, we could judge whether to create/update/delete
|
||||
the corresponding object.
|
||||
|
||||
:param table: Resource object type
|
||||
:param df_object: Object from df db
|
||||
:param local_object: Object from local cache
|
||||
:param direct: the process model, if True, we'll do the operation
|
||||
directly after this comparison, if False, we'll go into the verify
|
||||
process which need twice comparision to do the operation.
|
||||
"""
|
||||
local_object_map = {}
|
||||
for local_object in local_objects:
|
||||
local_object_map[local_object.get_id()] = local_object
|
||||
for df_object in df_objects[:]:
|
||||
df_id = df_object.get_id()
|
||||
df_version = df_object.get_version()
|
||||
if not df_version:
|
||||
LOG.error(_LE("Version is None in df_object: %s"), df_object)
|
||||
continue
|
||||
local_object = local_object_map.pop(df_id, None)
|
||||
if local_object:
|
||||
local_version = local_object.get_version()
|
||||
if not local_version:
|
||||
LOG.debug("Version is None in local_object: %s",
|
||||
local_object)
|
||||
self._process_object(
|
||||
table, 'update', df_object)
|
||||
elif df_version > local_version:
|
||||
LOG.debug("Find a newer version df object: %s",
|
||||
df_object)
|
||||
if direct:
|
||||
self._process_object(
|
||||
table, 'update', df_object)
|
||||
else:
|
||||
self._verify_object(
|
||||
table, df_id, 'update',
|
||||
df_object, local_object)
|
||||
else:
|
||||
LOG.debug("Find an additional df object: %s", df_object)
|
||||
if direct:
|
||||
self._process_object(table, 'create', df_object)
|
||||
else:
|
||||
self._verify_object(table, df_id,
|
||||
'create', df_object)
|
||||
|
||||
for local_object in local_object_map.values():
|
||||
LOG.debug("Find a redundant local object: %s", local_object)
|
||||
if direct:
|
||||
self._process_object(table, 'delete', None, local_object)
|
||||
else:
|
||||
self._verify_object(
|
||||
table, local_object.get_id(),
|
||||
'delete', None, local_object)
|
||||
|
||||
def _get_and_compare_df_and_local_data(self, table, direct, topic=None):
|
||||
df_objects, local_objects = self._get_df_and_local_objects(
|
||||
topic, table)
|
||||
self._compare_df_and_local_data(
|
||||
table, df_objects, local_objects, direct)
|
||||
|
||||
def handle_data_comparison(self, tenants, table, direct):
|
||||
if table == 'publisher':
|
||||
self._get_and_compare_df_and_local_data(table, direct)
|
||||
return
|
||||
for topic in tenants:
|
||||
self._get_and_compare_df_and_local_data(table, direct, topic)
|
||||
|
||||
|
||||
class CacheObject(object):
|
||||
def __init__(self, action, df_version, local_version):
|
||||
self.action = action
|
||||
self.df_version = df_version
|
||||
self.local_version = local_version
|
||||
|
||||
def get_action(self):
|
||||
return self.action
|
||||
|
||||
def get_df_version(self):
|
||||
return self.df_version
|
||||
|
||||
def get_local_version(self):
|
||||
return self.local_version
|
|
@ -299,5 +299,8 @@ class DbStore(object):
|
|||
def get_publisher(self, uuid, topic=None):
|
||||
return self.get('publishers', uuid, topic)
|
||||
|
||||
def get_publishers(self, topic=None):
|
||||
return self.values('publishers', topic)
|
||||
|
||||
def delete_publisher(self, uuid, topic=None):
|
||||
self.delete('publishers', uuid, topic)
|
||||
|
|
|
@ -369,6 +369,9 @@ class Publisher(NbDbObject):
|
|||
def get_last_activity_timestamp(self):
|
||||
return self.inner_obj.get('last_activity_timestamp')
|
||||
|
||||
def get_version(self):
|
||||
return 0
|
||||
|
||||
|
||||
class OvsPort(object):
|
||||
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from dragonflow.db import db_consistent
|
||||
from dragonflow.db import models
|
||||
from dragonflow.tests.common import constants
|
||||
from dragonflow.tests.common import utils
|
||||
from dragonflow.tests.fullstack import test_base
|
||||
from dragonflow.tests.fullstack import test_objects as objects
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
|
||||
class TestDbConsistent(test_base.DFTestBase):
|
||||
|
||||
def _check_l2_lookup_rule(self, flows, mac):
|
||||
for flow in flows:
|
||||
if flow['table'] == '17' and 'goto_table:64' in flow['actions']:
|
||||
if 'dl_dst=' + mac in flow['match']:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _check_lswitch_dhcp_rule(self, flows, dhcp_ip):
|
||||
for flow in flows:
|
||||
if flow['table'] == '9' and flow['actions'] == 'goto_table:11':
|
||||
if ('nw_dst=' + dhcp_ip + ',tp_src=68,tp_dst=67'
|
||||
in flow['match']):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _check_no_lswitch_dhcp_rule(self, flows, dhcp_ip):
|
||||
if self._check_lswitch_dhcp_rule(flows, dhcp_ip):
|
||||
return False
|
||||
return True
|
||||
|
||||
def test_db_consistent(self):
|
||||
self.db_sync_time = self.conf.db_sync_time
|
||||
if self.db_sync_time < db_consistent.MIN_SYNC_INTERVAL_TIME:
|
||||
self.db_sync_time = db_consistent.MIN_SYNC_INTERVAL_TIME
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network_id = network.create(network={'name': 'private'})
|
||||
topic = network.get_topic()
|
||||
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
|
||||
network_id))
|
||||
subnet_body = {'network_id': network_id,
|
||||
'cidr': '10.50.0.0/24',
|
||||
'gateway_ip': '10.50.0.1',
|
||||
'ip_version': 4,
|
||||
'name': 'private',
|
||||
'enable_dhcp': True}
|
||||
subnet.create(subnet=subnet_body)
|
||||
time.sleep(constants.DEFAULT_RESOURCE_READY_TIMEOUT)
|
||||
self.assertTrue(network.exists())
|
||||
self.assertTrue(subnet.exists())
|
||||
|
||||
vm = self.store(objects.VMTestObj(self, self.neutron))
|
||||
vm.create(network=network)
|
||||
self.assertIsNotNone(vm.server.addresses['private'])
|
||||
mac = vm.server.addresses['private'][0]['OS-EXT-IPS-MAC:mac_addr']
|
||||
self.assertIsNotNone(mac)
|
||||
|
||||
ovs = utils.OvsFlowsParser()
|
||||
utils.wait_until_true(
|
||||
lambda: self._check_l2_lookup_rule(
|
||||
ovs.dump(self.integration_bridge), mac),
|
||||
timeout=10, sleep=1,
|
||||
exception=Exception('no rule for vm in l2 lookup table')
|
||||
)
|
||||
df_network = {}
|
||||
net_id = '11111111-1111-1111-1111-111111111111'
|
||||
df_network['id'] = net_id
|
||||
df_network['topic'] = topic
|
||||
df_network['name'] = 'df_nw1'
|
||||
df_network['network_type'] = 'vxlan'
|
||||
df_network['segmentation_id'] = 4000
|
||||
df_network['router_external'] = False
|
||||
df_network['mtu'] = 1500
|
||||
df_network[models.UNIQUE_KEY] = 1
|
||||
df_network['version'] = 1
|
||||
|
||||
df_subnet = {}
|
||||
df_subnet['id'] = '22222222-2222-2222-2222-222222222222'
|
||||
df_subnet['lswitch'] = net_id
|
||||
df_subnet['name'] = 'df_sn1'
|
||||
df_subnet['enable_dhcp'] = True
|
||||
df_subnet['cidr'] = '10.60.0.0/24'
|
||||
df_subnet['dhcp_ip'] = '10.60.0.2'
|
||||
df_subnet['gateway_ip'] = '10.60.0.1'
|
||||
df_subnet['dns_nameservers'] = []
|
||||
df_subnet['host_routes'] = []
|
||||
|
||||
df_network['subnets'] = [df_subnet]
|
||||
df_network_json = jsonutils.dumps(df_network)
|
||||
|
||||
self.nb_api.driver.create_key(
|
||||
'lswitch', net_id, df_network_json, topic)
|
||||
|
||||
time.sleep(self.db_sync_time)
|
||||
utils.wait_until_true(
|
||||
lambda: self._check_lswitch_dhcp_rule(
|
||||
ovs.dump(self.integration_bridge), '10.60.0.2'),
|
||||
timeout=self.db_sync_time + constants.DEFAULT_CMD_TIMEOUT, sleep=1,
|
||||
exception=Exception('no goto dhcp rule for lswitch')
|
||||
)
|
||||
|
||||
df_network['version'] = 2
|
||||
df_network['subnets'][0]['dhcp_ip'] = '10.60.0.3'
|
||||
df_network_json = jsonutils.dumps(df_network)
|
||||
self.nb_api.driver.set_key('lswitch', net_id, df_network_json, topic)
|
||||
|
||||
time.sleep(self.db_sync_time)
|
||||
utils.wait_until_true(
|
||||
lambda: self._check_lswitch_dhcp_rule(
|
||||
ovs.dump(self.integration_bridge), '10.60.0.3'),
|
||||
timeout=self.db_sync_time + constants.DEFAULT_CMD_TIMEOUT, sleep=1,
|
||||
exception=Exception('no goto dhcp rule for lswitch')
|
||||
)
|
||||
|
||||
self.nb_api.driver.delete_key('lswitch', net_id, topic)
|
||||
time.sleep(self.db_sync_time)
|
||||
utils.wait_until_true(
|
||||
lambda: self._check_no_lswitch_dhcp_rule(
|
||||
ovs.dump(self.integration_bridge), '10.60.0.3'),
|
||||
timeout=self.db_sync_time + constants.DEFAULT_CMD_TIMEOUT, sleep=1,
|
||||
exception=Exception('could not delete goto dhcp rule for lswitch')
|
||||
)
|
||||
|
||||
vm.close()
|
||||
subnet.close()
|
||||
network.close()
|
|
@ -0,0 +1,112 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from dragonflow.db import db_consistent
|
||||
from dragonflow.tests import base as tests_base
|
||||
|
||||
|
||||
class TestDBConsistent(tests_base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDBConsistent, self).setUp()
|
||||
self.controller = mock.Mock()
|
||||
self.topology = self.controller.topology
|
||||
self.nb_api = self.controller.nb_api
|
||||
self.db_store = self.controller.db_store
|
||||
|
||||
self.topic = '111-222-333'
|
||||
self.lport_id1 = '1'
|
||||
self.lport_id2 = '2'
|
||||
self.lport_id3 = '3'
|
||||
self.db_consistent = db_consistent.DBConsistencyManager(
|
||||
self.controller)
|
||||
|
||||
def test_db_comparison(self):
|
||||
df_obj1 = FakeDfLocalObj(self.lport_id1, 1)
|
||||
df_obj2 = FakeDfLocalObj(self.lport_id2, 2)
|
||||
|
||||
local_obj1 = FakeDfLocalObj(self.lport_id2, 1)
|
||||
local_obj2 = FakeDfLocalObj(self.lport_id3, 1)
|
||||
|
||||
self.nb_api.get_all_logical_switches.return_value = [df_obj1, df_obj2]
|
||||
self.db_store.get_lswitchs.return_value = [local_obj1, local_obj2]
|
||||
|
||||
self.nb_api.get_all_logical_ports.return_value = [df_obj1, df_obj2]
|
||||
self.db_store.get_ports.return_value = [local_obj1, local_obj2]
|
||||
|
||||
self.nb_api.get_routers.return_value = [df_obj1, df_obj2]
|
||||
self.db_store.get_routers.return_value = [local_obj1, local_obj2]
|
||||
|
||||
self.nb_api.get_security_groups.return_value = [df_obj1, df_obj2]
|
||||
self.db_store.get_security_groups.return_value = [
|
||||
local_obj1, local_obj2]
|
||||
|
||||
self.nb_api.get_floatingips.return_value = [df_obj1, df_obj2]
|
||||
self.db_store.get_floatingips.return_value = [local_obj1, local_obj2]
|
||||
|
||||
self.nb_api.get_publishers.return_value = [df_obj1, df_obj2]
|
||||
self.db_store.get_publishers.return_value = [local_obj1, local_obj2]
|
||||
|
||||
self.db_consistent.handle_data_comparison(
|
||||
[self.topic], 'publisher', True)
|
||||
self.controller.publisher_updated.assert_any_call(df_obj1)
|
||||
self.controller.publisher_updated.assert_any_call(df_obj2)
|
||||
self.controller.publisher_deleted.assert_any_call(self.lport_id3)
|
||||
|
||||
self.db_consistent.handle_data_comparison(
|
||||
[self.topic], 'lswitch', True)
|
||||
self.controller.logical_switch_updated.assert_any_call(df_obj1)
|
||||
self.controller.logical_switch_updated.assert_any_call(df_obj2)
|
||||
self.controller.logical_switch_deleted.assert_any_call(
|
||||
self.lport_id3)
|
||||
|
||||
self.db_consistent.handle_data_comparison(
|
||||
[self.topic], 'port', True)
|
||||
self.controller.logical_port_created.assert_any_call(df_obj1)
|
||||
self.controller.logical_port_updated.assert_any_call(df_obj2)
|
||||
self.controller.logical_port_deleted.assert_any_call(
|
||||
self.lport_id3)
|
||||
|
||||
self.db_consistent.handle_data_comparison(
|
||||
[self.topic], 'router', True)
|
||||
self.controller.router_updated.assert_any_call(df_obj1)
|
||||
self.controller.router_updated.assert_any_call(df_obj2)
|
||||
self.controller.router_deleted.assert_any_call(self.lport_id3)
|
||||
|
||||
self.db_consistent.handle_data_comparison(
|
||||
[self.topic], 'secgroup', True)
|
||||
self.controller.security_group_updated.assert_any_call(df_obj1)
|
||||
self.controller.security_group_updated.assert_any_call(df_obj2)
|
||||
self.controller.security_group_deleted.assert_any_call(
|
||||
self.lport_id3)
|
||||
|
||||
self.db_consistent.handle_data_comparison(
|
||||
[self.topic], 'floatingip', True)
|
||||
self.controller.floatingip_updated.assert_any_call(df_obj1)
|
||||
self.controller.floatingip_updated.assert_any_call(df_obj2)
|
||||
self.controller.floatingip_deleted.assert_any_call(
|
||||
self.lport_id3)
|
||||
|
||||
|
||||
class FakeDfLocalObj(object):
|
||||
"""To generate df_obj or local_obj for testing purposes only."""
|
||||
def __init__(self, id, version):
|
||||
self.id = id
|
||||
self.version = version
|
||||
|
||||
def get_id(self):
|
||||
return self.id
|
||||
|
||||
def get_version(self):
|
||||
return self.version
|
|
@ -117,3 +117,32 @@ class TestTopology(tests_base.BaseTestCase):
|
|||
self.lport1.get_id())
|
||||
self.mock_nb_api.subscriber.unregister_topic.assert_called_with(
|
||||
self.lport1.get_topic())
|
||||
|
||||
def test_check_topology_info(self):
|
||||
topic = '111-222-333'
|
||||
lport_id2 = '2'
|
||||
ovs_port_id2 = 'ovs_port2'
|
||||
lport_id3 = '3'
|
||||
ovs_port_id3 = 'ovs_port3'
|
||||
|
||||
self.topology.ovs_to_lport_mapping = {
|
||||
ovs_port_id2: {
|
||||
'lport_id': lport_id2,
|
||||
'topic': topic
|
||||
},
|
||||
ovs_port_id3: {
|
||||
'lport_id': lport_id3,
|
||||
'topic': topic
|
||||
}
|
||||
}
|
||||
self.topology.ovs_ports = {
|
||||
'ovs_port1': self.ovs_port1
|
||||
}
|
||||
self.topology.topic_subscribed = {
|
||||
topic: {lport_id2, lport_id3}
|
||||
}
|
||||
lport1 = Mock()
|
||||
lport1.get_topic.return_value = topic
|
||||
self.db_store.set_port('lport1', lport1, True, topic)
|
||||
self.topology.check_topology_info()
|
||||
self.assertEqual(len(self.topology.topic_subscribed[topic]), 1)
|
||||
|
|
Loading…
Reference in New Issue