diff --git a/devstack/override-defaults b/devstack/override-defaults index 27afb493c..ce7716741 100644 --- a/devstack/override-defaults +++ b/devstack/override-defaults @@ -28,6 +28,7 @@ if is_service_enabled df-metadata ; then fi DRAGONFLOW_CONF=/etc/neutron/dragonflow.ini +DRAGONFLOW_PUBLISHER_CONF=/etc/neutron/dragonflow_publisher.ini DRAGONFLOW_DATAPATH=/etc/neutron/dragonflow_datapath_layout.yaml Q_PLUGIN_EXTRA_CONF_PATH=/etc/neutron Q_PLUGIN_EXTRA_CONF_FILES=(dragonflow.ini) diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 9f47d3acd..c18a56465 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -155,13 +155,11 @@ fi if is_service_enabled df-etcd-pubsub-service ; then init_pubsub - DF_PUB_SUB_USE_MULTIPROC="False" source $DEST/dragonflow/devstack/etcd_pubsub_driver fi if [[ "$DF_REDIS_PUBSUB" == "True" ]]; then init_pubsub - DF_PUB_SUB_USE_MULTIPROC="False" source $DEST/dragonflow/devstack/redis_pubsub_driver fi @@ -316,8 +314,7 @@ function configure_df_plugin { iniset $DRAGONFLOW_CONF df apps_list "$DF_APPS_LIST" iniset $DRAGONFLOW_CONF df_l2_app l2_responder "$DF_L2_RESPONDER" iniset $DRAGONFLOW_CONF df enable_df_pub_sub "$DF_PUB_SUB" - iniset $DRAGONFLOW_CONF df pub_sub_use_multiproc "$DF_PUB_SUB_USE_MULTIPROC" - iniset $DRAGONFLOW_CONF df publisher_multiproc_socket "$DF_PUBLISHER_MULTIPROC_SOCKET" + iniset $DRAGONFLOW_CONF df_zmq ipc_socket "$DF_ZMQ_IPC_SOCKET" if [[ ! -z ${EXTERNAL_HOST_IP} ]]; then iniset $DRAGONFLOW_CONF df external_host_ip "$EXTERNAL_HOST_IP" iniset $DRAGONFLOW_CONF df_snat_app external_network_bridge "$PUBLIC_BRIDGE" @@ -483,9 +480,9 @@ function verify_ryu_version { } function start_pubsub_service { - echo "Starting Dragonflow publisher service" if is_service_enabled df-publisher-service ; then - run_process df-publisher-service "$DF_PUBLISHER_SERVICE_BINARY --config-file $NEUTRON_CONF --config-file $DRAGONFLOW_CONF" + echo "Starting Dragonflow publisher service" + run_process df-publisher-service "$DF_PUBLISHER_SERVICE_BINARY --config-file $NEUTRON_CONF --config-file $DRAGONFLOW_CONF --config-file $DRAGONFLOW_PUBLISHER_CONF" fi } @@ -587,7 +584,6 @@ function handle_df_stack_post_install { if [ -z $PUB_SUB_DRIVER ]; then die $LINENO "pub-sub enabled, but no pub-sub driver selected" fi - PUB_SUB_MULTIPROC_DRIVER=${PUB_SUB_MULTIPROC_DRIVER:-$PUB_SUB_DRIVER} fi if is_service_enabled nova; then diff --git a/devstack/settings b/devstack/settings index 0ec5a4e87..e0a41c766 100644 --- a/devstack/settings +++ b/devstack/settings @@ -11,7 +11,7 @@ DF_INSTALL_DEBUG_ROOTWRAP_CONF=${DF_INSTALL_DEBUG_ROOTWRAP_CONF:-"True"} DF_L3_BINARY=$NEUTRON_BIN_DIR/df-l3-agent DF_LOCAL_CONTROLLER_BINARY=$NEUTRON_BIN_DIR/df-local-controller DF_PUBLISHER_SERVICE_BINARY=$NEUTRON_BIN_DIR/df-publisher-service -DF_PUBLISHER_MULTIPROC_SOCKET=${DF_PUBLISHER_MULTIPROC_SOCKET:-"/var/run/zmq_pubsub/zmq-publisher-socket"} +DF_ZMQ_IPC_SOCKET=${DF_ZMQ_IPC_SOCKET:-"/var/run/zmq_pubsub/zmq-publisher-socket"} DF_AUTO_DETECT_PORT_BEHIND_PORT=${DF_AUTO_DETECT_PORT_BEHIND_PORT:-"False"} DF_LBAAS_AUTO_ENABLE_VIP_PORTS=${DF_LBAAS_AUTO_ENABLE_VIP_PORTS:-"True"} @@ -36,7 +36,6 @@ DF_L2_RESPONDER=${DF_L2_RESPONDER:-'True'} DF_MONITOR_TABLE_POLL_TIME=${DF_MONITOR_TABLE_POLL_TIME:-30} DF_PUB_SUB=${DF_PUB_SUB:-"False"} -DF_PUB_SUB_USE_MULTIPROC=${DF_PUB_SUB_USE_MULTIPROC:-"True"} DF_Q_SVC_MASTER=${DF_Q_SVC_MASTER:-"True"} PUBLISHER_RATE_LIMIT_TIMEOUT=${PUBLISHER_RATE_LIMIT_TIMEOUT:-180} diff --git a/devstack/zmq_pubsub_driver b/devstack/zmq_pubsub_driver index de515aa2b..4816ee354 100644 --- a/devstack/zmq_pubsub_driver +++ b/devstack/zmq_pubsub_driver @@ -1,15 +1,13 @@ #!/bin/bash -ZMQ_IPC_SOCKET=$DF_PUBLISHER_MULTIPROC_SOCKET - function configure_pubsub_service_plugin { NEUTRON_CONF=${NEUTRON_CONF:-"/etc/neutron/neutron.conf"} PUB_SUB_DRIVER=${PUB_SUB_DRIVER:-"zmq_pubsub_driver"} - PUB_SUB_MULTIPROC_DRIVER=${PUB_SUB_MULTIPROC_DRIVER:-"zmq_pubsub_multiproc_driver"} iniset $DRAGONFLOW_CONF df pub_sub_driver $PUB_SUB_DRIVER - iniset $DRAGONFLOW_CONF df pub_sub_multiproc_driver $PUB_SUB_MULTIPROC_DRIVER + DF_PUBLISHER_DRIVER=${DF_PUBLISHER_DRIVER:-"zmq_bind_pubsub_driver"} + iniset $DRAGONFLOW_PUBLISHER_CONF df pub_sub_driver $DF_PUBLISHER_DRIVER - ZMQ_IPC_SOCKET_DIR=`dirname $ZMQ_IPC_SOCKET` + ZMQ_IPC_SOCKET_DIR=`dirname $DF_ZMQ_IPC_SOCKET` sudo mkdir -p $ZMQ_IPC_SOCKET_DIR sudo chown $STACK_USER $ZMQ_IPC_SOCKET_DIR } diff --git a/doc/source/manual_deployment.rst b/doc/source/manual_deployment.rst index 8ee078a7a..2b87d1ddd 100644 --- a/doc/source/manual_deployment.rst +++ b/doc/source/manual_deployment.rst @@ -130,7 +130,7 @@ Next you need to change the configuration, for example, etcd: Pub/Sub Driver -------------- -Dragonflow supports zeromq and redis. You need to change the configuration, for example, zeromq: +Dragonflow supports etcd, redis and zeromq. You need to change the configuration, for example, etcd: /etc/neutron/dragonflow.ini: @@ -138,10 +138,7 @@ Dragonflow supports zeromq and redis. You need to change the configuration, for [df] enable_df_pub_sub = True - pub_sub_driver = zmq_pubsub_driver - publisher_multiproc_socket = /var/run/zmq_pubsub/zmq-publisher-socket - pub_sub_multiproc_driver = zmq_pubsub_multiproc_driver - pub_sub_use_multiproc = True + pub_sub_driver = etcd_pubsub_driver publisher_rate_limit_count = 1 publisher_rate_limit_timeout = 180 monitor_table_poll_time = 30 diff --git a/doc/source/pluggable_pubsub.rst b/doc/source/pluggable_pubsub.rst index dfb9e07f0..5ec9934bc 100644 --- a/doc/source/pluggable_pubsub.rst +++ b/doc/source/pluggable_pubsub.rst @@ -76,36 +76,21 @@ __ _COMMON_PARAMS 1. pub_sub_driver - The alias to the class implementing ``PubSubApi`` for network-based pub/sub. - 2. pub_sub_multiproc_driver - The alias to the class implementing ``PubSubApi`` - for IPC-based pub/sub. - - 3. publisher_port - The port to which the network publisher should bind. It is + 2. publisher_port - The port to which the network publisher should bind. It is also the port the network subscribers connect. - 4. publisher_transport - The transport protocol (e.g. TCP, UDP) over which + 3. publisher_transport - The transport protocol (e.g. TCP, UDP) over which pub/sub netwrok communication is passed. - 5. publisher_bind_address - The local address to which the network publisher + 4. publisher_bind_address - The local address to which the network publisher should bind. '*' means all addresses. - 6. publisher_multiproc_socket - The local socket over which the multi-proc - pub/sub implementation should communicate. The actual value is - implementation specific, since different implementations may use different - IPC mechanisms. - Some publish-subscribe drivers do not need to use a publisher service. This can be the case if e.g. the publisher does not bind to the communication socket. -In this case, the pub_sub_multiproc_driver and publisher_multiproc_socket -options are ignored. All publishers are created using the pub_sub_driver. - -In case this is what you want, disable the following option. - - 1. pub_sub_use_multiproc - Use inter-process publish/subscribe. Publishers - send events via the publisher service. When disabled, publishers send - events directly to the network. +All publishers are created using the pub_sub_driver. ======================== Reference Implementation diff --git a/dragonflow/cli/df_db.py b/dragonflow/cli/df_db.py index 2959a0055..7b31e05c8 100644 --- a/dragonflow/cli/df_db.py +++ b/dragonflow/cli/df_db.py @@ -430,7 +430,7 @@ def main(): df_utils.config_parse() global nb_api - nb_api = api_nb.NbApi.get_instance(False) + nb_api = api_nb.NbApi.get_instance() args.handle(args) diff --git a/dragonflow/cmd/df_skydive_service.py b/dragonflow/cmd/df_skydive_service.py index 14c1e0a91..2ea216e21 100644 --- a/dragonflow/cmd/df_skydive_service.py +++ b/dragonflow/cmd/df_skydive_service.py @@ -279,7 +279,7 @@ def start(is_service): """main method""" df_config.init(sys.argv) df_utils.config_parse() - nb_api = api_nb.NbApi.get_instance(False) + nb_api = api_nb.NbApi.get_instance() if is_service: df_service.register_service('df-skydive-service', nb_api) service_manager = cotyledon.ServiceManager() diff --git a/dragonflow/cmd/eventlet/df_metadata_service.py b/dragonflow/cmd/eventlet/df_metadata_service.py index 1851e7cbc..575079b61 100644 --- a/dragonflow/cmd/eventlet/df_metadata_service.py +++ b/dragonflow/cmd/eventlet/df_metadata_service.py @@ -81,7 +81,7 @@ def main(): config.init(sys.argv[1:]) config.setup_logging() environment_setup() - nb_api = api_nb.NbApi.get_instance(False) + nb_api = api_nb.NbApi.get_instance() service_instance = metadata_service.DFMetadataProxyHandler( cfg.CONF, nb_api) df_service.register_service('df-metadata-service', nb_api) diff --git a/dragonflow/conf/__init__.py b/dragonflow/conf/__init__.py index 4616d7c09..83f19decc 100644 --- a/dragonflow/conf/__init__.py +++ b/dragonflow/conf/__init__.py @@ -27,6 +27,7 @@ from dragonflow.conf import df_redis from dragonflow.conf import df_ryu from dragonflow.conf import df_skydive from dragonflow.conf import df_snat +from dragonflow.conf import df_zmq CONF = cfg.CONF @@ -42,6 +43,7 @@ df_l2.register_opts() df_l3.register_opts() df_dnat.register_opts() df_redis.register_opts() +df_zmq.register_opts() df_ryu.register_opts() df_provider_networks.register_opts() df_snat.register_opts() diff --git a/dragonflow/conf/df_common_params.py b/dragonflow/conf/df_common_params.py index c536c9edc..e664c6e92 100644 --- a/dragonflow/conf/df_common_params.py +++ b/dragonflow/conf/df_common_params.py @@ -62,9 +62,6 @@ df_opts = [ cfg.StrOpt('pub_sub_driver', default='zmq_pubsub_driver', help=_('Drivers to use for the Dragonflow pub/sub')), - cfg.StrOpt('pub_sub_multiproc_driver', - default='zmq_pubsub_multiproc_driver', - help=_('Drivers to use for the Dragonflow pub/sub')), cfg.BoolOpt('enable_neutron_notifier', default=False, help=_('Enable notifier for Dragonflow controller sending ' @@ -84,19 +81,6 @@ df_opts = [ cfg.StrOpt('publisher_bind_address', default='*', help=_('Neutron Server Publishers bind address')), - cfg.BoolOpt( - 'pub_sub_use_multiproc', - default=True, - help=_( - 'Use inter-process publish/subscribe. ' - 'Publishers send events via the publisher service.' - ) - ), - cfg.StrOpt( - 'publisher_multiproc_socket', - default='/var/run/zmq_pubsub/zmq-publisher-socket', - help=_('Neutron Server Publisher inter-process socket address') - ), cfg.IntOpt( 'publisher_timeout', default=300, diff --git a/dragonflow/conf/df_zmq.py b/dragonflow/conf/df_zmq.py new file mode 100644 index 000000000..0df1e6085 --- /dev/null +++ b/dragonflow/conf/df_zmq.py @@ -0,0 +1,36 @@ +# Copyright (c) 2016 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_config import cfg + +from dragonflow._i18n import _ + + +df_zmq_opts = [ + cfg.StrOpt( + 'ipc_socket', + default='/var/run/zmq_pubsub/zmq-publisher-socket', + help=_('Neutron Server Publisher inter-process socket address') + ), +] + + +def register_opts(): + cfg.CONF.register_opts(df_zmq_opts, group='df_zmq') + + +def list_opts(): + return {'df_zmq': df_zmq_opts} diff --git a/dragonflow/controller/df_bgp_service.py b/dragonflow/controller/df_bgp_service.py index 070bca6e8..89259d60c 100644 --- a/dragonflow/controller/df_bgp_service.py +++ b/dragonflow/controller/df_bgp_service.py @@ -165,7 +165,7 @@ class BGPService(service.Service): def main(): df_config.init(sys.argv) - nb_api = api_nb.NbApi.get_instance(False) + nb_api = api_nb.NbApi.get_instance() server = BGPService(nb_api) df_service.register_service('df-bgp-service', nb_api) service.launch(cfg.CONF, server).wait() diff --git a/dragonflow/controller/df_local_controller.py b/dragonflow/controller/df_local_controller.py index 58150a92b..6f6042ff8 100644 --- a/dragonflow/controller/df_local_controller.py +++ b/dragonflow/controller/df_local_controller.py @@ -473,7 +473,7 @@ def main(): df_config.init(sys.argv) init_ryu_config() - nb_api = api_nb.NbApi.get_instance(False) + nb_api = api_nb.NbApi.get_instance() controller = DfLocalController(chassis_name, nb_api) service.register_service('df-local-controller', nb_api) controller.run() diff --git a/dragonflow/controller/df_publisher_service.py b/dragonflow/controller/df_publisher_service.py index 569b89011..23f215d40 100644 --- a/dragonflow/controller/df_publisher_service.py +++ b/dragonflow/controller/df_publisher_service.py @@ -42,7 +42,7 @@ class PublisherService(object): def __init__(self, nb_api): self._queue = queue.Queue() self.publisher = _get_publisher() - self.multiproc_subscriber = self._get_multiproc_subscriber() + self.subscriber = self._get_subscriber() self.nb_api = nb_api self.db = self.nb_api.driver self.uuid = pub_sub_api.generate_publisher_uuid() @@ -51,21 +51,18 @@ class PublisherService(object): cfg.CONF.df.publisher_rate_limit_timeout, ) - def _get_multiproc_subscriber(self): + def _get_subscriber(self): """ Return the subscriber for inter-process communication. If multi-proc communication is not use (i.e. disabled from config), return None. """ - if not cfg.CONF.df.pub_sub_use_multiproc: - return None pub_sub_driver = df_utils.load_driver( - cfg.CONF.df.pub_sub_multiproc_driver, + cfg.CONF.df.pub_sub_driver, df_utils.DF_PUBSUB_DRIVER_NAMESPACE) return pub_sub_driver.get_subscriber() def initialize(self): - if self.multiproc_subscriber: - self.multiproc_subscriber.initialize(self._append_event_to_queue) + self.subscriber.initialize(self._append_event_to_queue) self.publisher.initialize() def _append_event_to_queue(self, table, key, action, value, topic): @@ -74,8 +71,7 @@ class PublisherService(object): time.sleep(0) def run(self): - if self.multiproc_subscriber: - self.multiproc_subscriber.daemonize() + self.subscriber.daemonize() self._register_as_publisher() self._start_db_table_monitors() while True: @@ -155,7 +151,7 @@ def main(): # PATCH(snapiri): Disable pub_sub as it creates a publisher in nb_api # which collides with the publisher we create here. cfg.CONF.set_override('enable_df_pub_sub', False, group='df') - nb_api = api_nb.NbApi.get_instance(False) + nb_api = api_nb.NbApi.get_instance() service = PublisherService(nb_api) df_service.register_service('df-publisher-service', nb_api) service.initialize() diff --git a/dragonflow/db/api_nb.py b/dragonflow/db/api_nb.py index 8fe252062..859936cf7 100644 --- a/dragonflow/db/api_nb.py +++ b/dragonflow/db/api_nb.py @@ -55,35 +55,24 @@ def _get_topic(obj): class NbApi(object): - def __init__(self, db_driver, use_pubsub=False, is_neutron_server=False): + def __init__(self, db_driver): super(NbApi, self).__init__() self.driver = db_driver self.controller = None - self.use_pubsub = use_pubsub + self.use_pubsub = cfg.CONF.df.enable_df_pub_sub self.publisher = None self.subscriber = None - self.is_neutron_server = is_neutron_server self.enable_selective_topo_dist = \ cfg.CONF.df.enable_selective_topology_distribution - self.pub_sub_use_multiproc = False - if self.is_neutron_server: - # multiproc pub/sub is only supported in neutron server - self.pub_sub_use_multiproc = cfg.CONF.df.pub_sub_use_multiproc @staticmethod - def get_instance(is_neutron_server): + def get_instance(): global _nb_api if _nb_api is None: nb_driver = df_utils.load_driver( cfg.CONF.df.nb_db_class, df_utils.DF_NB_DB_DRIVER_NAMESPACE) - # Do not use pubsub for external apps - this causes issues with - # threads and other issues. - use_pubsub = cfg.CONF.df.enable_df_pub_sub - nb_api = NbApi( - nb_driver, - use_pubsub=use_pubsub, - is_neutron_server=is_neutron_server) + nb_api = NbApi(nb_driver) ip, port = get_db_ip_port() nb_api._initialize(db_ip=ip, db_port=port) _nb_api = nb_api @@ -94,25 +83,17 @@ class NbApi(object): if self.use_pubsub: self.publisher = self._get_publisher() self.subscriber = self._get_subscriber() - if self.is_neutron_server: - self.publisher.initialize() - # Start a thread to detect DB failover in Plugin - self.publisher.set_publisher_for_failover( - self.publisher, - self.db_recover_callback) - self.publisher.start_detect_for_failover() - self.driver.set_neutron_server(True) - else: - # FIXME(nick-ma-z): if active-detection is enabled, - # we initialize the publisher here. Make sure it - # only supports redis-based pub/sub driver. - if "active_port_detection" in cfg.CONF.df.apps_list: - self.publisher.initialize() + self.publisher.initialize() + # Start a thread to detect DB failover in Plugin + self.publisher.set_publisher_for_failover( + self.publisher, + self.db_recover_callback) + self.publisher.start_detect_for_failover() def set_db_change_callback(self, db_change_callback): - if self.use_pubsub and not self.is_neutron_server: - # NOTE(gampel) we want to start queuing event as soon - # as possible + if self.use_pubsub: + # This is here to not allow multiple subscribers to be started + # under the same process. One should be more than enough. if not self.subscriber.is_running: self._start_subscriber(db_change_callback) # Register for DB Failover detection in NB Plugin @@ -134,16 +115,11 @@ class NbApi(object): self.driver.process_ha() self.publisher.process_ha() self.subscriber.process_ha() - if not self.is_neutron_server: - self.controller.sync() + self.controller.sync() def _get_publisher(self): - if self.pub_sub_use_multiproc: - pubsub_driver_name = cfg.CONF.df.pub_sub_multiproc_driver - else: - pubsub_driver_name = cfg.CONF.df.pub_sub_driver pub_sub_driver = df_utils.load_driver( - pubsub_driver_name, + cfg.CONF.df.pub_sub_driver, df_utils.DF_PUBSUB_DRIVER_NAMESPACE) return pub_sub_driver.get_publisher() diff --git a/dragonflow/db/db_api.py b/dragonflow/db/db_api.py index eb82dc3bc..f4d663701 100644 --- a/dragonflow/db/db_api.py +++ b/dragonflow/db/db_api.py @@ -163,10 +163,3 @@ class DbApi(object): :returns: None """ - - @abc.abstractmethod - def set_neutron_server(self, is_neutron_server): - """Set neutron server flag - - :returns: None - """ diff --git a/dragonflow/db/drivers/cassandra_db_driver.py b/dragonflow/db/drivers/cassandra_db_driver.py index 333a8c46d..453cbac97 100644 --- a/dragonflow/db/drivers/cassandra_db_driver.py +++ b/dragonflow/db/drivers/cassandra_db_driver.py @@ -193,6 +193,3 @@ class CassandraDbDriver(db_api.DbApi): def process_ha(self): pass - - def set_neutron_server(self, is_neutron_server): - pass diff --git a/dragonflow/db/drivers/etcd_db_driver.py b/dragonflow/db/drivers/etcd_db_driver.py index 88f274d41..c08df0d25 100644 --- a/dragonflow/db/drivers/etcd_db_driver.py +++ b/dragonflow/db/drivers/etcd_db_driver.py @@ -178,7 +178,3 @@ class EtcdDbDriver(db_api.DbApi): def process_ha(self): # Not needed in etcd pass - - def set_neutron_server(self, is_neutron_server): - # Not needed in etcd - pass diff --git a/dragonflow/db/drivers/ramcloud_db_driver.py b/dragonflow/db/drivers/ramcloud_db_driver.py index f11f64748..366c4d4bc 100644 --- a/dragonflow/db/drivers/ramcloud_db_driver.py +++ b/dragonflow/db/drivers/ramcloud_db_driver.py @@ -97,7 +97,3 @@ class RamCloudDbDriver(db_api.DbApi): def process_ha(self): # Not needed in rmc pass - - def set_neutron_server(self, is_neutron_server): - # Not needed in rmc - pass diff --git a/dragonflow/db/drivers/redis_db_driver.py b/dragonflow/db/drivers/redis_db_driver.py index f178621d6..7fb723aaa 100644 --- a/dragonflow/db/drivers/redis_db_driver.py +++ b/dragonflow/db/drivers/redis_db_driver.py @@ -394,6 +394,3 @@ class RedisDbDriver(db_api.DbApi): def process_ha(self): pass - - def set_neutron_server(self, is_neutron_server): - pass diff --git a/dragonflow/db/drivers/rethink_db_driver.py b/dragonflow/db/drivers/rethink_db_driver.py index b47d44227..3cddc48a1 100644 --- a/dragonflow/db/drivers/rethink_db_driver.py +++ b/dragonflow/db/drivers/rethink_db_driver.py @@ -134,6 +134,3 @@ class RethinkDbDriver(db_api.DbApi): def process_ha(self): pass - - def set_neutron_server(self, is_neutron_server): - pass # Not implemented diff --git a/dragonflow/db/drivers/zookeeper_db_driver.py b/dragonflow/db/drivers/zookeeper_db_driver.py index e8b4293a1..a9d7ef4a6 100644 --- a/dragonflow/db/drivers/zookeeper_db_driver.py +++ b/dragonflow/db/drivers/zookeeper_db_driver.py @@ -187,7 +187,3 @@ class ZookeeperDbDriver(db_api.DbApi): def process_ha(self): # Not needed in zookeeper pass - - def set_neutron_server(self, is_neutron_server): - # Not needed in zookeeper - pass diff --git a/dragonflow/db/models/mixins.py b/dragonflow/db/models/mixins.py index c20013a42..f80fb962c 100644 --- a/dragonflow/db/models/mixins.py +++ b/dragonflow/db/models/mixins.py @@ -55,5 +55,5 @@ class UniqueKey(mf.MixinBase): # Relevant bp: # https://blueprints.launchpad.net/dragonflow/+spec/pub-sub-v2 from dragonflow.db import api_nb - nb_api = api_nb.NbApi.get_instance(True) + nb_api = api_nb.NbApi.get_instance() self.unique_key = nb_api.driver.allocate_unique_key(self.table_name) diff --git a/dragonflow/db/pubsub_drivers/zmq_pubsub_driver.py b/dragonflow/db/pubsub_drivers/zmq_pubsub_driver.py index 3b2dcbb12..dea677d50 100644 --- a/dragonflow/db/pubsub_drivers/zmq_pubsub_driver.py +++ b/dragonflow/db/pubsub_drivers/zmq_pubsub_driver.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +import abc +import six import traceback import eventlet @@ -27,41 +29,6 @@ LOG = logging.getLogger(__name__) SUPPORTED_TRANSPORTS = set(['tcp', 'epgm']) -class ZMQPubSub(pub_sub_api.PubSubApi): - def __init__(self): - super(ZMQPubSub, self).__init__() - transport = cfg.CONF.df.publisher_transport - if transport not in SUPPORTED_TRANSPORTS: - message = ("zmq_pub_sub: Unsupported publisher_transport value " - "%(transport)s, expected %(expected)s") - LOG.error(message, { - 'transport': transport, - 'expected': SUPPORTED_TRANSPORTS - }) - raise exceptions.UnsupportedTransportException(transport=transport) - self.subscriber = ZMQSubscriberAgent() - self.publisher = ZMQPublisherAgent() - - def get_publisher(self): - return self.publisher - - def get_subscriber(self): - return self.subscriber - - -class ZMQPubSubMultiproc(pub_sub_api.PubSubApi): - def __init__(self): - super(ZMQPubSubMultiproc, self).__init__() - self.subscriber = ZMQSubscriberMultiprocAgent() - self.publisher = ZMQPublisherMultiprocAgent() - - def get_publisher(self): - return self.publisher - - def get_subscriber(self): - return self.subscriber - - class ZMQPublisherAgentBase(pub_sub_api.PublisherAgentBase): def __init__(self): self.socket = None @@ -111,7 +78,7 @@ class ZMQPublisherMultiprocAgent(ZMQPublisherAgentBase): def _connect(self): self.socket = self.context.socket(zmq.PUSH) - ipc_socket = cfg.CONF.df.publisher_multiproc_socket + ipc_socket = cfg.CONF.df_zmq.ipc_socket LOG.debug("About to connect to IPC socket: %s", ipc_socket) self.socket.connect('ipc://%s' % ipc_socket) @@ -173,7 +140,7 @@ class ZMQSubscriberAgentBase(pub_sub_api.SubscriberAgentBase): class ZMQSubscriberMultiprocAgent(ZMQSubscriberAgentBase): def connect(self): self.sub_socket = self.context.socket(zmq.PULL) - ipc_socket = cfg.CONF.df.publisher_multiproc_socket + ipc_socket = cfg.CONF.df_zmq.ipc_socket LOG.debug("About to bind to IPC socket: %s", ipc_socket) self.sub_socket.bind('ipc://%s' % ipc_socket) @@ -187,3 +154,42 @@ class ZMQSubscriberAgent(ZMQSubscriberAgentBase): self.sub_socket.connect(uri) for topic in self.topic_list: self.sub_socket.setsockopt(zmq.SUBSCRIBE, topic) + + +@six.add_metaclass(abc.ABCMeta) +class ZMQPubSubBase(pub_sub_api.PubSubApi): + def __init__(self): + super(ZMQPubSubBase, self).__init__() + transport = cfg.CONF.df.publisher_transport + if transport not in SUPPORTED_TRANSPORTS: + message = ("zmq_pub_sub: Unsupported publisher_transport value " + "%(transport)s, expected %(expected)s") + LOG.error(message, { + 'transport': transport, + 'expected': SUPPORTED_TRANSPORTS + }) + raise exceptions.UnsupportedTransportException(transport=transport) + self.subscriber = None + self.publisher = None + + def get_publisher(self): + return self.publisher + + def get_subscriber(self): + return self.subscriber + + +class ZMQPubSubBind(ZMQPubSubBase): + """Has IPC subscriber and TCP/PGM publisher""" + def __init__(self): + super(ZMQPubSubBind, self).__init__() + self.subscriber = ZMQSubscriberMultiprocAgent() + self.publisher = ZMQPublisherAgent() + + +class ZMQPubSubConnect(ZMQPubSubBase): + """Has TCP/PGM subscriber and IPC publisher""" + def __init__(self): + super(ZMQPubSubConnect, self).__init__() + self.subscriber = ZMQSubscriberAgent() + self.publisher = ZMQPublisherMultiprocAgent() diff --git a/dragonflow/neutron/ml2/mech_driver.py b/dragonflow/neutron/ml2/mech_driver.py index 8966f9af2..8b63c7754 100644 --- a/dragonflow/neutron/ml2/mech_driver.py +++ b/dragonflow/neutron/ml2/mech_driver.py @@ -79,7 +79,7 @@ class DFMechDriver(api.MechanismDriver): def post_fork_initialize(self, resource, event, trigger, **kwargs): # NOTE(nick-ma-z): This will initialize all workers (API, RPC, # plugin service, etc) and threads with network connections. - self.nb_api = api_nb.NbApi.get_instance(True) + self.nb_api = api_nb.NbApi.get_instance() df_qos.initialize(self.nb_api) if cfg.CONF.df.enable_neutron_notifier: neutron_notifier = df_utils.load_driver( diff --git a/dragonflow/tests/database/_dummy_db_driver.py b/dragonflow/tests/database/_dummy_db_driver.py index fe3f4d561..3f393965f 100644 --- a/dragonflow/tests/database/_dummy_db_driver.py +++ b/dragonflow/tests/database/_dummy_db_driver.py @@ -79,7 +79,3 @@ class _DummyDbDriver(db_api.DbApi): def process_ha(self): # Do nothing pass - - def set_neutron_server(self, is_neutron_server): - # Do nothing - pass diff --git a/dragonflow/tests/database/test_db.py b/dragonflow/tests/database/test_db.py index cd76edb2a..bd26105ba 100644 --- a/dragonflow/tests/database/test_db.py +++ b/dragonflow/tests/database/test_db.py @@ -156,7 +156,7 @@ def main(): elif sys.argv[1] != 'client': raise Exception('Bad parameter #1: Expected \'server\' or \'client\',' ' found: %s' % sys.argv[1]) - nb_api = api_nb.NbApi.get_instance(is_server) + nb_api = api_nb.NbApi.get_instance() if is_server: run_server(nb_api) else: diff --git a/dragonflow/tests/fullstack/test_base.py b/dragonflow/tests/fullstack/test_base.py index a43a274ff..2610be5d5 100644 --- a/dragonflow/tests/fullstack/test_base.py +++ b/dragonflow/tests/fullstack/test_base.py @@ -61,7 +61,7 @@ class DFTestBase(base.BaseTestCase): self.integration_bridge = self.conf.integration_bridge self._queue = queue.PriorityQueue() - self.nb_api = api_nb.NbApi.get_instance(False) + self.nb_api = api_nb.NbApi.get_instance() # As we are running in the same process over and over, # do not perform redundant calls to the subscriber if not self.nb_api.subscriber.is_running: @@ -112,10 +112,7 @@ class DFTestBase(base.BaseTestCase): return publisher def get_publisher(self, port=None): - if cfg.CONF.df.pub_sub_use_multiproc: - pubsub_driver_name = cfg.CONF.df.pub_sub_multiproc_driver - else: - pubsub_driver_name = cfg.CONF.df.pub_sub_driver + pubsub_driver_name = cfg.CONF.df.pub_sub_driver if port is not None: cfg.CONF.set_override('publisher_port', port, group='df') return self._get_publisher(pubsub_driver_name) diff --git a/dragonflow/tests/fullstack/test_pub_sub.py b/dragonflow/tests/fullstack/test_pub_sub.py index 20bca6cd9..5e4997e31 100644 --- a/dragonflow/tests/fullstack/test_pub_sub.py +++ b/dragonflow/tests/fullstack/test_pub_sub.py @@ -22,7 +22,6 @@ from dragonflow.db import db_common from dragonflow.db.models import core from dragonflow.db import pub_sub_api from dragonflow.tests.common import constants as const -from dragonflow.tests.common import utils as test_utils from dragonflow.tests.fullstack import test_base from dragonflow.tests.fullstack import test_objects as objects @@ -302,66 +301,6 @@ class TestPubSub(PubSubTestBase): self.assertEqual(ns.events_action, action) -class TestMultiprocPubSub(PubSubTestBase): - - def setUp(self): - super(TestMultiprocPubSub, self).setUp() - self.do_test = cfg.CONF.df.pub_sub_use_multiproc - self.key = 'key-{}'.format(random.random()) - self.event = db_common.DbUpdate( - 'info', - None, - "log", - "TestMultiprocPubSub value", - topic=db_common.SEND_ALL_TOPIC, - ) - self.publisher = None - self.subscriber = None - - def tearDown(self): - if self.subscriber: - self.subscriber.close() - self._stop_publisher(self.publisher) - super(TestMultiprocPubSub, self).tearDown() - - def _handle_received_event(self, table, key, action, value, topic): - self.event_received_info = db_common.DbUpdate( - table, - key, - action, - value, - topic=topic) - self.event_received = True - - def test_multiproc_pub_sub(self): - if not self.do_test: - self.skipTest('pub/sub is not enabled') - return - self.event_received = False - cfg.CONF.set_override('publisher_multiproc_socket', - '/tmp/ipc_test_socket', group='df') - pub_sub_driver = df_utils.load_driver( - cfg.CONF.df.pub_sub_multiproc_driver, - df_utils.DF_PUBSUB_DRIVER_NAMESPACE) - publisher = pub_sub_driver.get_publisher() - publisher.initialize() - self.subscriber = pub_sub_driver.get_subscriber() - self.subscriber.initialize(self._handle_received_event) - self.subscriber.daemonize() - publisher.send_event(self.event) - test_utils.wait_until_true(lambda: self.event_received) - self.subscriber.close() - self.subscriber = None - - # Check that we received the same event - self.assertEqual(self.event.table, self.event_received_info.table) - self.assertEqual(self.event.key, self.event_received_info.key) - self.assertEqual(self.event.action, self.event_received_info.action) - # Value is not tested, since it's currently set to None - # self.assertEqual(self.event.value, self.event_received_info.value) - self.assertEqual(self.event.topic, self.event_received_info.topic) - - class TestDbTableMonitors(PubSubTestBase): def setUp(self): super(TestDbTableMonitors, self).setUp() diff --git a/dragonflow/tests/unit/test_api_nb.py b/dragonflow/tests/unit/test_api_nb.py index 05073ac0e..a723159ca 100644 --- a/dragonflow/tests/unit/test_api_nb.py +++ b/dragonflow/tests/unit/test_api_nb.py @@ -12,6 +12,8 @@ from jsonmodels import fields import mock +from oslo_config import cfg + from dragonflow.common import exceptions from dragonflow.db import api_nb from dragonflow.db import db_common @@ -40,11 +42,8 @@ class TopicModelTest(mf.ModelBase, mixins.Topic): class TestNbApi(tests_base.BaseTestCase): def setUp(self): super(TestNbApi, self).setUp() - self.api_nb = api_nb.NbApi( - db_driver=mock.Mock(), - use_pubsub=True, - is_neutron_server=True - ) + cfg.CONF.set_override('enable_df_pub_sub', True, group='df') + self.api_nb = api_nb.NbApi(db_driver=mock.Mock()) self.api_nb.publisher = mock.Mock() self.api_nb.enable_selective_topo_dist = True diff --git a/dragonflow/tests/unit/test_app_base.py b/dragonflow/tests/unit/test_app_base.py index e855f55d6..d2d57d5e5 100644 --- a/dragonflow/tests/unit/test_app_base.py +++ b/dragonflow/tests/unit/test_app_base.py @@ -80,7 +80,7 @@ class DFAppTestBase(tests_base.BaseTestCase): # CLear old objects from cache db_store.get_instance().clear() - self.nb_api = api_nb.NbApi.get_instance(False) + self.nb_api = api_nb.NbApi.get_instance() self.controller = df_local_controller.DfLocalController( fake_chassis1.id, self.nb_api) self.vswitch_api = self.controller.vswitch_api = mock.MagicMock() diff --git a/dragonflow/tests/unit/test_df_bgp_service.py b/dragonflow/tests/unit/test_df_bgp_service.py index 4d94bfbc1..e4af082ad 100644 --- a/dragonflow/tests/unit/test_df_bgp_service.py +++ b/dragonflow/tests/unit/test_df_bgp_service.py @@ -77,7 +77,7 @@ class TestDFBGPService(tests_base.BaseTestCase): mock_nb_api = mock.patch('dragonflow.db.api_nb.NbApi.get_instance') mock_nb_api.start() self.addCleanup(mock_nb_api.stop) - nb_api = api_nb.NbApi.get_instance(False) + nb_api = api_nb.NbApi.get_instance() self.bgp_service = df_bgp_service.BGPService(nb_api) self.bgp_service.bgp_driver = mock.Mock() self.bgp_service.bgp_pulse = LoopingCallByEvent( diff --git a/dragonflow/tests/unit/test_sync.py b/dragonflow/tests/unit/test_sync.py index c373b9477..827f8332e 100644 --- a/dragonflow/tests/unit/test_sync.py +++ b/dragonflow/tests/unit/test_sync.py @@ -58,11 +58,7 @@ class TestSync(tests_base.BaseTestCase): self._db_store = db_store.get_instance() self._db_store.clear() - self.nb_api = api_nb.NbApi( - db_driver=mock.Mock(), - use_pubsub=True, - is_neutron_server=True - ) + self.nb_api = api_nb.NbApi(db_driver=mock.Mock()) self.nb_api.publisher = mock.Mock() self.nb_api.enable_selective_topo_dist = True self._update = mock.Mock(side_effect=self._db_store.update) diff --git a/setup.cfg b/setup.cfg index 75ac72671..de525ab89 100644 --- a/setup.cfg +++ b/setup.cfg @@ -68,8 +68,8 @@ console_scripts = df-bgp-service = dragonflow.cmd.eventlet.df_bgp_service:main df-skydive-service= dragonflow.cmd.df_skydive_service:service_main dragonflow.pubsub_driver = - zmq_pubsub_driver = dragonflow.db.pubsub_drivers.zmq_pubsub_driver:ZMQPubSub - zmq_pubsub_multiproc_driver = dragonflow.db.pubsub_drivers.zmq_pubsub_driver:ZMQPubSubMultiproc + zmq_pubsub_driver = dragonflow.db.pubsub_drivers.zmq_pubsub_driver:ZMQPubSubConnect + zmq_bind_pubsub_driver = dragonflow.db.pubsub_drivers.zmq_pubsub_driver:ZMQPubSubBind redis_db_pubsub_driver = dragonflow.db.pubsub_drivers.redis_db_pubsub_driver:RedisPubSub etcd_pubsub_driver = dragonflow.db.pubsub_drivers.etcd_pubsub_driver:EtcdPubSub dragonflow.nb_db_driver =