From af901ccaa5835fcfcbb846c5f261c3728404db4e Mon Sep 17 00:00:00 2001 From: Michael Johnson Date: Wed, 5 Sep 2018 22:21:13 -0700 Subject: [PATCH] Fix health manager performance regression When running stress tests against the Octavia Health Manager it was observed that the scalability and performance of the health manager has degraded. It was observed that the ORM layer was forming poorly optimized queries, putting excessive load on the database engine and unnecessary code paths were executing for each heartbeat message. This patch optimizes the health manager processing of amphora-agent heartbeat messages by optimizing the database requests, pool processing, and event streamer code paths. Story: 2001896 Task: 14381 (cherry picked from commit f13a2e654659055728f9d15f558606d9c57037bf) Change-Id: If2f81129ca94882b42b04ddf5652ff03e8a48edf --- .../healthmanager/health_drivers/update_db.py | 144 +++--- octavia/db/repositories.py | 72 +++ .../tests/functional/db/test_repositories.py | 76 ++++ .../health_drivers/test_update_db.py | 409 +++++++++++------- 4 files changed, 494 insertions(+), 207 deletions(-) diff --git a/octavia/controller/healthmanager/health_drivers/update_db.py b/octavia/controller/healthmanager/health_drivers/update_db.py index dda4da64d4..8fd79edde6 100644 --- a/octavia/controller/healthmanager/health_drivers/update_db.py +++ b/octavia/controller/healthmanager/health_drivers/update_db.py @@ -14,6 +14,7 @@ import datetime import time +import timeit from oslo_config import cfg from oslo_log import log as logging @@ -46,43 +47,50 @@ class UpdateHealthDb(update_base.HealthUpdateBase): self.loadbalancer_repo = repo.LoadBalancerRepository() self.member_repo = repo.MemberRepository() self.pool_repo = repo.PoolRepository() - self.sync_prv_status = CONF.health_manager.sync_provisioning_status def emit(self, info_type, info_id, info_obj): cnt = update_serializer.InfoContainer(info_type, info_id, info_obj) self.event_streamer.emit(cnt) def _update_status_and_emit_event(self, session, repo, entity_type, - entity_id, new_op_status, old_op_status, - current_prov_status): + entity_id, new_op_status, old_op_status): message = {} if old_op_status.lower() != new_op_status.lower(): LOG.debug("%s %s status has changed from %s to " - "%s. Updating db and sending event.", + "%s, updating db.", entity_type, entity_id, old_op_status, new_op_status) repo.update(session, entity_id, operating_status=new_op_status) + # Map the status for neutron-lbaas if new_op_status == constants.DRAINING: new_op_status = constants.ONLINE message.update({constants.OPERATING_STATUS: new_op_status}) - if self.sync_prv_status: - LOG.debug("%s %s provisioning_status %s. " - "Sending event.", - entity_type, entity_id, current_prov_status) - message.update( - {constants.PROVISIONING_STATUS: current_prov_status}) - if message: - self.emit(entity_type, entity_id, message) + if (CONF.health_manager.event_streamer_driver != + constants.NOOP_EVENT_STREAMER): + if CONF.health_manager.sync_provisioning_status: + current_prov_status = repo.get( + session, id=entity_id).provisioning_status + LOG.debug("%s %s provisioning_status %s. " + "Sending event.", + entity_type, entity_id, current_prov_status) + message.update( + {constants.PROVISIONING_STATUS: current_prov_status}) + if message: + self.emit(entity_type, entity_id, message) def update_health(self, health, srcaddr): # The executor will eat any exceptions from the update_health code # so we need to wrap it and log the unhandled exception + start_time = timeit.default_timer() try: self._update_health(health, srcaddr) - except Exception: - LOG.exception('update_health encountered an unknown error ' - 'processing health message for amphora {0} with IP ' - '{1}'.format(health['id'], srcaddr)) + except Exception as e: + LOG.exception('Health update for amphora %(amp)s encountered ' + 'error %(err)s. Skipping health update.', + {'amp': health['id'], 'err': str(e)}) + # TODO(johnsom) We need to set a warning threshold here + LOG.debug('Health Update finished in: {0} seconds'.format( + timeit.default_timer() - start_time)) def _update_health(self, health, srcaddr): """This function is to update db info based on amphora status @@ -110,13 +118,13 @@ class UpdateHealthDb(update_base.HealthUpdateBase): session = db_api.get_session() # We need to see if all of the listeners are reporting in - db_lb = self.amphora_repo.get_lb_for_amphora(session, health['id']) + db_lb = self.amphora_repo.get_lb_for_health_update(session, + health['id']) ignore_listener_count = False - listeners = health['listeners'] if db_lb: - expected_listener_count = len(db_lb.listeners) - if 'PENDING' in db_lb.provisioning_status: + expected_listener_count = len(db_lb.get('listeners', {})) + if 'PENDING' in db_lb['provisioning_status']: ignore_listener_count = True else: # If this is not a spare amp, log and skip it. @@ -151,6 +159,8 @@ class UpdateHealthDb(update_base.HealthUpdateBase): health['id'], srcaddr), e) expected_listener_count = 0 + listeners = health['listeners'] + # Do not update amphora health if the reporting listener count # does not match the expected listener count if len(listeners) == expected_listener_count or ignore_listener_count: @@ -160,6 +170,9 @@ class UpdateHealthDb(update_base.HealthUpdateBase): # if we're running too far behind, warn and bail proc_delay = time.time() - health['recv_time'] hb_interval = CONF.health_manager.heartbeat_interval + # TODO(johnsom) We need to set a warning threshold here, and + # escalate to critical when it reaches the + # heartbeat_interval if proc_delay >= hb_interval: LOG.warning('Amphora %(id)s health message was processed too ' 'slowly: %(delay)ss! The system may be overloaded ' @@ -189,16 +202,17 @@ class UpdateHealthDb(update_base.HealthUpdateBase): return processed_pools = [] + potential_offline_pools = {} # We got a heartbeat so lb is healthy until proven otherwise - if db_lb.enabled is False: + if db_lb['enabled'] is False: lb_status = constants.OFFLINE else: lb_status = constants.ONLINE - for db_listener in db_lb.listeners: + for listener_id in db_lb.get('listeners', {}): + db_op_status = db_lb['listeners'][listener_id]['operating_status'] listener_status = None - listener_id = db_listener.id listener = None if listener_id not in listeners: @@ -221,13 +235,11 @@ class UpdateHealthDb(update_base.HealthUpdateBase): 'status': listener.get('status')}) try: - if listener_status is not None: + if (listener_status is not None and + listener_status != db_op_status): self._update_status_and_emit_event( session, self.listener_repo, constants.LISTENER, - listener_id, listener_status, - db_listener.operating_status, - db_listener.provisioning_status - ) + listener_id, listener_status, db_op_status) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Listener %s is not in DB", listener_id) @@ -236,36 +248,52 @@ class UpdateHealthDb(update_base.HealthUpdateBase): pools = listener['pools'] - # Process pools bound to listeners - for db_pool in db_listener.pools: + for db_pool_id in db_lb.get('pools', {}): + # If we saw this pool already on another listener + # skip it. + if db_pool_id in processed_pools: + continue + db_pool_dict = db_lb['pools'][db_pool_id] lb_status = self._process_pool_status( - session, db_pool, pools, lb_status, processed_pools) + session, db_pool_id, db_pool_dict, pools, + lb_status, processed_pools, potential_offline_pools) - # Process pools bound to the load balancer - for db_pool in db_lb.pools: - # Don't re-process pools shared with listeners - if db_pool.id in processed_pools: + for pool_id in potential_offline_pools: + # Skip if we eventually found a status for this pool + if pool_id in processed_pools: continue - lb_status = self._process_pool_status( - session, db_pool, [], lb_status, processed_pools) + try: + # If the database doesn't already show the pool offline, update + if potential_offline_pools[pool_id] != constants.OFFLINE: + self._update_status_and_emit_event( + session, self.pool_repo, constants.POOL, + pool_id, constants.OFFLINE, + potential_offline_pools[pool_id]) + except sqlalchemy.orm.exc.NoResultFound: + LOG.error("Pool %s is not in DB", pool_id) # Update the load balancer status last try: - self._update_status_and_emit_event( - session, self.loadbalancer_repo, - constants.LOADBALANCER, db_lb.id, lb_status, - db_lb.operating_status, db_lb.provisioning_status - ) + if lb_status != db_lb['operating_status']: + self._update_status_and_emit_event( + session, self.loadbalancer_repo, + constants.LOADBALANCER, db_lb['id'], lb_status, + db_lb[constants.OPERATING_STATUS]) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Load balancer %s is not in DB", db_lb.id) - def _process_pool_status(self, session, db_pool, pools, lb_status, - processed_pools): + def _process_pool_status( + self, session, pool_id, db_pool_dict, pools, lb_status, + processed_pools, potential_offline_pools): pool_status = None - pool_id = db_pool.id if pool_id not in pools: - pool_status = constants.OFFLINE + # If we don't have a status update for this pool_id + # add it to the list of potential offline pools and continue. + # We will check the potential offline pool list after we + # finish processing the status updates from all of the listeners. + potential_offline_pools[pool_id] = db_pool_dict['operating_status'] + return lb_status else: pool = pools[pool_id] @@ -287,10 +315,10 @@ class UpdateHealthDb(update_base.HealthUpdateBase): # Deal with the members that are reporting from # the Amphora members = pool['members'] - for db_member in db_pool.members: + for member_id in db_pool_dict.get('members', {}): member_status = None - member_db_status = db_member.operating_status - member_id = db_member.id + member_db_status = ( + db_pool_dict['members'][member_id]['operating_status']) if member_id not in members: if member_db_status != constants.NO_MONITOR: @@ -323,25 +351,21 @@ class UpdateHealthDb(update_base.HealthUpdateBase): 'status': status}) try: - if member_status is not None: + if (member_status is not None and + member_status != member_db_status): self._update_status_and_emit_event( - session, self.member_repo, - constants.MEMBER, - member_id, member_status, - db_member.operating_status, - db_member.provisioning_status - ) + session, self.member_repo, constants.MEMBER, + member_id, member_status, member_db_status) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Member %s is not able to update " "in DB", member_id) try: - if pool_status is not None: + if (pool_status is not None and + pool_status != db_pool_dict['operating_status']): self._update_status_and_emit_event( session, self.pool_repo, constants.POOL, - pool_id, pool_status, db_pool.operating_status, - db_pool.provisioning_status - ) + pool_id, pool_status, db_pool_dict['operating_status']) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Pool %s is not in DB", pool_id) diff --git a/octavia/db/repositories.py b/octavia/db/repositories.py index c939bfccfb..097d1bc4b4 100644 --- a/octavia/db/repositories.py +++ b/octavia/db/repositories.py @@ -988,6 +988,78 @@ class AmphoraRepository(BaseRepository): return amp.to_data_model() + def get_lb_for_health_update(self, session, amphora_id): + """This method is for the health manager status update process. + + This is a time sensitive query that occurs often. + It is an explicit query as the ORM produces a poorly + optimized query. + + Use extreme caution making any changes to this query + as it can impact the scalability of the health manager. + All changes should be analyzed using SQL "EXPLAIN" to + make sure only indexes are being used. + Changes should also be evaluated using the stressHM tool. + + Note: The returned object is flat and not a graph representation + of the load balancer as it is not needed. This is on + purpose to optimize the processing time. This is not in + the normal data model objects. + + :param session: A Sql Alchemy database session. + :param amphora_id: The amphora ID to lookup the load balancer for. + :returns: A dictionary containing the required load balancer details. + """ + rows = session.execute( + "SELECT load_balancer.id, load_balancer.enabled, " + "load_balancer.provisioning_status AS lb_prov_status, " + "load_balancer.operating_status AS lb_op_status, " + "listener.id AS list_id, " + "listener.operating_status AS list_op_status, " + "pool.id AS pool_id, " + "pool.operating_status AS pool_op_status, " + "member.id AS member_id, " + "member.operating_status AS mem_op_status from " + "amphora JOIN load_balancer ON " + "amphora.load_balancer_id = load_balancer.id LEFT JOIN " + "listener ON load_balancer.id = listener.load_balancer_id " + "LEFT JOIN pool ON load_balancer.id = pool.load_balancer_id " + "LEFT JOIN member ON pool.id = member.pool_id WHERE " + "amphora.id = :amp_id AND amphora.status != :deleted AND " + "load_balancer.provisioning_status != :deleted;", + {'amp_id': amphora_id, 'deleted': consts.DELETED}).fetchall() + + lb = {} + listeners = {} + pools = {} + for row in rows: + if not lb: + lb['id'] = row['id'] + lb['enabled'] = row['enabled'] == 1 + lb['provisioning_status'] = row['lb_prov_status'] + lb['operating_status'] = row['lb_op_status'] + if row['list_id'] and row['list_id'] not in listeners: + listener = {'operating_status': row['list_op_status']} + listeners[row['list_id']] = listener + if row['pool_id']: + if row['pool_id'] in pools and row['member_id']: + member = {'operating_status': row['mem_op_status']} + pools[row['pool_id']]['members'][row['member_id']] = member + else: + pool = {'operating_status': row['pool_op_status'], + 'members': {}} + if row['member_id']: + member = {'operating_status': row['mem_op_status']} + pool['members'][row['member_id']] = member + pools[row['pool_id']] = pool + + if listeners: + lb['listeners'] = listeners + if pools: + lb['pools'] = pools + + return lb + class AmphoraBuildReqRepository(BaseRepository): model_class = models.AmphoraBuildRequest diff --git a/octavia/tests/functional/db/test_repositories.py b/octavia/tests/functional/db/test_repositories.py index d8344a951c..19a7cc3822 100644 --- a/octavia/tests/functional/db/test_repositories.py +++ b/octavia/tests/functional/db/test_repositories.py @@ -40,6 +40,9 @@ class BaseRepositoryTest(base.OctaviaDBTestBase): FAKE_UUID_2 = uuidutils.generate_uuid() FAKE_UUID_3 = uuidutils.generate_uuid() FAKE_UUID_4 = uuidutils.generate_uuid() + FAKE_UUID_5 = uuidutils.generate_uuid() + FAKE_UUID_6 = uuidutils.generate_uuid() + FAKE_UUID_7 = uuidutils.generate_uuid() FAKE_EXP_AGE = 10 def setUp(self): @@ -3076,6 +3079,79 @@ class AmphoraRepositoryTest(BaseRepositoryTest): self.assertEqual(cert_expired_amphora.cert_expiration, expiration) self.assertEqual(cert_expired_amphora.id, amphora2.id) + def test_get_lb_for_health_update(self): + amphora1 = self.create_amphora(self.FAKE_UUID_1) + amphora2 = self.create_amphora(self.FAKE_UUID_3) + self.amphora_repo.associate(self.session, self.lb.id, amphora1.id) + self.amphora_repo.associate(self.session, self.lb.id, amphora2.id) + + lb_ref = {'enabled': True, 'id': self.lb.id, + 'operating_status': constants.ONLINE, + 'provisioning_status': constants.ACTIVE} + + # Test with just a load balancer + lb = self.amphora_repo.get_lb_for_health_update(self.session, + self.FAKE_UUID_1) + self.assertEqual(lb_ref, lb) + + pool = self.pool_repo.create( + self.session, id=self.FAKE_UUID_4, project_id=self.FAKE_UUID_2, + name="pool_test", description="pool_description", + protocol=constants.PROTOCOL_HTTP, load_balancer_id=self.lb.id, + lb_algorithm=constants.LB_ALGORITHM_ROUND_ROBIN, + provisioning_status=constants.ACTIVE, + operating_status=constants.ONLINE, enabled=True) + + pool_ref = {pool.id: {'members': {}, + 'operating_status': constants.ONLINE}} + lb_ref['pools'] = pool_ref + + # Test with an LB and a pool + lb = self.amphora_repo.get_lb_for_health_update(self.session, + self.FAKE_UUID_1) + self.assertEqual(lb_ref, lb) + + listener = self.listener_repo.create( + self.session, id=self.FAKE_UUID_5, project_id=self.FAKE_UUID_2, + name="listener_name", description="listener_description", + protocol=constants.PROTOCOL_HTTP, protocol_port=80, + connection_limit=1, operating_status=constants.ONLINE, + load_balancer_id=self.lb.id, provisioning_status=constants.ACTIVE, + enabled=True, peer_port=1025, default_pool_id=pool.id) + + listener_ref = {listener.id: {'operating_status': constants.ONLINE}} + lb_ref['listeners'] = listener_ref + + # Test with an LB, pool, and listener (no members) + lb = self.amphora_repo.get_lb_for_health_update(self.session, + self.FAKE_UUID_1) + self.assertEqual(lb_ref, lb) + + member1 = self.member_repo.create(self.session, id=self.FAKE_UUID_6, + project_id=self.FAKE_UUID_2, + pool_id=pool.id, + ip_address="192.0.2.1", + protocol_port=80, enabled=True, + provisioning_status=constants.ACTIVE, + operating_status=constants.ONLINE) + + member2 = self.member_repo.create(self.session, id=self.FAKE_UUID_7, + project_id=self.FAKE_UUID_2, + pool_id=pool.id, + ip_address="192.0.2.21", + protocol_port=80, enabled=True, + provisioning_status=constants.ACTIVE, + operating_status=constants.OFFLINE) + + member_ref = {member1.id: {'operating_status': constants.ONLINE}, + member2.id: {'operating_status': constants.OFFLINE}} + lb_ref['pools'][pool.id]['members'] = member_ref + + # Test with an LB, pool, listener, and members + lb = self.amphora_repo.get_lb_for_health_update(self.session, + self.FAKE_UUID_1) + self.assertEqual(lb_ref, lb) + class AmphoraHealthRepositoryTest(BaseRepositoryTest): def setUp(self): diff --git a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py index 4d8bcbeed9..656eabf889 100644 --- a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py +++ b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py @@ -44,9 +44,9 @@ class TestUpdateHealthDb(base.TestCase): def setUp(self): super(TestUpdateHealthDb, self).setUp() - conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) - conf.config(group="health_manager", - event_streamer_driver='queue_event_streamer') + self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) + self.conf.config(group="health_manager", + event_streamer_driver='queue_event_streamer') session_patch = mock.patch('octavia.db.api.get_session') self.addCleanup(session_patch.stop) @@ -65,8 +65,6 @@ class TestUpdateHealthDb(base.TestCase): self.pool_repo = mock.MagicMock() self.hm.amphora_repo = self.amphora_repo - fake_lb = mock.MagicMock() - self.hm.amphora_repo.get_lb_for_amphora.return_value = fake_lb self.hm.amphora_health_repo = self.amphora_health_repo self.hm.listener_repo = self.listener_repo self.hm.listener_repo.count.return_value = 1 @@ -104,6 +102,7 @@ class TestUpdateHealthDb(base.TestCase): mock_lb.pools = [mock_pool1] if mock_listener1: mock_listener1.pools = [mock_pool1] + mock_listener1.default_pool = mock_pool1 for i in range(members): mock_member_x = mock.Mock() mock_member_x.id = 'member-id-%s' % (i + 1) @@ -116,13 +115,45 @@ class TestUpdateHealthDb(base.TestCase): return mock_lb, mock_listener1, mock_pool1, mock_members + def _make_fake_lb_health_dict(self, listener=True, pool=True, + health_monitor=True, members=1, + lb_prov_status=constants.ACTIVE): + + lb_ref = {'enabled': True, 'id': self.FAKE_UUID_1, + constants.OPERATING_STATUS: 'bogus', + constants.PROVISIONING_STATUS: lb_prov_status} + + if pool: + members_dict = {} + if health_monitor: + member_operating_status = 'NOTHING_MATCHABLE' + else: + member_operating_status = constants.NO_MONITOR + + for i in range(members): + member_id = 'member-id-%s' % (i + 1) + members_dict[member_id] = { + constants.OPERATING_STATUS: member_operating_status} + + pool_ref = {'pool-id-1': {'members': members_dict, + constants.OPERATING_STATUS: 'bogus'}} + lb_ref['pools'] = pool_ref + + if listener: + listener_ref = {'listener-id-1': { + constants.OPERATING_STATUS: 'bogus'}} + lb_ref['listeners'] = listener_ref + + return lb_ref + def test_update_health_event_stream(self): health = { "id": self.FAKE_UUID_1, "listeners": { "listener-id-1": {"status": constants.OPEN, "pools": { "pool-id-1": {"status": constants.UP, - "members": {"member-id-1": constants.UP} + "members": {"member-id-1": constants.UP, + "member-id-2": constants.UP} } } } @@ -130,9 +161,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.event_client.cast.assert_any_call( @@ -156,12 +186,11 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=False, pool=False)) - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False) + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertTrue(self.amphora_health_repo.replace.called) @@ -173,13 +202,12 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=True, pool=False, - lb_prov_status=constants.PENDING_UPDATE)) - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict( + listener=True, pool=False, lb_prov_status=constants.PENDING_UPDATE) + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertTrue(self.amphora_health_repo.replace.called) @@ -191,12 +219,11 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=True, pool=False)) - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(listener=True, pool=False) + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertFalse(self.amphora_health_repo.replace.called) @@ -208,12 +235,12 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() - hb_interval - 1 # extra -1 for buffer } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=False, pool=False)) - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False) + + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) # Receive time is stale, so we shouldn't see this called self.assertFalse(self.loadbalancer_repo.update.called) @@ -234,10 +261,9 @@ class TestUpdateHealthDb(base.TestCase): self.session_mock.commit.side_effect = TestException('boom') - mock_lb, mock_listener1, mock_pool1, mock_member1 = ( - self._make_mock_lb_tree()) + lb_ref = self._make_fake_lb_health_dict() - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -258,9 +284,9 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -285,10 +311,9 @@ class TestUpdateHealthDb(base.TestCase): operating_status=constants.ONLINE) # If the listener count is wrong, make sure we don't update - mock_listener2 = mock.Mock() - mock_listener2.id = 'listener-id-2' - mock_listener2.pools = [mock_pool1] - mock_lb.listeners = [mock_listener1, mock_listener2] + lb_ref['listeners']['listener-id-2'] = { + constants.OPERATING_STATUS: 'bogus'} + self.amphora_health_repo.replace.reset_mock() self.hm.update_health(health, '192.0.2.1') @@ -304,10 +329,9 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) + lb_ref = self._make_fake_lb_health_dict() - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -319,7 +343,7 @@ class TestUpdateHealthDb(base.TestCase): self.session_mock, listener_id, operating_status=constants.ONLINE) self.pool_repo.update.assert_any_call( - self.session_mock, mock_pool1.id, + self.session_mock, 'pool-id-1', operating_status=constants.OFFLINE ) @@ -339,22 +363,16 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) + lb_ref = self._make_fake_lb_health_dict() - mock_member2 = mock.Mock() - mock_member2.id = 'member-id-2' - mock_pool2 = mock.Mock() - mock_pool2.id = "pool-id-2" - mock_pool2.members = [mock_member2] - mock_listener2 = mock.Mock() - mock_listener2.id = 'listener-id-2' - mock_listener2.pools = [mock_pool2] + lb_ref['pools']['pool-id-2'] = { + constants.OPERATING_STATUS: 'bogus', + 'members': {'member-id-2': {constants.OPERATING_STATUS: 'bogus'}}} - mock_lb.listeners.append(mock_listener2) - mock_lb.pools.append(mock_pool2) + lb_ref['listeners']['listener-id-2'] = { + constants.OPERATING_STATUS: 'bogus'} - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -368,11 +386,11 @@ class TestUpdateHealthDb(base.TestCase): # Call count should be exactly 2, as each pool should be processed once self.assertEqual(2, self.pool_repo.update.call_count) self.pool_repo.update.assert_has_calls([ - mock.call(self.session_mock, mock_pool1.id, + mock.call(self.session_mock, 'pool-id-1', operating_status=constants.ERROR), - mock.call(self.session_mock, mock_pool2.id, + mock.call(self.session_mock, 'pool-id-2', operating_status=constants.ONLINE) - ]) + ], any_order=True) def test_update_lb_and_list_pool_health_online(self): @@ -389,9 +407,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -431,9 +448,12 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + + lb_ref['pools']['pool-id-2'] = { + constants.OPERATING_STATUS: constants.OFFLINE} + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -462,9 +482,8 @@ class TestUpdateHealthDb(base.TestCase): "members": {"member-id-1": constants.DRAIN}}}}}, "recv_time": time.time()} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -502,9 +521,8 @@ class TestUpdateHealthDb(base.TestCase): "members": {"member-id-1": constants.MAINT}}}}}, "recv_time": time.time()} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -542,9 +560,8 @@ class TestUpdateHealthDb(base.TestCase): "members": {"member-id-1": "blah"}}}}}, "recv_time": time.time()} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -578,9 +595,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -621,9 +637,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(health_monitor=False)) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(health_monitor=False) + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -659,11 +674,11 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(health_monitor=False)) - mock_members[0].admin_state_up = False - mock_members[0].operating_status = constants.NO_MONITOR - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(health_monitor=False) + member1 = lb_ref['pools']['pool-id-1']['members']['member-id-1'] + member1[constants.OPERATING_STATUS] = constants.NO_MONITOR + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -683,7 +698,7 @@ class TestUpdateHealthDb(base.TestCase): operating_status=constants.ONLINE) self.member_repo.update.assert_any_call( - self.session_mock, mock_members[0].id, + self.session_mock, 'member-id-1', operating_status=constants.OFFLINE) def test_update_health_member_no_check(self): @@ -702,9 +717,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -743,11 +757,12 @@ class TestUpdateHealthDb(base.TestCase): "member-id-1": constants.UP}}}}}, "recv_time": time.time()} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(members=2)) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(members=2) + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref + self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) # test listener, member for listener_id, listener in six.iteritems( @@ -770,7 +785,7 @@ class TestUpdateHealthDb(base.TestCase): self.session_mock, member_id, operating_status=constants.ONLINE) self.member_repo.update.assert_any_call( - self.session_mock, mock_members[1].id, + self.session_mock, 'member-id-2', operating_status=constants.OFFLINE) def test_update_health_list_full_member_down(self): @@ -788,9 +803,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -815,10 +829,9 @@ class TestUpdateHealthDb(base.TestCase): self.session_mock, member_id, operating_status=constants.ERROR) - mock_listener2 = mock.Mock() - mock_listener2.id = 'listener-id-2' - mock_listener2.pools = [mock_pool1] - mock_lb.listeners.append(mock_listener2) + lb_ref['listeners']['listener-id-2'] = { + constants.OPERATING_STATUS: 'bogus'} + self.amphora_health_repo.replace.reset_mock() self.hm.update_health(health, '192.0.2.1') @@ -839,9 +852,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -914,25 +926,24 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=False, pool=False)) + lb_ref = self._make_fake_lb_health_dict() # Build our own custom listeners/pools/members for i in [1, 2, 3, 4, 5]: - mock_member = mock.Mock() - mock_member.id = 'member-id-%s' % i - mock_pool = mock.Mock() - mock_pool.id = 'pool-id-%s' % i - mock_pool.members = [mock_member] - if i == 3: - mock_member = mock.Mock() - mock_member.id = 'member-id-31' - mock_pool.members.append(mock_member) - mock_listener = mock.Mock() - mock_listener.id = 'listener-id-%s' % i - mock_listener.pools = [mock_pool] - mock_lb.listeners.append(mock_listener) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref['listeners']['listener-id-%s' % i] = { + constants.OPERATING_STATUS: 'bogus'} + + if i == 3: + members_dict = {'member-id-3': { + constants.OPERATING_STATUS: 'bogus'}, 'member-id-31': { + constants.OPERATING_STATUS: 'bogus'}} + else: + members_dict = {'member-id-%s' % i: { + constants.OPERATING_STATUS: 'bogus'}} + lb_ref['pools']['pool-id-%s' % i] = { + 'members': members_dict, constants.OPERATING_STATUS: 'bogus'} + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') @@ -977,13 +988,15 @@ class TestUpdateHealthDb(base.TestCase): self.hm.member_repo.update.side_effect = ( [sqlalchemy.orm.exc.NoResultFound]) self.hm.pool_repo.update.side_effect = ( - [sqlalchemy.orm.exc.NoResultFound]) + sqlalchemy.orm.exc.NoResultFound) self.hm.loadbalancer_repo.update.side_effect = ( [sqlalchemy.orm.exc.NoResultFound]) - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + + lb_ref['pools']['pool-id-2'] = {constants.OPERATING_STATUS: 'bogus'} + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -1013,7 +1026,7 @@ class TestUpdateHealthDb(base.TestCase): def test_update_health_zombie(self, mock_driver): health = {"id": self.FAKE_UUID_1, "listeners": {}} - self.amphora_repo.get_lb_for_amphora.return_value = None + self.amphora_repo.get_lb_for_health_update.return_value = None amp_mock = mock.MagicMock() self.amphora_repo.get.return_value = amp_mock self.hm.update_health(health, '192.0.2.1') @@ -1037,15 +1050,18 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) + lb_ref = self._make_fake_lb_health_dict() # Start everything ONLINE - mock_members[0].operating_status = constants.ONLINE - mock_pool1.operating_status = constants.ONLINE - mock_listener1.operating_status = constants.ONLINE - mock_lb.operating_status = constants.ONLINE - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref[constants.OPERATING_STATUS] = constants.ONLINE + listener1 = lb_ref['listeners']['listener-id-1'] + listener1[constants.OPERATING_STATUS] = constants.ONLINE + pool1 = lb_ref['pools']['pool-id-1'] + pool1[constants.OPERATING_STATUS] = constants.ONLINE + member1 = pool1['members']['member-id-1'] + member1[constants.OPERATING_STATUS] = constants.ONLINE + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.event_client.cast.assert_not_called() @@ -1060,45 +1076,108 @@ class TestUpdateHealthDb(base.TestCase): "listeners": {}, "recv_time": time.time()} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=False, pool=False)) - mock_lb.enabled = False - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False) + lb_ref['enabled'] = False + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) self.assertTrue(self.loadbalancer_repo.update.called) self.loadbalancer_repo.update.assert_called_with( - self.mock_session(), mock_lb.id, + self.mock_session(), self.FAKE_UUID_1, operating_status='OFFLINE') def test_update_health_lb_admin_up(self): health = { "id": self.FAKE_UUID_1, "listeners": {}, - "recv_time": time.time()} + "recv_time": time.time(), + "ver": 1} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=False, pool=False)) - mock_lb.enabled = True - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False) + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) self.assertTrue(self.loadbalancer_repo.update.called) self.loadbalancer_repo.update.assert_called_with( - self.mock_session(), mock_lb.id, + self.mock_session(), self.FAKE_UUID_1, operating_status='ONLINE') + def test_update_health_no_db_lb(self): + health = { + "id": self.FAKE_UUID_1, + "listeners": {}, + "recv_time": time.time() + } + self.hm.amphora_repo.get_lb_for_health_update.return_value = {} + + self.hm.update_health(health, '192.0.2.1') + + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) + self.assertFalse(self.amphora_health_repo.replace.called) + + # Test missing amp in addition to missing lb DB record + self.amphora_repo.get_lb_for_health_update.reset_mock() + self.amphora_health_repo.replace.reset_mock() + + mock_amphora = mock.MagicMock() + mock_amphora.load_balancer_id = None + self.amphora_repo.get.return_value = mock_amphora + + self.hm.update_health(health, '192.0.2.1') + + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) + self.assertTrue(self.amphora_repo.get.called) + self.assertTrue(self.amphora_health_repo.replace.called) + + def test_update_status_and_emit_event(self): + + # Test update with the same operating status + self.conf.config(group="health_manager", + event_streamer_driver=constants.NOOP_EVENT_STREAMER) + self.hm._update_status_and_emit_event( + 'fake_session', self.loadbalancer_repo, constants.LOADBALANCER, + 1, 'ONLINE', 'ONLINE') + self.assertFalse(self.loadbalancer_repo.update.called) + self.assertFalse(self.event_client.cast.called) + + self.conf.config(group="health_manager", + event_streamer_driver='queue_event_streamer', + sync_provisioning_status=True) + + self.loadbalancer_repo.update.reset_mock() + self.event_client.reset_mock() + + # Test stream with provisioning sync + self.hm._update_status_and_emit_event( + 'fake_session', self.loadbalancer_repo, constants.LOADBALANCER, + 1, 'ONLINE', 'OFFLINE') + self.assertTrue(self.loadbalancer_repo.update.called) + self.assertTrue(self.event_client.cast.called) + + self.conf.config(group="health_manager", + sync_provisioning_status=False) + + self.loadbalancer_repo.update.reset_mock() + self.event_client.reset_mock() + + # Test stream with no provisioning sync + self.hm._update_status_and_emit_event( + 'fake_session', self.loadbalancer_repo, constants.LOADBALANCER, + 1, 'ONLINE', 'ONLINE') + self.assertFalse(self.loadbalancer_repo.update.called) + self.assertFalse(self.event_client.cast.called) + class TestUpdateStatsDb(base.TestCase): def setUp(self): super(TestUpdateStatsDb, self).setUp() - conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) - conf.config(group="health_manager", - event_streamer_driver='queue_event_streamer') + self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) + self.conf.config(group="health_manager", + event_streamer_driver='queue_event_streamer') self.sm = update_db.UpdateStatsDb() self.event_client = mock.MagicMock() @@ -1141,7 +1220,7 @@ class TestUpdateStatsDb(base.TestCase): self.loadbalancer_repo.get.return_value = self.loadbalancer @mock.patch('octavia.db.api.get_session') - def test_update_stats(self, session): + def test_update_stats(self, mock_session): health = { "id": self.amphora_id, @@ -1165,7 +1244,7 @@ class TestUpdateStatsDb(base.TestCase): } } - session.return_value = 'blah' + mock_session.return_value = 'blah' self.sm.update_stats(health, '192.0.2.1') @@ -1202,3 +1281,39 @@ class TestUpdateStatsDb(base.TestCase): self.listener_stats.active_connections, 'bytes_out': self.listener_stats.bytes_out, 'request_errors': self.listener_stats.request_errors}}) + + # Test with noop streamer + self.event_client.cast.reset_mock() + self.conf.config(group="health_manager", + event_streamer_driver=constants.NOOP_EVENT_STREAMER) + + self.sm.update_stats(health, '192.0.2.1') + + self.conf.config(group="health_manager", + event_streamer_driver='queue_event_streamer') + self.assertFalse(self.event_client.cast.called) + + # Test with missing DB listener + self.event_client.cast.reset_mock() + self.sm.repo_listener.get.return_value = None + + self.sm.update_stats(health, '192.0.2.1') + + self.event_client.cast.assert_called_once_with( + {}, 'update_info', container={ + 'info_type': 'listener_stats', + 'info_id': self.listener_id, + 'info_payload': { + 'bytes_in': self.listener_stats.bytes_in, + 'total_connections': + self.listener_stats.total_connections, + 'active_connections': + self.listener_stats.active_connections, + 'bytes_out': self.listener_stats.bytes_out, + 'request_errors': self.listener_stats.request_errors}}) + + # Test with update failure + self.event_client.cast.reset_mock() + mock_session.side_effect = Exception + self.sm.update_stats(health, '192.0.2.1') + self.assertFalse(self.event_client.cast.called)