Merge "Migrate Listener to the new model framework"

This commit is contained in:
Jenkins 2017-05-24 02:10:36 +00:00 committed by Gerrit Code Review
commit 04c50c2218
7 changed files with 63 additions and 135 deletions

View File

@ -167,36 +167,6 @@ class NbApi(object):
self.publisher.send_event(update)
eventlet.sleep(0)
def get_neutron_listener(self, id):
try:
listener = self.driver.get_key(db_models.Listener.table_name, id)
return db_models.Listener(listener)
except Exception:
return None
def get_all_neutron_listeners(self):
listeners = self.driver.get_all_entries(db_models.Listener.table_name)
return [db_models.Listener(l) for l in listeners]
def create_neutron_listener(self, id, **columns):
listener = {
'id': id
}
listener.update(columns)
listener_json = jsonutils.dumps(listener)
self.driver.create_key(db_models.Listener.table_name, id,
listener_json)
def update_neutron_listener(self, id, **columns):
listener_json = self.driver.get_key(db_models.Listener.table_name, id)
listener = jsonutils.loads(listener_json)
listener.update(columns)
listener_json = jsonutils.dumps(listener)
self.driver.set_key(db_models.Listener.table_name, id, listener_json)
def delete_neutron_listener(self, host):
self.driver.delete_key(db_models.Listener.table_name, host)
def register_notification_callback(self, controller):
self.controller = controller
LOG.info("DB configuration sync finished, waiting for changes")

View File

@ -17,7 +17,6 @@ NbDbObject = legacy.NbDbObject
UniqueKeyMixin = legacy.UniqueKeyMixin
Floatingip = legacy.Floatingip
AllowedAddressPairsActivePort = legacy.AllowedAddressPairsActivePort
Listener = legacy.Listener
OvsPort = legacy.OvsPort
UNIQUE_KEY = legacy.UNIQUE_KEY

View File

@ -44,3 +44,27 @@ class Publisher(mf.ModelBase, mixins.Name):
@classmethod
def on_get_all_post(self, instances):
return [o for o in instances if not o.is_stale()]
@mf.register_model
@mf.construct_nb_db_model
class Listener(mf.ModelBase):
table_name = "listener"
timestamp = df_fields.TimestampField()
ppid = fields.IntField()
@property
def topic(self):
return 'listener_{id}'.format(id=self.id)
def update_timestamp(self):
self.timestamp = time.time()
def on_create_pre(self):
super(Listener, self).on_create_pre()
self.update_timestamp()
def on_update_pre(self):
super(Listener, self).on_update_pre()
self.update_timestamp()

View File

@ -166,21 +166,6 @@ class AllowedAddressPairsActivePort(NbDbObject):
return True
@register_model_class
class Listener(NbDbObject):
table_name = "listener"
def get_topic(self):
return 'listener' + '_' + self.inner_obj['id']
def get_timestamp(self):
return self.inner_obj['timestamp']
def get_ppid(self):
return self.inner_obj['ppid']
class OvsPort(object):
TYPE_VM = 'vm'

View File

