Rename port_status_notifier to neutron_notifier

The mechanism could be used to send updates from dragonflow controller
to neutron server. It was used as port status notifier, but it could
also been used as fip status notifier.

This patch rename the class to a neutral name "neutron_notifier".

Change-Id: I9aaf30810517ac7a53d120f9cb28b35737c76804
Related-Bug: #1682066
This commit is contained in:
Hong Hui Xiao 2017-04-13 21:40:34 +08:00
parent 16bc524181
commit 3ed9c29104
15 changed files with 77 additions and 73 deletions

View File

@ -52,8 +52,8 @@ OVS_DB_PID=$OVS_DIR"/"$OVS_DB_SERVICE".pid"
OVS_VSWITCHD_PID=$OVS_DIR"/"$OVS_VSWITCHD_SERVICE".pid"
OVS_VSWITCH_OCSSCHEMA_FILE=${OVS_VSWITCH_OCSSCHEMA_FILE:-"/usr/share/openvswitch/vswitch.ovsschema"}
# Port status notifier
ENABLE_PORT_STATUS_NOTIFIER=${ENABLE_PORT_STATUS_NOTIFIER:-"False"}
# Neutron notifier
ENABLE_NEUTRON_NOTIFIER=${ENABLE_NEUTRON_NOTIFIER:-"False"}
# Set value of TUNNEL_ENDPOINT_IP if unset
TUNNEL_ENDPOINT_IP=${TUNNEL_ENDPOINT_IP:-$HOST_IP}
@ -234,7 +234,7 @@ function configure_df_plugin {
iniset $DRAGONFLOW_CONF df remote_db_hosts "$REMOTE_DB_HOSTS"
iniset $DRAGONFLOW_CONF df nb_db_class "$NB_DRIVER_CLASS"
iniset $DRAGONFLOW_CONF df port_status_notifier "$PORT_STATUS_NOTIFIER"
iniset $DRAGONFLOW_CONF df enable_port_status_notifier "$ENABLE_PORT_STATUS_NOTIFIER"
iniset $DRAGONFLOW_CONF df enable_neutron_notifier "$ENABLE_NEUTRON_NOTIFIER"
iniset $DRAGONFLOW_CONF df enable_dpdk "$ENABLE_DPDK"
iniset $DRAGONFLOW_CONF df management_ip "$HOST_IP"
iniset $DRAGONFLOW_CONF df local_ip "$TUNNEL_ENDPOINT_IP"

View File

@ -5,7 +5,7 @@ LOGFILE=$DEST/logs/stack.sh.log
Q_ENABLE_DRAGONFLOW_LOCAL_CONTROLLER=True
DF_SELECTIVE_TOPO_DIST=True
DF_PUB_SUB=True
ENABLE_PORT_STATUS_NOTIFIER=False
ENABLE_NEUTRON_NOTIFIER=False
DATABASE_PASSWORD=password
RABBIT_PASSWORD=password

View File

@ -5,7 +5,7 @@ LOGFILE=$DEST/logs/stack.sh.log
Q_ENABLE_DRAGONFLOW_LOCAL_CONTROLLER=True
DF_SELECTIVE_TOPO_DIST=True
DF_PUB_SUB=True
ENABLE_PORT_STATUS_NOTIFIER=False
ENABLE_NEUTRON_NOTIFIER=False
DATABASE_PASSWORD=password
RABBIT_PASSWORD=password

View File

@ -8,7 +8,7 @@ LOGFILE=$DEST/logs/stack.sh.log
Q_ENABLE_DRAGONFLOW_LOCAL_CONTROLLER=True
DF_SELECTIVE_TOPO_DIST=True
DF_PUB_SUB=True
ENABLE_PORT_STATUS_NOTIFIER=False
ENABLE_NEUTRON_NOTIFIER=False
DATABASE_PASSWORD=password
RABBIT_PASSWORD=password

View File

@ -30,7 +30,7 @@ from dragonflow._i18n import _
DF_PUBSUB_DRIVER_NAMESPACE = 'dragonflow.pubsub_driver'
DF_NB_DB_DRIVER_NAMESPACE = 'dragonflow.nb_db_driver'
DF_PORT_STATUS_DRIVER_NAMESPACE = 'dragonflow.port_status_driver'
DF_NEUTRON_NOTIFIER_DRIVER_NAMESPACE = 'dragonflow.neutron_notifier_driver'
LOG = logging.getLogger(__name__)

View File

@ -72,12 +72,13 @@ df_opts = [
cfg.StrOpt('pub_sub_multiproc_driver',
default='zmq_pubsub_multiproc_driver',
help=_('Drivers to use for the Dragonflow pub/sub')),
cfg.BoolOpt('enable_port_status_notifier',
cfg.BoolOpt('enable_neutron_notifier',
default=False,
help=_('Enable notifier for the Dragonflow port status')),
cfg.StrOpt('port_status_notifier',
default='redis_port_status_notifier_driver',
help=_('Notifier for the Dragonflow port status')),
help=_('Enable notifier for Dragonflow controller sending '
'data to neutron server')),
cfg.StrOpt('neutron_notifier',
default='nb_api_neutron_notifier_driver',
help=_('Notifier for the Dragonflow controller events')),
cfg.ListOpt('publishers_ips',
default=['$local_ip'],
help=_('List of the Neutron Server Publisher IPs.')),

View File

@ -60,7 +60,7 @@ class DfLocalController(object):
# option tunnel_type
self.tunnel_types = [cfg.CONF.df.tunnel_type]
self.sync_finished = False
self.port_status_notifier = None
self.neutron_notifier = None
nb_driver = df_utils.load_driver(
cfg.CONF.df.nb_db_class,
df_utils.DF_NB_DB_DRIVER_NAMESPACE)
@ -68,15 +68,15 @@ class DfLocalController(object):
nb_driver,
use_pubsub=cfg.CONF.df.enable_df_pub_sub)
self.vswitch_api = vswitch_impl.OvsApi(self.mgt_ip)
if cfg.CONF.df.enable_port_status_notifier:
self.port_status_notifier = df_utils.load_driver(
cfg.CONF.df.port_status_notifier,
df_utils.DF_PORT_STATUS_DRIVER_NAMESPACE)
if cfg.CONF.df.enable_neutron_notifier:
self.neutron_notifier = df_utils.load_driver(
cfg.CONF.df.neutron_notifier,
df_utils.DF_NEUTRON_NOTIFIER_DRIVER_NAMESPACE)
kwargs = dict(
nb_api=self.nb_api,
vswitch_api=self.vswitch_api,
db_store=self.db_store,
neutron_server_notifier=self.port_status_notifier
neutron_server_notifier=self.neutron_notifier
)
app_mgr = app_manager.AppManager.get_instance()
self.open_flow_app = app_mgr.instantiate(ryu_base_app.RyuDFAdapter,
@ -92,9 +92,9 @@ class DfLocalController(object):
self.nb_api.initialize(db_ip=cfg.CONF.df.remote_db_ip,
db_port=cfg.CONF.df.remote_db_port)
self.vswitch_api.initialize(self.nb_api)
if cfg.CONF.df.enable_port_status_notifier:
self.port_status_notifier.initialize(nb_api=self.nb_api,
is_neutron_server=False)
if cfg.CONF.df.enable_neutron_notifier:
self.neutron_notifier.initialize(nb_api=self.nb_api,
is_neutron_server=False)
self.topology = topology.Topology(self,
self.enable_selective_topo_dist)
if self.enable_db_consistency:
@ -565,8 +565,8 @@ class DfLocalController(object):
return self.chassis_name
def notify_port_status(self, ovs_port, status):
if self.port_status_notifier:
self.port_status_notifier.notify_port_status(ovs_port, status)
if self.neutron_notifier:
self.neutron_notifier.notify_port_status(ovs_port, status)
def _get_delete_handler(self, table):
method_name = 'delete_{0}'.format(table)

View File

@ -16,25 +16,29 @@ import six
@six.add_metaclass(abc.ABCMeta)
class PortStatusDriver(object):
# PortStatus implements port status update southbound
# notification mechanism.
class NeutronNotifierDriver(object):
# NeutronNotifierDriver implements notification mechanism from
# Dragonflow controller to northbound neutron server.
@abc.abstractmethod
def initialize(self, nb_api, is_neutron_server):
"""Initialise the portstatus both in server
compute node
"""Initialise the NeutronNotifierDriver both in neutron server and
compute node.
:nb_api: nb_api driver
:is_neutron_server server or compute
:is_neutron_server Neutron server or compute
:return: None
"""
@abc.abstractmethod
def notify_port_status(self, ovs_port, status):
"""notify port status changes to server
def notify_neutron_server(self, table, key, action, value, topic):
"""Notify the change to neutron server. Note that this method
will run in neutron server.
:param ovs_port: which port status changed
:param status: notify port status up or down
:return: None
:param table: which db model
:param key: the id of db model data
:param action: the action of data, create/update/delete
:param value: the value of db model data
:param topic: the topic of neutron server's corresponding listener
:return: None
"""

View File

@ -28,14 +28,14 @@ from dragonflow.common import utils as df_utils
from dragonflow.db import db_common
from dragonflow.db import models
from dragonflow.db.neutron import lockedobjects_db as lock_db
from dragonflow.db import port_status_api
from dragonflow.db import neutron_notifier_api
LOG = log.getLogger(__name__)
class RedisPortStatusNotifier(port_status_api.PortStatusDriver):
# PortStatusNotifier implements port status update
# southbound notification mechanism based on redis
class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
# NbApiNeutronNotifier implements notification mechanism from
# Dragonflow controller to northbound neutron server, based on
# pub/sub driver at present.
def __init__(self):
self.nb_api = None
@ -46,7 +46,7 @@ class RedisPortStatusNotifier(port_status_api.PortStatusDriver):
self.create_heart_beat_reporter(cfg.CONF.host)
else:
if not cfg.CONF.df.enable_df_pub_sub:
LOG.warning("RedisPortStatusNotifier cannot "
LOG.warning("NbApiNeutronNotifier cannot "
"work when enable_df_pub_sub is disabled")
return
self.nb_api.publisher.initialize()
@ -69,7 +69,7 @@ class RedisPortStatusNotifier(port_status_api.PortStatusDriver):
self._create_heart_beat_reporter(host)
def _create_heart_beat_reporter(self, host):
self.nb_api.register_listener_callback(self.port_status_callback,
self.nb_api.register_listener_callback(self.notify_neutron_server,
'listener_' + host)
LOG.info("Register listener %s", host)
self.heart_beat_reporter = HeartBeatReporter(self.nb_api)
@ -109,7 +109,7 @@ class RedisPortStatusNotifier(port_status_api.PortStatusDriver):
LOG.info("Publish to neutron %s", topic)
self.nb_api.publisher.send_event(update)
def port_status_callback(self, table, key, action, value, topic=None):
def notify_neutron_server(self, table, key, action, value, topic=None):
if models.LogicalPort.table_name == table and 'update' == action:
LOG.info("Process port %s status update event", key)
core_plugin = directory.get_plugin()

View File

@ -74,13 +74,12 @@ class DFMechDriver(driver_api.MechanismDriver):
# plugin service, etc) and threads with network connections.
self.nb_api = api_nb.NbApi.get_instance(True)
df_qos.initialize(self.nb_api)
if cfg.CONF.df.enable_port_status_notifier:
port_status_notifier = df_utils.load_driver(
cfg.CONF.df.port_status_notifier,
df_utils.DF_PORT_STATUS_DRIVER_NAMESPACE)
port_status_notifier.initialize(self.nb_api,
is_neutron_server=True)
if cfg.CONF.df.enable_neutron_notifier:
neutron_notifier = df_utils.load_driver(
cfg.CONF.df.neutron_notifier,
df_utils.DF_NEUTRON_NOTIFIER_DRIVER_NAMESPACE)
neutron_notifier.initialize(self.nb_api,
is_neutron_server=True)
self.port_status = None
def subscribe_registries(self):
@ -556,7 +555,7 @@ class DFMechDriver(driver_api.MechanismDriver):
# Here we do not want port status update to trigger
# sending event to other compute node.
if (cfg.CONF.df.enable_port_status_notifier and
if (cfg.CONF.df.enable_neutron_notifier and
n_const.DEVICE_OWNER_COMPUTE_PREFIX
in updated_port['device_owner'] and
context.status != context.original_status and

View File

@ -60,7 +60,7 @@ class TestTopology(test_base.DFTestBase):
self.assertTrue(vm_mac is not None)
vm_flows = self._get_vm_flows(vm_mac)
self.assertTrue(any(vm_flows))
if cfg.CONF.df.enable_port_status_notifier:
if cfg.CONF.df.enable_neutron_notifier:
# test port status update
utils.wait_until_true(
lambda: self._is_VM_port_status(vm, 'ACTIVE'),
@ -71,7 +71,7 @@ class TestTopology(test_base.DFTestBase):
def _remove_vm(self, vm):
vm_mac = vm.get_first_mac()
if cfg.CONF.df.enable_port_status_notifier:
if cfg.CONF.df.enable_neutron_notifier:
# test port status update
vm.server.stop()
utils.wait_until_true(

View File

@ -166,12 +166,12 @@ class TestDFL3RouterPlugin(test_mech_driver.DFMechanismDriverTestCase):
self.assertIsNotNone(record['extra_attributes'])
def test_notify_update_fip_status(self):
cfg.CONF.set_override('port_status_notifier',
'redis_port_status_notifier_driver',
cfg.CONF.set_override('neutron_notifier',
'nb_api_neutron_notifier_driver',
group='df')
notifier = df_utils.load_driver(
cfg.CONF.df.port_status_notifier,
df_utils.DF_PORT_STATUS_DRIVER_NAMESPACE)
cfg.CONF.df.neutron_notifier,
df_utils.DF_NEUTRON_NOTIFIER_DRIVER_NAMESPACE)
kwargs = {'arg_list': ('router:external',),
'router:external': True}
@ -183,10 +183,10 @@ class TestDFL3RouterPlugin(test_mech_driver.DFMechanismDriverTestCase):
'tenant_id': n['network']['tenant_id']}})
self.assertEqual(n_const.FLOATINGIP_STATUS_DOWN, floatingip['status'])
notifier.port_status_callback(models.Floatingip.table_name,
floatingip['id'],
"update",
n_const.FLOATINGIP_STATUS_ACTIVE)
notifier.notify_neutron_server(models.Floatingip.table_name,
floatingip['id'],
"update",
n_const.FLOATINGIP_STATUS_ACTIVE)
floatingip = self.l3p.get_floatingip(self.context, floatingip['id'])
self.assertEqual(n_const.FLOATINGIP_STATUS_ACTIVE,
floatingip['status'])

View File

@ -22,18 +22,18 @@ from dragonflow.tests import base as tests_base
from dragonflow.tests.common import utils
class TestRedisPortStatus(tests_base.BaseTestCase):
class TestNbApiNeutronNotifier(tests_base.BaseTestCase):
def setUp(self):
cfg.CONF.set_override('port_status_notifier',
'redis_port_status_notifier_driver',
cfg.CONF.set_override('neutron_notifier',
'nb_api_neutron_notifier_driver',
group='df')
mock.patch('dragonflow.db.neutron.lockedobjects_db.wrap_db_lock',
side_effect=utils.empty_wrapper).start()
super(TestRedisPortStatus, self).setUp()
super(TestNbApiNeutronNotifier, self).setUp()
self.notifier = df_utils.load_driver(
cfg.CONF.df.port_status_notifier,
df_utils.DF_PORT_STATUS_DRIVER_NAMESPACE)
cfg.CONF.df.neutron_notifier,
df_utils.DF_NEUTRON_NOTIFIER_DRIVER_NAMESPACE)
def test_create_heart_beat_reporter(self):
nb_api = mock.Mock()
@ -56,13 +56,13 @@ class TestRedisPortStatus(tests_base.BaseTestCase):
self.notifier.create_heart_beat_reporter('fake_host')
self.assertFalse(nb_api.register_listener_callback.called)
def test_port_status_callback(self):
def test_notify_neutron_server(self):
core_plugin = mock.Mock()
with mock.patch("neutron_lib.plugins.directory.get_plugin",
return_value=core_plugin):
self.notifier.port_status_callback(models.LogicalPort.table_name,
"fake_port",
"update",
"up")
self.notifier.notify_neutron_server(models.LogicalPort.table_name,
"fake_port",
"update",
"up")
core_plugin.update_port_status.assert_called_once_with(
mock.ANY, "fake_port", "up")

View File

@ -55,7 +55,7 @@ class TestTopology(test_app_base.DFAppTestBase):
def setUp(self):
cfg.CONF.set_override('enable_selective_topology_distribution',
True, group='df')
cfg.CONF.set_override('enable_port_status_notifier', False, group='df')
cfg.CONF.set_override('enable_neutron_notifier', False, group='df')
cfg.CONF.set_override('enable_df_db_consistency', False, group='df')
super(TestTopology, self).setUp(enable_selective_topo_dist=True)
# By default, return empty value for all resources, each case can

View File

@ -68,8 +68,8 @@ dragonflow.nb_db_driver =
zookeeper_nb_db_driver = dragonflow.db.drivers.zookeeper_db_driver:ZookeeperDbDriver
redis_nb_db_driver = dragonflow.db.drivers.redis_db_driver:RedisDbDriver
cassandra_nb_db_driver = dragonflow.db.drivers.cassandra_db_driver:CassandraDbDriver
dragonflow.port_status_driver =
redis_port_status_notifier_driver = dragonflow.db.pubsub_drivers.redis_port_status_notifier:RedisPortStatusNotifier
dragonflow.neutron_notifier_driver =
nb_api_neutron_notifier_driver = dragonflow.db.pubsub_drivers.nb_api_neutron_notifier:NbApiNeutronNotifier
neutron.service_plugins =
df-l3 = dragonflow.neutron.services.l3_router_plugin:DFL3RouterPlugin
df-bgp = dragonflow.neutron.services.bgp.bgp_plugin:DFBgpPlugin