diff --git a/octavia/controller/healthmanager/health_drivers/update_db.py b/octavia/controller/healthmanager/health_drivers/update_db.py index dda4da64d4..8fd79edde6 100644 --- a/octavia/controller/healthmanager/health_drivers/update_db.py +++ b/octavia/controller/healthmanager/health_drivers/update_db.py @@ -14,6 +14,7 @@ import datetime import time +import timeit from oslo_config import cfg from oslo_log import log as logging @@ -46,43 +47,50 @@ class UpdateHealthDb(update_base.HealthUpdateBase): self.loadbalancer_repo = repo.LoadBalancerRepository() self.member_repo = repo.MemberRepository() self.pool_repo = repo.PoolRepository() - self.sync_prv_status = CONF.health_manager.sync_provisioning_status def emit(self, info_type, info_id, info_obj): cnt = update_serializer.InfoContainer(info_type, info_id, info_obj) self.event_streamer.emit(cnt) def _update_status_and_emit_event(self, session, repo, entity_type, - entity_id, new_op_status, old_op_status, - current_prov_status): + entity_id, new_op_status, old_op_status): message = {} if old_op_status.lower() != new_op_status.lower(): LOG.debug("%s %s status has changed from %s to " - "%s. Updating db and sending event.", + "%s, updating db.", entity_type, entity_id, old_op_status, new_op_status) repo.update(session, entity_id, operating_status=new_op_status) + # Map the status for neutron-lbaas if new_op_status == constants.DRAINING: new_op_status = constants.ONLINE message.update({constants.OPERATING_STATUS: new_op_status}) - if self.sync_prv_status: - LOG.debug("%s %s provisioning_status %s. " - "Sending event.", - entity_type, entity_id, current_prov_status) - message.update( - {constants.PROVISIONING_STATUS: current_prov_status}) - if message: - self.emit(entity_type, entity_id, message) + if (CONF.health_manager.event_streamer_driver != + constants.NOOP_EVENT_STREAMER): + if CONF.health_manager.sync_provisioning_status: + current_prov_status = repo.get( + session, id=entity_id).provisioning_status + LOG.debug("%s %s provisioning_status %s. " + "Sending event.", + entity_type, entity_id, current_prov_status) + message.update( + {constants.PROVISIONING_STATUS: current_prov_status}) + if message: + self.emit(entity_type, entity_id, message) def update_health(self, health, srcaddr): # The executor will eat any exceptions from the update_health code # so we need to wrap it and log the unhandled exception + start_time = timeit.default_timer() try: self._update_health(health, srcaddr) - except Exception: - LOG.exception('update_health encountered an unknown error ' - 'processing health message for amphora {0} with IP ' - '{1}'.format(health['id'], srcaddr)) + except Exception as e: + LOG.exception('Health update for amphora %(amp)s encountered ' + 'error %(err)s. Skipping health update.', + {'amp': health['id'], 'err': str(e)}) + # TODO(johnsom) We need to set a warning threshold here + LOG.debug('Health Update finished in: {0} seconds'.format( + timeit.default_timer() - start_time)) def _update_health(self, health, srcaddr): """This function is to update db info based on amphora status @@ -110,13 +118,13 @@ class UpdateHealthDb(update_base.HealthUpdateBase): session = db_api.get_session() # We need to see if all of the listeners are reporting in - db_lb = self.amphora_repo.get_lb_for_amphora(session, health['id']) + db_lb = self.amphora_repo.get_lb_for_health_update(session, + health['id']) ignore_listener_count = False - listeners = health['listeners'] if db_lb: - expected_listener_count = len(db_lb.listeners) - if 'PENDING' in db_lb.provisioning_status: + expected_listener_count = len(db_lb.get('listeners', {})) + if 'PENDING' in db_lb['provisioning_status']: ignore_listener_count = True else: # If this is not a spare amp, log and skip it. @@ -151,6 +159,8 @@ class UpdateHealthDb(update_base.HealthUpdateBase): health['id'], srcaddr), e) expected_listener_count = 0 + listeners = health['listeners'] + # Do not update amphora health if the reporting listener count # does not match the expected listener count if len(listeners) == expected_listener_count or ignore_listener_count: @@ -160,6 +170,9 @@ class UpdateHealthDb(update_base.HealthUpdateBase): # if we're running too far behind, warn and bail proc_delay = time.time() - health['recv_time'] hb_interval = CONF.health_manager.heartbeat_interval + # TODO(johnsom) We need to set a warning threshold here, and + # escalate to critical when it reaches the + # heartbeat_interval if proc_delay >= hb_interval: LOG.warning('Amphora %(id)s health message was processed too ' 'slowly: %(delay)ss! The system may be overloaded ' @@ -189,16 +202,17 @@ class UpdateHealthDb(update_base.HealthUpdateBase): return processed_pools = [] + potential_offline_pools = {} # We got a heartbeat so lb is healthy until proven otherwise - if db_lb.enabled is False: + if db_lb['enabled'] is False: lb_status = constants.OFFLINE else: lb_status = constants.ONLINE - for db_listener in db_lb.listeners: + for listener_id in db_lb.get('listeners', {}): + db_op_status = db_lb['listeners'][listener_id]['operating_status'] listener_status = None - listener_id = db_listener.id listener = None if listener_id not in listeners: @@ -221,13 +235,11 @@ class UpdateHealthDb(update_base.HealthUpdateBase): 'status': listener.get('status')}) try: - if listener_status is not None: + if (listener_status is not None and + listener_status != db_op_status): self._update_status_and_emit_event( session, self.listener_repo, constants.LISTENER, - listener_id, listener_status, - db_listener.operating_status, - db_listener.provisioning_status - ) + listener_id, listener_status, db_op_status) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Listener %s is not in DB", listener_id) @@ -236,36 +248,52 @@ class UpdateHealthDb(update_base.HealthUpdateBase): pools = listener['pools'] - # Process pools bound to listeners - for db_pool in db_listener.pools: + for db_pool_id in db_lb.get('pools', {}): + # If we saw this pool already on another listener + # skip it. + if db_pool_id in processed_pools: + continue + db_pool_dict = db_lb['pools'][db_pool_id] lb_status = self._process_pool_status( - session, db_pool, pools, lb_status, processed_pools) + session, db_pool_id, db_pool_dict, pools, + lb_status, processed_pools, potential_offline_pools) - # Process pools bound to the load balancer - for db_pool in db_lb.pools: - # Don't re-process pools shared with listeners - if db_pool.id in processed_pools: + for pool_id in potential_offline_pools: + # Skip if we eventually found a status for this pool + if pool_id in processed_pools: continue - lb_status = self._process_pool_status( - session, db_pool, [], lb_status, processed_pools) + try: + # If the database doesn't already show the pool offline, update + if potential_offline_pools[pool_id] != constants.OFFLINE: + self._update_status_and_emit_event( + session, self.pool_repo, constants.POOL, + pool_id, constants.OFFLINE, + potential_offline_pools[pool_id]) + except sqlalchemy.orm.exc.NoResultFound: + LOG.error("Pool %s is not in DB", pool_id) # Update the load balancer status last try: - self._update_status_and_emit_event( - session, self.loadbalancer_repo, - constants.LOADBALANCER, db_lb.id, lb_status, - db_lb.operating_status, db_lb.provisioning_status - ) + if lb_status != db_lb['operating_status']: + self._update_status_and_emit_event( + session, self.loadbalancer_repo, + constants.LOADBALANCER, db_lb['id'], lb_status, + db_lb[constants.OPERATING_STATUS]) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Load balancer %s is not in DB", db_lb.id) - def _process_pool_status(self, session, db_pool, pools, lb_status, - processed_pools): + def _process_pool_status( + self, session, pool_id, db_pool_dict, pools, lb_status, + processed_pools, potential_offline_pools): pool_status = None - pool_id = db_pool.id if pool_id not in pools: - pool_status = constants.OFFLINE + # If we don't have a status update for this pool_id + # add it to the list of potential offline pools and continue. + # We will check the potential offline pool list after we + # finish processing the status updates from all of the listeners. + potential_offline_pools[pool_id] = db_pool_dict['operating_status'] + return lb_status else: pool = pools[pool_id] @@ -287,10 +315,10 @@ class UpdateHealthDb(update_base.HealthUpdateBase): # Deal with the members that are reporting from # the Amphora members = pool['members'] - for db_member in db_pool.members: + for member_id in db_pool_dict.get('members', {}): member_status = None - member_db_status = db_member.operating_status - member_id = db_member.id + member_db_status = ( + db_pool_dict['members'][member_id]['operating_status']) if member_id not in members: if member_db_status != constants.NO_MONITOR: @@ -323,25 +351,21 @@ class UpdateHealthDb(update_base.HealthUpdateBase): 'status': status}) try: - if member_status is not None: + if (member_status is not None and + member_status != member_db_status): self._update_status_and_emit_event( - session, self.member_repo, - constants.MEMBER, - member_id, member_status, - db_member.operating_status, - db_member.provisioning_status - ) + session, self.member_repo, constants.MEMBER, + member_id, member_status, member_db_status) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Member %s is not able to update " "in DB", member_id) try: - if pool_status is not None: + if (pool_status is not None and + pool_status != db_pool_dict['operating_status']): self._update_status_and_emit_event( session, self.pool_repo, constants.POOL, - pool_id, pool_status, db_pool.operating_status, - db_pool.provisioning_status - ) + pool_id, pool_status, db_pool_dict['operating_status']) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Pool %s is not in DB", pool_id) diff --git a/octavia/db/repositories.py b/octavia/db/repositories.py index c939bfccfb..097d1bc4b4 100644 --- a/octavia/db/repositories.py +++ b/octavia/db/repositories.py @@ -988,6 +988,78 @@ class AmphoraRepository(BaseRepository): return amp.to_data_model() + def get_lb_for_health_update(self, session, amphora_id): + """This method is for the health manager status update process. + + This is a time sensitive query that occurs often. + It is an explicit query as the ORM produces a poorly + optimized query. + + Use extreme caution making any changes to this query + as it can impact the scalability of the health manager. + All changes should be analyzed using SQL "EXPLAIN" to + make sure only indexes are being used. + Changes should also be evaluated using the stressHM tool. + + Note: The returned object is flat and not a graph representation + of the load balancer as it is not needed. This is on + purpose to optimize the processing time. This is not in + the normal data model objects. + + :param session: A Sql Alchemy database session. + :param amphora_id: The amphora ID to lookup the load balancer for. + :returns: A dictionary containing the required load balancer details. + """ + rows = session.execute( + "SELECT load_balancer.id, load_balancer.enabled, " + "load_balancer.provisioning_status AS lb_prov_status, " + "load_balancer.operating_status AS lb_op_status, " + "listener.id AS list_id, " + "listener.operating_status AS list_op_status, " + "pool.id AS pool_id, " + "pool.operating_status AS pool_op_status, " + "member.id AS member_id, " + "member.operating_status AS mem_op_status from " + "amphora JOIN load_balancer ON " + "amphora.load_balancer_id = load_balancer.id LEFT JOIN " + "listener ON load_balancer.id = listener.load_balancer_id " + "LEFT JOIN pool ON load_balancer.id = pool.load_balancer_id " + "LEFT JOIN member ON pool.id = member.pool_id WHERE " + "amphora.id = :amp_id AND amphora.status != :deleted AND " + "load_balancer.provisioning_status != :deleted;", + {'amp_id': amphora_id, 'deleted': consts.DELETED}).fetchall() + + lb = {} + listeners = {} + pools = {} + for row in rows: + if not lb: + lb['id'] = row['id'] + lb['enabled'] = row['enabled'] == 1 + lb['provisioning_status'] = row['lb_prov_status'] + lb['operating_status'] = row['lb_op_status'] + if row['list_id'] and row['list_id'] not in listeners: + listener = {'operating_status': row['list_op_status']} + listeners[row['list_id']] = listener + if row['pool_id']: + if row['pool_id'] in pools and row['member_id']: + member = {'operating_status': row['mem_op_status']} + pools[row['pool_id']]['members'][row['member_id']] = member + else: + pool = {'operating_status': row['pool_op_status'], + 'members': {}} + if row['member_id']: + member = {'operating_status': row['mem_op_status']} + pool['members'][row['member_id']] = member + pools[row['pool_id']] = pool + + if listeners: + lb['listeners'] = listeners + if pools: + lb['pools'] = pools + + return lb + class AmphoraBuildReqRepository(BaseRepository): model_class = models.AmphoraBuildRequest diff --git a/octavia/tests/functional/db/test_repositories.py b/octavia/tests/functional/db/test_repositories.py index d8344a951c..19a7cc3822 100644 --- a/octavia/tests/functional/db/test_repositories.py +++ b/octavia/tests/functional/db/test_repositories.py @@ -40,6 +40,9 @@ class BaseRepositoryTest(base.OctaviaDBTestBase): FAKE_UUID_2 = uuidutils.generate_uuid() FAKE_UUID_3 = uuidutils.generate_uuid() FAKE_UUID_4 = uuidutils.generate_uuid() + FAKE_UUID_5 = uuidutils.generate_uuid() + FAKE_UUID_6 = uuidutils.generate_uuid() + FAKE_UUID_7 = uuidutils.generate_uuid() FAKE_EXP_AGE = 10 def setUp(self): @@ -3076,6 +3079,79 @@ class AmphoraRepositoryTest(BaseRepositoryTest): self.assertEqual(cert_expired_amphora.cert_expiration, expiration) self.assertEqual(cert_expired_amphora.id, amphora2.id) + def test_get_lb_for_health_update(self): + amphora1 = self.create_amphora(self.FAKE_UUID_1) + amphora2 = self.create_amphora(self.FAKE_UUID_3) + self.amphora_repo.associate(self.session, self.lb.id, amphora1.id) + self.amphora_repo.associate(self.session, self.lb.id, amphora2.id) + + lb_ref = {'enabled': True, 'id': self.lb.id, + 'operating_status': constants.ONLINE, + 'provisioning_status': constants.ACTIVE} + + # Test with just a load balancer + lb = self.amphora_repo.get_lb_for_health_update(self.session, + self.FAKE_UUID_1) + self.assertEqual(lb_ref, lb) + + pool = self.pool_repo.create( + self.session, id=self.FAKE_UUID_4, project_id=self.FAKE_UUID_2, + name="pool_test", description="pool_description", + protocol=constants.PROTOCOL_HTTP, load_balancer_id=self.lb.id, + lb_algorithm=constants.LB_ALGORITHM_ROUND_ROBIN, + provisioning_status=constants.ACTIVE, + operating_status=constants.ONLINE, enabled=True) + + pool_ref = {pool.id: {'members': {}, + 'operating_status': constants.ONLINE}} + lb_ref['pools'] = pool_ref + + # Test with an LB and a pool + lb = self.amphora_repo.get_lb_for_health_update(self.session, + self.FAKE_UUID_1) + self.assertEqual(lb_ref, lb) + + listener = self.listener_repo.create( + self.session, id=self.FAKE_UUID_5, project_id=self.FAKE_UUID_2, + name="listener_name", description="listener_description", + protocol=constants.PROTOCOL_HTTP, protocol_port=80, + connection_limit=1, operating_status=constants.ONLINE, + load_balancer_id=self.lb.id, provisioning_status=constants.ACTIVE, + enabled=True, peer_port=1025, default_pool_id=pool.id) + + listener_ref = {listener.id: {'operating_status': constants.ONLINE}} + lb_ref['listeners'] = listener_ref + + # Test with an LB, pool, and listener (no members) + lb = self.amphora_repo.get_lb_for_health_update(self.session, + self.FAKE_UUID_1) + self.assertEqual(lb_ref, lb) + + member1 = self.member_repo.create(self.session, id=self.FAKE_UUID_6, + project_id=self.FAKE_UUID_2, + pool_id=pool.id, + ip_address="192.0.2.1", + protocol_port=80, enabled=True, + provisioning_status=constants.ACTIVE, + operating_status=constants.ONLINE) + + member2 = self.member_repo.create(self.session, id=self.FAKE_UUID_7, + project_id=self.FAKE_UUID_2, + pool_id=pool.id, + ip_address="192.0.2.21", + protocol_port=80, enabled=True, + provisioning_status=constants.ACTIVE, + operating_status=constants.OFFLINE) + + member_ref = {member1.id: {'operating_status': constants.ONLINE}, + member2.id: {'operating_status': constants.OFFLINE}} + lb_ref['pools'][pool.id]['members'] = member_ref + + # Test with an LB, pool, listener, and members + lb = self.amphora_repo.get_lb_for_health_update(self.session, + self.FAKE_UUID_1) + self.assertEqual(lb_ref, lb) + class AmphoraHealthRepositoryTest(BaseRepositoryTest): def setUp(self): diff --git a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py index 4d8bcbeed9..656eabf889 100644 --- a/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py +++ b/octavia/tests/unit/controller/healthmanager/health_drivers/test_update_db.py @@ -44,9 +44,9 @@ class TestUpdateHealthDb(base.TestCase): def setUp(self): super(TestUpdateHealthDb, self).setUp() - conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) - conf.config(group="health_manager", - event_streamer_driver='queue_event_streamer') + self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) + self.conf.config(group="health_manager", + event_streamer_driver='queue_event_streamer') session_patch = mock.patch('octavia.db.api.get_session') self.addCleanup(session_patch.stop) @@ -65,8 +65,6 @@ class TestUpdateHealthDb(base.TestCase): self.pool_repo = mock.MagicMock() self.hm.amphora_repo = self.amphora_repo - fake_lb = mock.MagicMock() - self.hm.amphora_repo.get_lb_for_amphora.return_value = fake_lb self.hm.amphora_health_repo = self.amphora_health_repo self.hm.listener_repo = self.listener_repo self.hm.listener_repo.count.return_value = 1 @@ -104,6 +102,7 @@ class TestUpdateHealthDb(base.TestCase): mock_lb.pools = [mock_pool1] if mock_listener1: mock_listener1.pools = [mock_pool1] + mock_listener1.default_pool = mock_pool1 for i in range(members): mock_member_x = mock.Mock() mock_member_x.id = 'member-id-%s' % (i + 1) @@ -116,13 +115,45 @@ class TestUpdateHealthDb(base.TestCase): return mock_lb, mock_listener1, mock_pool1, mock_members + def _make_fake_lb_health_dict(self, listener=True, pool=True, + health_monitor=True, members=1, + lb_prov_status=constants.ACTIVE): + + lb_ref = {'enabled': True, 'id': self.FAKE_UUID_1, + constants.OPERATING_STATUS: 'bogus', + constants.PROVISIONING_STATUS: lb_prov_status} + + if pool: + members_dict = {} + if health_monitor: + member_operating_status = 'NOTHING_MATCHABLE' + else: + member_operating_status = constants.NO_MONITOR + + for i in range(members): + member_id = 'member-id-%s' % (i + 1) + members_dict[member_id] = { + constants.OPERATING_STATUS: member_operating_status} + + pool_ref = {'pool-id-1': {'members': members_dict, + constants.OPERATING_STATUS: 'bogus'}} + lb_ref['pools'] = pool_ref + + if listener: + listener_ref = {'listener-id-1': { + constants.OPERATING_STATUS: 'bogus'}} + lb_ref['listeners'] = listener_ref + + return lb_ref + def test_update_health_event_stream(self): health = { "id": self.FAKE_UUID_1, "listeners": { "listener-id-1": {"status": constants.OPEN, "pools": { "pool-id-1": {"status": constants.UP, - "members": {"member-id-1": constants.UP} + "members": {"member-id-1": constants.UP, + "member-id-2": constants.UP} } } } @@ -130,9 +161,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.event_client.cast.assert_any_call( @@ -156,12 +186,11 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=False, pool=False)) - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False) + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertTrue(self.amphora_health_repo.replace.called) @@ -173,13 +202,12 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=True, pool=False, - lb_prov_status=constants.PENDING_UPDATE)) - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict( + listener=True, pool=False, lb_prov_status=constants.PENDING_UPDATE) + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertTrue(self.amphora_health_repo.replace.called) @@ -191,12 +219,11 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=True, pool=False)) - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(listener=True, pool=False) + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) self.assertTrue(self.loadbalancer_repo.update.called) self.assertFalse(self.amphora_health_repo.replace.called) @@ -208,12 +235,12 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() - hb_interval - 1 # extra -1 for buffer } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=False, pool=False)) - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False) + + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) # Receive time is stale, so we shouldn't see this called self.assertFalse(self.loadbalancer_repo.update.called) @@ -234,10 +261,9 @@ class TestUpdateHealthDb(base.TestCase): self.session_mock.commit.side_effect = TestException('boom') - mock_lb, mock_listener1, mock_pool1, mock_member1 = ( - self._make_mock_lb_tree()) + lb_ref = self._make_fake_lb_health_dict() - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -258,9 +284,9 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref + self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -285,10 +311,9 @@ class TestUpdateHealthDb(base.TestCase): operating_status=constants.ONLINE) # If the listener count is wrong, make sure we don't update - mock_listener2 = mock.Mock() - mock_listener2.id = 'listener-id-2' - mock_listener2.pools = [mock_pool1] - mock_lb.listeners = [mock_listener1, mock_listener2] + lb_ref['listeners']['listener-id-2'] = { + constants.OPERATING_STATUS: 'bogus'} + self.amphora_health_repo.replace.reset_mock() self.hm.update_health(health, '192.0.2.1') @@ -304,10 +329,9 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) + lb_ref = self._make_fake_lb_health_dict() - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -319,7 +343,7 @@ class TestUpdateHealthDb(base.TestCase): self.session_mock, listener_id, operating_status=constants.ONLINE) self.pool_repo.update.assert_any_call( - self.session_mock, mock_pool1.id, + self.session_mock, 'pool-id-1', operating_status=constants.OFFLINE ) @@ -339,22 +363,16 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) + lb_ref = self._make_fake_lb_health_dict() - mock_member2 = mock.Mock() - mock_member2.id = 'member-id-2' - mock_pool2 = mock.Mock() - mock_pool2.id = "pool-id-2" - mock_pool2.members = [mock_member2] - mock_listener2 = mock.Mock() - mock_listener2.id = 'listener-id-2' - mock_listener2.pools = [mock_pool2] + lb_ref['pools']['pool-id-2'] = { + constants.OPERATING_STATUS: 'bogus', + 'members': {'member-id-2': {constants.OPERATING_STATUS: 'bogus'}}} - mock_lb.listeners.append(mock_listener2) - mock_lb.pools.append(mock_pool2) + lb_ref['listeners']['listener-id-2'] = { + constants.OPERATING_STATUS: 'bogus'} - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -368,11 +386,11 @@ class TestUpdateHealthDb(base.TestCase): # Call count should be exactly 2, as each pool should be processed once self.assertEqual(2, self.pool_repo.update.call_count) self.pool_repo.update.assert_has_calls([ - mock.call(self.session_mock, mock_pool1.id, + mock.call(self.session_mock, 'pool-id-1', operating_status=constants.ERROR), - mock.call(self.session_mock, mock_pool2.id, + mock.call(self.session_mock, 'pool-id-2', operating_status=constants.ONLINE) - ]) + ], any_order=True) def test_update_lb_and_list_pool_health_online(self): @@ -389,9 +407,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -431,9 +448,12 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + + lb_ref['pools']['pool-id-2'] = { + constants.OPERATING_STATUS: constants.OFFLINE} + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -462,9 +482,8 @@ class TestUpdateHealthDb(base.TestCase): "members": {"member-id-1": constants.DRAIN}}}}}, "recv_time": time.time()} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -502,9 +521,8 @@ class TestUpdateHealthDb(base.TestCase): "members": {"member-id-1": constants.MAINT}}}}}, "recv_time": time.time()} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -542,9 +560,8 @@ class TestUpdateHealthDb(base.TestCase): "members": {"member-id-1": "blah"}}}}}, "recv_time": time.time()} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -578,9 +595,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -621,9 +637,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(health_monitor=False)) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(health_monitor=False) + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -659,11 +674,11 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(health_monitor=False)) - mock_members[0].admin_state_up = False - mock_members[0].operating_status = constants.NO_MONITOR - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(health_monitor=False) + member1 = lb_ref['pools']['pool-id-1']['members']['member-id-1'] + member1[constants.OPERATING_STATUS] = constants.NO_MONITOR + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -683,7 +698,7 @@ class TestUpdateHealthDb(base.TestCase): operating_status=constants.ONLINE) self.member_repo.update.assert_any_call( - self.session_mock, mock_members[0].id, + self.session_mock, 'member-id-1', operating_status=constants.OFFLINE) def test_update_health_member_no_check(self): @@ -702,9 +717,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -743,11 +757,12 @@ class TestUpdateHealthDb(base.TestCase): "member-id-1": constants.UP}}}}}, "recv_time": time.time()} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(members=2)) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(members=2) + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref + self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) # test listener, member for listener_id, listener in six.iteritems( @@ -770,7 +785,7 @@ class TestUpdateHealthDb(base.TestCase): self.session_mock, member_id, operating_status=constants.ONLINE) self.member_repo.update.assert_any_call( - self.session_mock, mock_members[1].id, + self.session_mock, 'member-id-2', operating_status=constants.OFFLINE) def test_update_health_list_full_member_down(self): @@ -788,9 +803,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -815,10 +829,9 @@ class TestUpdateHealthDb(base.TestCase): self.session_mock, member_id, operating_status=constants.ERROR) - mock_listener2 = mock.Mock() - mock_listener2.id = 'listener-id-2' - mock_listener2.pools = [mock_pool1] - mock_lb.listeners.append(mock_listener2) + lb_ref['listeners']['listener-id-2'] = { + constants.OPERATING_STATUS: 'bogus'} + self.amphora_health_repo.replace.reset_mock() self.hm.update_health(health, '192.0.2.1') @@ -839,9 +852,8 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -914,25 +926,24 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=False, pool=False)) + lb_ref = self._make_fake_lb_health_dict() # Build our own custom listeners/pools/members for i in [1, 2, 3, 4, 5]: - mock_member = mock.Mock() - mock_member.id = 'member-id-%s' % i - mock_pool = mock.Mock() - mock_pool.id = 'pool-id-%s' % i - mock_pool.members = [mock_member] - if i == 3: - mock_member = mock.Mock() - mock_member.id = 'member-id-31' - mock_pool.members.append(mock_member) - mock_listener = mock.Mock() - mock_listener.id = 'listener-id-%s' % i - mock_listener.pools = [mock_pool] - mock_lb.listeners.append(mock_listener) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref['listeners']['listener-id-%s' % i] = { + constants.OPERATING_STATUS: 'bogus'} + + if i == 3: + members_dict = {'member-id-3': { + constants.OPERATING_STATUS: 'bogus'}, 'member-id-31': { + constants.OPERATING_STATUS: 'bogus'}} + else: + members_dict = {'member-id-%s' % i: { + constants.OPERATING_STATUS: 'bogus'}} + lb_ref['pools']['pool-id-%s' % i] = { + 'members': members_dict, constants.OPERATING_STATUS: 'bogus'} + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') @@ -977,13 +988,15 @@ class TestUpdateHealthDb(base.TestCase): self.hm.member_repo.update.side_effect = ( [sqlalchemy.orm.exc.NoResultFound]) self.hm.pool_repo.update.side_effect = ( - [sqlalchemy.orm.exc.NoResultFound]) + sqlalchemy.orm.exc.NoResultFound) self.hm.loadbalancer_repo.update.side_effect = ( [sqlalchemy.orm.exc.NoResultFound]) - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict() + + lb_ref['pools']['pool-id-2'] = {constants.OPERATING_STATUS: 'bogus'} + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.assertTrue(self.amphora_health_repo.replace.called) @@ -1013,7 +1026,7 @@ class TestUpdateHealthDb(base.TestCase): def test_update_health_zombie(self, mock_driver): health = {"id": self.FAKE_UUID_1, "listeners": {}} - self.amphora_repo.get_lb_for_amphora.return_value = None + self.amphora_repo.get_lb_for_health_update.return_value = None amp_mock = mock.MagicMock() self.amphora_repo.get.return_value = amp_mock self.hm.update_health(health, '192.0.2.1') @@ -1037,15 +1050,18 @@ class TestUpdateHealthDb(base.TestCase): "recv_time": time.time() } - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree()) + lb_ref = self._make_fake_lb_health_dict() # Start everything ONLINE - mock_members[0].operating_status = constants.ONLINE - mock_pool1.operating_status = constants.ONLINE - mock_listener1.operating_status = constants.ONLINE - mock_lb.operating_status = constants.ONLINE - self.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref[constants.OPERATING_STATUS] = constants.ONLINE + listener1 = lb_ref['listeners']['listener-id-1'] + listener1[constants.OPERATING_STATUS] = constants.ONLINE + pool1 = lb_ref['pools']['pool-id-1'] + pool1[constants.OPERATING_STATUS] = constants.ONLINE + member1 = pool1['members']['member-id-1'] + member1[constants.OPERATING_STATUS] = constants.ONLINE + + self.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') self.event_client.cast.assert_not_called() @@ -1060,45 +1076,108 @@ class TestUpdateHealthDb(base.TestCase): "listeners": {}, "recv_time": time.time()} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=False, pool=False)) - mock_lb.enabled = False - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False) + lb_ref['enabled'] = False + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) self.assertTrue(self.loadbalancer_repo.update.called) self.loadbalancer_repo.update.assert_called_with( - self.mock_session(), mock_lb.id, + self.mock_session(), self.FAKE_UUID_1, operating_status='OFFLINE') def test_update_health_lb_admin_up(self): health = { "id": self.FAKE_UUID_1, "listeners": {}, - "recv_time": time.time()} + "recv_time": time.time(), + "ver": 1} - mock_lb, mock_listener1, mock_pool1, mock_members = ( - self._make_mock_lb_tree(listener=False, pool=False)) - mock_lb.enabled = True - self.hm.amphora_repo.get_lb_for_amphora.return_value = mock_lb + lb_ref = self._make_fake_lb_health_dict(listener=False, pool=False) + self.hm.amphora_repo.get_lb_for_health_update.return_value = lb_ref self.hm.update_health(health, '192.0.2.1') - self.assertTrue(self.amphora_repo.get_lb_for_amphora.called) + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) self.assertTrue(self.loadbalancer_repo.update.called) self.loadbalancer_repo.update.assert_called_with( - self.mock_session(), mock_lb.id, + self.mock_session(), self.FAKE_UUID_1, operating_status='ONLINE') + def test_update_health_no_db_lb(self): + health = { + "id": self.FAKE_UUID_1, + "listeners": {}, + "recv_time": time.time() + } + self.hm.amphora_repo.get_lb_for_health_update.return_value = {} + + self.hm.update_health(health, '192.0.2.1') + + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) + self.assertFalse(self.amphora_health_repo.replace.called) + + # Test missing amp in addition to missing lb DB record + self.amphora_repo.get_lb_for_health_update.reset_mock() + self.amphora_health_repo.replace.reset_mock() + + mock_amphora = mock.MagicMock() + mock_amphora.load_balancer_id = None + self.amphora_repo.get.return_value = mock_amphora + + self.hm.update_health(health, '192.0.2.1') + + self.assertTrue(self.amphora_repo.get_lb_for_health_update.called) + self.assertTrue(self.amphora_repo.get.called) + self.assertTrue(self.amphora_health_repo.replace.called) + + def test_update_status_and_emit_event(self): + + # Test update with the same operating status + self.conf.config(group="health_manager", + event_streamer_driver=constants.NOOP_EVENT_STREAMER) + self.hm._update_status_and_emit_event( + 'fake_session', self.loadbalancer_repo, constants.LOADBALANCER, + 1, 'ONLINE', 'ONLINE') + self.assertFalse(self.loadbalancer_repo.update.called) + self.assertFalse(self.event_client.cast.called) + + self.conf.config(group="health_manager", + event_streamer_driver='queue_event_streamer', + sync_provisioning_status=True) + + self.loadbalancer_repo.update.reset_mock() + self.event_client.reset_mock() + + # Test stream with provisioning sync + self.hm._update_status_and_emit_event( + 'fake_session', self.loadbalancer_repo, constants.LOADBALANCER, + 1, 'ONLINE', 'OFFLINE') + self.assertTrue(self.loadbalancer_repo.update.called) + self.assertTrue(self.event_client.cast.called) + + self.conf.config(group="health_manager", + sync_provisioning_status=False) + + self.loadbalancer_repo.update.reset_mock() + self.event_client.reset_mock() + + # Test stream with no provisioning sync + self.hm._update_status_and_emit_event( + 'fake_session', self.loadbalancer_repo, constants.LOADBALANCER, + 1, 'ONLINE', 'ONLINE') + self.assertFalse(self.loadbalancer_repo.update.called) + self.assertFalse(self.event_client.cast.called) + class TestUpdateStatsDb(base.TestCase): def setUp(self): super(TestUpdateStatsDb, self).setUp() - conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) - conf.config(group="health_manager", - event_streamer_driver='queue_event_streamer') + self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) + self.conf.config(group="health_manager", + event_streamer_driver='queue_event_streamer') self.sm = update_db.UpdateStatsDb() self.event_client = mock.MagicMock() @@ -1141,7 +1220,7 @@ class TestUpdateStatsDb(base.TestCase): self.loadbalancer_repo.get.return_value = self.loadbalancer @mock.patch('octavia.db.api.get_session') - def test_update_stats(self, session): + def test_update_stats(self, mock_session): health = { "id": self.amphora_id, @@ -1165,7 +1244,7 @@ class TestUpdateStatsDb(base.TestCase): } } - session.return_value = 'blah' + mock_session.return_value = 'blah' self.sm.update_stats(health, '192.0.2.1') @@ -1202,3 +1281,39 @@ class TestUpdateStatsDb(base.TestCase): self.listener_stats.active_connections, 'bytes_out': self.listener_stats.bytes_out, 'request_errors': self.listener_stats.request_errors}}) + + # Test with noop streamer + self.event_client.cast.reset_mock() + self.conf.config(group="health_manager", + event_streamer_driver=constants.NOOP_EVENT_STREAMER) + + self.sm.update_stats(health, '192.0.2.1') + + self.conf.config(group="health_manager", + event_streamer_driver='queue_event_streamer') + self.assertFalse(self.event_client.cast.called) + + # Test with missing DB listener + self.event_client.cast.reset_mock() + self.sm.repo_listener.get.return_value = None + + self.sm.update_stats(health, '192.0.2.1') + + self.event_client.cast.assert_called_once_with( + {}, 'update_info', container={ + 'info_type': 'listener_stats', + 'info_id': self.listener_id, + 'info_payload': { + 'bytes_in': self.listener_stats.bytes_in, + 'total_connections': + self.listener_stats.total_connections, + 'active_connections': + self.listener_stats.active_connections, + 'bytes_out': self.listener_stats.bytes_out, + 'request_errors': self.listener_stats.request_errors}}) + + # Test with update failure + self.event_client.cast.reset_mock() + mock_session.side_effect = Exception + self.sm.update_stats(health, '192.0.2.1') + self.assertFalse(self.event_client.cast.called)