@ -27,6 +27,7 @@ from oslo_log import log
from dragonflow.common import utils as df_utils
from dragonflow.db import db_common
from dragonflow.db import models
from dragonflow.db.models import core
from dragonflow.db.models import l2
from dragonflow.db.neutron import lockedobjects_db as lock_db
from dragonflow.db import neutron_notifier_api
@ -54,11 +55,11 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
@lock_db.wrap_db_lock(lock_db.RESOURCE_NEUTRON_LISTENER)
def create_heart_beat_reporter(self, host):
listener = self.nb_api.get_neutron_listener(host)
if not listener:
listener = self.nb_api.get(core.Listener(id=host))
if listener is None:
self._create_heart_beat_reporter(host)
else:
ppid = listener.get_ppid()
ppid = listener.ppid
my_ppid = os.getppid()
LOG.info("Listener %(l)s exists, my ppid is %(ppid)s",
{'l': listener, 'ppid': my_ppid})
@ -66,14 +67,18 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
# equal to my_ppid. I tried to set api_worker=1, still multiple
# neutron-server processes were created.
if ppid != my_ppid:
self.nb_api.delete_neutron_listener(host)
self.nb_api.delete(listener)
self._create_heart_beat_reporter(host)
def _create_heart_beat_reporter(self, host):
listener = core.Listener(
id=host,
ppid=os.getppid(),
)
self.nb_api.register_listener_callback(self.notify_neutron_server,
'listener_' + host)
LOG.info("Register listener %s", host)
self.heart_beat_reporter = HeartBeatReporter(self.nb_api)
listener.topic)
LOG.info("Register listener %s", listener.id)
self.heart_beat_reporter = HeartBeatReporter(self.nb_api, listener)
self.heart_beat_reporter.daemonize()
def notify_port_status(self, ovs_port, status):
@ -86,7 +91,7 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
fip.get_id(), 'update', status)
def _send_event(self, table, key, action, value):
listeners = self.nb_api.get_all_neutron_listeners()
listeners = self.nb_api.get_all(core.Listener)
listeners_num = len(listeners)
if listeners_num > 1:
# Sort by timestamp and choose from the latest ones randomly.
@ -98,14 +103,14 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
# one is chosen. For users, do not need to figure out what is
# the best report interval. A big interval increase the possility a
# dead one is chosen, while a small one may affect the performance
listeners.sort(key=lambda l: l.get_timestamp(), reverse=True)
listeners.sort(key=lambda l: l.timestamp, reverse=True)
selected = random.choice(listeners[:len(listeners) / 2])
elif listeners_num == 1:
selected = listeners[0]
else:
LOG.warning("No neutron listener found")
return
topic = selected.get_topic()
topic = selected.topic
update = db_common.DbUpdate(table, key, action, value, topic=topic)
LOG.info("Publish to neutron %s", topic)
self.nb_api.publisher.send_event(update)
@ -125,8 +130,9 @@ class NbApiNeutronNotifier(neutron_notifier_api.NeutronNotifierDriver):
class HeartBeatReporter(object):
"""Updates heartbeat timestamp periodically with a random delay."""
def __init__(self, api_nb):
def __init__(self, api_nb, listener):
self.api_nb = api_nb
self.listener = listener
self._daemon = df_utils.DFDaemon()
def daemonize(self):
@ -136,11 +142,7 @@ class HeartBeatReporter(object):
return self._daemon.stop()
def run(self):
listener = cfg.CONF.host
ppid = os.getppid()
self.api_nb.create_neutron_listener(listener,
timestamp=int(time.time()),
ppid=ppid)
self.api_nb.create(self.listener)
cfg_interval = cfg.CONF.df.neutron_listener_report_interval
delay = cfg.CONF.df.neutron_listener_report_delay
@ -151,9 +153,7 @@ class HeartBeatReporter(object):
# throughput and pressure for df-db in a big scale
interval = random.randint(cfg_interval, cfg_interval + delay)
time.sleep(interval)
timestamp = int(time.time())
self.api_nb.update_neutron_listener(listener,
timestamp=timestamp,
ppid=ppid)
self.api_nb.update(self.listener)
except Exception:
LOG.exception("Failed to report heart beat for %s", listener)
LOG.exception("Failed to report heart beat for %s",
self.listener)

View File

