diff --git a/devstack/etcd_pubsub_driver b/devstack/etcd_pubsub_driver new file mode 100644 index 000000000..aaa814838 --- /dev/null +++ b/devstack/etcd_pubsub_driver @@ -0,0 +1,7 @@ +#!/bin/bash + +function configure_pubsub_service_plugin { + NEUTRON_CONF=${NEUTRON_CONF:-"/etc/neutron/neutron.conf"} + PUB_SUB_DRIVER=${PUB_SUB_DRIVER:-"etcd_pubsub_driver"} + iniset $DRAGONFLOW_CONF df pub_sub_driver $PUB_SUB_DRIVER +} diff --git a/devstack/plugin.sh b/devstack/plugin.sh index a38e8c617..de929e2c7 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -148,6 +148,12 @@ if is_service_enabled df-zmq-publisher-service ; then source $DEST/dragonflow/devstack/zmq_pubsub_driver fi +if is_service_enabled df-etcd-pubsub-service ; then + DF_PUB_SUB="True" + DF_PUB_SUB_USE_MULTIPROC="False" + source $DEST/dragonflow/devstack/etcd_pubsub_driver +fi + if [[ "$DF_REDIS_PUBSUB" == "True" ]]; then DF_PUB_SUB="True" DF_PUB_SUB_USE_MULTIPROC="False" diff --git a/dragonflow/db/pubsub_drivers/etcd_pubsub_driver.py b/dragonflow/db/pubsub_drivers/etcd_pubsub_driver.py new file mode 100644 index 000000000..eaceba396 --- /dev/null +++ b/dragonflow/db/pubsub_drivers/etcd_pubsub_driver.py @@ -0,0 +1,160 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import etcd3gw +import threading + +from oslo_config import cfg +from oslo_log import log as logging + +from dragonflow.db import pub_sub_api + + +LOG = logging.getLogger(__name__) + +PUBSUB_DB_PREFIX = "pubsub" + + +def _get_topic_watch_prefix(topic): + topic_prefix = "/{}/{}".format(PUBSUB_DB_PREFIX, topic) + return topic_prefix + + +class EtcdPubSub(pub_sub_api.PubSubApi): + def __init__(self): + super(EtcdPubSub, self).__init__() + self.subscriber = EtcdSubscriberAgent() + self.publisher = EtcdPublisherAgent() + + def get_publisher(self): + return self.publisher + + def get_subscriber(self): + return self.subscriber + + +class EtcdPublisherAgent(pub_sub_api.PublisherAgentBase): + def __init__(self): + super(EtcdPublisherAgent, self).__init__() + self.client = None + + def initialize(self): + super(EtcdPublisherAgent, self).__init__() + self.client = etcd3gw.client(host=cfg.CONF.df.remote_db_ip, + port=cfg.CONF.df.remote_db_port) + + def _send_event(self, data, topic): + topic_prefix = _get_topic_watch_prefix(topic) + self.client.put(topic_prefix, data) + + def close(self): + pass + + +class WatcherThread(threading.Thread): + def __init__(self, etcd_client, kwargs): + super(WatcherThread, self).__init__(target=self.startWatch, + kwargs=kwargs) + self.daemon = True + self.client = etcd_client + self._cancel = None # Updated in startWatch + + def startWatch(self, topic, handle_event): + topic_prefix = _get_topic_watch_prefix(topic) + events, self._cancel = self.client.watch(topic_prefix) + for event in events: + handle_event(event) + + def cancel(self): + if self._cancel: + self._cancel() + + +class EtcdSubscriberAgent(pub_sub_api.SubscriberApi): + def __init__(self): + self.topic_list = {} + self.uri_list = [] + self.running = False + self.client = None + self.db_changes_callback = None + self.stop_event = None + super(EtcdSubscriberAgent, self).__init__() + + def initialize(self, callback): + self.db_changes_callback = callback + self.stop_event = threading.Event() + self.client = etcd3gw.client(host=cfg.CONF.df.remote_db_ip, + port=cfg.CONF.df.remote_db_port) + + def _create_topic_thread(self, topic): + topic_thread = WatcherThread( + etcd_client=self.client, + kwargs={'topic': topic, + 'handle_event': self.handle_event}) + return topic_thread + + def daemonize(self): + # Start watching + self.running = True + for topic in self.topic_list: + self.topic_list[topic].start() + + def close(self): + self.running = False + for topic in self.topic_list: + self._stop_topic_thread(self.topic_list[topic]) + + def register_topic(self, topic): + LOG.info('Register topic %s', topic) + if topic not in self.topic_list: + topic_thread = self._create_topic_thread(topic) + self.topic_list[topic] = topic_thread + if self.running: + topic_thread.start() + return True + return False + + def unregister_topic(self, topic): + LOG.info('Unregister topic %s', topic) + if self.running: + self._stop_topic_thread(self.topic_list[topic]) + del self.topic_list[topic] + + def _stop_topic_thread(self, topic_thread): + topic_thread.cancel() + + def handle_event(self, event): + unpacked_event = pub_sub_api.unpack_message(event["kv"]["value"]) + self.db_changes_callback( + unpacked_event['table'], + unpacked_event['key'], + unpacked_event['action'], + unpacked_event['value'], + unpacked_event['topic'], + ) + + def process_ha(self): + pass + + def register_hamsg_for_db(self): + pass + + def set_subscriber_for_failover(self, sub, callback): + pass + + def register_listen_address(self, uri): + pass + + def unregister_listen_address(self, uri): + pass diff --git a/dragonflow/tests/fullstack/test_pub_sub.py b/dragonflow/tests/fullstack/test_pub_sub.py index 275d4be05..ca414ca17 100644 --- a/dragonflow/tests/fullstack/test_pub_sub.py +++ b/dragonflow/tests/fullstack/test_pub_sub.py @@ -55,7 +55,7 @@ class PubSubTestBase(test_base.DFTestBase): subscriber.register_topic(db_common.SEND_ALL_TOPIC) uri = '%s://%s:%s' % ( cfg.CONF.df.publisher_transport, - '127.0.0.1', + cfg.CONF.host, cfg.CONF.df.publisher_port ) subscriber.register_listen_address(uri) diff --git a/setup.cfg b/setup.cfg index 64621505c..5ad42d30c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -66,6 +66,7 @@ dragonflow.pubsub_driver = zmq_pubsub_driver = dragonflow.db.pubsub_drivers.zmq_pubsub_driver:ZMQPubSub zmq_pubsub_multiproc_driver = dragonflow.db.pubsub_drivers.zmq_pubsub_driver:ZMQPubSubMultiproc redis_db_pubsub_driver = dragonflow.db.pubsub_drivers.redis_db_pubsub_driver:RedisPubSub + etcd_pubsub_driver = dragonflow.db.pubsub_drivers.etcd_pubsub_driver:EtcdPubSub dragonflow.nb_db_driver = etcd_nb_db_driver = dragonflow.db.drivers.etcd_db_driver:EtcdDbDriver ramcloud_nb_db_driver = dragonflow.db.drivers.ramcloud_db_driver:RamCloudDbDriver