Merge "Fix health manager performance regression" into stable/queens
This commit is contained in:
commit
de64db8e7b
|
@ -14,6 +14,7 @@
|
|||
|
||||
import datetime
|
||||
import time
|
||||
import timeit
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
@ -46,43 +47,50 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
self.loadbalancer_repo = repo.LoadBalancerRepository()
|
||||
self.member_repo = repo.MemberRepository()
|
||||
self.pool_repo = repo.PoolRepository()
|
||||
self.sync_prv_status = CONF.health_manager.sync_provisioning_status
|
||||
|
||||
def emit(self, info_type, info_id, info_obj):
|
||||
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
|
||||
self.event_streamer.emit(cnt)
|
||||
|
||||
def _update_status_and_emit_event(self, session, repo, entity_type,
|
||||
entity_id, new_op_status, old_op_status,
|
||||
current_prov_status):
|
||||
entity_id, new_op_status, old_op_status):
|
||||
message = {}
|
||||
if old_op_status.lower() != new_op_status.lower():
|
||||
LOG.debug("%s %s status has changed from %s to "
|
||||
"%s. Updating db and sending event.",
|
||||
"%s, updating db.",
|
||||
entity_type, entity_id, old_op_status,
|
||||
new_op_status)
|
||||
repo.update(session, entity_id, operating_status=new_op_status)
|
||||
# Map the status for neutron-lbaas
|
||||
if new_op_status == constants.DRAINING:
|
||||
new_op_status = constants.ONLINE
|
||||
message.update({constants.OPERATING_STATUS: new_op_status})
|
||||
if self.sync_prv_status:
|
||||
LOG.debug("%s %s provisioning_status %s. "
|
||||
"Sending event.",
|
||||
entity_type, entity_id, current_prov_status)
|
||||
message.update(
|
||||
{constants.PROVISIONING_STATUS: current_prov_status})
|
||||
if message:
|
||||
self.emit(entity_type, entity_id, message)
|
||||
if (CONF.health_manager.event_streamer_driver !=
|
||||
constants.NOOP_EVENT_STREAMER):
|
||||
if CONF.health_manager.sync_provisioning_status:
|
||||
current_prov_status = repo.get(
|
||||
session, id=entity_id).provisioning_status
|
||||
LOG.debug("%s %s provisioning_status %s. "
|
||||
"Sending event.",
|
||||
entity_type, entity_id, current_prov_status)
|
||||
message.update(
|
||||
{constants.PROVISIONING_STATUS: current_prov_status})
|
||||
if message:
|
||||
self.emit(entity_type, entity_id, message)
|
||||
|
||||
def update_health(self, health, srcaddr):
|
||||
# The executor will eat any exceptions from the update_health code
|
||||
# so we need to wrap it and log the unhandled exception
|
||||
start_time = timeit.default_timer()
|
||||
try:
|
||||
self._update_health(health, srcaddr)
|
||||
except Exception:
|
||||
LOG.exception('update_health encountered an unknown error '
|
||||
'processing health message for amphora {0} with IP '
|
||||
'{1}'.format(health['id'], srcaddr))
|
||||
except Exception as e:
|
||||
LOG.exception('Health update for amphora %(amp)s encountered '
|
||||
'error %(err)s. Skipping health update.',
|
||||
{'amp': health['id'], 'err': str(e)})
|
||||
# TODO(johnsom) We need to set a warning threshold here
|
||||
LOG.debug('Health Update finished in: {0} seconds'.format(
|
||||
timeit.default_timer() - start_time))
|
||||
|
||||
def _update_health(self, health, srcaddr):
|
||||
"""This function is to update db info based on amphora status
|
||||
|
@ -110,13 +118,13 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
session = db_api.get_session()
|
||||
|
||||
# We need to see if all of the listeners are reporting in
|
||||
db_lb = self.amphora_repo.get_lb_for_amphora(session, health['id'])
|
||||
db_lb = self.amphora_repo.get_lb_for_health_update(session,
|
||||
health['id'])
|
||||
ignore_listener_count = False
|
||||
listeners = health['listeners']
|
||||
|
||||
if db_lb:
|
||||
expected_listener_count = len(db_lb.listeners)
|
||||
if 'PENDING' in db_lb.provisioning_status:
|
||||
expected_listener_count = len(db_lb.get('listeners', {}))
|
||||
if 'PENDING' in db_lb['provisioning_status']:
|
||||
ignore_listener_count = True
|
||||
else:
|
||||
# If this is not a spare amp, log and skip it.
|
||||
|
@ -151,6 +159,8 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
health['id'], srcaddr), e)
|
||||
expected_listener_count = 0
|
||||
|
||||
listeners = health['listeners']
|
||||
|
||||
# Do not update amphora health if the reporting listener count
|
||||
# does not match the expected listener count
|
||||
if len(listeners) == expected_listener_count or ignore_listener_count:
|
||||
|
@ -160,6 +170,9 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
# if we're running too far behind, warn and bail
|
||||
proc_delay = time.time() - health['recv_time']
|
||||
hb_interval = CONF.health_manager.heartbeat_interval
|
||||
# TODO(johnsom) We need to set a warning threshold here, and
|
||||
# escalate to critical when it reaches the
|
||||
# heartbeat_interval
|
||||
if proc_delay >= hb_interval:
|
||||
LOG.warning('Amphora %(id)s health message was processed too '
|
||||
'slowly: %(delay)ss! The system may be overloaded '
|
||||
|
@ -189,16 +202,17 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
return
|
||||
|
||||
processed_pools = []
|
||||
potential_offline_pools = {}
|
||||
|
||||
# We got a heartbeat so lb is healthy until proven otherwise
|
||||
if db_lb.enabled is False:
|
||||
if db_lb['enabled'] is False:
|
||||
lb_status = constants.OFFLINE
|
||||
else:
|
||||
lb_status = constants.ONLINE
|
||||
|
||||
for db_listener in db_lb.listeners:
|
||||
for listener_id in db_lb.get('listeners', {}):
|
||||
db_op_status = db_lb['listeners'][listener_id]['operating_status']
|
||||
listener_status = None
|
||||
listener_id = db_listener.id
|
||||
listener = None
|
||||
|
||||
if listener_id not in listeners:
|
||||
|
@ -221,13 +235,11 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
'status': listener.get('status')})
|
||||
|
||||
try:
|
||||
if listener_status is not None:
|
||||
if (listener_status is not None and
|
||||
listener_status != db_op_status):
|
||||
self._update_status_and_emit_event(
|
||||
session, self.listener_repo, constants.LISTENER,
|
||||
listener_id, listener_status,
|
||||
db_listener.operating_status,
|
||||
db_listener.provisioning_status
|
||||
)
|
||||
listener_id, listener_status, db_op_status)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error("Listener %s is not in DB", listener_id)
|
||||
|
||||
|
@ -236,36 +248,52 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
|
||||
pools = listener['pools']
|
||||
|
||||
# Process pools bound to listeners
|
||||
for db_pool in db_listener.pools:
|
||||
for db_pool_id in db_lb.get('pools', {}):
|
||||
# If we saw this pool already on another listener
|
||||
# skip it.
|
||||
if db_pool_id in processed_pools:
|
||||
continue
|
||||
db_pool_dict = db_lb['pools'][db_pool_id]
|
||||
lb_status = self._process_pool_status(
|
||||
session, db_pool, pools, lb_status, processed_pools)
|
||||
session, db_pool_id, db_pool_dict, pools,
|
||||
lb_status, processed_pools, potential_offline_pools)
|
||||
|
||||
# Process pools bound to the load balancer
|
||||
for db_pool in db_lb.pools:
|
||||
# Don't re-process pools shared with listeners
|
||||
if db_pool.id in processed_pools:
|
||||
for pool_id in potential_offline_pools:
|
||||
# Skip if we eventually found a status for this pool
|
||||
if pool_id in processed_pools:
|
||||
continue
|
||||
lb_status = self._process_pool_status(
|
||||
session, db_pool, [], lb_status, processed_pools)
|
||||
try:
|
||||
# If the database doesn't already show the pool offline, update
|
||||
if potential_offline_pools[pool_id] != constants.OFFLINE:
|
||||
self._update_status_and_emit_event(
|
||||
session, self.pool_repo, constants.POOL,
|
||||
pool_id, constants.OFFLINE,
|
||||
potential_offline_pools[pool_id])
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error("Pool %s is not in DB", pool_id)
|
||||
|
||||
# Update the load balancer status last
|
||||
try:
|
||||
self._update_status_and_emit_event(
|
||||
session, self.loadbalancer_repo,
|
||||
constants.LOADBALANCER, db_lb.id, lb_status,
|
||||
db_lb.operating_status, db_lb.provisioning_status
|
||||
)
|
||||
if lb_status != db_lb['operating_status']:
|
||||
self._update_status_and_emit_event(
|
||||
session, self.loadbalancer_repo,
|
||||
constants.LOADBALANCER, db_lb['id'], lb_status,
|
||||
db_lb[constants.OPERATING_STATUS])
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error("Load balancer %s is not in DB", db_lb.id)
|
||||
|
||||
def _process_pool_status(self, session, db_pool, pools, lb_status,
|
||||
processed_pools):
|
||||
def _process_pool_status(
|
||||
self, session, pool_id, db_pool_dict, pools, lb_status,
|
||||
processed_pools, potential_offline_pools):
|
||||
pool_status = None
|
||||
pool_id = db_pool.id
|
||||
|
||||
if pool_id not in pools:
|
||||
pool_status = constants.OFFLINE
|
||||
# If we don't have a status update for this pool_id
|
||||
# add it to the list of potential offline pools and continue.
|
||||
# We will check the potential offline pool list after we
|
||||
# finish processing the status updates from all of the listeners.
|
||||
potential_offline_pools[pool_id] = db_pool_dict['operating_status']
|
||||
return lb_status
|
||||
else:
|
||||
pool = pools[pool_id]
|
||||
|
||||
|
@ -287,10 +315,10 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
# Deal with the members that are reporting from
|
||||
# the Amphora
|
||||
members = pool['members']
|
||||
for db_member in db_pool.members:
|
||||
for member_id in db_pool_dict.get('members', {}):
|
||||
member_status = None
|
||||
member_db_status = db_member.operating_status
|
||||
member_id = db_member.id
|
||||
member_db_status = (
|
||||
db_pool_dict['members'][member_id]['operating_status'])
|
||||
|
||||
if member_id not in members:
|
||||
if member_db_status != constants.NO_MONITOR:
|
||||
|
@ -323,25 +351,21 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
|
|||
'status': status})
|
||||
|
||||
try:
|
||||
if member_status is not None:
|
||||
if (member_status is not None and
|
||||
member_status != member_db_status):
|
||||
self._update_status_and_emit_event(
|
||||
session, self.member_repo,
|
||||
constants.MEMBER,
|
||||
member_id, member_status,
|
||||
db_member.operating_status,
|
||||
db_member.provisioning_status
|
||||
)
|
||||
session, self.member_repo, constants.MEMBER,
|
||||
member_id, member_status, member_db_status)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error("Member %s is not able to update "
|
||||
"in DB", member_id)
|
||||
|
||||
try:
|
||||
if pool_status is not None:
|
||||
if (pool_status is not None and
|
||||
pool_status != db_pool_dict['operating_status']):
|
||||
self._update_status_and_emit_event(
|
||||
session, self.pool_repo, constants.POOL,
|
||||
pool_id, pool_status, db_pool.operating_status,
|
||||
db_pool.provisioning_status
|
||||
)
|
||||
pool_id, pool_status, db_pool_dict['operating_status'])
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error("Pool %s is not in DB", pool_id)
|
||||
|
||||
|
|
|
@ -988,6 +988,78 @@ class AmphoraRepository(BaseRepository):
|
|||
|
||||
return amp.to_data_model()
|
||||
|
||||
def get_lb_for_health_update(self, session, amphora_id):
|
||||
"""This method is for the health manager status update process.
|
||||
|
||||
This is a time sensitive query that occurs often.
|
||||
It is an explicit query as the ORM produces a poorly
|
||||
optimized query.
|
||||
|
||||
Use extreme caution making any changes to this query
|
||||
as it can impact the scalability of the health manager.
|
||||
All changes should be analyzed using SQL "EXPLAIN" to
|
||||
make sure only indexes are being used.
|
||||
Changes should also be evaluated using the stressHM tool.
|
||||
|
||||
Note: The returned object is flat and not a graph representation
|
||||
of the load balancer as it is not needed. This is on
|
||||
purpose to optimize the processing time. This is not in
|
||||
the normal data model objects.
|
||||
|
||||
:param session: A Sql Alchemy database session.
|
||||
:param amphora_id: The amphora ID to lookup the load balancer for.
|
||||
:returns: A dictionary containing the required load balancer details.
|
||||
"""
|
||||
rows = session.execute(
|
||||
"SELECT load_balancer.id, load_balancer.enabled, "
|
||||
"load_balancer.provisioning_status AS lb_prov_status, "
|
||||
"load_balancer.operating_status AS lb_op_status, "
|
||||
"listener.id AS list_id, "
|
||||
"listener.operating_status AS list_op_status, "
|
||||
"pool.id AS pool_id, "
|
||||
"pool.operating_status AS pool_op_status, "
|
||||
"member.id AS member_id, "
|
||||
"member.operating_status AS mem_op_status from "
|
||||
"amphora JOIN load_balancer ON "
|
||||
"amphora.load_balancer_id = load_balancer.id LEFT JOIN "
|
||||
"listener ON load_balancer.id = listener.load_balancer_id "
|
||||
"LEFT JOIN pool ON load_balancer.id = pool.load_balancer_id "
|
||||
"LEFT JOIN member ON pool.id = member.pool_id WHERE "
|
||||
"amphora.id = :amp_id AND amphora.status != :deleted AND "
|
||||
"load_balancer.provisioning_status != :deleted;",
|
||||
{'amp_id': amphora_id, 'deleted': consts.DELETED}).fetchall()
|
||||
|
||||
lb = {}
|
||||
listeners = {}
|
||||
pools = {}
|
||||
for row in rows:
|
||||
if not lb:
|
||||
lb['id'] = row['id']
|
||||
lb['enabled'] = row['enabled'] == 1
|
||||
lb['provisioning_status'] = row['lb_prov_status']
|
||||
lb['operating_status'] = row['lb_op_status']
|
||||
if row['list_id'] and row['list_id'] not in listeners:
|
||||
listener = {'operating_status': row['list_op_status']}
|
||||
listeners[row['list_id']] = listener
|
||||
if row['pool_id']:
|
||||
if row['pool_id'] in pools and row['member_id']:
|
||||
member = {'operating_status': row['mem_op_status']}
|
||||
pools[row['pool_id']]['members'][row['member_id']] = member
|
||||
else:
|
||||
pool = {'operating_status': row['pool_op_status'],
|
||||
'members': {}}
|
||||
if row['member_id']:
|
||||
member = {'operating_status': row['mem_op_status']}
|
||||
pool['members'][row['member_id']] = member
|
||||
pools[row['pool_id']] = pool
|
||||
|
||||
if listeners:
|
||||
lb['listeners'] = listeners
|
||||
if pools:
|
||||
lb['pools'] = pools
|
||||
|
||||
return lb
|
||||
|
||||
|
||||
class AmphoraBuildReqRepository(BaseRepository):
|
||||
model_class = models.AmphoraBuildRequest
|
||||
|
|
|
@ -40,6 +40,9 @@ class BaseRepositoryTest(base.OctaviaDBTestBase):
|
|||
FAKE_UUID_2 = uuidutils.generate_uuid()
|
||||
FAKE_UUID_3 = uuidutils.generate_uuid()
|
||||
FAKE_UUID_4 = uuidutils.generate_uuid()
|
||||
FAKE_UUID_5 = uuidutils.generate_uuid()
|
||||
FAKE_UUID_6 = uuidutils.generate_uuid()
|
||||
FAKE_UUID_7 = uuidutils.generate_uuid()
|
||||
FAKE_EXP_AGE = 10
|
||||
|
||||
def setUp(self):
|
||||
|
@ -3076,6 +3079,79 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
|
|||
self.assertEqual(cert_expired_amphora.cert_expiration, expiration)
|
||||
self.assertEqual(cert_expired_amphora.id, amphora2.id)
|
||||
|
||||
def test_get_lb_for_health_update(self):
|
||||
amphora1 = self.create_amphora(self.FAKE_UUID_1)
|
||||
amphora2 = self.create_amphora(self.FAKE_UUID_3)
|
||||
self.amphora_repo.associate(self.session, self.lb.id, amphora1.id)
|
||||
self.amphora_repo.associate(self.session, self.lb.id, amphora2.id)
|
||||
|
||||
lb_ref = {'enabled': True, 'id': self.lb.id,
|
||||
'operating_status': constants.ONLINE,
|
||||
'provisioning_status': constants.ACTIVE}
|
||||
|
||||
# Test with just a load balancer
|
||||
lb = self.amphora_repo.get_lb_for_health_update(self.session,
|
||||
self.FAKE_UUID_1)
|
||||
self.assertEqual(lb_ref, lb)
|
||||
|
||||
pool = self.pool_repo.create(
|
||||
self.session, id=self.FAKE_UUID_4, project_id=self.FAKE_UUID_2,
|
||||
name="pool_test", description="pool_description",
|
||||
protocol=constants.PROTOCOL_HTTP, load_balancer_id=self.lb.id,
|
||||
lb_algorithm=constants.LB_ALGORITHM_ROUND_ROBIN,
|
||||
provisioning_status=constants.ACTIVE,
|
||||
operating_status=constants.ONLINE, enabled=True)
|
||||
|
||||
pool_ref = {pool.id: {'members': {},
|
||||
'operating_status': constants.ONLINE}}
|
||||
lb_ref['pools'] = pool_ref
|
||||
|
||||
# Test with an LB and a pool
|
||||
lb = self.amphora_repo.get_lb_for_health_update(self.session,
|
||||
self.FAKE_UUID_1)
|
||||
self.assertEqual(lb_ref, lb)
|
||||
|
||||
listener = self.listener_repo.create(
|
||||
self.session, id=self.FAKE_UUID_5, project_id=self.FAKE_UUID_2,
|
||||
name="listener_name", description="listener_description",
|
||||
protocol=constants.PROTOCOL_HTTP, protocol_port=80,
|
||||
connection_limit=1, operating_status=constants.ONLINE,
|
||||
load_balancer_id=self.lb.id, provisioning_status=constants.ACTIVE,
|
||||
enabled=True, peer_port=1025, default_pool_id=pool.id)
|
||||
|
||||
listener_ref = {listener.id: {'operating_status': constants.ONLINE}}
|
||||
lb_ref['listeners'] = listener_ref
|
||||
|
||||
# Test with an LB, pool, and listener (no members)
|
||||
lb = self.amphora_repo.get_lb_for_health_update(self.session,
|
||||
self.FAKE_UUID_1)
|
||||
self.assertEqual(lb_ref, lb)
|
||||
|
||||
member1 = self.member_repo.create(self.session, id=self.FAKE_UUID_6,
|
||||
project_id=self.FAKE_UUID_2,
|
||||
pool_id=pool.id,
|
||||
ip_address="192.0.2.1",
|
||||
protocol_port=80, enabled=True,
|
||||
provisioning_status=constants.ACTIVE,
|
||||
operating_status=constants.ONLINE)
|
||||
|
||||
member2 = self.member_repo.create(self.session, id=self.FAKE_UUID_7,
|
||||
project_id=self.FAKE_UUID_2,
|
||||
pool_id=pool.id,
|
||||
ip_address="192.0.2.21",
|
||||
protocol_port=80, enabled=True,
|
||||
provisioning_status=constants.ACTIVE,
|
||||
operating_status=constants.OFFLINE)
|
||||
|
||||
member_ref = {member1.id: {'operating_status': constants.ONLINE},
|
||||
member2.id: {'operating_status': constants.OFFLINE}}
|
||||
lb_ref['pools'][pool.id]['members'] = member_ref
|
||||
|
||||
# Test with an LB, pool, listener, and members
|
||||
lb = self.amphora_repo.get_lb_for_health_update(self.session,
|
||||
self.FAKE_UUID_1)
|
||||
self.assertEqual(lb_ref, lb)
|
||||
|
||||
|
||||
class AmphoraHealthRepositoryTest(BaseRepositoryTest):
|
||||
def setUp(self):
|
||||
|
|
|
@ -44,9 +44,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
def setUp(self):
|
||||
super(TestUpdateHealthDb, self).setUp()
|
||||
|
||||
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer')
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer')
|
||||
|
||||
session_patch = mock.patch('octavia.db.api.get_session')
|
||||
self.addCleanup(session_patch.stop)
|
||||
|
@ -65,8 +65,6 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
self.pool_repo = mock.MagicMock()
|
||||
|
||||
self.hm.amphora_repo = self.amphora_repo
|
||||
fake_lb = mock.MagicMock()
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = fake_lb
|
||||
self.hm.amphora_health_repo = self.amphora_health_repo
|
||||
self.hm.listener_repo = self.listener_repo
|
||||
self.hm.listener_repo.count.return_value = 1
|
||||
|
@ -104,6 +102,7 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
mock_lb.pools = [mock_pool1]
|
||||
if mock_listener1:
|
||||
mock_listener1.pools = [mock_pool1]
|
||||
mock_listener1.default_pool = mock_pool1
|
||||
for i in range(members):
|
||||
mock_member_x = mock.Mock()
|
||||
mock_member_x.id = 'member-id-%s' % (i + 1)
|
||||
|
@ -116,13 +115,45 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
|
||||
return mock_lb, mock_listener1, mock_pool1, mock_members
|
||||
|
||||
def _make_fake_lb_health_dict(self, listener=True, pool=True,
|
||||
health_monitor=True, members=1,
|
||||
lb_prov_status=constants.ACTIVE):
|
||||
|
||||
lb_ref = {'enabled': True, 'id': self.FAKE_UUID_1,
|
||||
constants.OPERATING_STATUS: 'bogus',
|
||||
constants.PROVISIONING_STATUS: lb_prov_status}
|
||||
|
||||
if pool:
|
||||
members_dict = {}
|
||||
if health_monitor:
|
||||
member_operating_status = 'NOTHING_MATCHABLE'
|
||||
else:
|
||||
member_operating_status = constants.NO_MONITOR
|
||||
|
||||
for i in range(members):
|
||||
member_id = 'member-id-%s' % (i + 1)
|
||||
members_dict[member_id] = {
|
||||
constants.OPERATING_STATUS: member_operating_status}
|
||||
|
||||
pool_ref = {'pool-id-1': {'members': members_dict,
|
||||
constants.OPERATING_STATUS: 'bogus'}}
|
||||
lb_ref['pools'] = pool_ref
|
||||
|
||||
if listener:
|
||||
listener_ref = {'listener-id-1': {
|
||||
constants.OPERATING_STATUS: 'bogus'}}
|
||||
lb_ref['listeners'] = listener_ref
|
||||
|
||||
return lb_ref
|
||||
|
||||
def test_update_health_event_stream(self):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {
|
||||
"listener-id-1": {"status": constants.OPEN, "pools": {
|
||||
"pool-id-1": {"status": constants.UP,
|
||||
"members": {"member-id-1": constants.UP}
|
||||
"members": {"member-id-1": constants.UP,
|
||||
"member-id-2": constants.UP}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -130,9 +161,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.event_client.cast.assert_any_call(
|
||||
|
@ -156,12 +186,11 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -173,13 +202,12 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=True, pool=False,
|
||||
lb_prov_status=constants.PENDING_UPDATE))
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(
|
||||
listener=True, pool=False, lb_prov_status=constants.PENDING_UPDATE)
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -191,12 +219,11 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=True, pool=False))
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(listener=True, pool=False)
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.assertFalse(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -208,12 +235,12 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time() - hb_interval - 1 # extra -1 for buffer
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
|
||||
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
# Receive time is stale, so we shouldn't see this called
|
||||
self.assertFalse(self.loadbalancer_repo.update.called)
|
||||
|
||||
|
@ -234,10 +261,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
|
||||
self.session_mock.commit.side_effect = TestException('boom')
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_member1 = (
|
||||
self._make_mock_lb_tree())
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -258,9 +284,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -285,10 +311,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
operating_status=constants.ONLINE)
|
||||
|
||||
# If the listener count is wrong, make sure we don't update
|
||||
mock_listener2 = mock.Mock()
|
||||
mock_listener2.id = 'listener-id-2'
|
||||
mock_listener2.pools = [mock_pool1]
|
||||
mock_lb.listeners = [mock_listener1, mock_listener2]
|
||||
lb_ref['listeners']['listener-id-2'] = {
|
||||
constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
self.amphora_health_repo.replace.reset_mock()
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
|
@ -304,10 +329,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -319,7 +343,7 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
self.session_mock, listener_id,
|
||||
operating_status=constants.ONLINE)
|
||||
self.pool_repo.update.assert_any_call(
|
||||
self.session_mock, mock_pool1.id,
|
||||
self.session_mock, 'pool-id-1',
|
||||
operating_status=constants.OFFLINE
|
||||
)
|
||||
|
||||
|
@ -339,22 +363,16 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
mock_member2 = mock.Mock()
|
||||
mock_member2.id = 'member-id-2'
|
||||
mock_pool2 = mock.Mock()
|
||||
mock_pool2.id = "pool-id-2"
|
||||
mock_pool2.members = [mock_member2]
|
||||
mock_listener2 = mock.Mock()
|
||||
mock_listener2.id = 'listener-id-2'
|
||||
mock_listener2.pools = [mock_pool2]
|
||||
lb_ref['pools']['pool-id-2'] = {
|
||||
constants.OPERATING_STATUS: 'bogus',
|
||||
'members': {'member-id-2': {constants.OPERATING_STATUS: 'bogus'}}}
|
||||
|
||||
mock_lb.listeners.append(mock_listener2)
|
||||
mock_lb.pools.append(mock_pool2)
|
||||
lb_ref['listeners']['listener-id-2'] = {
|
||||
constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -368,11 +386,11 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
# Call count should be exactly 2, as each pool should be processed once
|
||||
self.assertEqual(2, self.pool_repo.update.call_count)
|
||||
self.pool_repo.update.assert_has_calls([
|
||||
mock.call(self.session_mock, mock_pool1.id,
|
||||
mock.call(self.session_mock, 'pool-id-1',
|
||||
operating_status=constants.ERROR),
|
||||
mock.call(self.session_mock, mock_pool2.id,
|
||||
mock.call(self.session_mock, 'pool-id-2',
|
||||
operating_status=constants.ONLINE)
|
||||
])
|
||||
], any_order=True)
|
||||
|
||||
def test_update_lb_and_list_pool_health_online(self):
|
||||
|
||||
|
@ -389,9 +407,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -431,9 +448,12 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
lb_ref['pools']['pool-id-2'] = {
|
||||
constants.OPERATING_STATUS: constants.OFFLINE}
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -462,9 +482,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"members": {"member-id-1": constants.DRAIN}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -502,9 +521,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"members": {"member-id-1": constants.MAINT}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -542,9 +560,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"members": {"member-id-1": "blah"}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -578,9 +595,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -621,9 +637,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(health_monitor=False))
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(health_monitor=False)
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -659,11 +674,11 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(health_monitor=False))
|
||||
mock_members[0].admin_state_up = False
|
||||
mock_members[0].operating_status = constants.NO_MONITOR
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(health_monitor=False)
|
||||
member1 = lb_ref['pools']['pool-id-1']['members']['member-id-1']
|
||||
member1[constants.OPERATING_STATUS] = constants.NO_MONITOR
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -683,7 +698,7 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
operating_status=constants.ONLINE)
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
self.session_mock, mock_members[0].id,
|
||||
self.session_mock, 'member-id-1',
|
||||
operating_status=constants.OFFLINE)
|
||||
|
||||
def test_update_health_member_no_check(self):
|
||||
|
@ -702,9 +717,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -743,11 +757,12 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"member-id-1": constants.UP}}}}},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(members=2))
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(members=2)
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
|
||||
# test listener, member
|
||||
for listener_id, listener in six.iteritems(
|
||||
|
@ -770,7 +785,7 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
self.session_mock, member_id,
|
||||
operating_status=constants.ONLINE)
|
||||
self.member_repo.update.assert_any_call(
|
||||
self.session_mock, mock_members[1].id,
|
||||
self.session_mock, 'member-id-2',
|
||||
operating_status=constants.OFFLINE)
|
||||
|
||||
def test_update_health_list_full_member_down(self):
|
||||
|
@ -788,9 +803,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -815,10 +829,9 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
self.session_mock, member_id,
|
||||
operating_status=constants.ERROR)
|
||||
|
||||
mock_listener2 = mock.Mock()
|
||||
mock_listener2.id = 'listener-id-2'
|
||||
mock_listener2.pools = [mock_pool1]
|
||||
mock_lb.listeners.append(mock_listener2)
|
||||
lb_ref['listeners']['listener-id-2'] = {
|
||||
constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
self.amphora_health_repo.replace.reset_mock()
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
|
@ -839,9 +852,8 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -914,25 +926,24 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
# Build our own custom listeners/pools/members
|
||||
for i in [1, 2, 3, 4, 5]:
|
||||
mock_member = mock.Mock()
|
||||
mock_member.id = 'member-id-%s' % i
|
||||
mock_pool = mock.Mock()
|
||||
mock_pool.id = 'pool-id-%s' % i
|
||||
mock_pool.members = [mock_member]
|
||||
if i == 3:
|
||||
mock_member = mock.Mock()
|
||||
mock_member.id = 'member-id-31'
|
||||
mock_pool.members.append(mock_member)
|
||||
mock_listener = mock.Mock()
|
||||
mock_listener.id = 'listener-id-%s' % i
|
||||
mock_listener.pools = [mock_pool]
|
||||
mock_lb.listeners.append(mock_listener)
|
||||
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref['listeners']['listener-id-%s' % i] = {
|
||||
constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
if i == 3:
|
||||
members_dict = {'member-id-3': {
|
||||
constants.OPERATING_STATUS: 'bogus'}, 'member-id-31': {
|
||||
constants.OPERATING_STATUS: 'bogus'}}
|
||||
else:
|
||||
members_dict = {'member-id-%s' % i: {
|
||||
constants.OPERATING_STATUS: 'bogus'}}
|
||||
lb_ref['pools']['pool-id-%s' % i] = {
|
||||
'members': members_dict, constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
|
||||
|
@ -977,13 +988,15 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
self.hm.member_repo.update.side_effect = (
|
||||
[sqlalchemy.orm.exc.NoResultFound])
|
||||
self.hm.pool_repo.update.side_effect = (
|
||||
[sqlalchemy.orm.exc.NoResultFound])
|
||||
sqlalchemy.orm.exc.NoResultFound)
|
||||
self.hm.loadbalancer_repo.update.side_effect = (
|
||||
[sqlalchemy.orm.exc.NoResultFound])
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
lb_ref['pools']['pool-id-2'] = {constants.OPERATING_STATUS: 'bogus'}
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
@ -1013,7 +1026,7 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
def test_update_health_zombie(self, mock_driver):
|
||||
health = {"id": self.FAKE_UUID_1, "listeners": {}}
|
||||
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = None
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = None
|
||||
amp_mock = mock.MagicMock()
|
||||
self.amphora_repo.get.return_value = amp_mock
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
|
@ -1037,15 +1050,18 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"recv_time": time.time()
|
||||
}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree())
|
||||
lb_ref = self._make_fake_lb_health_dict()
|
||||
|
||||
# Start everything ONLINE
|
||||
mock_members[0].operating_status = constants.ONLINE
|
||||
mock_pool1.operating_status = constants.ONLINE
|
||||
mock_listener1.operating_status = constants.ONLINE
|
||||
mock_lb.operating_status = constants.ONLINE
|
||||
self.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref[constants.OPERATING_STATUS] = constants.ONLINE
|
||||
listener1 = lb_ref['listeners']['listener-id-1']
|
||||
listener1[constants.OPERATING_STATUS] = constants.ONLINE
|
||||
pool1 = lb_ref['pools']['pool-id-1']
|
||||
pool1[constants.OPERATING_STATUS] = constants.ONLINE
|
||||
member1 = pool1['members']['member-id-1']
|
||||
member1[constants.OPERATING_STATUS] = constants.ONLINE
|
||||
|
||||
self.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.event_client.cast.assert_not_called()
|
||||
|
@ -1060,45 +1076,108 @@ class TestUpdateHealthDb(base.TestCase):
|
|||
"listeners": {},
|
||||
"recv_time": time.time()}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
mock_lb.enabled = False
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
|
||||
lb_ref['enabled'] = False
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.loadbalancer_repo.update.assert_called_with(
|
||||
self.mock_session(), mock_lb.id,
|
||||
self.mock_session(), self.FAKE_UUID_1,
|
||||
operating_status='OFFLINE')
|
||||
|
||||
def test_update_health_lb_admin_up(self):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {},
|
||||
"recv_time": time.time()}
|
||||
"recv_time": time.time(),
|
||||
"ver": 1}
|
||||
|
||||
mock_lb, mock_listener1, mock_pool1, mock_members = (
|
||||
self._make_mock_lb_tree(listener=False, pool=False))
|
||||
mock_lb.enabled = True
|
||||
self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb
|
||||
lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False)
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_amphora.called)
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.loadbalancer_repo.update.assert_called_with(
|
||||
self.mock_session(), mock_lb.id,
|
||||
self.mock_session(), self.FAKE_UUID_1,
|
||||
operating_status='ONLINE')
|
||||
|
||||
def test_update_health_no_db_lb(self):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {},
|
||||
"recv_time": time.time()
|
||||
}
|
||||
self.hm.amphora_repo.get_lb_for_health_update.return_value = {}
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertFalse(self.amphora_health_repo.replace.called)
|
||||
|
||||
# Test missing amp in addition to missing lb DB record
|
||||
self.amphora_repo.get_lb_for_health_update.reset_mock()
|
||||
self.amphora_health_repo.replace.reset_mock()
|
||||
|
||||
mock_amphora = mock.MagicMock()
|
||||
mock_amphora.load_balancer_id = None
|
||||
self.amphora_repo.get.return_value = mock_amphora
|
||||
|
||||
self.hm.update_health(health, '192.0.2.1')
|
||||
|
||||
self.assertTrue(self.amphora_repo.get_lb_for_health_update.called)
|
||||
self.assertTrue(self.amphora_repo.get.called)
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
def test_update_status_and_emit_event(self):
|
||||
|
||||
# Test update with the same operating status
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver=constants.NOOP_EVENT_STREAMER)
|
||||
self.hm._update_status_and_emit_event(
|
||||
'fake_session', self.loadbalancer_repo, constants.LOADBALANCER,
|
||||
1, 'ONLINE', 'ONLINE')
|
||||
self.assertFalse(self.loadbalancer_repo.update.called)
|
||||
self.assertFalse(self.event_client.cast.called)
|
||||
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer',
|
||||
sync_provisioning_status=True)
|
||||
|
||||
self.loadbalancer_repo.update.reset_mock()
|
||||
self.event_client.reset_mock()
|
||||
|
||||
# Test stream with provisioning sync
|
||||
self.hm._update_status_and_emit_event(
|
||||
'fake_session', self.loadbalancer_repo, constants.LOADBALANCER,
|
||||
1, 'ONLINE', 'OFFLINE')
|
||||
self.assertTrue(self.loadbalancer_repo.update.called)
|
||||
self.assertTrue(self.event_client.cast.called)
|
||||
|
||||
self.conf.config(group="health_manager",
|
||||
sync_provisioning_status=False)
|
||||
|
||||
self.loadbalancer_repo.update.reset_mock()
|
||||
self.event_client.reset_mock()
|
||||
|
||||
# Test stream with no provisioning sync
|
||||
self.hm._update_status_and_emit_event(
|
||||
'fake_session', self.loadbalancer_repo, constants.LOADBALANCER,
|
||||
1, 'ONLINE', 'ONLINE')
|
||||
self.assertFalse(self.loadbalancer_repo.update.called)
|
||||
self.assertFalse(self.event_client.cast.called)
|
||||
|
||||
|
||||
class TestUpdateStatsDb(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestUpdateStatsDb, self).setUp()
|
||||
|
||||
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer')
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer')
|
||||
|
||||
self.sm = update_db.UpdateStatsDb()
|
||||
self.event_client = mock.MagicMock()
|
||||
|
@ -1141,7 +1220,7 @@ class TestUpdateStatsDb(base.TestCase):
|
|||
self.loadbalancer_repo.get.return_value = self.loadbalancer
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
def test_update_stats(self, session):
|
||||
def test_update_stats(self, mock_session):
|
||||
|
||||
health = {
|
||||
"id": self.amphora_id,
|
||||
|
@ -1165,7 +1244,7 @@ class TestUpdateStatsDb(base.TestCase):
|
|||
}
|
||||
}
|
||||
|
||||
session.return_value = 'blah'
|
||||
mock_session.return_value = 'blah'
|
||||
|
||||
self.sm.update_stats(health, '192.0.2.1')
|
||||
|
||||
|
@ -1202,3 +1281,39 @@ class TestUpdateStatsDb(base.TestCase):
|
|||
self.listener_stats.active_connections,
|
||||
'bytes_out': self.listener_stats.bytes_out,
|
||||
'request_errors': self.listener_stats.request_errors}})
|
||||
|
||||
# Test with noop streamer
|
||||
self.event_client.cast.reset_mock()
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver=constants.NOOP_EVENT_STREAMER)
|
||||
|
||||
self.sm.update_stats(health, '192.0.2.1')
|
||||
|
||||
self.conf.config(group="health_manager",
|
||||
event_streamer_driver='queue_event_streamer')
|
||||
self.assertFalse(self.event_client.cast.called)
|
||||
|
||||
# Test with missing DB listener
|
||||
self.event_client.cast.reset_mock()
|
||||
self.sm.repo_listener.get.return_value = None
|
||||
|
||||
self.sm.update_stats(health, '192.0.2.1')
|
||||
|
||||
self.event_client.cast.assert_called_once_with(
|
||||
{}, 'update_info', container={
|
||||
'info_type': 'listener_stats',
|
||||
'info_id': self.listener_id,
|
||||
'info_payload': {
|
||||
'bytes_in': self.listener_stats.bytes_in,
|
||||
'total_connections':
|
||||
self.listener_stats.total_connections,
|
||||
'active_connections':
|
||||
self.listener_stats.active_connections,
|
||||
'bytes_out': self.listener_stats.bytes_out,
|
||||
'request_errors': self.listener_stats.request_errors}})
|
||||
|
||||
# Test with update failure
|
||||
self.event_client.cast.reset_mock()
|
||||
mock_session.side_effect = Exception
|
||||
self.sm.update_stats(health, '192.0.2.1')
|
||||
self.assertFalse(self.event_client.cast.called)
|
||||
|
|
Loading…
Reference in New Issue