Merge "Fix health manager performance regression" into stable/queens

This commit is contained in:
Zuul 2019-02-05 23:33:44 +00:00 committed by Gerrit Code Review
commit de64db8e7b
4 changed files with 494 additions and 207 deletions

View File

@ -14,6 +14,7 @@
import datetime
import time
import timeit
from oslo_config import cfg
from oslo_log import log as logging
@ -46,43 +47,50 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
self.loadbalancer_repo = repo.LoadBalancerRepository()
self.member_repo = repo.MemberRepository()
self.pool_repo = repo.PoolRepository()
self.sync_prv_status = CONF.health_manager.sync_provisioning_status
def emit(self, info_type, info_id, info_obj):
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
self.event_streamer.emit(cnt)
def _update_status_and_emit_event(self, session, repo, entity_type,
entity_id, new_op_status, old_op_status,
current_prov_status):
entity_id, new_op_status, old_op_status):
message = {}
if old_op_status.lower() != new_op_status.lower():
LOG.debug("%s %s status has changed from %s to "
"%s. Updating db and sending event.",
"%s, updating db.",
entity_type, entity_id, old_op_status,
new_op_status)
repo.update(session, entity_id, operating_status=new_op_status)
# Map the status for neutron-lbaas
if new_op_status == constants.DRAINING:
new_op_status = constants.ONLINE
message.update({constants.OPERATING_STATUS: new_op_status})
if self.sync_prv_status:
LOG.debug("%s %s provisioning_status %s. "
"Sending event.",
entity_type, entity_id, current_prov_status)
message.update(
{constants.PROVISIONING_STATUS: current_prov_status})
if message:
self.emit(entity_type, entity_id, message)
if (CONF.health_manager.event_streamer_driver !=
constants.NOOP_EVENT_STREAMER):
if CONF.health_manager.sync_provisioning_status:
current_prov_status = repo.get(
session, id=entity_id).provisioning_status
LOG.debug("%s %s provisioning_status %s. "
"Sending event.",
entity_type, entity_id, current_prov_status)
message.update(
{constants.PROVISIONING_STATUS: current_prov_status})
if message:
self.emit(entity_type, entity_id, message)
def update_health(self, health, srcaddr):
# The executor will eat any exceptions from the update_health code
# so we need to wrap it and log the unhandled exception
start_time = timeit.default_timer()
try:
self._update_health(health, srcaddr)
except Exception:
LOG.exception('update_health encountered an unknown error '
'processing health message for amphora {0} with IP '
'{1}'.format(health['id'], srcaddr))
except Exception as e:
LOG.exception('Health update for amphora %(amp)s encountered '
'error %(err)s. Skipping health update.',
{'amp': health['id'], 'err': str(e)})
# TODO(johnsom) We need to set a warning threshold here
LOG.debug('Health Update finished in: {0} seconds'.format(
timeit.default_timer() - start_time))
def _update_health(self, health, srcaddr):
"""This function is to update db info based on amphora status
@ -110,13 +118,13 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
session = db_api.get_session()
# We need to see if all of the listeners are reporting in
db_lb = self.amphora_repo.get_lb_for_amphora(session, health['id'])
db_lb = self.amphora_repo.get_lb_for_health_update(session,
health['id'])
ignore_listener_count = False
listeners = health['listeners']
if db_lb:
expected_listener_count = len(db_lb.listeners)
if 'PENDING' in db_lb.provisioning_status:
expected_listener_count = len(db_lb.get('listeners', {}))
if 'PENDING' in db_lb['provisioning_status']:
ignore_listener_count = True
else:
# If this is not a spare amp, log and skip it.
@ -151,6 +159,8 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
health['id'], srcaddr), e)
expected_listener_count = 0
listeners = health['listeners']
# Do not update amphora health if the reporting listener count
# does not match the expected listener count
if len(listeners) == expected_listener_count or ignore_listener_count:
@ -160,6 +170,9 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
# if we're running too far behind, warn and bail
proc_delay = time.time() - health['recv_time']
hb_interval = CONF.health_manager.heartbeat_interval
# TODO(johnsom) We need to set a warning threshold here, and
# escalate to critical when it reaches the
# heartbeat_interval
if proc_delay >= hb_interval:
LOG.warning('Amphora %(id)s health message was processed too '
'slowly: %(delay)ss! The system may be overloaded '
@ -189,16 +202,17 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
return
processed_pools = []
potential_offline_pools = {}
# We got a heartbeat so lb is healthy until proven otherwise
if db_lb.enabled is False:
if db_lb['enabled'] is False:
lb_status = constants.OFFLINE
else:
lb_status = constants.ONLINE
for db_listener in db_lb.listeners:
for listener_id in db_lb.get('listeners', {}):
db_op_status = db_lb['listeners'][listener_id]['operating_status']
listener_status = None
listener_id = db_listener.id
listener = None
if listener_id not in listeners:
@ -221,13 +235,11 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
'status': listener.get('status')})
try:
if listener_status is not None:
if (listener_status is not None and
listener_status != db_op_status):
self._update_status_and_emit_event(
session, self.listener_repo, constants.LISTENER,
listener_id, listener_status,
db_listener.operating_status,
db_listener.provisioning_status
)
listener_id, listener_status, db_op_status)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Listener %s is not in DB", listener_id)
@ -236,36 +248,52 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
pools = listener['pools']
# Process pools bound to listeners
for db_pool in db_listener.pools:
for db_pool_id in db_lb.get('pools', {}):
# If we saw this pool already on another listener
# skip it.
if db_pool_id in processed_pools:
continue
db_pool_dict = db_lb['pools'][db_pool_id]
lb_status = self._process_pool_status(
session, db_pool, pools, lb_status, processed_pools)
session, db_pool_id, db_pool_dict, pools,
lb_status, processed_pools, potential_offline_pools)
# Process pools bound to the load balancer
for db_pool in db_lb.pools:
# Don't re-process pools shared with listeners
if db_pool.id in processed_pools:
for pool_id in potential_offline_pools:
# Skip if we eventually found a status for this pool
if pool_id in processed_pools:
continue
lb_status = self._process_pool_status(
session, db_pool, [], lb_status, processed_pools)
try:
# If the database doesn't already show the pool offline, update
if potential_offline_pools[pool_id] != constants.OFFLINE:
self._update_status_and_emit_event(
session, self.pool_repo, constants.POOL,
pool_id, constants.OFFLINE,
potential_offline_pools[pool_id])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Pool %s is not in DB", pool_id)
# Update the load balancer status last
try:
self._update_status_and_emit_event(
session, self.loadbalancer_repo,
constants.LOADBALANCER, db_lb.id, lb_status,
db_lb.operating_status, db_lb.provisioning_status
)
if lb_status != db_lb['operating_status']:
self._update_status_and_emit_event(
session, self.loadbalancer_repo,
constants.LOADBALANCER, db_lb['id'], lb_status,
db_lb[constants.OPERATING_STATUS])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Load balancer %s is not in DB", db_lb.id)
def _process_pool_status(self, session, db_pool, pools, lb_status,
processed_pools):
def _process_pool_status(
self, session, pool_id, db_pool_dict, pools, lb_status,
processed_pools, potential_offline_pools):
pool_status = None
pool_id = db_pool.id
if pool_id not in pools:
pool_status = constants.OFFLINE
# If we don't have a status update for this pool_id
# add it to the list of potential offline pools and continue.
# We will check the potential offline pool list after we
# finish processing the status updates from all of the listeners.
potential_offline_pools[pool_id] = db_pool_dict['operating_status']
return lb_status
else:
pool = pools[pool_id]
@ -287,10 +315,10 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
# Deal with the members that are reporting from
# the Amphora
members = pool['members']
for db_member in db_pool.members:
for member_id in db_pool_dict.get('members', {}):
member_status = None
member_db_status = db_member.operating_status
member_id = db_member.id
member_db_status = (
db_pool_dict['members'][member_id]['operating_status'])
if member_id not in members:
if member_db_status != constants.NO_MONITOR:
@ -323,25 +351,21 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
'status': status})
try:
if member_status is not None:
if (member_status is not None and
member_status != member_db_status):
self._update_status_and_emit_event(
session, self.member_repo,
constants.MEMBER,
member_id, member_status,
db_member.operating_status,
db_member.provisioning_status
)
session, self.member_repo, constants.MEMBER,
member_id, member_status, member_db_status)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Member %s is not able to update "
"in DB", member_id)
try:
if pool_status is not None:
if (pool_status is not None and
pool_status != db_pool_dict['operating_status']):
self._update_status_and_emit_event(
session, self.pool_repo, constants.POOL,
pool_id, pool_status, db_pool.operating_status,
db_pool.provisioning_status
)
pool_id, pool_status, db_pool_dict['operating_status'])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Pool %s is not in DB", pool_id)

View File

@ -988,6 +988,78 @@ class AmphoraRepository(BaseRepository):
return amp.to_data_model()
def get_lb_for_health_update(self, session, amphora_id):
"""This method is for the health manager status update process.
This is a time sensitive query that occurs often.
It is an explicit query as the ORM produces a poorly
optimized query.
Use extreme caution making any changes to this query
as it can impact the scalability of the health manager.
All changes should be analyzed using SQL "EXPLAIN" to
make sure only indexes are being used.
Changes should also be evaluated using the stressHM tool.
Note: The returned object is flat and not a graph representation
of the load balancer as it is not needed. This is on
purpose to optimize the processing time. This is not in
the normal data model objects.
:param session: A Sql Alchemy database session.
:param amphora_id: The amphora ID to lookup the load balancer for.
:returns: A dictionary containing the required load balancer details.
"""
rows = session.execute(
"SELECT load_balancer.id, load_balancer.enabled, "
"load_balancer.provisioning_status AS lb_prov_status, "
"load_balancer.operating_status AS lb_op_status, "
"listener.id AS list_id, "
"listener.operating_status AS list_op_status, "
"pool.id AS pool_id, "
"pool.operating_status AS pool_op_status, "
"member.id AS member_id, "
"member.operating_status AS mem_op_status from "
"amphora JOIN load_balancer ON "
"amphora.load_balancer_id = load_balancer.id LEFT JOIN "
"listener ON load_balancer.id = listener.load_balancer_id "
"LEFT JOIN pool ON load_balancer.id = pool.load_balancer_id "
"LEFT JOIN member ON pool.id = member.pool_id WHERE "
"amphora.id = :amp_id AND amphora.status != :deleted AND "
"load_balancer.provisioning_status != :deleted;",
{'amp_id': amphora_id, 'deleted': consts.DELETED}).fetchall()
lb = {}
listeners = {}
pools = {}
for row in rows:
if not lb:
lb['id'] = row['id']
lb['enabled'] = row['enabled'] == 1
lb['provisioning_status'] = row['lb_prov_status']
lb['operating_status'] = row['lb_op_status']
if row['list_id'] and row['list_id'] not in listeners:
listener = {'operating_status': row['list_op_status']}
listeners[row['list_id']] = listener
if row['pool_id']:
if row['pool_id'] in pools and row['member_id']:
member = {'operating_status': row['mem_op_status']}
pools[row['pool_id']]['members'][row['member_id']] = member
else:
pool = {'operating_status': row['pool_op_status'],
'members': {}}
if row['member_id']:
member = {'operating_status': row['mem_op_status']}
pool['members'][row['member_id']] = member
pools[row['pool_id']] = pool
if listeners:
lb['listeners'] = listeners
if pools:
lb['pools'] = pools
return lb
class AmphoraBuildReqRepository(BaseRepository):
model_class = models.AmphoraBuildRequest

View File

@ -40,6 +40,9 @@ class BaseRepositoryTest(base.OctaviaDBTestBase):
FAKE_UUID_2 = uuidutils.generate_uuid()
FAKE_UUID_3 = uuidutils.generate_uuid()
FAKE_UUID_4 = uuidutils.generate_uuid()
FAKE_UUID_5 = uuidutils.generate_uuid()
FAKE_UUID_6 = uuidutils.generate_uuid()
FAKE_UUID_7 = uuidutils.generate_uuid()
FAKE_EXP_AGE = 10
def setUp(self):
@ -3076,6 +3079,79 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
self.assertEqual(cert_expired_amphora.cert_expiration, expiration)
self.assertEqual(cert_expired_amphora.id, amphora2.id)
def test_get_lb_for_health_update(self):
amphora1 = self.create_amphora(self.FAKE_UUID_1)
amphora2 = self.create_amphora(self.FAKE_UUID_3)
self.amphora_repo.associate(self.session, self.lb.id, amphora1.id)
self.amphora_repo.associate(self.session, self.lb.id, amphora2.id)
lb_ref = {'enabled': True, 'id': self.lb.id,
'operating_status': constants.ONLINE,
'provisioning_status': constants.ACTIVE}
# Test with just a load balancer
lb = self.amphora_repo.get_lb_for_health_update(self.session,
self.FAKE_UUID_1)
self.assertEqual(lb_ref, lb)
pool = self.pool_repo.create(
self.session, id=self.FAKE_UUID_4, project_id=self.FAKE_UUID_2,
name="pool_test", description="pool_description",
protocol=constants.PROTOCOL_HTTP, load_balancer_id=self.lb.id,
lb_algorithm=constants.LB_ALGORITHM_ROUND_ROBIN,
provisioning_status=constants.ACTIVE,
operating_status=constants.ONLINE, enabled=True)
pool_ref = {pool.id: {'members': {},
'operating_status': constants.ONLINE}}
lb_ref['pools'] = pool_ref
# Test with an LB and a pool
lb = self.amphora_repo.get_lb_for_health_update(self.session,
self.FAKE_UUID_1)
self.assertEqual(lb_ref, lb)
listener = self.listener_repo.create(
self.session, id=self.FAKE_UUID_5, project_id=self.FAKE_UUID_2,
name="listener_name", description="listener_description",
protocol=constants.PROTOCOL_HTTP, protocol_port=80,
connection_limit=1, operating_status=constants.ONLINE,
load_balancer_id=self.lb.id, provisioning_status=constants.ACTIVE,
enabled=True, peer_port=1025, default_pool_id=pool.id)
listener_ref = {listener.id: {'operating_status': constants.ONLINE}}
lb_ref['listeners'] = listener_ref
# Test with an LB, pool, and listener (no members)
lb = self.amphora_repo.get_lb_for_health_update(self.session,
self.FAKE_UUID_1)
self.assertEqual(lb_ref, lb)
member1 = self.member_repo.create(self.session, id=self.FAKE_UUID_6,
project_id=self.FAKE_UUID_2,
pool_id=pool.id,
ip_address="192.0.2.1",
protocol_port=80, enabled=True,
provisioning_status=constants.ACTIVE,
operating_status=constants.ONLINE)
member2 = self.member_repo.create(self.session, id=self.FAKE_UUID_7,
project_id=self.FAKE_UUID_2,
pool_id=pool.id,
ip_address="192.0.2.21",
protocol_port=80, enabled=True,
provisioning_status=constants.ACTIVE,
operating_status=constants.OFFLINE)
member_ref = {member1.id: {'operating_status': constants.ONLINE},
member2.id: {'operating_status': constants.OFFLINE}}
lb_ref['pools'][pool.id]['members'] = member_ref
# Test with an LB, pool, listener, and members
lb = self.amphora_repo.get_lb_for_health_update(self.session,
self.FAKE_UUID_1)
self.assertEqual(lb_ref, lb)
class AmphoraHealthRepositoryTest(BaseRepositoryTest):
def setUp(self):

View File

@ -44,9 +44,9 @@ class TestUpdateHealthDb(base.TestCase):
def setUp(self):
super(TestUpdateHealthDb, self).setUp()
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
conf.config(group="health_manager",
event_streamer_driver='queue_event_streamer')
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
self.conf.config(group="health_manager",
event_streamer_driver='queue_event_streamer')
session_patch = mock.patch('octavia.db.api.get_session')
self.addCleanup(session_patch.stop)
@ -65,8 +65,6 @@ class TestUpdateHealthDb(base.TestCase):
self.pool_repo = mock.MagicMock()
self.hm.amphora_repo = self.amphora_repo
fake_lb = mock.MagicMock()
self.hm.amphora_repo.get_lb_for_amphora.return_value = fake_lb
self.hm.amphora_health_repo = self.amphora_health_repo
self.hm.listener_repo = self.listener_repo
self.hm.listener_repo.count.return_value = 1
@ -104,6 +102,7 @@ class TestUpdateHealthDb(base.TestCase):
mock_lb.pools = [mock_pool1]
if mock_listener1:
mock_listener1.pools = [mock_pool1]
mock_listener1.default_pool = mock_pool1
for i in range(members):
mock_member_x = mock.Mock()
mock_member_x.id = 'member-id-%s' % (i + 1)
@ -116,13 +115,45 @@ class TestUpdateHealthDb(base.TestCase):
return mock_lb, mock_listener1, mock_pool1, mock_members
def _make_fake_lb_health_dict(self, listener=True, pool=True,
health_monitor=True, members=1,
lb_prov_status=constants.ACTIVE):
lb_ref = {'enabled': True, 'id': self.FAKE_UUID_1,
constants.OPERATING_STATUS: 'bogus',
constants.PROVISIONING_STATUS: lb_prov_status}
if pool:
members_dict = {}
if health_monitor:
member_operating_status = 'NOTHING_MATCHABLE'
else:
member_operating_status = constants.NO_MONITOR
for i in range(members):
member_id = 'member-id-%s' % (i + 1)
members_dict[member_id] = {
constants.OPERATING_STATUS: member_operating_status}
pool_ref = {'pool-id-1': {'members': members_dict,
constants.OPERATING_STATUS: 'bogus'}}
lb_ref['pools'] = pool_ref
if listener:
listener_ref = {'listener-id-1': {
constants.OPERATING_STATUS: 'bogus'}}
lb_ref['listeners'] = listener_ref
return lb_ref
def test_update_health_event_stream(self):
health = {
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"status": constants.OPEN, "pools": {
"pool-id-1": {"status": constants.UP,
"members": {"member-id-1": constants.UP}
"members": {"member-id-1": constants.UP,
"member-id-2": constants.UP}
}
}
}
@ -130,9 +161,8 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.event_client.cast.assert_any_call(
@ -156,12 +186,11 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
self.assertTrue(self.loadbalancer_repo.update.called)
self.assertTrue(self.amphora_health_repo.replace.called)
@ -173,13 +202,12 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=True, pool=False,
lb_prov_status=constants.PENDING_UPDATE))
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict(
listener=True, pool=False, lb_prov_status=constants.PENDING_UPDATE)
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
self.assertTrue(self.loadbalancer_repo.update.called)
self.assertTrue(self.amphora_health_repo.replace.called)
@ -191,12 +219,11 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=True, pool=False))
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict(listener=True, pool=False)
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
self.assertTrue(self.loadbalancer_repo.update.called)
self.assertFalse(self.amphora_health_repo.replace.called)
@ -208,12 +235,12 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time() - hb_interval - 1 # extra -1 for buffer
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
# Receive time is stale, so we shouldn't see this called
self.assertFalse(self.loadbalancer_repo.update.called)
@ -234,10 +261,9 @@ class TestUpdateHealthDb(base.TestCase):
self.session_mock.commit.side_effect = TestException('boom')
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
self._make_mock_lb_tree())
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -258,9 +284,9 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -285,10 +311,9 @@ class TestUpdateHealthDb(base.TestCase):
operating_status=constants.ONLINE)
# If the listener count is wrong, make sure we don't update
mock_listener2 = mock.Mock()
mock_listener2.id = 'listener-id-2'
mock_listener2.pools = [mock_pool1]
mock_lb.listeners = [mock_listener1, mock_listener2]
lb_ref['listeners']['listener-id-2'] = {
constants.OPERATING_STATUS: 'bogus'}
self.amphora_health_repo.replace.reset_mock()
self.hm.update_health(health, '192.0.2.1')
@ -304,10 +329,9 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -319,7 +343,7 @@ class TestUpdateHealthDb(base.TestCase):
self.session_mock, listener_id,
operating_status=constants.ONLINE)
self.pool_repo.update.assert_any_call(
self.session_mock, mock_pool1.id,
self.session_mock, 'pool-id-1',
operating_status=constants.OFFLINE
)
@ -339,22 +363,16 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
lb_ref = self._make_fake_lb_health_dict()
mock_member2 = mock.Mock()
mock_member2.id = 'member-id-2'
mock_pool2 = mock.Mock()
mock_pool2.id = "pool-id-2"
mock_pool2.members = [mock_member2]
mock_listener2 = mock.Mock()
mock_listener2.id = 'listener-id-2'
mock_listener2.pools = [mock_pool2]
lb_ref['pools']['pool-id-2'] = {
constants.OPERATING_STATUS: 'bogus',
'members': {'member-id-2': {constants.OPERATING_STATUS: 'bogus'}}}
mock_lb.listeners.append(mock_listener2)
mock_lb.pools.append(mock_pool2)
lb_ref['listeners']['listener-id-2'] = {
constants.OPERATING_STATUS: 'bogus'}
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -368,11 +386,11 @@ class TestUpdateHealthDb(base.TestCase):
# Call count should be exactly 2, as each pool should be processed once
self.assertEqual(2, self.pool_repo.update.call_count)
self.pool_repo.update.assert_has_calls([
mock.call(self.session_mock, mock_pool1.id,
mock.call(self.session_mock, 'pool-id-1',
operating_status=constants.ERROR),
mock.call(self.session_mock, mock_pool2.id,
mock.call(self.session_mock, 'pool-id-2',
operating_status=constants.ONLINE)
])
], any_order=True)
def test_update_lb_and_list_pool_health_online(self):
@ -389,9 +407,8 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -431,9 +448,12 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
lb_ref['pools']['pool-id-2'] = {
constants.OPERATING_STATUS: constants.OFFLINE}
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -462,9 +482,8 @@ class TestUpdateHealthDb(base.TestCase):
"members": {"member-id-1": constants.DRAIN}}}}},
"recv_time": time.time()}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -502,9 +521,8 @@ class TestUpdateHealthDb(base.TestCase):
"members": {"member-id-1": constants.MAINT}}}}},
"recv_time": time.time()}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -542,9 +560,8 @@ class TestUpdateHealthDb(base.TestCase):
"members": {"member-id-1": "blah"}}}}},
"recv_time": time.time()}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -578,9 +595,8 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -621,9 +637,8 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(health_monitor=False))
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict(health_monitor=False)
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -659,11 +674,11 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(health_monitor=False))
mock_members[0].admin_state_up = False
mock_members[0].operating_status = constants.NO_MONITOR
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict(health_monitor=False)
member1 = lb_ref['pools']['pool-id-1']['members']['member-id-1']
member1[constants.OPERATING_STATUS] = constants.NO_MONITOR
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -683,7 +698,7 @@ class TestUpdateHealthDb(base.TestCase):
operating_status=constants.ONLINE)
self.member_repo.update.assert_any_call(
self.session_mock, mock_members[0].id,
self.session_mock, 'member-id-1',
operating_status=constants.OFFLINE)
def test_update_health_member_no_check(self):
@ -702,9 +717,8 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -743,11 +757,12 @@ class TestUpdateHealthDb(base.TestCase):
"member-id-1": constants.UP}}}}},
"recv_time": time.time()}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(members=2))
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict(members=2)
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
# test listener, member
for listener_id, listener in six.iteritems(
@ -770,7 +785,7 @@ class TestUpdateHealthDb(base.TestCase):
self.session_mock, member_id,
operating_status=constants.ONLINE)
self.member_repo.update.assert_any_call(
self.session_mock, mock_members[1].id,
self.session_mock, 'member-id-2',
operating_status=constants.OFFLINE)
def test_update_health_list_full_member_down(self):
@ -788,9 +803,8 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -815,10 +829,9 @@ class TestUpdateHealthDb(base.TestCase):
self.session_mock, member_id,
operating_status=constants.ERROR)
mock_listener2 = mock.Mock()
mock_listener2.id = 'listener-id-2'
mock_listener2.pools = [mock_pool1]
mock_lb.listeners.append(mock_listener2)
lb_ref['listeners']['listener-id-2'] = {
constants.OPERATING_STATUS: 'bogus'}
self.amphora_health_repo.replace.reset_mock()
self.hm.update_health(health, '192.0.2.1')
@ -839,9 +852,8 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -914,25 +926,24 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
lb_ref = self._make_fake_lb_health_dict()
# Build our own custom listeners/pools/members
for i in [1, 2, 3, 4, 5]:
mock_member = mock.Mock()
mock_member.id = 'member-id-%s' % i
mock_pool = mock.Mock()
mock_pool.id = 'pool-id-%s' % i
mock_pool.members = [mock_member]
if i == 3:
mock_member = mock.Mock()
mock_member.id = 'member-id-31'
mock_pool.members.append(mock_member)
mock_listener = mock.Mock()
mock_listener.id = 'listener-id-%s' % i
mock_listener.pools = [mock_pool]
mock_lb.listeners.append(mock_listener)
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref['listeners']['listener-id-%s' % i] = {
constants.OPERATING_STATUS: 'bogus'}
if i == 3:
members_dict = {'member-id-3': {
constants.OPERATING_STATUS: 'bogus'}, 'member-id-31': {
constants.OPERATING_STATUS: 'bogus'}}
else:
members_dict = {'member-id-%s' % i: {
constants.OPERATING_STATUS: 'bogus'}}
lb_ref['pools']['pool-id-%s' % i] = {
'members': members_dict, constants.OPERATING_STATUS: 'bogus'}
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
@ -977,13 +988,15 @@ class TestUpdateHealthDb(base.TestCase):
self.hm.member_repo.update.side_effect = (
[sqlalchemy.orm.exc.NoResultFound])
self.hm.pool_repo.update.side_effect = (
[sqlalchemy.orm.exc.NoResultFound])
sqlalchemy.orm.exc.NoResultFound)
self.hm.loadbalancer_repo.update.side_effect = (
[sqlalchemy.orm.exc.NoResultFound])
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict()
lb_ref['pools']['pool-id-2'] = {constants.OPERATING_STATUS: 'bogus'}
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_health_repo.replace.called)
@ -1013,7 +1026,7 @@ class TestUpdateHealthDb(base.TestCase):
def test_update_health_zombie(self, mock_driver):
health = {"id": self.FAKE_UUID_1, "listeners": {}}
self.amphora_repo.get_lb_for_amphora.return_value = None
self.amphora_repo.get_lb_for_health_update.return_value = None
amp_mock = mock.MagicMock()
self.amphora_repo.get.return_value = amp_mock
self.hm.update_health(health, '192.0.2.1')
@ -1037,15 +1050,18 @@ class TestUpdateHealthDb(base.TestCase):
"recv_time": time.time()
}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree())
lb_ref = self._make_fake_lb_health_dict()
# Start everything ONLINE
mock_members[0].operating_status = constants.ONLINE
mock_pool1.operating_status = constants.ONLINE
mock_listener1.operating_status = constants.ONLINE
mock_lb.operating_status = constants.ONLINE
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref[constants.OPERATING_STATUS] = constants.ONLINE
listener1 = lb_ref['listeners']['listener-id-1']
listener1[constants.OPERATING_STATUS] = constants.ONLINE
pool1 = lb_ref['pools']['pool-id-1']
pool1[constants.OPERATING_STATUS] = constants.ONLINE
member1 = pool1['members']['member-id-1']
member1[constants.OPERATING_STATUS] = constants.ONLINE
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.event_client.cast.assert_not_called()
@ -1060,45 +1076,108 @@ class TestUpdateHealthDb(base.TestCase):
"listeners": {},
"recv_time": time.time()}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
mock_lb.enabled = False
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
lb_ref['enabled'] = False
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
self.assertTrue(self.loadbalancer_repo.update.called)
self.loadbalancer_repo.update.assert_called_with(
self.mock_session(), mock_lb.id,
self.mock_session(), self.FAKE_UUID_1,
operating_status='OFFLINE')
def test_update_health_lb_admin_up(self):
health = {
"id": self.FAKE_UUID_1,
"listeners": {},
"recv_time": time.time()}
"recv_time": time.time(),
"ver": 1}
mock_lb, mock_listener1, mock_pool1, mock_members = (
self._make_mock_lb_tree(listener=False, pool=False))
mock_lb.enabled = True
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
self.assertTrue(self.loadbalancer_repo.update.called)
self.loadbalancer_repo.update.assert_called_with(
self.mock_session(), mock_lb.id,
self.mock_session(), self.FAKE_UUID_1,
operating_status='ONLINE')
def test_update_health_no_db_lb(self):
health = {
"id": self.FAKE_UUID_1,
"listeners": {},
"recv_time": time.time()
}
self.hm.amphora_repo.get_lb_for_health_update.return_value = {}
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
self.assertFalse(self.amphora_health_repo.replace.called)
# Test missing amp in addition to missing lb DB record
self.amphora_repo.get_lb_for_health_update.reset_mock()
self.amphora_health_repo.replace.reset_mock()
mock_amphora = mock.MagicMock()
mock_amphora.load_balancer_id = None
self.amphora_repo.get.return_value = mock_amphora
self.hm.update_health(health, '192.0.2.1')
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
self.assertTrue(self.amphora_repo.get.called)
self.assertTrue(self.amphora_health_repo.replace.called)
def test_update_status_and_emit_event(self):
# Test update with the same operating status
self.conf.config(group="health_manager",
event_streamer_driver=constants.NOOP_EVENT_STREAMER)
self.hm._update_status_and_emit_event(
'fake_session', self.loadbalancer_repo, constants.LOADBALANCER,
1, 'ONLINE', 'ONLINE')
self.assertFalse(self.loadbalancer_repo.update.called)
self.assertFalse(self.event_client.cast.called)
self.conf.config(group="health_manager",
event_streamer_driver='queue_event_streamer',
sync_provisioning_status=True)
self.loadbalancer_repo.update.reset_mock()
self.event_client.reset_mock()
# Test stream with provisioning sync
self.hm._update_status_and_emit_event(
'fake_session', self.loadbalancer_repo, constants.LOADBALANCER,
1, 'ONLINE', 'OFFLINE')
self.assertTrue(self.loadbalancer_repo.update.called)
self.assertTrue(self.event_client.cast.called)
self.conf.config(group="health_manager",
sync_provisioning_status=False)
self.loadbalancer_repo.update.reset_mock()
self.event_client.reset_mock()
# Test stream with no provisioning sync
self.hm._update_status_and_emit_event(
'fake_session', self.loadbalancer_repo, constants.LOADBALANCER,
1, 'ONLINE', 'ONLINE')
self.assertFalse(self.loadbalancer_repo.update.called)
self.assertFalse(self.event_client.cast.called)
class TestUpdateStatsDb(base.TestCase):
def setUp(self):
super(TestUpdateStatsDb, self).setUp()
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
conf.config(group="health_manager",
event_streamer_driver='queue_event_streamer')
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
self.conf.config(group="health_manager",
event_streamer_driver='queue_event_streamer')
self.sm = update_db.UpdateStatsDb()
self.event_client = mock.MagicMock()
@ -1141,7 +1220,7 @@ class TestUpdateStatsDb(base.TestCase):
self.loadbalancer_repo.get.return_value = self.loadbalancer
@mock.patch('octavia.db.api.get_session')
def test_update_stats(self, session):
def test_update_stats(self, mock_session):
health = {
"id": self.amphora_id,
@ -1165,7 +1244,7 @@ class TestUpdateStatsDb(base.TestCase):
}
}
session.return_value = 'blah'
mock_session.return_value = 'blah'
self.sm.update_stats(health, '192.0.2.1')
@ -1202,3 +1281,39 @@ class TestUpdateStatsDb(base.TestCase):
self.listener_stats.active_connections,
'bytes_out': self.listener_stats.bytes_out,
'request_errors': self.listener_stats.request_errors}})
# Test with noop streamer
self.event_client.cast.reset_mock()
self.conf.config(group="health_manager",
event_streamer_driver=constants.NOOP_EVENT_STREAMER)
self.sm.update_stats(health, '192.0.2.1')
self.conf.config(group="health_manager",
event_streamer_driver='queue_event_streamer')
self.assertFalse(self.event_client.cast.called)
# Test with missing DB listener
self.event_client.cast.reset_mock()
self.sm.repo_listener.get.return_value = None
self.sm.update_stats(health, '192.0.2.1')
self.event_client.cast.assert_called_once_with(
{}, 'update_info', container={
'info_type': 'listener_stats',
'info_id': self.listener_id,
'info_payload': {
'bytes_in': self.listener_stats.bytes_in,
'total_connections':
self.listener_stats.total_connections,
'active_connections':
self.listener_stats.active_connections,
'bytes_out': self.listener_stats.bytes_out,
'request_errors': self.listener_stats.request_errors}})
# Test with update failure
self.event_client.cast.reset_mock()
mock_session.side_effect = Exception
self.sm.update_stats(health, '192.0.2.1')
self.assertFalse(self.event_client.cast.called)