diff --git a/octavia/controller/housekeeping/house_keeping.py b/octavia/controller/housekeeping/house_keeping.py index 226fed402c..48bc475b65 100644 --- a/octavia/controller/housekeeping/house_keeping.py +++ b/octavia/controller/housekeeping/house_keeping.py @@ -72,18 +72,24 @@ class DatabaseCleanup(object): seconds=CONF.house_keeping.amphora_expiry_age) session = db_api.get_session() - amphora, _ = self.amp_repo.get_all(session, status=constants.DELETED) + expiring_amphora = self.amp_repo.get_all_deleted_expiring_amphora( + session, exp_age=exp_age) - for amp in amphora: - if self.amp_health_repo.check_amphora_expired(session, amp.id, - exp_age): - LOG.info('Attempting to delete Amphora id : %s', amp.id) + for amp in expiring_amphora: + # If we're here, we already think the amp is expiring according to + # the amphora table. Now check it is expired in the health table. + # In this way, we ensure that amps aren't deleted unless they are + # both expired AND no longer receiving zombie heartbeats. + if self.amp_health_repo.check_amphora_health_expired( + session, amp.id, exp_age): + LOG.debug('Attempting to purge db record for Amphora ID: %s', + amp.id) self.amp_repo.delete(session, id=amp.id) try: self.amp_health_repo.delete(session, amphora_id=amp.id) except sqlalchemy_exceptions.NoResultFound: pass # Best effort delete, this record might not exist - LOG.info('Deleted Amphora id : %s', amp.id) + LOG.info('Purged db record for Amphora ID: %s', amp.id) def cleanup_load_balancers(self): """Checks the DB for old load balancers and triggers their removal.""" diff --git a/octavia/controller/worker/controller_worker.py b/octavia/controller/worker/controller_worker.py index d019bb49f9..ca61a5aa7f 100644 --- a/octavia/controller/worker/controller_worker.py +++ b/octavia/controller/worker/controller_worker.py @@ -732,11 +732,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): if amp.status == constants.DELETED: LOG.warning('Amphora %s is marked DELETED in the database but ' - 'was submitted for failover. Marking it busy in the ' + 'was submitted for failover. Deleting it from the ' 'amphora health table to exclude it from health ' 'checks and skipping the failover.', amp.id) - self._amphora_health_repo.update(db_apis.get_session(), amp.id, - busy=True) + self._amphora_health_repo.delete(db_apis.get_session(), + amphora_id=amp.id) return if (CONF.house_keeping.spare_amphora_pool_size == 0) and ( @@ -755,8 +755,8 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): lb[0].server_group_id) failover_amphora_tf = self._taskflow_load( - self._amphora_flows.get_failover_flow(role=amp.role, - status=amp.status), + self._amphora_flows.get_failover_flow( + role=amp.role, load_balancer_id=amp.load_balancer_id), store=stored_params) with tf_logging.DynamicLoggingListener( diff --git a/octavia/controller/worker/flows/amphora_flows.py b/octavia/controller/worker/flows/amphora_flows.py index c35a54e316..e2b5bedf23 100644 --- a/octavia/controller/worker/flows/amphora_flows.py +++ b/octavia/controller/worker/flows/amphora_flows.py @@ -290,7 +290,7 @@ class AmphoraFlows(object): return delete_amphora_flow def get_failover_flow(self, role=constants.ROLE_STANDALONE, - status=constants.AMPHORA_READY): + load_balancer_id=None): """Creates a flow to failover a stale amphora :returns: The flow for amphora failover @@ -329,16 +329,16 @@ class AmphoraFlows(object): failover_amphora_flow.add(network_tasks.WaitForPortDetach( rebind={constants.AMPHORA: constants.FAILED_AMPHORA}, requires=constants.AMPHORA)) - failover_amphora_flow.add( - database_tasks.DisableAmphoraHealthMonitoring( - rebind={constants.AMPHORA: constants.FAILED_AMPHORA}, - requires=constants.AMPHORA)) failover_amphora_flow.add(database_tasks.MarkAmphoraDeletedInDB( rebind={constants.AMPHORA: constants.FAILED_AMPHORA}, requires=constants.AMPHORA)) # If this is an unallocated amp (spares pool), we're done - if status != constants.AMPHORA_ALLOCATED: + if not load_balancer_id: + failover_amphora_flow.add( + database_tasks.DisableAmphoraHealthMonitoring( + rebind={constants.AMPHORA: constants.FAILED_AMPHORA}, + requires=constants.AMPHORA)) return failover_amphora_flow # Save failed amphora details for later @@ -413,6 +413,10 @@ class AmphoraFlows(object): failover_amphora_flow.add(amphora_driver_tasks.ListenersStart( requires=(constants.LOADBALANCER, constants.LISTENERS))) + failover_amphora_flow.add( + database_tasks.DisableAmphoraHealthMonitoring( + rebind={constants.AMPHORA: constants.FAILED_AMPHORA}, + requires=constants.AMPHORA)) return failover_amphora_flow diff --git a/octavia/controller/worker/tasks/lifecycle_tasks.py b/octavia/controller/worker/tasks/lifecycle_tasks.py index 309ce1a6e9..d199e5dd26 100644 --- a/octavia/controller/worker/tasks/lifecycle_tasks.py +++ b/octavia/controller/worker/tasks/lifecycle_tasks.py @@ -33,7 +33,6 @@ class AmphoraIDToErrorOnRevertTask(BaseLifecycleTask): def revert(self, amphora_id, *args, **kwargs): self.task_utils.mark_amphora_status_error(amphora_id) - self.task_utils.unmark_amphora_health_busy(amphora_id) class AmphoraToErrorOnRevertTask(AmphoraIDToErrorOnRevertTask): diff --git a/octavia/db/repositories.py b/octavia/db/repositories.py index a7ca953261..24853902f9 100644 --- a/octavia/db/repositories.py +++ b/octavia/db/repositories.py @@ -954,6 +954,33 @@ class AmphoraRepository(BaseRepository): data_model_list = [model.to_data_model() for model in lb_list] return data_model_list + def get_all_deleted_expiring_amphora(self, session, exp_age=None): + + """Get all previously deleted amphora that are now expiring. + + :param session: A Sql Alchemy database session. + :param exp_age: A standard datetime delta which is used to see for how + long can an amphora live without updates before it is + considered expired (default: + CONF.house_keeping.amphora_expiry_age) + :returns: [octavia.common.data_model] + """ + if not exp_age: + exp_age = datetime.timedelta( + seconds=CONF.house_keeping.amphora_expiry_age) + + expiry_time = datetime.datetime.utcnow() - exp_age + + query = session.query(self.model_class).filter_by( + status=consts.DELETED).filter( + self.model_class.updated_at < expiry_time) + # Only make one trip to the database + query = query.options(joinedload('*')) + model_list = query.all() + + data_model_list = [model.to_data_model() for model in model_list] + return data_model_list + def get_spare_amphora_count(self, session): """Get the count of the spare amphora. @@ -1096,8 +1123,8 @@ class AmphoraHealthRepository(BaseRepository): model_kwargs['amphora_id'] = amphora_id self.create(session, **model_kwargs) - def check_amphora_expired(self, session, amphora_id, exp_age=None): - """check if a specific amphora is expired + def check_amphora_health_expired(self, session, amphora_id, exp_age=None): + """check if a specific amphora is expired in the amphora_health table :param session: A Sql Alchemy database session. :param amphora_id: id of an amphora object @@ -1111,16 +1138,28 @@ class AmphoraHealthRepository(BaseRepository): exp_age = datetime.timedelta( seconds=CONF.house_keeping.amphora_expiry_age) - timestamp = datetime.datetime.utcnow() - exp_age - amphora_health = self.get(session, amphora_id=amphora_id) - if amphora_health is not None: - return amphora_health.last_update < timestamp - else: - # Amphora was just destroyed. - return True + expiry_time = datetime.datetime.utcnow() - exp_age + + amphora_model = ( + session.query(models.AmphoraHealth) + .filter_by(amphora_id=amphora_id) + .filter(models.AmphoraHealth.last_update > expiry_time) + ).first() + # This will return a value if: + # * there is an entry in the table for this amphora_id + # AND + # * the entry was last updated more recently than our expiry_time + # Receiving any value means that the amp is unexpired. + + # In contrast, we receive no value if: + # * there is no entry for this amphora_id + # OR + # * the entry was last updated before our expiry_time + # In this case, the amphora is expired. + return amphora_model is None def get_stale_amphora(self, session): - """Retrieves a staled amphora from the health manager database. + """Retrieves a stale amphora from the health manager database. :param session: A Sql Alchemy database session. :returns: [octavia.common.data_model] diff --git a/octavia/tests/functional/db/test_repositories.py b/octavia/tests/functional/db/test_repositories.py index aa31612380..ddaab2c05b 100644 --- a/octavia/tests/functional/db/test_repositories.py +++ b/octavia/tests/functional/db/test_repositories.py @@ -2946,17 +2946,20 @@ class AmphoraRepositoryTest(BaseRepositoryTest): provisioning_status=constants.ACTIVE, operating_status=constants.ONLINE, enabled=True) - def create_amphora(self, amphora_id): - expiration = datetime.datetime.utcnow() - amphora = self.amphora_repo.create(self.session, id=amphora_id, - compute_id=self.FAKE_UUID_3, - status=constants.ACTIVE, - lb_network_ip=self.FAKE_IP, - vrrp_ip=self.FAKE_IP, - ha_ip=self.FAKE_IP, - role=constants.ROLE_MASTER, - cert_expiration=expiration, - cert_busy=False) + def create_amphora(self, amphora_id, **overrides): + settings = { + 'id': amphora_id, + 'compute_id': self.FAKE_UUID_3, + 'status': constants.ACTIVE, + 'lb_network_ip': self.FAKE_IP, + 'vrrp_ip': self.FAKE_IP, + 'ha_ip': self.FAKE_IP, + 'role': constants.ROLE_MASTER, + 'cert_expiration': datetime.datetime.utcnow(), + 'cert_busy': False + } + settings.update(overrides) + amphora = self.amphora_repo.create(self.session, **settings) return amphora def test_get(self): @@ -3037,6 +3040,20 @@ class AmphoraRepositoryTest(BaseRepositoryTest): self.assertIsNotNone(lb_list) self.assertIn(self.lb, lb_list) + def get_all_deleted_expiring_amphora(self): + exp_age = datetime.timedelta(seconds=self.FAKE_EXP_AGE) + updated_at = datetime.datetime.utcnow() - exp_age + amphora1 = self.create_amphora( + self.FAKE_UUID_1, updated_at=updated_at, status=constants.DELETED) + amphora2 = self.create_amphora( + self.FAKE_UUID_2, status=constants.DELETED) + + expiring_list = self.amphora_repo.get_all_deleted_expiring_amphora( + self.session, exp_age=exp_age) + expiring_ids = [amp.id for amp in expiring_list] + self.assertIn(amphora1.id, expiring_ids) + self.assertNotIn(amphora2.id, expiring_ids) + def test_get_spare_amphora_count(self): count = self.amphora_repo.get_spare_amphora_count(self.session) self.assertEqual(0, count) @@ -3131,7 +3148,7 @@ class AmphoraHealthRepositoryTest(BaseRepositoryTest): def test_check_amphora_expired_default_exp_age(self): """When exp_age defaults to CONF.house_keeping.amphora_expiry_age.""" self.create_amphora_health(self.amphora.id) - checkres = self.amphora_health_repo.check_amphora_expired( + checkres = self.amphora_health_repo.check_amphora_health_expired( self.session, self.amphora.id) # Default amphora_expiry_age value is 1 week so amphora shouldn't be # considered expired. @@ -3142,13 +3159,13 @@ class AmphoraHealthRepositoryTest(BaseRepositoryTest): exp_age = datetime.timedelta( seconds=self.FAKE_EXP_AGE) self.create_amphora_health(self.amphora.id) - checkres = self.amphora_health_repo.check_amphora_expired( + checkres = self.amphora_health_repo.check_amphora_health_expired( self.session, self.amphora.id, exp_age) self.assertTrue(checkres) def test_check_amphora_expired_with_no_age(self): """When the amphora_health entry is missing in the DB.""" - checkres = self.amphora_health_repo.check_amphora_expired( + checkres = self.amphora_health_repo.check_amphora_health_expired( self.session, self.amphora.id) self.assertTrue(checkres) diff --git a/octavia/tests/unit/controller/housekeeping/test_house_keeping.py b/octavia/tests/unit/controller/housekeeping/test_house_keeping.py index a2543bdf69..402e5037f1 100644 --- a/octavia/tests/unit/controller/housekeeping/test_house_keeping.py +++ b/octavia/tests/unit/controller/housekeeping/test_house_keeping.py @@ -12,8 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime + import mock from oslo_config import cfg +from oslo_config import fixture as oslo_fixture from oslo_utils import uuidutils from octavia.common import constants @@ -49,13 +52,14 @@ class TestSpareCheck(base.TestCase): self.spare_amp.amp_repo = self.amp_repo self.spare_amp.cw = self.cw - self.CONF = cfg.CONF + self.CONF = self.useFixture(oslo_fixture.Config(cfg.CONF)) @mock.patch('octavia.db.api.get_session') def test_spare_check_diff_count(self, session): """When spare amphora count does not meet the requirement.""" session.return_value = session - self.CONF.house_keeping.spare_amphora_pool_size = self.FAKE_CNF_SPAR1 + self.CONF.config(group="house_keeping", + spare_amphora_pool_size=self.FAKE_CNF_SPAR1) self.amp_repo.get_spare_amphora_count.return_value = ( self.FAKE_CUR_SPAR1) self.spare_amp.spare_check() @@ -68,7 +72,8 @@ class TestSpareCheck(base.TestCase): def test_spare_check_no_diff_count(self, session): """When spare amphora count meets the requirement.""" session.return_value = session - self.CONF.house_keeping.spare_amphora_pool_size = self.FAKE_CNF_SPAR2 + self.CONF.config(group="house_keeping", + spare_amphora_pool_size=self.FAKE_CNF_SPAR2) self.amp_repo.get_spare_amphora_count.return_value = ( self.FAKE_CUR_SPAR2) self.spare_amp.spare_check() @@ -83,7 +88,7 @@ class TestDatabaseCleanup(base.TestCase): FAKE_IP = "10.0.0.1" FAKE_UUID_1 = uuidutils.generate_uuid() FAKE_UUID_2 = uuidutils.generate_uuid() - FAKE_EXP_AGE = 10 + FAKE_EXP_AGE = 60 def setUp(self): super(TestDatabaseCleanup, self).setUp() @@ -95,48 +100,84 @@ class TestDatabaseCleanup(base.TestCase): self.dbclean.amp_repo = self.amp_repo self.dbclean.amp_health_repo = self.amp_health_repo - self.CONF = cfg.CONF + self.CONF = self.useFixture(oslo_fixture.Config(cfg.CONF)) @mock.patch('octavia.db.api.get_session') def test_delete_old_amphorae_True(self, session): """When the deleted amphorae is expired.""" session.return_value = session - self.CONF.house_keeping.amphora_expiry_age = self.FAKE_EXP_AGE + self.CONF.config(group="house_keeping", + amphora_expiry_age=self.FAKE_EXP_AGE) + expired_time = datetime.datetime.utcnow() - datetime.timedelta( + seconds=self.FAKE_EXP_AGE + 1) amphora = self.amp.create(session, id=self.FAKE_UUID_1, compute_id=self.FAKE_UUID_2, status=constants.DELETED, lb_network_ip=self.FAKE_IP, vrrp_ip=self.FAKE_IP, - ha_ip=self.FAKE_IP) - self.amp_repo.get_all.return_value = ([amphora], None) - self.amp_health_repo.check_amphora_expired.return_value = True + ha_ip=self.FAKE_IP, + updated_at=expired_time) + self.amp_repo.get_all_deleted_expiring_amphora.return_value = [amphora] + self.amp_health_repo.check_amphora_health_expired.return_value = True self.dbclean.delete_old_amphorae() - self.assertTrue(self.amp_repo.get_all.called) - self.assertTrue(self.amp_health_repo.check_amphora_expired.called) + self.assertTrue(self.amp_repo.get_all_deleted_expiring_amphora.called) + self.assertTrue( + self.amp_health_repo.check_amphora_health_expired.called) self.assertTrue(self.amp_repo.delete.called) @mock.patch('octavia.db.api.get_session') def test_delete_old_amphorae_False(self, session): """When the deleted amphorae is not expired.""" session.return_value = session - self.CONF.house_keeping.amphora_expiry_age = self.FAKE_EXP_AGE + self.CONF.config(group="house_keeping", + amphora_expiry_age=self.FAKE_EXP_AGE) + self.amp.create(session, id=self.FAKE_UUID_1, + compute_id=self.FAKE_UUID_2, + status=constants.DELETED, + lb_network_ip=self.FAKE_IP, + vrrp_ip=self.FAKE_IP, + ha_ip=self.FAKE_IP, + updated_at=datetime.datetime.now()) + self.amp_repo.get_all_deleted_expiring_amphora.return_value = [] + self.dbclean.delete_old_amphorae() + self.assertTrue(self.amp_repo.get_all_deleted_expiring_amphora.called) + self.assertFalse( + self.amp_health_repo.check_amphora_health_expired.called) + self.assertFalse(self.amp_repo.delete.called) + + @mock.patch('octavia.db.api.get_session') + def test_delete_old_amphorae_Zombie(self, session): + """When the deleted amphorae is expired but is a zombie! + + This is when the amphora is expired in the amphora table, but in the + amphora_health table there are newer records, meaning the amp checked + in with the healthmanager *after* it was deleted (and craves brains). + """ + session.return_value = session + self.CONF.config(group="house_keeping", + amphora_expiry_age=self.FAKE_EXP_AGE) + expired_time = datetime.datetime.utcnow() - datetime.timedelta( + seconds=self.FAKE_EXP_AGE + 1) amphora = self.amp.create(session, id=self.FAKE_UUID_1, compute_id=self.FAKE_UUID_2, status=constants.DELETED, lb_network_ip=self.FAKE_IP, vrrp_ip=self.FAKE_IP, - ha_ip=self.FAKE_IP) - self.amp_repo.get_all.return_value = ([amphora], None) - self.amp_health_repo.check_amphora_expired.return_value = False + ha_ip=self.FAKE_IP, + updated_at=expired_time) + self.amp_repo.get_all_deleted_expiring_amphora.return_value = [amphora] + self.amp_health_repo.check_amphora_health_expired.return_value = False self.dbclean.delete_old_amphorae() - self.assertTrue(self.amp_repo.get_all.called) - self.assertTrue(self.amp_health_repo.check_amphora_expired.called) + self.assertTrue(self.amp_repo.get_all_deleted_expiring_amphora.called) + self.assertTrue( + self.amp_health_repo.check_amphora_health_expired.called) self.assertFalse(self.amp_repo.delete.called) @mock.patch('octavia.db.api.get_session') def test_delete_old_load_balancer(self, session): """Check delete of load balancers in DELETED provisioning status.""" - self.CONF.house_keeping.load_balancer_expiry_age = self.FAKE_EXP_AGE + self.CONF.config(group="house_keeping", + load_balancer_expiry_age=self.FAKE_EXP_AGE) session.return_value = session load_balancer = self.lb.create(session, id=self.FAKE_UUID_1, provisioning_status=constants.DELETED, diff --git a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py index 11a31b730d..e700c8c98a 100644 --- a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py @@ -237,7 +237,7 @@ class TestAmphoraFlows(base.TestCase): def test_get_failover_flow_allocated(self, mock_get_net_driver): amp_flow = self.AmpFlow.get_failover_flow( - status=constants.AMPHORA_ALLOCATED) + load_balancer_id='mylb') self.assertIsInstance(amp_flow, flow.Flow) @@ -257,7 +257,7 @@ class TestAmphoraFlows(base.TestCase): self.assertEqual(11, len(amp_flow.provides)) amp_flow = self.AmpFlow.get_failover_flow( - role=constants.ROLE_MASTER, status=constants.AMPHORA_ALLOCATED) + role=constants.ROLE_MASTER, load_balancer_id='mylb') self.assertIsInstance(amp_flow, flow.Flow) @@ -277,7 +277,7 @@ class TestAmphoraFlows(base.TestCase): self.assertEqual(11, len(amp_flow.provides)) amp_flow = self.AmpFlow.get_failover_flow( - role=constants.ROLE_BACKUP, status=constants.AMPHORA_ALLOCATED) + role=constants.ROLE_BACKUP, load_balancer_id='mylb') self.assertIsInstance(amp_flow, flow.Flow) @@ -297,7 +297,7 @@ class TestAmphoraFlows(base.TestCase): self.assertEqual(11, len(amp_flow.provides)) amp_flow = self.AmpFlow.get_failover_flow( - role='BOGUSROLE', status=constants.AMPHORA_ALLOCATED) + role='BOGUSROLE', load_balancer_id='mylb') self.assertIsInstance(amp_flow, flow.Flow) @@ -319,7 +319,7 @@ class TestAmphoraFlows(base.TestCase): def test_get_failover_flow_spare(self, mock_get_net_driver): amp_flow = self.AmpFlow.get_failover_flow( - status=constants.AMPHORA_READY) + load_balancer_id=None) self.assertIsInstance(amp_flow, flow.Flow) diff --git a/octavia/tests/unit/controller/worker/tasks/test_lifecycle_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_lifecycle_tasks.py index 84a49e2ee2..1a3e319bd2 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_lifecycle_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_lifecycle_tasks.py @@ -72,7 +72,7 @@ class TestLifecycleTasks(base.TestCase): amp_id_to_error_on_revert.revert(self.AMPHORA_ID) mock_amp_status_error.assert_called_once_with(self.AMPHORA_ID) - mock_amp_health_busy.assert_called_once_with(self.AMPHORA_ID) + self.assertFalse(mock_amp_health_busy.called) @mock.patch('octavia.controller.worker.task_utils.TaskUtils.' 'unmark_amphora_health_busy') @@ -92,7 +92,7 @@ class TestLifecycleTasks(base.TestCase): amp_to_error_on_revert.revert(self.AMPHORA) mock_amp_status_error.assert_called_once_with(self.AMPHORA_ID) - mock_amp_health_busy.assert_called_once_with(self.AMPHORA_ID) + self.assertFalse(mock_amp_health_busy.called) @mock.patch('octavia.controller.worker.task_utils.TaskUtils.' 'mark_health_mon_prov_status_error') diff --git a/octavia/tests/unit/controller/worker/test_controller_worker.py b/octavia/tests/unit/controller/worker/test_controller_worker.py index 4d039f5230..fdb7e68eab 100644 --- a/octavia/tests/unit/controller/worker/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/test_controller_worker.py @@ -1155,9 +1155,9 @@ class TestControllerWorker(base.TestCase): mock_update.assert_called_with('TEST', LB_ID, provisioning_status=constants.ACTIVE) - @mock.patch('octavia.db.repositories.AmphoraHealthRepository.update') + @mock.patch('octavia.db.repositories.AmphoraHealthRepository.delete') def test_failover_deleted_amphora(self, - mock_update, + mock_delete, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -1178,7 +1178,7 @@ class TestControllerWorker(base.TestCase): cw = controller_worker.ControllerWorker() cw._perform_amphora_failover(mock_amphora, 10) - mock_update.assert_called_with('TEST', AMP_ID, busy=True) + mock_delete.assert_called_with('TEST', amphora_id=AMP_ID) mock_taskflow_load.assert_not_called() @mock.patch('octavia.controller.worker.' diff --git a/tools/create_flow_docs.py b/tools/create_flow_docs.py index b553befd8a..e710cf74db 100755 --- a/tools/create_flow_docs.py +++ b/tools/create_flow_docs.py @@ -55,7 +55,7 @@ def generate(flow_list, output_directory): current_tuple[2] == 'get_failover_flow'): current_engine = engines.load( get_flow_method(role=constants.ROLE_STANDALONE, - status=constants.AMPHORA_ALLOCATED)) + load_balancer_id=None)) elif (current_tuple[1] == 'LoadBalancerFlows' and current_tuple[2] == 'get_create_load_balancer_flow'): current_engine = engines.load(