@ -14,7 +14,6 @@ import copy
import mock
from dragonflow.db import models as db_models
from dragonflow.db.models import l2
from dragonflow.db.models import l3
from dragonflow.tests.fullstack import test_base
@ -91,50 +90,3 @@ class Test_API_NB(test_base.DFTestBase):
self.assertIsNotNone(lrouter1.unique_key)
self.assertNotEqual(lrouter.unique_key, lrouter1.unique_key)
def test_create_listener(self):
# prepare
fake_listener1 = db_models.Listener("{}")
fake_listener1.inner_obj = {"id": "fake_host1",
"timestamp": 1,
"ppid": -1}
fake_listener2 = db_models.Listener("{}")
fake_listener2.inner_obj = {"id": "fake_host2",
"timestamp": 2,
"ppid": -2}
# test creating
self.nb_api.create_neutron_listener('fake_host1',
timestamp=1,
ppid=-1)
self.nb_api.create_neutron_listener('fake_host2',
timestamp=2,
ppid=-2)
listeners = self.nb_api.get_all_neutron_listeners()
self.assertIn(fake_listener1, listeners)
self.assertIn(fake_listener2, listeners)
# test updating timestamp
self.nb_api.update_neutron_listener('fake_host1',
timestamp=11)
listener1 = self.nb_api.get_neutron_listener('fake_host1')
self.assertEqual(listener1.get_timestamp(), 11)
self.assertEqual(listener1.get_ppid(), -1)
# test updating timestamp and ppid
self.nb_api.update_neutron_listener('fake_host2',
timestamp=22,
ppid=-22)
listener2 = self.nb_api.get_neutron_listener('fake_host2')
self.assertEqual(listener2.get_timestamp(), 22)
self.assertEqual(listener2.get_ppid(), -22)
# test deleting
self.nb_api.delete_neutron_listener('fake_host1')
self.nb_api.delete_neutron_listener('fake_host2')
listener1 = self.nb_api.get_neutron_listener('fake_host1')
listener2 = self.nb_api.get_neutron_listener('fake_host2')
self.assertIsNone(listener1)
self.assertIsNone(listener2)

View File

@ -10,14 +10,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
from dragonflow.common import utils as df_utils
from dragonflow.db import models
from dragonflow.db.models import core
from dragonflow.db.models import l2
from dragonflow.tests import base as tests_base
from dragonflow.tests.common import utils
@ -35,27 +32,28 @@ class TestNbApiNeutronNotifier(tests_base.BaseTestCase):
self.notifier = df_utils.load_driver(
cfg.CONF.df.neutron_notifier,
df_utils.DF_NEUTRON_NOTIFIER_DRIVER_NAMESPACE)
self.notifier.nb_api = mock.Mock()
def test_create_heart_beat_reporter(self):
nb_api = mock.Mock()
self.notifier.nb_api = nb_api
nb_api.get_neutron_listener.return_value = None
self.notifier.create_heart_beat_reporter('fake_host')
self.assertTrue(nb_api.register_listener_callback.called)
getppid_patch = mock.patch('os.getppid', return_value=1)
self.addCleanup(getppid_patch.stop)
getppid_patch.start()
nb_api.reset_mock()
listener = {'id': 'fake_host', 'ppid': 'fake_ppid'}
nb_api.get_neutron_listener.return_value = models.Listener(
jsonutils.dumps(listener))
def test_create_new_heart_beat_reporter(self):
self.notifier.nb_api.get.return_value = None
self.notifier.create_heart_beat_reporter('fake_host')
self.assertTrue(nb_api.register_listener_callback.called)
self.notifier.nb_api.register_listener_callback.assert_called_once()
nb_api.reset_mock()
listener = {'id': 'fake_host', 'ppid': os.getppid()}
nb_api.get_neutron_listener.return_value = models.Listener(
jsonutils.dumps(listener))
def test_replace_heart_beat_reporter(self):
listener = core.Listener(id='fake_host', ppid=6)
self.notifier.nb_api.get.return_value = listener
self.notifier.create_heart_beat_reporter('fake_host')
self.assertFalse(nb_api.register_listener_callback.called)
self.notifier.nb_api.register_listener_callback.assert_called_once()
def test_valid_heart_beat_reporter_exists(self):
listener = core.Listener(id='fake_host', ppid=1)
self.notifier.nb_api.get.return_value = listener
self.notifier.create_heart_beat_reporter('fake_host')
self.notifier.nb_api.register_listener_callback.assert_not_called()
def test_notify_neutron_server(self):
core_plugin = mock.Mock()