Merge "Use retry for AmphoraComputeConnectivityWait" into stable/train

This commit is contained in:
Zuul 2020-07-18 02:44:06 +00:00 committed by Gerrit Code Review
commit d69a6f776a
10 changed files with 86 additions and 37 deletions

View File

@ -121,3 +121,9 @@ class NodeProvisioningError(ProvisioningErrors):
class AmpDriverNotImplementedError(AmphoraDriverError):
message = _('Amphora does not implement this feature.')
class AmpConnectionRetry(AmphoraDriverError):
message = _('Could not connect to amphora, exception caught: '
'%(exception)s')

View File

@ -82,11 +82,13 @@ class AmphoraLoadBalancerDriver(object):
"""
@abc.abstractmethod
def get_info(self, amphora):
def get_info(self, amphora, raise_retry_exception=False):
"""Returns information about the amphora.
:param amphora: amphora object, need to use its id property
:type amphora: octavia.db.models.Amphora
:param raise_retry_exception: Flag if outside task should be retried
:type boolean: False by default
:returns: return a value list (amphora.id, status flag--'info')
At this moment, we just build the basic structure for testing, will

View File

@ -91,7 +91,8 @@ class HaproxyAmphoraLoadBalancerDriver(
return haproxy_version_string.split('.')[:2]
def _populate_amphora_api_version(self, amphora):
def _populate_amphora_api_version(self, amphora,
raise_retry_exception=False):
"""Populate the amphora object with the api_version
This will query the amphora for version discovery and populate
@ -102,7 +103,8 @@ class HaproxyAmphoraLoadBalancerDriver(
if not getattr(amphora, 'api_version', None):
try:
amphora.api_version = self.clients['base'].get_api_version(
amphora)['api_version']
amphora,
raise_retry_exception=raise_retry_exception)['api_version']
except exc.NotFound:
# Amphora is too old for version discovery, default to 0.5
amphora.api_version = '0.5'
@ -364,9 +366,11 @@ class HaproxyAmphoraLoadBalancerDriver(
self.clients[amphora.api_version].delete_listener(
amphora, listener.load_balancer.id)
def get_info(self, amphora):
self._populate_amphora_api_version(amphora)
return self.clients[amphora.api_version].get_info(amphora)
def get_info(self, amphora, raise_retry_exception=False):
self._populate_amphora_api_version(
amphora, raise_retry_exception=raise_retry_exception)
return self.clients[amphora.api_version].get_info(
amphora, raise_retry_exception=raise_retry_exception)
def get_diagnostics(self, amphora):
pass
@ -624,7 +628,7 @@ class AmphoraAPIClientBase(object):
port=CONF.haproxy_amphora.bind_port)
def request(self, method, amp, path='/', timeout_dict=None,
retry_404=True, **kwargs):
retry_404=True, raise_retry_exception=False, **kwargs):
cfg_ha_amp = CONF.haproxy_amphora
if timeout_dict is None:
timeout_dict = {}
@ -689,7 +693,13 @@ class AmphoraAPIClientBase(object):
exception = e
LOG.warning("Could not connect to instance. Retrying.")
time.sleep(conn_retry_interval)
if raise_retry_exception:
# For taskflow persistence cause attribute should
# be serializable to JSON. Pass None, as cause exception
# is described in the expection message.
six.raise_from(
driver_except.AmpConnectionRetry(exception=str(e)),
None)
LOG.error("Connection retries (currently set to %(max_retries)s) "
"exhausted. The amphora is unavailable. Reason: "
"%(exception)s",
@ -697,9 +707,10 @@ class AmphoraAPIClientBase(object):
'exception': exception})
raise driver_except.TimeOutException()
def get_api_version(self, amp):
def get_api_version(self, amp, raise_retry_exception=False):
amp.api_version = None
r = self.get(amp, retry_404=False)
r = self.get(amp, retry_404=False,
raise_retry_exception=raise_retry_exception)
# Handle 404 special as we don't want to log an ERROR on 404
exc.check_exception(r, (404,))
if r.status_code == 404:
@ -766,8 +777,8 @@ class AmphoraAPIClient0_5(AmphoraAPIClientBase):
amp, 'listeners/{listener_id}'.format(listener_id=listener_id))
return exc.check_exception(r, (404,))
def get_info(self, amp):
r = self.get(amp, "info")
def get_info(self, amp, raise_retry_exception=False):
r = self.get(amp, "info", raise_retry_exception=raise_retry_exception)
if exc.check_exception(r):
return r.json()
return None
@ -896,8 +907,8 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase):
amp, 'listeners/{object_id}'.format(object_id=object_id))
return exc.check_exception(r, (404,))
def get_info(self, amp):
r = self.get(amp, "info")
def get_info(self, amp, raise_retry_exception=False):
r = self.get(amp, "info", raise_retry_exception=raise_retry_exception)
if exc.check_exception(r):
return r.json()
return None

View File

@ -73,7 +73,7 @@ class NoopManager(object):
listener.load_balancer.vip.ip_address)] = (
listener, listener.load_balancer.vip, 'delete')
def get_info(self, amphora):
def get_info(self, amphora, raise_retry_exception=False):
LOG.debug("Amphora %s no-op, info amphora %s",
self.__class__.__name__, amphora.id)
self.amphoraconfig[amphora.id] = (amphora.id, 'get_info')
@ -138,9 +138,10 @@ class NoopAmphoraLoadBalancerDriver(
self.driver.delete(listener)
def get_info(self, amphora):
def get_info(self, amphora, raise_retry_exception=False):
self.driver.get_info(amphora)
self.driver.get_info(amphora,
raise_retry_exception=raise_retry_exception)
def get_diagnostics(self, amphora):

View File

@ -383,6 +383,7 @@ VRRP_GROUP = 'vrrp_group'
# Taskflow flow and task names
CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow'
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'
CREATE_AMPHORA_RETRY_SUBFLOW = 'octavia-create-amphora-retry-subflow'
CREATE_AMPHORA_FOR_LB_FLOW = 'octavia-create-amp-for-lb-flow'
CREATE_HEALTH_MONITOR_FLOW = 'octavia-create-health-monitor-flow'
CREATE_LISTENER_FLOW = 'octavia-create-listener_flow'

View File

@ -15,9 +15,9 @@
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from stevedore import driver as stevedore_driver
from octavia.common import constants
from octavia.controller.worker.v2 import controller_worker
CONF = cfg.CONF
@ -34,11 +34,7 @@ class Endpoints(object):
version='2.0')
def __init__(self):
self.worker = stevedore_driver.DriverManager(
namespace='octavia.plugins',
name=CONF.octavia_plugins,
invoke_on_load=True
).driver
self.worker = controller_worker.ControllerWorker()
def create_load_balancer(self, context, load_balancer_id,
flavor=None):

View File

@ -70,9 +70,14 @@ class AmphoraFlows(object):
create_amphora_flow.add(database_tasks.UpdateAmphoraInfo(
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amphora_flow.add(
retry_subflow = linear_flow.Flow(
constants.CREATE_AMPHORA_RETRY_SUBFLOW,
retry=amphora_driver_tasks.AmpRetry())
retry_subflow.add(
amphora_driver_tasks.AmphoraComputeConnectivityWait(
requires=constants.AMPHORA))
requires=constants.AMPHORA,
inject={'raise_retry_exception': True}))
create_amphora_flow.add(retry_subflow)
create_amphora_flow.add(database_tasks.ReloadAmphora(
requires=constants.AMPHORA_ID,
provides=constants.AMPHORA))
@ -194,10 +199,15 @@ class AmphoraFlows(object):
name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO,
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amp_for_lb_subflow.add(
retry_task = sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT
retry_subflow = linear_flow.Flow(
constants.CREATE_AMPHORA_RETRY_SUBFLOW,
retry=amphora_driver_tasks.AmpRetry())
retry_subflow.add(
amphora_driver_tasks.AmphoraComputeConnectivityWait(
name=sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT,
requires=constants.AMPHORA))
name=retry_task, requires=constants.AMPHORA,
inject={'raise_retry_exception': True}))
create_amp_for_lb_subflow.add(retry_subflow)
create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize(
name=sf_name + '-' + constants.AMPHORA_FINALIZE,
requires=constants.AMPHORA))

View File

@ -18,6 +18,7 @@ from oslo_config import cfg
from oslo_log import log as logging
import six
from stevedore import driver as stevedore_driver
from taskflow import retry
from taskflow import task
from taskflow.types import failure
@ -49,6 +50,26 @@ class BaseAmphoraTask(task.Task):
self.task_utils = task_utilities.TaskUtils()
class AmpRetry(retry.Times):
def on_failure(self, history, *args, **kwargs):
last_errors = history[-1][1]
max_retry_attempt = CONF.haproxy_amphora.connection_max_retries
for task_name, ex_info in last_errors.items():
if len(history) <= max_retry_attempt:
# When taskflow persistance is enabled and flow/task state is
# saved in the backend. If flow(task) is restored(restart of
# worker,etc) we are getting ex_info as None - we need to RETRY
# task to check its real state.
if ex_info is None or ex_info._exc_info is None:
return retry.RETRY
excp = ex_info._exc_info[1]
if isinstance(excp, driver_except.AmpConnectionRetry):
return retry.RETRY
return retry.REVERT_ALL
class AmpListenersUpdate(BaseAmphoraTask):
"""Task to update the listeners on one amphora."""
@ -323,10 +344,11 @@ class AmphoraVRRPStart(BaseAmphoraTask):
class AmphoraComputeConnectivityWait(BaseAmphoraTask):
"""Task to wait for the compute instance to be up."""
def execute(self, amphora):
def execute(self, amphora, raise_retry_exception=False):
"""Execute get_info routine for an amphora until it responds."""
try:
amp_info = self.amphora_driver.get_info(amphora)
amp_info = self.amphora_driver.get_info(
amphora, raise_retry_exception=raise_retry_exception)
LOG.debug('Successfuly connected to amphora %s: %s',
amphora.id, amp_info)
except driver_except.TimeOutException:

View File

@ -18,7 +18,6 @@ from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.controller.queue.v2 import endpoints
from octavia.controller.worker.v2 import controller_worker
from octavia.tests.unit import base
@ -30,10 +29,9 @@ class TestEndpoints(base.TestCase):
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
conf.config(octavia_plugins='hot_plug_plugin')
mock_class = mock.create_autospec(controller_worker.ControllerWorker)
self.worker_patcher = mock.patch('octavia.controller.queue.v2.'
'endpoints.stevedore_driver')
self.worker_patcher.start().ControllerWorker = mock_class
self.worker_patcher = mock.patch('octavia.controller.worker.v2.'
'controller_worker.ControllerWorker')
self.worker_patcher.start()
self.ep = endpoints.Endpoints()
self.context = {}

View File

@ -580,8 +580,10 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_amphora_repo_update):
amp_compute_conn_wait_obj = (
amphora_driver_tasks.AmphoraComputeConnectivityWait())
amp_compute_conn_wait_obj.execute(_amphora_mock)
mock_driver.get_info.assert_called_once_with(_amphora_mock)
amp_compute_conn_wait_obj.execute(_amphora_mock,
raise_retry_exception=True)
mock_driver.get_info.assert_called_once_with(
_amphora_mock, raise_retry_exception=True)
mock_driver.get_info.side_effect = driver_except.TimeOutException()
self.assertRaises(driver_except.TimeOutException,