diff --git a/etc/octavia.conf b/etc/octavia.conf index 88e17452d1..03feff76e8 100644 --- a/etc/octavia.conf +++ b/etc/octavia.conf @@ -198,6 +198,12 @@ # # rest_request_conn_timeout = 10 # rest_request_read_timeout = 60 +# +# These "active" timeouts are used once the amphora should already +# be fully up and active. These values are lower than the other values to +# facilitate "fail fast" scenarios like failovers +# active_connection_max_retries = 15 +# active_connection_rety_interval = 2 [controller_worker] # workers = 1 diff --git a/octavia/amphorae/drivers/driver_base.py b/octavia/amphorae/drivers/driver_base.py index e7229ac2d6..9014069822 100644 --- a/octavia/amphorae/drivers/driver_base.py +++ b/octavia/amphorae/drivers/driver_base.py @@ -21,6 +21,25 @@ import six @six.add_metaclass(abc.ABCMeta) class AmphoraLoadBalancerDriver(object): + @abc.abstractmethod + def update_amphora_listeners(self, listeners, amphora_id, timeout_dict): + """Update the amphora with a new configuration. + + :param listeners: List of listeners to update. + :type listener: list + :param amphora_id: The ID of the amphora to update + :type amphora_id: string + :param timeout_dict: Dictionary of timeout values for calls to the + amphora. May contain: req_conn_timeout, + req_read_timeout, conn_max_retries, + conn_retry_interval + :returns: None + + Builds a new configuration, pushes it to the amphora, and reloads + the listener on one amphora. + """ + pass + @abc.abstractmethod def update(self, listener, vip): """Update the amphora with a new configuration. @@ -30,7 +49,7 @@ class AmphoraLoadBalancerDriver(object): :type listener: object :param vip: vip object, need to use its ip_address property :type vip: object - :returns: return a value list (listener, vip, status flag--update) + :returns: None At this moment, we just build the basic structure for testing, will add more function along with the development. diff --git a/octavia/amphorae/drivers/haproxy/rest_api_driver.py b/octavia/amphorae/drivers/haproxy/rest_api_driver.py index ff9e08cfdf..ef141b556f 100644 --- a/octavia/amphorae/drivers/haproxy/rest_api_driver.py +++ b/octavia/amphorae/drivers/haproxy/rest_api_driver.py @@ -60,6 +60,45 @@ class HaproxyAmphoraLoadBalancerDriver( haproxy_template=CONF.haproxy_amphora.haproxy_template, connection_logging=CONF.haproxy_amphora.connection_logging) + def update_amphora_listeners(self, listeners, amphora_index, + amphorae, timeout_dict=None): + """Update the amphora with a new configuration. + + :param listeners: List of listeners to update. + :type listener: list + :param amphora_id: The ID of the amphora to update + :type amphora_id: string + :param timeout_dict: Dictionary of timeout values for calls to the + amphora. May contain: req_conn_timeout, + req_read_timeout, conn_max_retries, + conn_retry_interval + :returns: None + + Updates the configuration of the listeners on a single amphora. + """ + # if the amphora does not yet have listeners, no need to update them. + if not listeners: + LOG.debug('No listeners found to update.') + return + amp = amphorae[amphora_index] + if amp is None or amp.status == consts.DELETED: + return + # TODO(johnsom) remove when we don't have a process per listener + for listener in listeners: + LOG.debug("%s updating listener %s on amphora %s", + self.__class__.__name__, listener.id, amp.id) + certs = self._process_tls_certificates(listener) + # Generate HaProxy configuration from listener object + config = self.jinja.build_config( + host_amphora=amp, + listener=listener, + tls_cert=certs['tls_cert'], + user_group=CONF.haproxy_amphora.user_group) + self.client.upload_config(amp, listener.id, config, + timeout_dict=timeout_dict) + self.client.reload_listener(amp, listener.id, + timeout_dict=timeout_dict) + def update(self, listener, vip): LOG.debug("Amphora %s haproxy, updating listener %s, vip %s", self.__class__.__name__, listener.protocol_port, @@ -85,25 +124,29 @@ class HaproxyAmphoraLoadBalancerDriver( self.__class__.__name__, amp.id) self.client.update_cert_for_rotation(amp, pem) - def _apply(self, func, listener=None, *args): - for amp in listener.load_balancer.amphorae: - if amp.status != consts.DELETED: - func(amp, listener.id, *args) + def _apply(self, func, listener=None, amphora=None, *args): + if amphora is None: + for amp in listener.load_balancer.amphorae: + if amp.status != consts.DELETED: + func(amp, listener.id, *args) + else: + if amphora.status != consts.DELETED: + func(amphora, listener.id, *args) def stop(self, listener, vip): self._apply(self.client.stop_listener, listener) - def start(self, listener, vip): - self._apply(self.client.start_listener, listener) + def start(self, listener, vip, amphora=None): + self._apply(self.client.start_listener, listener, amphora) def delete(self, listener, vip): self._apply(self.client.delete_listener, listener) def get_info(self, amphora): - self.driver.get_info(amphora.lb_network_ip) + return self.client.get_info(amphora) def get_diagnostics(self, amphora): - self.driver.get_diagnostics(amphora.lb_network_ip) + pass def finalize_amphora(self, amphora): pass @@ -186,7 +229,7 @@ class HaproxyAmphoraLoadBalancerDriver( pem = cert_parser.build_pem(cert) md5 = hashlib.md5(pem).hexdigest() # nosec name = '{id}.pem'.format(id=cert.id) - self._apply(self._upload_cert, listener, pem, md5, name) + self._apply(self._upload_cert, listener, None, pem, md5, name) return {'tls_cert': tls_cert, 'sni_certs': sni_certs} @@ -251,17 +294,27 @@ class AmphoraAPIClient(object): port=CONF.haproxy_amphora.bind_port, version=API_VERSION) - def request(self, method, amp, path='/', **kwargs): + def request(self, method, amp, path='/', timeout_dict=None, **kwargs): + cfg_ha_amp = CONF.haproxy_amphora + if timeout_dict is None: + timeout_dict = {} + req_conn_timeout = timeout_dict.get( + consts.REQ_CONN_TIMEOUT, cfg_ha_amp.rest_request_conn_timeout) + req_read_timeout = timeout_dict.get( + consts.REQ_READ_TIMEOUT, cfg_ha_amp.rest_request_read_timeout) + conn_max_retries = timeout_dict.get( + consts.CONN_MAX_RETRIES, cfg_ha_amp.connection_max_retries) + conn_retry_interval = timeout_dict.get( + consts.CONN_RETRY_INTERVAL, cfg_ha_amp.connection_retry_interval) + LOG.debug("request url %s", path) _request = getattr(self.session, method.lower()) _url = self._base_url(amp.lb_network_ip) + path LOG.debug("request url %s", _url) - timeout_tuple = (CONF.haproxy_amphora.rest_request_conn_timeout, - CONF.haproxy_amphora.rest_request_read_timeout) reqargs = { 'verify': CONF.haproxy_amphora.server_ca, 'url': _url, - 'timeout': timeout_tuple, } + 'timeout': (req_conn_timeout, req_read_timeout), } reqargs.update(kwargs) headers = reqargs.setdefault('headers', {}) @@ -269,7 +322,7 @@ class AmphoraAPIClient(object): self.ssl_adapter.uuid = amp.id exception = None # Keep retrying - for a in six.moves.xrange(CONF.haproxy_amphora.connection_max_retries): + for a in six.moves.xrange(conn_max_retries): try: with warnings.catch_warnings(): warnings.filterwarnings( @@ -303,20 +356,20 @@ class AmphoraAPIClient(object): except (requests.ConnectionError, requests.Timeout) as e: exception = e LOG.warning("Could not connect to instance. Retrying.") - time.sleep(CONF.haproxy_amphora.connection_retry_interval) + time.sleep(conn_retry_interval) LOG.error("Connection retries (currently set to %(max_retries)s) " "exhausted. The amphora is unavailable. Reason: " "%(exception)s", - {'max_retries': CONF.haproxy_amphora.connection_max_retries, + {'max_retries': conn_max_retries, 'exception': exception}) raise driver_except.TimeOutException() - def upload_config(self, amp, listener_id, config): + def upload_config(self, amp, listener_id, config, timeout_dict=None): r = self.put( amp, 'listeners/{amphora_id}/{listener_id}/haproxy'.format( - amphora_id=amp.id, listener_id=listener_id), + amphora_id=amp.id, listener_id=listener_id), timeout_dict, data=config) return exc.check_exception(r) @@ -328,9 +381,9 @@ class AmphoraAPIClient(object): return r.json() return None - def _action(self, action, amp, listener_id): + def _action(self, action, amp, listener_id, timeout_dict=None): r = self.put(amp, 'listeners/{listener_id}/{action}'.format( - listener_id=listener_id, action=action)) + listener_id=listener_id, action=action), timeout_dict=timeout_dict) return exc.check_exception(r) def upload_cert_pem(self, amp, listener_id, pem_filename, pem_file): @@ -403,8 +456,9 @@ class AmphoraAPIClient(object): r = self.put(amp, 'vrrp/{action}'.format(action=action)) return exc.check_exception(r) - def get_interface(self, amp, ip_addr): - r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr)) + def get_interface(self, amp, ip_addr, timeout_dict=None): + r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr), + timeout_dict=timeout_dict) if exc.check_exception(r): return r.json() return None diff --git a/octavia/amphorae/drivers/keepalived/vrrp_rest_driver.py b/octavia/amphorae/drivers/keepalived/vrrp_rest_driver.py index a7f6ed4129..a337b9173f 100644 --- a/octavia/amphorae/drivers/keepalived/vrrp_rest_driver.py +++ b/octavia/amphorae/drivers/keepalived/vrrp_rest_driver.py @@ -90,5 +90,6 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin): self.client.reload_vrrp(amp) - def get_vrrp_interface(self, amphora): - return self.client.get_interface(amphora, amphora.vrrp_ip)['interface'] + def get_vrrp_interface(self, amphora, timeout_dict=None): + return self.client.get_interface( + amphora, amphora.vrrp_ip, timeout_dict=timeout_dict)['interface'] diff --git a/octavia/amphorae/drivers/noop_driver/driver.py b/octavia/amphorae/drivers/noop_driver/driver.py index ea5dd3802a..8c641ae774 100644 --- a/octavia/amphorae/drivers/noop_driver/driver.py +++ b/octavia/amphorae/drivers/noop_driver/driver.py @@ -37,6 +37,14 @@ class NoopManager(object): super(NoopManager, self).__init__() self.amphoraconfig = {} + def update_amphora_listeners(self, listeners, amphora_id, timeout_dict): + for listener in listeners: + LOG.debug("Amphora noop driver update_amphora_listeners, " + "listener %s, amphora %s, timeouts %s", listener.id, + amphora_id, timeout_dict) + self.amphoraconfig[(listener.id, amphora_id)] = ( + listener, amphora_id, timeout_dict, "update_amp") + def update(self, listener, vip): LOG.debug("Amphora %s no-op, update listener %s, vip %s", self.__class__.__name__, listener.protocol_port, @@ -106,6 +114,11 @@ class NoopAmphoraLoadBalancerDriver( super(NoopAmphoraLoadBalancerDriver, self).__init__() self.driver = NoopManager() + def update_amphora_listeners(self, listeners, amphora_id, timeout_dict): + + self.driver.update_amphora_listeners(listeners, amphora_id, + timeout_dict) + def update(self, listener, vip): self.driver.update(listener, vip) diff --git a/octavia/common/config.py b/octavia/common/config.py index 28ba348a03..6a03c14790 100644 --- a/octavia/common/config.py +++ b/octavia/common/config.py @@ -240,6 +240,13 @@ haproxy_amphora_opts = [ default=5, help=_('Retry timeout between connection attempts in ' 'seconds.')), + cfg.IntOpt('active_connection_max_retries', + default=15, + help=_('Retry threshold for connecting to active amphorae.')), + cfg.IntOpt('active_connection_rety_interval', + default=2, + help=_('Retry timeout between connection attempts in ' + 'seconds for active amphora.')), cfg.IntOpt('build_rate_limit', default=-1, help=_('Number of amphorae that could be built per controller' diff --git a/octavia/common/constants.py b/octavia/common/constants.py index 7983a13904..1036346a72 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -195,6 +195,7 @@ FAILED_AMPHORA = 'failed_amphora' FAILOVER_AMPHORA = 'failover_amphora' AMPHORAE = 'amphorae' AMPHORA_ID = 'amphora_id' +AMPHORA_INDEX = 'amphora_index' FAILOVER_AMPHORA_ID = 'failover_amphora_id' DELTA = 'delta' DELTAS = 'deltas' @@ -240,6 +241,11 @@ MEMBER_UPDATES = 'member_updates' HEALTH_MONITOR_UPDATES = 'health_monitor_updates' L7POLICY_UPDATES = 'l7policy_updates' L7RULE_UPDATES = 'l7rule_updates' +TIMEOUT_DICT = 'timeout_dict' +REQ_CONN_TIMEOUT = 'req_conn_timeout' +REQ_READ_TIMEOUT = 'req_read_timeout' +CONN_MAX_RETRIES = 'conn_max_retries' +CONN_RETRY_INTERVAL = 'conn_retry_interval' CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow' CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow' @@ -273,6 +279,7 @@ UPDATE_MEMBER_FLOW = 'octavia-update-member-flow' UPDATE_POOL_FLOW = 'octavia-update-pool-flow' UPDATE_L7POLICY_FLOW = 'octavia-update-l7policy-flow' UPDATE_L7RULE_FLOW = 'octavia-update-l7rule-flow' +UPDATE_AMPS_SUBFLOW = 'octavia-update-amps-subflow' POST_MAP_AMP_TO_LB_SUBFLOW = 'octavia-post-map-amp-to-lb-subflow' CREATE_AMP_FOR_LB_SUBFLOW = 'octavia-create-amp-for-lb-subflow' @@ -306,6 +313,8 @@ AMP_VRRP_STOP = 'octavia-amphora-vrrp-stop' AMP_UPDATE_VRRP_INTF = 'octavia-amphora-update-vrrp-intf' CREATE_VRRP_GROUP_FOR_LB = 'octavia-create-vrrp-group-for-lb' CREATE_VRRP_SECURITY_RULES = 'octavia-create-vrrp-security-rules' +AMP_COMPUTE_CONNECTIVITY_WAIT = 'octavia-amp-compute-connectivity-wait' +AMP_LISTENER_UPDATE = 'octavia-amp-listeners-update' GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask' diff --git a/octavia/controller/worker/controller_worker.py b/octavia/controller/worker/controller_worker.py index d5775116e4..531576a948 100644 --- a/octavia/controller/worker/controller_worker.py +++ b/octavia/controller/worker/controller_worker.py @@ -747,15 +747,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): # if we run with anti-affinity we need to set the server group # as well - if CONF.nova.enable_anti_affinity: - lb = self._amphora_repo.get_lb_for_amphora( - db_apis.get_session(), amp.id) - if lb: - stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id + lb = self._amphora_repo.get_lb_for_amphora( + db_apis.get_session(), amp.id) + if CONF.nova.enable_anti_affinity and lb: + stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id failover_amphora_tf = self._taskflow_load( self._amphora_flows.get_failover_flow( - role=amp.role, load_balancer_id=amp.load_balancer_id), + role=amp.role, load_balancer=lb), store=stored_params) with tf_logging.DynamicLoggingListener( diff --git a/octavia/controller/worker/flows/amphora_flows.py b/octavia/controller/worker/flows/amphora_flows.py index e2b5bedf23..6a3d9056ed 100644 --- a/octavia/controller/worker/flows/amphora_flows.py +++ b/octavia/controller/worker/flows/amphora_flows.py @@ -16,6 +16,7 @@ from oslo_config import cfg from taskflow.patterns import graph_flow from taskflow.patterns import linear_flow +from taskflow.patterns import unordered_flow from octavia.common import constants from octavia.controller.worker.tasks import amphora_driver_tasks @@ -62,12 +63,15 @@ class AmphoraFlows(object): provides=constants.COMPUTE_ID)) create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB( requires=(constants.AMPHORA_ID, constants.COMPUTE_ID))) - create_amphora_flow.add(compute_tasks.ComputeWait( + create_amphora_flow.add(compute_tasks.ComputeActiveWait( requires=(constants.COMPUTE_ID, constants.AMPHORA_ID), provides=constants.COMPUTE_OBJ)) create_amphora_flow.add(database_tasks.UpdateAmphoraInfo( requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), provides=constants.AMPHORA)) + create_amphora_flow.add( + amphora_driver_tasks.AmphoraComputeConnectivityWait( + requires=constants.AMPHORA)) create_amphora_flow.add(database_tasks.ReloadAmphora( requires=constants.AMPHORA_ID, provides=constants.AMPHORA)) @@ -172,7 +176,7 @@ class AmphoraFlows(object): create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraBootingInDB( name=sf_name + '-' + constants.MARK_AMPHORA_BOOTING_INDB, requires=(constants.AMPHORA_ID, constants.COMPUTE_ID))) - create_amp_for_lb_subflow.add(compute_tasks.ComputeWait( + create_amp_for_lb_subflow.add(compute_tasks.ComputeActiveWait( name=sf_name + '-' + constants.COMPUTE_WAIT, requires=(constants.COMPUTE_ID, constants.AMPHORA_ID), provides=constants.COMPUTE_OBJ)) @@ -180,6 +184,10 @@ class AmphoraFlows(object): name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO, requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), provides=constants.AMPHORA)) + create_amp_for_lb_subflow.add( + amphora_driver_tasks.AmphoraComputeConnectivityWait( + name=sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT, + requires=constants.AMPHORA)) create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize( name=sf_name + '-' + constants.AMPHORA_FINALIZE, requires=constants.AMPHORA)) @@ -290,7 +298,7 @@ class AmphoraFlows(object): return delete_amphora_flow def get_failover_flow(self, role=constants.ROLE_STANDALONE, - load_balancer_id=None): + load_balancer=None): """Creates a flow to failover a stale amphora :returns: The flow for amphora failover @@ -334,7 +342,7 @@ class AmphoraFlows(object): requires=constants.AMPHORA)) # If this is an unallocated amp (spares pool), we're done - if not load_balancer_id: + if not load_balancer: failover_amphora_flow.add( database_tasks.DisableAmphoraHealthMonitoring( rebind={constants.AMPHORA: constants.FAILED_AMPHORA}, @@ -373,9 +381,38 @@ class AmphoraFlows(object): provides=constants.AMPHORAE_NETWORK_CONFIG)) failover_amphora_flow.add(database_tasks.GetListenersFromLoadbalancer( requires=constants.LOADBALANCER, provides=constants.LISTENERS)) + failover_amphora_flow.add(database_tasks.GetAmphoraeFromLoadbalancer( + requires=constants.LOADBALANCER, provides=constants.AMPHORAE)) - failover_amphora_flow.add(amphora_driver_tasks.ListenersUpdate( - requires=(constants.LOADBALANCER, constants.LISTENERS))) + # Listeners update needs to be run on all amphora to update + # their peer configurations. So parallelize this with an + # unordered subflow. + update_amps_subflow = unordered_flow.Flow( + constants.UPDATE_AMPS_SUBFLOW) + + timeout_dict = { + constants.CONN_MAX_RETRIES: + CONF.haproxy_amphora.active_connection_max_retries, + constants.CONN_RETRY_INTERVAL: + CONF.haproxy_amphora.active_connection_rety_interval} + + # Setup parallel flows for each amp. We don't know the new amp + # details at flow creation time, so setup a subflow for each + # amp on the LB, they let the task index into a list of amps + # to find the amphora it should work on. + amp_index = 0 + for amp in load_balancer.amphorae: + if amp.status == constants.DELETED: + continue + update_amps_subflow.add( + amphora_driver_tasks.AmpListenersUpdate( + name=constants.AMP_LISTENER_UPDATE + '-' + str(amp_index), + requires=(constants.LISTENERS, constants.AMPHORAE), + inject={constants.AMPHORA_INDEX: amp_index, + constants.TIMEOUT_DICT: timeout_dict})) + amp_index += 1 + + failover_amphora_flow.add(update_amps_subflow) # Plug the VIP ports into the new amphora failover_amphora_flow.add(network_tasks.PlugVIPPort( @@ -385,13 +422,22 @@ class AmphoraFlows(object): constants.AMPHORAE_NETWORK_CONFIG))) # Plug the member networks into the new amphora - failover_amphora_flow.add(network_tasks.CalculateDelta( - requires=constants.LOADBALANCER, provides=constants.DELTAS)) - failover_amphora_flow.add(network_tasks.HandleNetworkDeltas( - requires=constants.DELTAS, provides=constants.ADDED_PORTS)) + failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta( + requires=(constants.LOADBALANCER, constants.AMPHORA), + provides=constants.DELTA)) + + failover_amphora_flow.add(network_tasks.HandleNetworkDelta( + requires=(constants.AMPHORA, constants.DELTA), + provides=constants.ADDED_PORTS)) + failover_amphora_flow.add(amphora_driver_tasks.AmphoraePostNetworkPlug( requires=(constants.LOADBALANCER, constants.ADDED_PORTS))) + failover_amphora_flow.add(database_tasks.ReloadLoadBalancer( + name='octavia-failover-LB-reload-2', + requires=constants.LOADBALANCER_ID, + provides=constants.LOADBALANCER)) + # Handle the amphora role and VRRP if necessary if role == constants.ROLE_MASTER: failover_amphora_flow.add(database_tasks.MarkAmphoraMasterInDB( @@ -412,7 +458,8 @@ class AmphoraFlows(object): requires=constants.AMPHORA)) failover_amphora_flow.add(amphora_driver_tasks.ListenersStart( - requires=(constants.LOADBALANCER, constants.LISTENERS))) + requires=(constants.LOADBALANCER, constants.LISTENERS, + constants.AMPHORA))) failover_amphora_flow.add( database_tasks.DisableAmphoraHealthMonitoring( rebind={constants.AMPHORA: constants.FAILED_AMPHORA}, diff --git a/octavia/controller/worker/tasks/amphora_driver_tasks.py b/octavia/controller/worker/tasks/amphora_driver_tasks.py index 097ca2eaa0..66122554c6 100644 --- a/octavia/controller/worker/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/tasks/amphora_driver_tasks.py @@ -20,6 +20,7 @@ from stevedore import driver as stevedore_driver from taskflow import task from taskflow.types import failure +from octavia.amphorae.driver_exceptions import exceptions as driver_except from octavia.common import constants from octavia.controller.worker import task_utils as task_utilities from octavia.db import api as db_apis @@ -45,6 +46,25 @@ class BaseAmphoraTask(task.Task): self.task_utils = task_utilities.TaskUtils() +class AmpListenersUpdate(BaseAmphoraTask): + """Task to update the listeners on one amphora.""" + + def execute(self, listeners, amphora_index, amphorae, timeout_dict=()): + # Note, we don't want this to cause a revert as it may be used + # in a failover flow with both amps failing. Skip it and let + # health manager fix it. + try: + self.amphora_driver.update_amphora_listeners( + listeners, amphora_index, amphorae, timeout_dict) + except Exception as e: + amphora_id = amphorae[amphora_index].id + LOG.error('Failed to update listeners on amphora %s. Skipping ' + 'this amphora as it is failing to update due to: %s', + amphora_id, str(e)) + self.amphora_repo.update(db_apis.get_session(), amphora_id, + status=constants.ERROR) + + class ListenersUpdate(BaseAmphoraTask): """Task to update amphora with all specified listeners' configurations.""" @@ -104,10 +124,10 @@ class ListenerStart(BaseAmphoraTask): class ListenersStart(BaseAmphoraTask): """Task to start all listeners on the vip.""" - def execute(self, loadbalancer, listeners): + def execute(self, loadbalancer, listeners, amphora=None): """Execute listener start routines for listeners on an amphora.""" for listener in listeners: - self.amphora_driver.start(listener, loadbalancer.vip) + self.amphora_driver.start(listener, loadbalancer.vip, amphora) LOG.debug("Started the listeners on the vip") def revert(self, listeners, *args, **kwargs): @@ -261,11 +281,27 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask): def execute(self, loadbalancer): """Execute post_vip_routine.""" amps = [] + timeout_dict = { + constants.CONN_MAX_RETRIES: + CONF.haproxy_amphora.active_connection_max_retries, + constants.CONN_RETRY_INTERVAL: + CONF.haproxy_amphora.active_connection_rety_interval} for amp in six.moves.filter( lambda amp: amp.status == constants.AMPHORA_ALLOCATED, loadbalancer.amphorae): - # Currently this is supported only with REST Driver - interface = self.amphora_driver.get_vrrp_interface(amp) + + try: + interface = self.amphora_driver.get_vrrp_interface( + amp, timeout_dict=timeout_dict) + except Exception as e: + # This can occur when an active/standby LB has no listener + LOG.error('Failed to get amphora VRRP interface on amphora ' + '%s. Skipping this amphora as it is failing due to: ' + '%s', amp.id, str(e)) + self.amphora_repo.update(db_apis.get_session(), amp.id, + status=constants.ERROR) + continue + self.amphora_repo.update(db_apis.get_session(), amp.id, vrrp_interface=interface) amps.append(self.amphora_repo.get(db_apis.get_session(), @@ -317,3 +353,22 @@ class AmphoraVRRPStart(BaseAmphoraTask): self.amphora_driver.start_vrrp_service(loadbalancer) LOG.debug("Started VRRP of loadbalancer %s amphorae", loadbalancer.id) + + +class AmphoraComputeConnectivityWait(BaseAmphoraTask): + """"Task to wait for the compute instance to be up.""" + + def execute(self, amphora): + """Execute get_info routine for an amphora until it responds.""" + try: + amp_info = self.amphora_driver.get_info(amphora) + LOG.debug('Successfuly connected to amphora %s: %s', + amphora.id, amp_info) + except driver_except.TimeOutException: + LOG.error("Amphora compute instance failed to become reachable. " + "This either means the compute driver failed to fully " + "boot the instance inside the timeout interval or the " + "instance is not reachable via the lb-mgmt-net.") + self.amphora_repo.update(db_apis.get_session(), amphora.id, + status=constants.ERROR) + raise diff --git a/octavia/controller/worker/tasks/compute_tasks.py b/octavia/controller/worker/tasks/compute_tasks.py index cd0fb3c6de..92f88dd669 100644 --- a/octavia/controller/worker/tasks/compute_tasks.py +++ b/octavia/controller/worker/tasks/compute_tasks.py @@ -175,7 +175,7 @@ class ComputeDelete(BaseComputeTask): raise -class ComputeWait(BaseComputeTask): +class ComputeActiveWait(BaseComputeTask): """Wait for the compute driver to mark the amphora active.""" def execute(self, compute_id, amphora_id): diff --git a/octavia/controller/worker/tasks/database_tasks.py b/octavia/controller/worker/tasks/database_tasks.py index e2a45be386..353407447e 100644 --- a/octavia/controller/worker/tasks/database_tasks.py +++ b/octavia/controller/worker/tasks/database_tasks.py @@ -1525,6 +1525,25 @@ class GetAmphoraDetails(BaseDatabaseTask): vrrp_priority=amphora.vrrp_priority) +class GetAmphoraeFromLoadbalancer(BaseDatabaseTask): + """Task to pull the listeners from a loadbalancer.""" + + def execute(self, loadbalancer): + """Pull the amphorae from a loadbalancer. + + :param loadbalancer: Load balancer which listeners are required + :returns: A list of Listener objects + """ + amphorae = [] + for amp in loadbalancer.amphorae: + a = self.amphora_repo.get(db_apis.get_session(), id=amp.id, + show_deleted=False) + if a is None: + continue + amphorae.append(a) + return amphorae + + class GetListenersFromLoadbalancer(BaseDatabaseTask): """Task to pull the listeners from a loadbalancer.""" @@ -1537,6 +1556,7 @@ class GetListenersFromLoadbalancer(BaseDatabaseTask): listeners = [] for listener in loadbalancer.listeners: l = self.listener_repo.get(db_apis.get_session(), id=listener.id) + l.load_balancer = loadbalancer listeners.append(l) return listeners diff --git a/octavia/controller/worker/tasks/network_tasks.py b/octavia/controller/worker/tasks/network_tasks.py index 697fb5cd33..e9b017723d 100644 --- a/octavia/controller/worker/tasks/network_tasks.py +++ b/octavia/controller/worker/tasks/network_tasks.py @@ -215,6 +215,55 @@ class GetMemberPorts(BaseNetworkTask): return member_ports +class HandleNetworkDelta(BaseNetworkTask): + """Task to plug and unplug networks + + Plug or unplug networks based on delta + """ + + def execute(self, amphora, delta): + """Handle network plugging based off deltas.""" + added_ports = {} + added_ports[amphora.id] = [] + for nic in delta.add_nics: + interface = self.network_driver.plug_network(delta.compute_id, + nic.network_id) + port = self.network_driver.get_port(interface.port_id) + port.network = self.network_driver.get_network(port.network_id) + for fixed_ip in port.fixed_ips: + fixed_ip.subnet = self.network_driver.get_subnet( + fixed_ip.subnet_id) + added_ports[amphora.id].append(port) + for nic in delta.delete_nics: + try: + self.network_driver.unplug_network(delta.compute_id, + nic.network_id) + except base.NetworkNotFound: + LOG.debug("Network %d not found ", nic.network_id) + except Exception: + LOG.exception("Unable to unplug network") + return added_ports + + def revert(self, result, amphora, delta, *args, **kwargs): + """Handle a network plug or unplug failures.""" + + if isinstance(result, failure.Failure): + return + + if not delta: + return + + LOG.warning("Unable to plug networks for amp id %s", + delta.amphora_id) + + for nic in delta.add_nics: + try: + self.network_driver.unplug_network(delta.compute_id, + nic.network_id) + except Exception: + pass + + class HandleNetworkDeltas(BaseNetworkTask): """Task to plug and unplug networks diff --git a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py index 1b32f143bf..35992a38e2 100644 --- a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py +++ b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py @@ -24,6 +24,7 @@ import six from octavia.amphorae.driver_exceptions import exceptions as driver_except from octavia.amphorae.drivers.haproxy import exceptions as exc from octavia.amphorae.drivers.haproxy import rest_api_driver as driver +from octavia.common import constants from octavia.db import models from octavia.network import data_models as network_models from octavia.tests.unit import base @@ -47,6 +48,9 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase): def setUp(self): super(TestHaproxyAmphoraLoadBalancerDriverTest, self).setUp() + conf = oslo_fixture.Config(cfg.CONF) + conf.config(group="haproxy_amphora", user_group="everyone") + DEST1 = '198.51.100.0/24' DEST2 = '203.0.113.0/24' NEXTHOP = '192.0.2.1' @@ -84,6 +88,50 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase): 'mtu': FAKE_MTU, 'host_routes': host_routes_data} + self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1, + constants.REQ_READ_TIMEOUT: 2, + constants.CONN_MAX_RETRIES: 3, + constants.CONN_RETRY_INTERVAL: 4} + + @mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data') + def test_update_amphora_listeners(self, mock_load_cert): + mock_amphora = mock.MagicMock() + mock_amphora.id = uuidutils.generate_uuid() + mock_listener = mock.MagicMock() + mock_listener.id = uuidutils.generate_uuid() + mock_load_cert.return_value = {'tls_cert': None, 'sni_certs': []} + self.driver.jinja.build_config.return_value = 'the_config' + + self.driver.update_amphora_listeners(None, 1, [], + self.timeout_dict) + mock_load_cert.assert_not_called() + self.driver.jinja.build_config.assert_not_called() + self.driver.client.upload_config.assert_not_called() + self.driver.client.reload_listener.assert_not_called() + + self.driver.update_amphora_listeners([mock_listener], 0, + [mock_amphora], self.timeout_dict) + self.driver.jinja.build_config.assert_called_once_with( + host_amphora=mock_amphora, listener=mock_listener, + tls_cert=None, user_group="everyone") + self.driver.client.upload_config.assert_called_once_with( + mock_amphora, mock_listener.id, 'the_config', + timeout_dict=self.timeout_dict) + self.driver.client.reload_listener(mock_amphora, mock_listener.id, + timeout_dict=self.timeout_dict) + + mock_load_cert.reset_mock() + self.driver.jinja.build_config.reset_mock() + self.driver.client.upload_config.reset_mock() + self.driver.client.reload_listener.reset_mock() + mock_amphora.status = constants.DELETED + self.driver.update_amphora_listeners([mock_listener], 0, + [mock_amphora], self.timeout_dict) + mock_load_cert.assert_not_called() + self.driver.jinja.build_config.assert_not_called() + self.driver.client.upload_config.assert_not_called() + self.driver.client.reload_listener.assert_not_called() + @mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data') @mock.patch('octavia.common.tls_utils.cert_parser.get_host_names') def test_update(self, mock_cert, mock_load_crt): @@ -157,11 +205,29 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase): self.amp, self.sl.id) def test_start(self): + amp1 = mock.MagicMock() + amp2 = mock.MagicMock() + amp2.status = constants.DELETED + listener = mock.MagicMock() + listener.id = uuidutils.generate_uuid() + listener.load_balancer.amphorae = [amp1, amp2] # Execute driver method - self.driver.start(self.sl, self.sv) + self.driver.start(listener, self.sv) + self.driver.client.start_listener.assert_called_once_with( + amp1, listener.id) + + def test_start_with_amphora(self): + # Execute driver method + amp = mock.MagicMock() + self.driver.start(self.sl, self.sv, self.amp) self.driver.client.start_listener.assert_called_once_with( self.amp, self.sl.id) + self.driver.client.start_listener.reset_mock() + amp.status = constants.DELETED + self.driver.start(self.sl, self.sv, amp) + self.driver.client.start_listener.assert_not_called() + def test_delete(self): # Execute driver method self.driver.delete(self.sl, self.sv) @@ -169,13 +235,19 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase): self.amp, self.sl.id) def test_get_info(self): - pass + self.driver.client.get_info.return_value = 'FAKE_INFO' + result = self.driver.get_info(self.amp) + self.assertEqual('FAKE_INFO', result) def test_get_diagnostics(self): - pass + # TODO(johnsom) Implement once this exists on the amphora agent. + result = self.driver.get_diagnostics(self.amp) + self.assertIsNone(result) def test_finalize_amphora(self): - pass + # TODO(johnsom) Implement once this exists on the amphora agent. + result = self.driver.finalize_amphora(self.amp) + self.assertIsNone(result) def test_post_vip_plug(self): amphorae_network_config = mock.MagicMock() @@ -249,7 +321,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase): def test_get_vrrp_interface(self): self.driver.get_vrrp_interface(self.amp) self.driver.client.get_interface.assert_called_once_with( - self.amp, self.amp.vrrp_ip) + self.amp, self.amp.vrrp_ip, timeout_dict=None) class TestAmphoraAPIClientTest(base.TestCase): @@ -270,6 +342,10 @@ class TestAmphoraAPIClientTest(base.TestCase): 'vrrp_ip': self.amp.vrrp_ip} patcher = mock.patch('time.sleep').start() self.addCleanup(patcher.stop) + self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1, + constants.REQ_READ_TIMEOUT: 2, + constants.CONN_MAX_RETRIES: 3, + constants.CONN_RETRY_INTERVAL: 4} def test_base_url(self): url = self.driver._base_url(FAKE_IP) @@ -283,8 +359,8 @@ class TestAmphoraAPIClientTest(base.TestCase): @mock.patch('octavia.amphorae.drivers.haproxy.rest_api_driver.time.sleep') def test_request(self, mock_sleep, mock_get): self.assertRaises(driver_except.TimeOutException, - self.driver.request, - 'get', self.amp, 'unavailableURL') + self.driver.request, 'get', self.amp, + 'unavailableURL', self.timeout_dict) @requests_mock.mock() def test_get_info(self, m): diff --git a/octavia/tests/unit/amphorae/drivers/test_noop_amphoraloadbalancer_driver.py b/octavia/tests/unit/amphorae/drivers/test_noop_amphoraloadbalancer_driver.py index 35914b73b3..8abacbb203 100644 --- a/octavia/tests/unit/amphorae/drivers/test_noop_amphoraloadbalancer_driver.py +++ b/octavia/tests/unit/amphorae/drivers/test_noop_amphoraloadbalancer_driver.py @@ -15,6 +15,7 @@ from oslo_utils import uuidutils from octavia.amphorae.drivers.noop_driver import driver +from octavia.common import constants from octavia.common import data_models from octavia.network import data_models as network_models from octavia.tests.unit import base @@ -44,6 +45,7 @@ class TestNoopAmphoraLoadBalancerDriver(base.TestCase): super(TestNoopAmphoraLoadBalancerDriver, self).setUp() self.driver = driver.NoopAmphoraLoadBalancerDriver() self.listener = data_models.Listener() + self.listener.id = uuidutils.generate_uuid() self.listener.protocol_port = 80 self.vip = data_models.Vip() self.vip.ip_address = "10.0.0.1" @@ -61,6 +63,19 @@ class TestNoopAmphoraLoadBalancerDriver(base.TestCase): vip_subnet=network_models.Subnet(id=self.FAKE_UUID_1)) } self.pem_file = 'test_pem_file' + self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1, + constants.REQ_READ_TIMEOUT: 2, + constants.CONN_MAX_RETRIES: 3, + constants.CONN_RETRY_INTERVAL: 4} + + def test_update_amphora_listeners(self): + self.driver.update_amphora_listeners([self.listener], self.amphora.id, + self.timeout_dict) + self.assertEqual((self.listener, self.amphora.id, self.timeout_dict, + 'update_amp'), + self.driver.driver.amphoraconfig[( + self.listener.id, + self.amphora.id)]) def test_update(self): self.driver.update(self.listener, self.vip) diff --git a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py index e700c8c98a..5d5a293468 100644 --- a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py @@ -19,6 +19,7 @@ from oslo_config import fixture as oslo_fixture from taskflow.patterns import linear_flow as flow from octavia.common import constants +from octavia.common import data_models from octavia.controller.worker.flows import amphora_flows import octavia.tests.unit.base as base @@ -38,6 +39,11 @@ class TestAmphoraFlows(base.TestCase): amphora_driver='amphora_haproxy_rest_driver') self.conf.config(group="nova", enable_anti_affinity=False) self.AmpFlow = amphora_flows.AmphoraFlows() + self.amp1 = data_models.Amphora(id=1) + self.amp2 = data_models.Amphora(id=2) + self.amp3 = data_models.Amphora(id=3, status=constants.DELETED) + self.lb = data_models.LoadBalancer( + id=4, amphorae=[self.amp1, self.amp2, self.amp3]) def test_get_create_amphora_flow(self, mock_get_net_driver): @@ -237,7 +243,7 @@ class TestAmphoraFlows(base.TestCase): def test_get_failover_flow_allocated(self, mock_get_net_driver): amp_flow = self.AmpFlow.get_failover_flow( - load_balancer_id='mylb') + load_balancer=self.lb) self.assertIsInstance(amp_flow, flow.Flow) @@ -254,10 +260,10 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.LOADBALANCER, amp_flow.provides) self.assertEqual(3, len(amp_flow.requires)) - self.assertEqual(11, len(amp_flow.provides)) + self.assertEqual(12, len(amp_flow.provides)) amp_flow = self.AmpFlow.get_failover_flow( - role=constants.ROLE_MASTER, load_balancer_id='mylb') + role=constants.ROLE_MASTER, load_balancer=self.lb) self.assertIsInstance(amp_flow, flow.Flow) @@ -274,10 +280,10 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.LOADBALANCER, amp_flow.provides) self.assertEqual(3, len(amp_flow.requires)) - self.assertEqual(11, len(amp_flow.provides)) + self.assertEqual(12, len(amp_flow.provides)) amp_flow = self.AmpFlow.get_failover_flow( - role=constants.ROLE_BACKUP, load_balancer_id='mylb') + role=constants.ROLE_BACKUP, load_balancer=self.lb) self.assertIsInstance(amp_flow, flow.Flow) @@ -294,10 +300,10 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.LOADBALANCER, amp_flow.provides) self.assertEqual(3, len(amp_flow.requires)) - self.assertEqual(11, len(amp_flow.provides)) + self.assertEqual(12, len(amp_flow.provides)) amp_flow = self.AmpFlow.get_failover_flow( - role='BOGUSROLE', load_balancer_id='mylb') + role='BOGUSROLE', load_balancer=self.lb) self.assertIsInstance(amp_flow, flow.Flow) @@ -314,12 +320,11 @@ class TestAmphoraFlows(base.TestCase): self.assertIn(constants.LOADBALANCER, amp_flow.provides) self.assertEqual(3, len(amp_flow.requires)) - self.assertEqual(11, len(amp_flow.provides)) + self.assertEqual(12, len(amp_flow.provides)) def test_get_failover_flow_spare(self, mock_get_net_driver): - amp_flow = self.AmpFlow.get_failover_flow( - load_balancer_id=None) + amp_flow = self.AmpFlow.get_failover_flow() self.assertIsInstance(amp_flow, flow.Flow) diff --git a/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py index a80ec911bd..6e30a740b7 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py @@ -14,9 +14,12 @@ # import mock +from oslo_config import cfg +from oslo_config import fixture as oslo_fixture from oslo_utils import uuidutils from taskflow.types import failure +from octavia.amphorae.driver_exceptions import exceptions as driver_except from octavia.common import constants from octavia.common import data_models from octavia.controller.worker.tasks import amphora_driver_tasks @@ -28,6 +31,8 @@ AMP_ID = uuidutils.generate_uuid() COMPUTE_ID = uuidutils.generate_uuid() LISTENER_ID = uuidutils.generate_uuid() LB_ID = uuidutils.generate_uuid() +CONN_MAX_RETRIES = 10 +CONN_RETRY_INTERVAL = 6 _amphora_mock = mock.MagicMock() _amphora_mock.id = AMP_ID @@ -61,8 +66,42 @@ class TestAmphoraDriverTasks(base.TestCase): _LB_mock.amphorae = [_amphora_mock] _LB_mock.id = LB_ID + conf = oslo_fixture.Config(cfg.CONF) + conf.config(group="haproxy_amphora", + active_connection_max_retries=CONN_MAX_RETRIES) + conf.config(group="haproxy_amphora", + active_connection_rety_interval=CONN_RETRY_INTERVAL) super(TestAmphoraDriverTasks, self).setUp() + def test_amp_listener_update(self, + mock_driver, + mock_generate_uuid, + mock_log, + mock_get_session, + mock_listener_repo_get, + mock_listener_repo_update, + mock_amphora_repo_update): + + timeout_dict = {constants.REQ_CONN_TIMEOUT: 1, + constants.REQ_READ_TIMEOUT: 2, + constants.CONN_MAX_RETRIES: 3, + constants.CONN_RETRY_INTERVAL: 4} + + amp_list_update_obj = amphora_driver_tasks.AmpListenersUpdate() + amp_list_update_obj.execute([_listener_mock], 0, + [_amphora_mock], timeout_dict) + + mock_driver.update_amphora_listeners.assert_called_once_with( + [_listener_mock], 0, [_amphora_mock], timeout_dict) + + mock_driver.update_amphora_listeners.side_effect = Exception('boom') + + amp_list_update_obj.execute([_listener_mock], 0, + [_amphora_mock], timeout_dict) + + mock_amphora_repo_update.assert_called_once_with( + _session_mock, AMP_ID, status=constants.ERROR) + def test_listener_update(self, mock_driver, mock_generate_uuid, @@ -480,10 +519,15 @@ class TestAmphoraDriverTasks(base.TestCase): mock_listener_repo_update, mock_amphora_repo_update): _LB_mock.amphorae = _amphorae_mock + + timeout_dict = {constants.CONN_MAX_RETRIES: CONN_MAX_RETRIES, + constants.CONN_RETRY_INTERVAL: CONN_RETRY_INTERVAL} + amphora_update_vrrp_interface_obj = ( amphora_driver_tasks.AmphoraUpdateVRRPInterface()) amphora_update_vrrp_interface_obj.execute(_LB_mock) - mock_driver.get_vrrp_interface.assert_called_once_with(_amphora_mock) + mock_driver.get_vrrp_interface.assert_called_once_with( + _amphora_mock, timeout_dict=timeout_dict) # Test revert mock_driver.reset_mock() @@ -550,3 +594,22 @@ class TestAmphoraDriverTasks(base.TestCase): amphora_driver_tasks.AmphoraVRRPStart()) amphora_vrrp_start_obj.execute(_LB_mock) mock_driver.start_vrrp_service.assert_called_once_with(_LB_mock) + + def test_amphora_compute_connectivity_wait(self, + mock_driver, + mock_generate_uuid, + mock_log, + mock_get_session, + mock_listener_repo_get, + mock_listener_repo_update, + mock_amphora_repo_update): + amp_compute_conn_wait_obj = ( + amphora_driver_tasks.AmphoraComputeConnectivityWait()) + amp_compute_conn_wait_obj.execute(_amphora_mock) + mock_driver.get_info.assert_called_once_with(_amphora_mock) + + mock_driver.get_info.side_effect = driver_except.TimeOutException() + self.assertRaises(driver_except.TimeOutException, + amp_compute_conn_wait_obj.execute, _amphora_mock) + mock_amphora_repo_update.assert_called_once_with( + _session_mock, AMP_ID, status=constants.ERROR) diff --git a/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py index b08b3a5223..e9bf6bf0bc 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py @@ -339,7 +339,7 @@ class TestComputeTasks(base.TestCase): mock_driver.get_amphora.return_value = _amphora_mock, None - computewait = compute_tasks.ComputeWait() + computewait = compute_tasks.ComputeActiveWait() computewait.execute(COMPUTE_ID, AMPHORA_ID) mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID) @@ -366,7 +366,7 @@ class TestComputeTasks(base.TestCase): mock_driver.get_amphora.return_value = _amphora_mock, None - computewait = compute_tasks.ComputeWait() + computewait = compute_tasks.ComputeActiveWait() computewait.execute(COMPUTE_ID, AMPHORA_ID) mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID) @@ -391,7 +391,7 @@ class TestComputeTasks(base.TestCase): mock_driver.get_amphora.return_value = _amphora_mock, None - computewait = compute_tasks.ComputeWait() + computewait = compute_tasks.ComputeActiveWait() computewait.execute(COMPUTE_ID, AMPHORA_ID) mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID) diff --git a/octavia/tests/unit/controller/worker/tasks/test_database_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_database_tasks.py index 308183c090..0a2296f3a6 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_database_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_database_tasks.py @@ -1766,6 +1766,59 @@ class TestDatabaseTasks(base.TestCase): repo.AmphoraRepository.update.assert_called_once_with( 'TEST', AMP_ID, role=None, vrrp_priority=None) + @mock.patch('octavia.db.repositories.AmphoraRepository.get') + def test_get_amphorae_from_loadbalancer(self, + mock_amphora_get, + mock_generate_uuid, + mock_LOG, + mock_get_session, + mock_loadbalancer_repo_update, + mock_listener_repo_update, + mock_amphora_repo_update, + mock_amphora_repo_delete): + amp1 = mock.MagicMock() + amp1.id = uuidutils.generate_uuid() + amp2 = mock.MagicMock() + amp2.id = uuidutils.generate_uuid() + lb = mock.MagicMock() + lb.amphorae = [amp1, amp2] + + mock_amphora_get.side_effect = [_amphora_mock, None] + + get_amps_from_lb_obj = database_tasks.GetAmphoraeFromLoadbalancer() + result = get_amps_from_lb_obj.execute(lb) + self.assertEqual([_amphora_mock], result) + + @mock.patch('octavia.db.repositories.ListenerRepository.get') + def test_get_listeners_from_loadbalancer(self, + mock_listener_get, + mock_generate_uuid, + mock_LOG, + mock_get_session, + mock_loadbalancer_repo_update, + mock_listener_repo_update, + mock_amphora_repo_update, + mock_amphora_repo_delete): + mock_listener_get.return_value = _listener_mock + _loadbalancer_mock.listeners = [_listener_mock] + get_list_from_lb_obj = database_tasks.GetListenersFromLoadbalancer() + result = get_list_from_lb_obj.execute(_loadbalancer_mock) + mock_listener_get.assert_called_once_with('TEST', id=_listener_mock.id) + self.assertEqual([_listener_mock], result) + + def test_get_vip_from_loadbalancer(self, + mock_generate_uuid, + mock_LOG, + mock_get_session, + mock_loadbalancer_repo_update, + mock_listener_repo_update, + mock_amphora_repo_update, + mock_amphora_repo_delete): + _loadbalancer_mock.vip = _vip_mock + get_vip_from_lb_obj = database_tasks.GetVipFromLoadbalancer() + result = get_vip_from_lb_obj.execute(_loadbalancer_mock) + self.assertEqual(_vip_mock, result) + @mock.patch('octavia.db.repositories.VRRPGroupRepository.create') def test_create_vrrp_group_for_lb(self, mock_vrrp_group_create, diff --git a/octavia/tests/unit/controller/worker/tasks/test_network_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_network_tasks.py index 55e30723cf..53c58ec816 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_network_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_network_tasks.py @@ -17,6 +17,7 @@ import mock from oslo_config import cfg from oslo_config import fixture as oslo_fixture from oslo_utils import uuidutils +from taskflow.types import failure from octavia.common import constants from octavia.common import data_models as o_data_models @@ -296,6 +297,69 @@ class TestNetworkTasks(base.TestCase): self.assertEqual([port_mock], ports) def test_handle_network_delta(self, mock_get_net_driver): + mock_net_driver = mock.MagicMock() + mock_get_net_driver.return_value = mock_net_driver + + nic1 = mock.MagicMock() + nic1.network_id = uuidutils.generate_uuid() + nic2 = mock.MagicMock() + nic2.network_id = uuidutils.generate_uuid() + interface1 = mock.MagicMock() + interface1.port_id = uuidutils.generate_uuid() + port1 = mock.MagicMock() + port1.network_id = uuidutils.generate_uuid() + fixed_ip = mock.MagicMock() + fixed_ip.subnet_id = uuidutils.generate_uuid() + port1.fixed_ips = [fixed_ip] + subnet = mock.MagicMock() + network = mock.MagicMock() + + delta = data_models.Delta(amphora_id=self.amphora_mock.id, + compute_id=self.amphora_mock.compute_id, + add_nics=[nic1], + delete_nics=[nic2, nic2, nic2]) + + mock_net_driver.plug_network.return_value = interface1 + mock_net_driver.get_port.return_value = port1 + mock_net_driver.get_network.return_value = network + mock_net_driver.get_subnet.return_value = subnet + + mock_net_driver.unplug_network.side_effect = [ + None, net_base.NetworkNotFound, Exception] + + handle_net_delta_obj = network_tasks.HandleNetworkDelta() + result = handle_net_delta_obj.execute(self.amphora_mock, delta) + + mock_net_driver.plug_network.assert_called_once_with( + self.amphora_mock.compute_id, nic1.network_id) + mock_net_driver.get_port.assert_called_once_with(interface1.port_id) + mock_net_driver.get_network.assert_called_once_with(port1.network_id) + mock_net_driver.get_subnet.assert_called_once_with(fixed_ip.subnet_id) + + self.assertEqual({self.amphora_mock.id: [port1]}, result) + + mock_net_driver.unplug_network.assert_called_with( + self.amphora_mock.compute_id, nic2.network_id) + + # Revert + delta2 = data_models.Delta(amphora_id=self.amphora_mock.id, + compute_id=self.amphora_mock.compute_id, + add_nics=[nic1, nic1], + delete_nics=[nic2, nic2, nic2]) + + mock_net_driver.unplug_network.reset_mock() + handle_net_delta_obj.revert( + failure.Failure.from_exception(Exception('boom')), None, None) + mock_net_driver.unplug_network.assert_not_called() + + mock_net_driver.unplug_network.reset_mock() + handle_net_delta_obj.revert(None, None, None) + mock_net_driver.unplug_network.assert_not_called() + + mock_net_driver.unplug_network.reset_mock() + handle_net_delta_obj.revert(None, None, delta2) + + def test_handle_network_deltas(self, mock_get_net_driver): mock_driver = mock.MagicMock() mock_get_net_driver.return_value = mock_driver diff --git a/octavia/tests/unit/controller/worker/test_controller_worker.py b/octavia/tests/unit/controller/worker/test_controller_worker.py index 87813c2792..11bd415019 100644 --- a/octavia/tests/unit/controller/worker/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/test_controller_worker.py @@ -56,10 +56,20 @@ _l7rule_mock = mock.MagicMock() _create_map_flow_mock = mock.MagicMock() _amphora_mock.load_balancer_id = LB_ID _amphora_mock.id = AMP_ID +_db_session = mock.MagicMock() CONF = cfg.CONF +class TestException(Exception): + + def __init__(self, value): + self.value = value + + def __str__(self): + return repr(self.value) + + @mock.patch('octavia.db.repositories.AmphoraRepository.get', return_value=_amphora_mock) @mock.patch('octavia.db.repositories.HealthMonitorRepository.get', @@ -79,7 +89,7 @@ CONF = cfg.CONF @mock.patch('octavia.common.base_taskflow.BaseTaskFlowEngine._taskflow_load', return_value=_flow_mock) @mock.patch('taskflow.listeners.logging.DynamicLoggingListener') -@mock.patch('octavia.db.api.get_session', return_value='TEST') +@mock.patch('octavia.db.api.get_session', return_value=_db_session) class TestControllerWorker(base.TestCase): def setUp(self): @@ -162,7 +172,7 @@ class TestControllerWorker(base.TestCase): cw.delete_amphora(AMP_ID) mock_amp_repo_get.assert_called_once_with( - 'TEST', + _db_session, id=AMP_ID) (base_taskflow.BaseTaskFlowEngine._taskflow_load. @@ -583,7 +593,7 @@ class TestControllerWorker(base.TestCase): cw.delete_load_balancer(LB_ID, cascade=False) mock_lb_repo_get.assert_called_once_with( - 'TEST', + _db_session, id=LB_ID) (base_taskflow.BaseTaskFlowEngine._taskflow_load. @@ -621,7 +631,7 @@ class TestControllerWorker(base.TestCase): cw.delete_load_balancer(LB_ID, cascade=True) mock_lb_repo_get.assert_called_once_with( - 'TEST', + _db_session, id=LB_ID) (base_taskflow.BaseTaskFlowEngine._taskflow_load. @@ -663,7 +673,7 @@ class TestControllerWorker(base.TestCase): cw.update_load_balancer(LB_ID, change) mock_lb_repo_get.assert_called_once_with( - 'TEST', + _db_session, id=LB_ID) (base_taskflow.BaseTaskFlowEngine._taskflow_load. @@ -1152,9 +1162,81 @@ class TestControllerWorker(base.TestCase): })) _flow_mock.run.assert_called_once_with() - mock_update.assert_called_with('TEST', LB_ID, + mock_update.assert_called_with(_db_session, LB_ID, provisioning_status=constants.ACTIVE) + @mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.' + '_perform_amphora_failover') + def test_failover_amp_missing_amp(self, + mock_perform_amp_failover, + mock_api_get_session, + mock_dyn_log_listener, + mock_taskflow_load, + mock_pool_repo_get, + mock_member_repo_get, + mock_l7rule_repo_get, + mock_l7policy_repo_get, + mock_listener_repo_get, + mock_lb_repo_get, + mock_health_mon_repo_get, + mock_amp_repo_get): + + mock_amp_repo_get.return_value = None + + cw = controller_worker.ControllerWorker() + cw.failover_amphora(AMP_ID) + + mock_perform_amp_failover.assert_not_called() + + @mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.' + '_perform_amphora_failover') + def test_failover_amp_flow_exception(self, + mock_perform_amp_failover, + mock_api_get_session, + mock_dyn_log_listener, + mock_taskflow_load, + mock_pool_repo_get, + mock_member_repo_get, + mock_l7rule_repo_get, + mock_l7policy_repo_get, + mock_listener_repo_get, + mock_lb_repo_get, + mock_health_mon_repo_get, + mock_amp_repo_get): + + mock_perform_amp_failover.side_effect = TestException('boom') + cw = controller_worker.ControllerWorker() + self.assertRaises(TestException, cw.failover_amphora, AMP_ID) + + @mock.patch('octavia.controller.worker.controller_worker.ControllerWorker.' + '_perform_amphora_failover') + @mock.patch('octavia.db.repositories.LoadBalancerRepository.update') + def test_failover_amp_no_lb(self, + mock_lb_update, + mock_perform_amp_failover, + mock_api_get_session, + mock_dyn_log_listener, + mock_taskflow_load, + mock_pool_repo_get, + mock_member_repo_get, + mock_l7rule_repo_get, + mock_l7policy_repo_get, + mock_listener_repo_get, + mock_lb_repo_get, + mock_health_mon_repo_get, + mock_amp_repo_get): + + amphora = mock.MagicMock() + amphora.load_balancer_id = None + mock_amp_repo_get.return_value = amphora + + cw = controller_worker.ControllerWorker() + cw.failover_amphora(AMP_ID) + + mock_lb_update.assert_not_called() + mock_perform_amp_failover.assert_called_once_with( + amphora, constants.LB_CREATE_FAILOVER_PRIORITY) + @mock.patch('octavia.db.repositories.AmphoraHealthRepository.delete') def test_failover_deleted_amphora(self, mock_delete, @@ -1178,7 +1260,7 @@ class TestControllerWorker(base.TestCase): cw = controller_worker.ControllerWorker() cw._perform_amphora_failover(mock_amphora, 10) - mock_delete.assert_called_with('TEST', amphora_id=AMP_ID) + mock_delete.assert_called_with(_db_session, amphora_id=AMP_ID) mock_taskflow_load.assert_not_called() @mock.patch('octavia.controller.worker.' @@ -1207,7 +1289,7 @@ class TestControllerWorker(base.TestCase): cw.failover_loadbalancer('123') mock_perform.assert_called_with( _amphora_mock2, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY) - mock_update.assert_called_with('TEST', '123', + mock_update.assert_called_with(_db_session, '123', provisioning_status=constants.ACTIVE) mock_perform.reset @@ -1219,13 +1301,13 @@ class TestControllerWorker(base.TestCase): # is the last one mock_perform.assert_called_with( _amphora_mock, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY) - mock_update.assert_called_with('TEST', '123', + mock_update.assert_called_with(_db_session, '123', provisioning_status=constants.ACTIVE) mock_perform.reset mock_perform.side_effect = OverflowError() self.assertRaises(OverflowError, cw.failover_loadbalancer, 123) - mock_update.assert_called_with('TEST', 123, + mock_update.assert_called_with(_db_session, 123, provisioning_status=constants.ERROR) @mock.patch('octavia.controller.worker.flows.' @@ -1270,7 +1352,7 @@ class TestControllerWorker(base.TestCase): })) _flow_mock.run.assert_called_once_with() - mock_update.assert_called_with('TEST', LB_ID, + mock_update.assert_called_with(_db_session, LB_ID, provisioning_status=constants.ACTIVE) @mock.patch('octavia.controller.worker.flows.' diff --git a/releasenotes/notes/fix-multi-amp-down-failover-952618fb8d3d8ae6.yaml b/releasenotes/notes/fix-multi-amp-down-failover-952618fb8d3d8ae6.yaml new file mode 100644 index 0000000000..95d492a984 --- /dev/null +++ b/releasenotes/notes/fix-multi-amp-down-failover-952618fb8d3d8ae6.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Fixes an issue where if more than one amphora fails at the same time, + failover might not fully complete, leaving the load balancer in ERROR. diff --git a/tools/create_flow_docs.py b/tools/create_flow_docs.py index e710cf74db..b2d8e16ab3 100755 --- a/tools/create_flow_docs.py +++ b/tools/create_flow_docs.py @@ -53,9 +53,12 @@ def generate(flow_list, output_directory): get_flow_method = getattr(current_instance, current_tuple[2]) if (current_tuple[1] == 'AmphoraFlows' and current_tuple[2] == 'get_failover_flow'): + amp1 = dmh.generate_amphora() + amp2 = dmh.generate_amphora() + lb = dmh.generate_load_balancer(amphorae=[amp1, amp2]) current_engine = engines.load( get_flow_method(role=constants.ROLE_STANDALONE, - load_balancer_id=None)) + load_balancer=lb)) elif (current_tuple[1] == 'LoadBalancerFlows' and current_tuple[2] == 'get_create_load_balancer_flow'): current_engine = engines.load(