Merge "Use retry for AmphoraComputeConnectivityWait" into stable/train
This commit is contained in:
commit
d69a6f776a
|
@ -121,3 +121,9 @@ class NodeProvisioningError(ProvisioningErrors):
|
|||
class AmpDriverNotImplementedError(AmphoraDriverError):
|
||||
|
||||
message = _('Amphora does not implement this feature.')
|
||||
|
||||
|
||||
class AmpConnectionRetry(AmphoraDriverError):
|
||||
|
||||
message = _('Could not connect to amphora, exception caught: '
|
||||
'%(exception)s')
|
||||
|
|
|
@ -82,11 +82,13 @@ class AmphoraLoadBalancerDriver(object):
|
|||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_info(self, amphora):
|
||||
def get_info(self, amphora, raise_retry_exception=False):
|
||||
"""Returns information about the amphora.
|
||||
|
||||
:param amphora: amphora object, need to use its id property
|
||||
:type amphora: octavia.db.models.Amphora
|
||||
:param raise_retry_exception: Flag if outside task should be retried
|
||||
:type boolean: False by default
|
||||
:returns: return a value list (amphora.id, status flag--'info')
|
||||
|
||||
At this moment, we just build the basic structure for testing, will
|
||||
|
|
|
@ -91,7 +91,8 @@ class HaproxyAmphoraLoadBalancerDriver(
|
|||
|
||||
return haproxy_version_string.split('.')[:2]
|
||||
|
||||
def _populate_amphora_api_version(self, amphora):
|
||||
def _populate_amphora_api_version(self, amphora,
|
||||
raise_retry_exception=False):
|
||||
"""Populate the amphora object with the api_version
|
||||
|
||||
This will query the amphora for version discovery and populate
|
||||
|
@ -102,7 +103,8 @@ class HaproxyAmphoraLoadBalancerDriver(
|
|||
if not getattr(amphora, 'api_version', None):
|
||||
try:
|
||||
amphora.api_version = self.clients['base'].get_api_version(
|
||||
amphora)['api_version']
|
||||
amphora,
|
||||
raise_retry_exception=raise_retry_exception)['api_version']
|
||||
except exc.NotFound:
|
||||
# Amphora is too old for version discovery, default to 0.5
|
||||
amphora.api_version = '0.5'
|
||||
|
@ -364,9 +366,11 @@ class HaproxyAmphoraLoadBalancerDriver(
|
|||
self.clients[amphora.api_version].delete_listener(
|
||||
amphora, listener.load_balancer.id)
|
||||
|
||||
def get_info(self, amphora):
|
||||
self._populate_amphora_api_version(amphora)
|
||||
return self.clients[amphora.api_version].get_info(amphora)
|
||||
def get_info(self, amphora, raise_retry_exception=False):
|
||||
self._populate_amphora_api_version(
|
||||
amphora, raise_retry_exception=raise_retry_exception)
|
||||
return self.clients[amphora.api_version].get_info(
|
||||
amphora, raise_retry_exception=raise_retry_exception)
|
||||
|
||||
def get_diagnostics(self, amphora):
|
||||
pass
|
||||
|
@ -624,7 +628,7 @@ class AmphoraAPIClientBase(object):
|
|||
port=CONF.haproxy_amphora.bind_port)
|
||||
|
||||
def request(self, method, amp, path='/', timeout_dict=None,
|
||||
retry_404=True, **kwargs):
|
||||
retry_404=True, raise_retry_exception=False, **kwargs):
|
||||
cfg_ha_amp = CONF.haproxy_amphora
|
||||
if timeout_dict is None:
|
||||
timeout_dict = {}
|
||||
|
@ -689,7 +693,13 @@ class AmphoraAPIClientBase(object):
|
|||
exception = e
|
||||
LOG.warning("Could not connect to instance. Retrying.")
|
||||
time.sleep(conn_retry_interval)
|
||||
|
||||
if raise_retry_exception:
|
||||
# For taskflow persistence cause attribute should
|
||||
# be serializable to JSON. Pass None, as cause exception
|
||||
# is described in the expection message.
|
||||
six.raise_from(
|
||||
driver_except.AmpConnectionRetry(exception=str(e)),
|
||||
None)
|
||||
LOG.error("Connection retries (currently set to %(max_retries)s) "
|
||||
"exhausted. The amphora is unavailable. Reason: "
|
||||
"%(exception)s",
|
||||
|
@ -697,9 +707,10 @@ class AmphoraAPIClientBase(object):
|
|||
'exception': exception})
|
||||
raise driver_except.TimeOutException()
|
||||
|
||||
def get_api_version(self, amp):
|
||||
def get_api_version(self, amp, raise_retry_exception=False):
|
||||
amp.api_version = None
|
||||
r = self.get(amp, retry_404=False)
|
||||
r = self.get(amp, retry_404=False,
|
||||
raise_retry_exception=raise_retry_exception)
|
||||
# Handle 404 special as we don't want to log an ERROR on 404
|
||||
exc.check_exception(r, (404,))
|
||||
if r.status_code == 404:
|
||||
|
@ -766,8 +777,8 @@ class AmphoraAPIClient0_5(AmphoraAPIClientBase):
|
|||
amp, 'listeners/{listener_id}'.format(listener_id=listener_id))
|
||||
return exc.check_exception(r, (404,))
|
||||
|
||||
def get_info(self, amp):
|
||||
r = self.get(amp, "info")
|
||||
def get_info(self, amp, raise_retry_exception=False):
|
||||
r = self.get(amp, "info", raise_retry_exception=raise_retry_exception)
|
||||
if exc.check_exception(r):
|
||||
return r.json()
|
||||
return None
|
||||
|
@ -896,8 +907,8 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase):
|
|||
amp, 'listeners/{object_id}'.format(object_id=object_id))
|
||||
return exc.check_exception(r, (404,))
|
||||
|
||||
def get_info(self, amp):
|
||||
r = self.get(amp, "info")
|
||||
def get_info(self, amp, raise_retry_exception=False):
|
||||
r = self.get(amp, "info", raise_retry_exception=raise_retry_exception)
|
||||
if exc.check_exception(r):
|
||||
return r.json()
|
||||
return None
|
||||
|
|
|
@ -73,7 +73,7 @@ class NoopManager(object):
|
|||
listener.load_balancer.vip.ip_address)] = (
|
||||
listener, listener.load_balancer.vip, 'delete')
|
||||
|
||||
def get_info(self, amphora):
|
||||
def get_info(self, amphora, raise_retry_exception=False):
|
||||
LOG.debug("Amphora %s no-op, info amphora %s",
|
||||
self.__class__.__name__, amphora.id)
|
||||
self.amphoraconfig[amphora.id] = (amphora.id, 'get_info')
|
||||
|
@ -138,9 +138,10 @@ class NoopAmphoraLoadBalancerDriver(
|
|||
|
||||
self.driver.delete(listener)
|
||||
|
||||
def get_info(self, amphora):
|
||||
def get_info(self, amphora, raise_retry_exception=False):
|
||||
|
||||
self.driver.get_info(amphora)
|
||||
self.driver.get_info(amphora,
|
||||
raise_retry_exception=raise_retry_exception)
|
||||
|
||||
def get_diagnostics(self, amphora):
|
||||
|
||||
|
|
|
@ -383,6 +383,7 @@ VRRP_GROUP = 'vrrp_group'
|
|||
# Taskflow flow and task names
|
||||
CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow'
|
||||
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'
|
||||
CREATE_AMPHORA_RETRY_SUBFLOW = 'octavia-create-amphora-retry-subflow'
|
||||
CREATE_AMPHORA_FOR_LB_FLOW = 'octavia-create-amp-for-lb-flow'
|
||||
CREATE_HEALTH_MONITOR_FLOW = 'octavia-create-health-monitor-flow'
|
||||
CREATE_LISTENER_FLOW = 'octavia-create-listener_flow'
|
||||
|
|
|
@ -15,9 +15,9 @@
|
|||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from stevedore import driver as stevedore_driver
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.v2 import controller_worker
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
@ -34,11 +34,7 @@ class Endpoints(object):
|
|||
version='2.0')
|
||||
|
||||
def __init__(self):
|
||||
self.worker = stevedore_driver.DriverManager(
|
||||
namespace='octavia.plugins',
|
||||
name=CONF.octavia_plugins,
|
||||
invoke_on_load=True
|
||||
).driver
|
||||
self.worker = controller_worker.ControllerWorker()
|
||||
|
||||
def create_load_balancer(self, context, load_balancer_id,
|
||||
flavor=None):
|
||||
|
|
|
@ -70,9 +70,14 @@ class AmphoraFlows(object):
|
|||
create_amphora_flow.add(database_tasks.UpdateAmphoraInfo(
|
||||
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
|
||||
provides=constants.AMPHORA))
|
||||
create_amphora_flow.add(
|
||||
retry_subflow = linear_flow.Flow(
|
||||
constants.CREATE_AMPHORA_RETRY_SUBFLOW,
|
||||
retry=amphora_driver_tasks.AmpRetry())
|
||||
retry_subflow.add(
|
||||
amphora_driver_tasks.AmphoraComputeConnectivityWait(
|
||||
requires=constants.AMPHORA))
|
||||
requires=constants.AMPHORA,
|
||||
inject={'raise_retry_exception': True}))
|
||||
create_amphora_flow.add(retry_subflow)
|
||||
create_amphora_flow.add(database_tasks.ReloadAmphora(
|
||||
requires=constants.AMPHORA_ID,
|
||||
provides=constants.AMPHORA))
|
||||
|
@ -194,10 +199,15 @@ class AmphoraFlows(object):
|
|||
name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO,
|
||||
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
|
||||
provides=constants.AMPHORA))
|
||||
create_amp_for_lb_subflow.add(
|
||||
retry_task = sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT
|
||||
retry_subflow = linear_flow.Flow(
|
||||
constants.CREATE_AMPHORA_RETRY_SUBFLOW,
|
||||
retry=amphora_driver_tasks.AmpRetry())
|
||||
retry_subflow.add(
|
||||
amphora_driver_tasks.AmphoraComputeConnectivityWait(
|
||||
name=sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT,
|
||||
requires=constants.AMPHORA))
|
||||
name=retry_task, requires=constants.AMPHORA,
|
||||
inject={'raise_retry_exception': True}))
|
||||
create_amp_for_lb_subflow.add(retry_subflow)
|
||||
create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize(
|
||||
name=sf_name + '-' + constants.AMPHORA_FINALIZE,
|
||||
requires=constants.AMPHORA))
|
||||
|
|
|
@ -18,6 +18,7 @@ from oslo_config import cfg
|
|||
from oslo_log import log as logging
|
||||
import six
|
||||
from stevedore import driver as stevedore_driver
|
||||
from taskflow import retry
|
||||
from taskflow import task
|
||||
from taskflow.types import failure
|
||||
|
||||
|
@ -49,6 +50,26 @@ class BaseAmphoraTask(task.Task):
|
|||
self.task_utils = task_utilities.TaskUtils()
|
||||
|
||||
|
||||
class AmpRetry(retry.Times):
|
||||
|
||||
def on_failure(self, history, *args, **kwargs):
|
||||
last_errors = history[-1][1]
|
||||
max_retry_attempt = CONF.haproxy_amphora.connection_max_retries
|
||||
for task_name, ex_info in last_errors.items():
|
||||
if len(history) <= max_retry_attempt:
|
||||
# When taskflow persistance is enabled and flow/task state is
|
||||
# saved in the backend. If flow(task) is restored(restart of
|
||||
# worker,etc) we are getting ex_info as None - we need to RETRY
|
||||
# task to check its real state.
|
||||
if ex_info is None or ex_info._exc_info is None:
|
||||
return retry.RETRY
|
||||
excp = ex_info._exc_info[1]
|
||||
if isinstance(excp, driver_except.AmpConnectionRetry):
|
||||
return retry.RETRY
|
||||
|
||||
return retry.REVERT_ALL
|
||||
|
||||
|
||||
class AmpListenersUpdate(BaseAmphoraTask):
|
||||
"""Task to update the listeners on one amphora."""
|
||||
|
||||
|
@ -323,10 +344,11 @@ class AmphoraVRRPStart(BaseAmphoraTask):
|
|||
class AmphoraComputeConnectivityWait(BaseAmphoraTask):
|
||||
"""Task to wait for the compute instance to be up."""
|
||||
|
||||
def execute(self, amphora):
|
||||
def execute(self, amphora, raise_retry_exception=False):
|
||||
"""Execute get_info routine for an amphora until it responds."""
|
||||
try:
|
||||
amp_info = self.amphora_driver.get_info(amphora)
|
||||
amp_info = self.amphora_driver.get_info(
|
||||
amphora, raise_retry_exception=raise_retry_exception)
|
||||
LOG.debug('Successfuly connected to amphora %s: %s',
|
||||
amphora.id, amp_info)
|
||||
except driver_except.TimeOutException:
|
||||
|
|
|
@ -18,7 +18,6 @@ from oslo_config import fixture as oslo_fixture
|
|||
from oslo_utils import uuidutils
|
||||
|
||||
from octavia.controller.queue.v2 import endpoints
|
||||
from octavia.controller.worker.v2 import controller_worker
|
||||
from octavia.tests.unit import base
|
||||
|
||||
|
||||
|
@ -30,10 +29,9 @@ class TestEndpoints(base.TestCase):
|
|||
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
conf.config(octavia_plugins='hot_plug_plugin')
|
||||
|
||||
mock_class = mock.create_autospec(controller_worker.ControllerWorker)
|
||||
self.worker_patcher = mock.patch('octavia.controller.queue.v2.'
|
||||
'endpoints.stevedore_driver')
|
||||
self.worker_patcher.start().ControllerWorker = mock_class
|
||||
self.worker_patcher = mock.patch('octavia.controller.worker.v2.'
|
||||
'controller_worker.ControllerWorker')
|
||||
self.worker_patcher.start()
|
||||
|
||||
self.ep = endpoints.Endpoints()
|
||||
self.context = {}
|
||||
|
|
|
@ -580,8 +580,10 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||
mock_amphora_repo_update):
|
||||
amp_compute_conn_wait_obj = (
|
||||
amphora_driver_tasks.AmphoraComputeConnectivityWait())
|
||||
amp_compute_conn_wait_obj.execute(_amphora_mock)
|
||||
mock_driver.get_info.assert_called_once_with(_amphora_mock)
|
||||
amp_compute_conn_wait_obj.execute(_amphora_mock,
|
||||
raise_retry_exception=True)
|
||||
mock_driver.get_info.assert_called_once_with(
|
||||
_amphora_mock, raise_retry_exception=True)
|
||||
|
||||
mock_driver.get_info.side_effect = driver_except.TimeOutException()
|
||||
self.assertRaises(driver_except.TimeOutException,
|
||||
|
|
Loading…
Reference in New Issue