Move publish logic outside specific drivers
Change-Id: I7351aa5868f2ddf4b233f19921ff8e414169b7fc
This commit is contained in:
parent
b1ff544c50
commit
f64be0ded7
|
@ -14,7 +14,7 @@
|
|||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
SEND_ALL_TOPIC = b'D'
|
||||
SEND_ALL_TOPIC = 'D'
|
||||
DB_SYNC_MINIMUM_INTERVAL = 180
|
||||
|
||||
|
||||
|
|
|
@ -102,13 +102,13 @@ class PublisherApi(object):
|
|||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_event(self, update, topic):
|
||||
"""Publish the update
|
||||
def _send_event(self, data, topic):
|
||||
"""Publish data to a topic
|
||||
|
||||
:param update: Encapsulates a Publisher update
|
||||
:type update: DbUpdate object
|
||||
:param data: Stream of data to publish
|
||||
:type data: bytes
|
||||
:param topic: topic to send event to
|
||||
:type topic: string
|
||||
:type topic: bytes
|
||||
:returns: None
|
||||
"""
|
||||
|
||||
|
@ -126,6 +126,27 @@ class PublisherApi(object):
|
|||
pass
|
||||
|
||||
|
||||
class PublisherAgentBase(PublisherApi):
|
||||
def send_event(self, update, topic=None):
|
||||
"""Publish the update
|
||||
|
||||
:param update: Encapsulates a Publisher update
|
||||
:type update: DbUpdate object
|
||||
:param topic: topic to send event to
|
||||
:type topic: string
|
||||
:returns: None
|
||||
"""
|
||||
if topic is None:
|
||||
topic = update.topic or db_common.SEND_ALL_TOPIC
|
||||
|
||||
topic = topic.encode('utf8')
|
||||
|
||||
LOG.debug("Sending %s to %s", update, topic)
|
||||
|
||||
data = pack_message(update.to_dict())
|
||||
self._send_event(data, topic)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class SubscriberApi(object):
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ class RedisPubSub(pub_sub_api.PubSubApi):
|
|||
return self.subscriber
|
||||
|
||||
|
||||
class RedisPublisherAgent(pub_sub_api.PublisherApi):
|
||||
class RedisPublisherAgent(pub_sub_api.PublisherAgentBase):
|
||||
|
||||
publish_retry_times = 5
|
||||
|
||||
|
@ -88,19 +88,14 @@ class RedisPublisherAgent(pub_sub_api.PublisherApi):
|
|||
if result:
|
||||
self._update_client()
|
||||
|
||||
def send_event(self, update, topic=None):
|
||||
if topic:
|
||||
update.topic = topic
|
||||
local_topic = update.topic
|
||||
local_topic = local_topic.encode('utf8')
|
||||
data = pub_sub_api.pack_message(update.to_dict())
|
||||
def _send_event(self, data, topic):
|
||||
ttl = self.publish_retry_times
|
||||
alreadysync = False
|
||||
while ttl > 0:
|
||||
ttl -= 1
|
||||
try:
|
||||
if self.client is not None:
|
||||
self.client.publish(local_topic, data)
|
||||
self.client.publish(topic, data)
|
||||
break
|
||||
except Exception:
|
||||
if not alreadysync:
|
||||
|
|
|
@ -18,7 +18,6 @@ from oslo_config import cfg
|
|||
from oslo_log import log as logging
|
||||
|
||||
from dragonflow.common import exceptions
|
||||
from dragonflow.db import db_common
|
||||
from dragonflow.db import pub_sub_api
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -61,7 +60,7 @@ class ZMQPubSubMultiproc(pub_sub_api.PubSubApi):
|
|||
return self.subscriber
|
||||
|
||||
|
||||
class ZMQPublisherAgentBase(pub_sub_api.PublisherApi):
|
||||
class ZMQPublisherAgentBase(pub_sub_api.PublisherAgentBase):
|
||||
def __init__(self):
|
||||
self.socket = None
|
||||
self.context = None
|
||||
|
@ -74,20 +73,11 @@ class ZMQPublisherAgentBase(pub_sub_api.PublisherApi):
|
|||
def _connect(self):
|
||||
pass
|
||||
|
||||
def send_event(self, update, topic=None):
|
||||
def _send_event(self, data, topic):
|
||||
if not self.socket:
|
||||
self._connect()
|
||||
|
||||
if topic:
|
||||
update.topic = topic
|
||||
elif update.topic:
|
||||
topic = update.topic.encode('utf-8')
|
||||
else:
|
||||
topic = db_common.SEND_ALL_TOPIC
|
||||
update.topic = topic
|
||||
data = pub_sub_api.pack_message(update.to_dict())
|
||||
self.socket.send_multipart([topic, data])
|
||||
LOG.debug("Sending %s", update)
|
||||
|
||||
def close(self):
|
||||
if self.socket:
|
||||
|
|
|
@ -52,7 +52,8 @@ class TestZMQPubSub(tests_base.BaseTestCase):
|
|||
log_debug.assert_called()
|
||||
args = self.ZMQPublisherAgent.socket.send_multipart.call_args
|
||||
self.ZMQPublisherAgent.socket.send_multipart.assert_called_once()
|
||||
self.assertEqual(db_common.SEND_ALL_TOPIC, args[0][0][0])
|
||||
self.assertEqual(db_common.SEND_ALL_TOPIC.encode('utf-8'),
|
||||
args[0][0][0])
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_publisher_reconnection(self):
|
||||
|
@ -67,7 +68,7 @@ class TestZMQPubSub(tests_base.BaseTestCase):
|
|||
self.ZMQPublisherAgent.socket.bind.assert_called_once()
|
||||
self.ZMQPublisherAgent.socket.send_multipart.assert_called_once()
|
||||
log_debug.assert_called()
|
||||
self.assertEqual(2, log_debug.call_count)
|
||||
self.assertEqual(1, log_debug.call_count)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_subscribe_success(self):
|
||||
|
|
Loading…
Reference in New Issue