1301 lines
55 KiB
Python
1301 lines
55 KiB
Python
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
from octavia_lib.common import constants as lib_consts
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
from oslo_utils import excutils
|
|
from sqlalchemy.orm import exc as db_exceptions
|
|
from stevedore import driver as stevedore_driver
|
|
from taskflow.listeners import logging as tf_logging
|
|
import tenacity
|
|
|
|
from octavia.amphorae.driver_exceptions import exceptions as driver_exc
|
|
from octavia.api.drivers import utils as provider_utils
|
|
from octavia.common import base_taskflow
|
|
from octavia.common import constants
|
|
from octavia.common import exceptions
|
|
from octavia.common import utils
|
|
from octavia.controller.worker.v2.flows import flow_utils
|
|
from octavia.controller.worker.v2 import taskflow_jobboard_driver as tsk_driver
|
|
from octavia.db import api as db_apis
|
|
from octavia.db import repositories as repo
|
|
|
|
CONF = cfg.CONF
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
# We do not need to log retry exception information. Warning "Could not connect
|
|
# to instance" will be logged as usual.
|
|
def retryMaskFilter(record):
|
|
if record.exc_info is not None and isinstance(
|
|
record.exc_info[1], (
|
|
driver_exc.AmpConnectionRetry,
|
|
exceptions.ComputeWaitTimeoutException)):
|
|
return False
|
|
return True
|
|
|
|
|
|
LOG.logger.addFilter(retryMaskFilter)
|
|
|
|
|
|
def _is_provisioning_status_pending_update(lb_obj):
|
|
return not lb_obj.provisioning_status == constants.PENDING_UPDATE
|
|
|
|
|
|
class ControllerWorker(object):
|
|
|
|
def __init__(self):
|
|
|
|
self._amphora_repo = repo.AmphoraRepository()
|
|
self._amphora_health_repo = repo.AmphoraHealthRepository()
|
|
self._health_mon_repo = repo.HealthMonitorRepository()
|
|
self._lb_repo = repo.LoadBalancerRepository()
|
|
self._listener_repo = repo.ListenerRepository()
|
|
self._member_repo = repo.MemberRepository()
|
|
self._pool_repo = repo.PoolRepository()
|
|
self._l7policy_repo = repo.L7PolicyRepository()
|
|
self._l7rule_repo = repo.L7RuleRepository()
|
|
self._flavor_repo = repo.FlavorRepository()
|
|
self._az_repo = repo.AvailabilityZoneRepository()
|
|
|
|
if CONF.task_flow.jobboard_enabled:
|
|
persistence = tsk_driver.MysqlPersistenceDriver()
|
|
|
|
self.jobboard_driver = stevedore_driver.DriverManager(
|
|
namespace='octavia.worker.jobboard_driver',
|
|
name=CONF.task_flow.jobboard_backend_driver,
|
|
invoke_args=(persistence,),
|
|
invoke_on_load=True).driver
|
|
else:
|
|
self.tf_engine = base_taskflow.BaseTaskFlowEngine()
|
|
|
|
@tenacity.retry(
|
|
retry=(
|
|
tenacity.retry_if_result(_is_provisioning_status_pending_update) |
|
|
tenacity.retry_if_exception_type()),
|
|
wait=tenacity.wait_incrementing(
|
|
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
|
|
CONF.haproxy_amphora.api_db_commit_retry_backoff,
|
|
CONF.haproxy_amphora.api_db_commit_retry_max),
|
|
stop=tenacity.stop_after_attempt(
|
|
CONF.haproxy_amphora.api_db_commit_retry_attempts))
|
|
def _get_db_obj_until_pending_update(self, repo, id):
|
|
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
return repo.get(session, id=id)
|
|
|
|
@property
|
|
def services_controller(self):
|
|
return base_taskflow.TaskFlowServiceController(self.jobboard_driver)
|
|
|
|
def run_flow(self, func, *args, **kwargs):
|
|
if CONF.task_flow.jobboard_enabled:
|
|
self.services_controller.run_poster(func, *args, **kwargs)
|
|
else:
|
|
store = kwargs.pop('store', None)
|
|
tf = self.tf_engine.taskflow_load(
|
|
func(*args, **kwargs), store=store)
|
|
with tf_logging.DynamicLoggingListener(tf, log=LOG):
|
|
tf.run()
|
|
|
|
def delete_amphora(self, amphora_id):
|
|
"""Deletes an existing Amphora.
|
|
|
|
:param amphora_id: ID of the amphora to delete
|
|
:returns: None
|
|
:raises AmphoraNotFound: The referenced Amphora was not found
|
|
"""
|
|
try:
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
amphora = self._amphora_repo.get(session,
|
|
id=amphora_id)
|
|
store = {constants.AMPHORA: amphora.to_dict()}
|
|
self.run_flow(
|
|
flow_utils.get_delete_amphora_flow,
|
|
store=store)
|
|
except Exception as e:
|
|
LOG.error('Failed to delete a amphora %s due to: %s',
|
|
amphora_id, str(e))
|
|
return
|
|
LOG.info('Finished deleting amphora %s.', amphora_id)
|
|
|
|
@tenacity.retry(
|
|
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
|
wait=tenacity.wait_incrementing(
|
|
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
|
|
CONF.haproxy_amphora.api_db_commit_retry_backoff,
|
|
CONF.haproxy_amphora.api_db_commit_retry_max),
|
|
stop=tenacity.stop_after_attempt(
|
|
CONF.haproxy_amphora.api_db_commit_retry_attempts))
|
|
def create_health_monitor(self, health_monitor):
|
|
"""Creates a health monitor.
|
|
|
|
:param health_monitor: Provider health monitor dict
|
|
:returns: None
|
|
:raises NoResultFound: Unable to find the object
|
|
"""
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_health_monitor = self._health_mon_repo.get(
|
|
session,
|
|
id=health_monitor[constants.HEALTHMONITOR_ID])
|
|
|
|
if not db_health_monitor:
|
|
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
|
'60 seconds.', 'healthmonitor',
|
|
health_monitor[constants.HEALTHMONITOR_ID])
|
|
raise db_exceptions.NoResultFound
|
|
|
|
pool = db_health_monitor.pool
|
|
pool.health_monitor = db_health_monitor
|
|
load_balancer = pool.load_balancer
|
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
load_balancer).to_dict(recurse=True)
|
|
listeners_dicts = provider_lb.get('listeners', [])
|
|
|
|
store = {constants.HEALTH_MON: health_monitor,
|
|
constants.POOL_ID: pool.id,
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: load_balancer.id,
|
|
constants.LOADBALANCER: provider_lb}
|
|
self.run_flow(
|
|
flow_utils.get_create_health_monitor_flow,
|
|
store=store)
|
|
|
|
def delete_health_monitor(self, health_monitor):
|
|
"""Deletes a health monitor.
|
|
|
|
:param health_monitor: Provider health monitor dict
|
|
:returns: None
|
|
:raises HMNotFound: The referenced health monitor was not found
|
|
"""
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_health_monitor = self._health_mon_repo.get(
|
|
session,
|
|
id=health_monitor[constants.HEALTHMONITOR_ID])
|
|
|
|
pool = db_health_monitor.pool
|
|
load_balancer = pool.load_balancer
|
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
load_balancer).to_dict(recurse=True)
|
|
listeners_dicts = provider_lb.get('listeners', [])
|
|
|
|
store = {constants.HEALTH_MON: health_monitor,
|
|
constants.POOL_ID: pool.id,
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: load_balancer.id,
|
|
constants.LOADBALANCER: provider_lb,
|
|
constants.PROJECT_ID: load_balancer.project_id}
|
|
self.run_flow(
|
|
flow_utils.get_delete_health_monitor_flow,
|
|
store=store)
|
|
|
|
def update_health_monitor(self, original_health_monitor,
|
|
health_monitor_updates):
|
|
"""Updates a health monitor.
|
|
|
|
:param original_health_monitor: Provider health monitor dict
|
|
:param health_monitor_updates: Dict containing updated health monitor
|
|
:returns: None
|
|
:raises HMNotFound: The referenced health monitor was not found
|
|
"""
|
|
try:
|
|
db_health_monitor = self._get_db_obj_until_pending_update(
|
|
self._health_mon_repo,
|
|
original_health_monitor[constants.HEALTHMONITOR_ID])
|
|
except tenacity.RetryError as e:
|
|
LOG.warning('Health monitor did not go into %s in 60 seconds. '
|
|
'This either due to an in-progress Octavia upgrade '
|
|
'or an overloaded and failing database. Assuming '
|
|
'an upgrade is in progress and continuing.',
|
|
constants.PENDING_UPDATE)
|
|
db_health_monitor = e.last_attempt.result()
|
|
|
|
pool = db_health_monitor.pool
|
|
|
|
load_balancer = pool.load_balancer
|
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
load_balancer).to_dict(recurse=True)
|
|
listeners_dicts = provider_lb.get('listeners', [])
|
|
|
|
store = {constants.HEALTH_MON: original_health_monitor,
|
|
constants.POOL_ID: pool.id,
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: load_balancer.id,
|
|
constants.LOADBALANCER: provider_lb,
|
|
constants.UPDATE_DICT: health_monitor_updates}
|
|
self.run_flow(
|
|
flow_utils.get_update_health_monitor_flow,
|
|
store=store)
|
|
|
|
@tenacity.retry(
|
|
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
|
wait=tenacity.wait_incrementing(
|
|
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
|
|
CONF.haproxy_amphora.api_db_commit_retry_backoff,
|
|
CONF.haproxy_amphora.api_db_commit_retry_max),
|
|
stop=tenacity.stop_after_attempt(
|
|
CONF.haproxy_amphora.api_db_commit_retry_attempts))
|
|
def create_listener(self, listener):
|
|
"""Creates a listener.
|
|
|
|
:param listener: A listener provider dictionary.
|
|
:returns: None
|
|
:raises NoResultFound: Unable to find the object
|
|
"""
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_listener = self._listener_repo.get(
|
|
session, id=listener[constants.LISTENER_ID])
|
|
if not db_listener:
|
|
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
|
'60 seconds.', 'listener',
|
|
listener[constants.LISTENER_ID])
|
|
raise db_exceptions.NoResultFound
|
|
|
|
load_balancer = db_listener.load_balancer
|
|
flavor_dict = {}
|
|
if load_balancer.flavor_id:
|
|
with session.begin():
|
|
flavor_dict = (
|
|
self._flavor_repo.get_flavor_metadata_dict(
|
|
session, load_balancer.flavor_id))
|
|
flavor_dict[constants.LOADBALANCER_TOPOLOGY] = load_balancer.topology
|
|
|
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
load_balancer).to_dict(recurse=True)
|
|
|
|
store = {constants.LISTENERS: provider_lb['listeners'],
|
|
constants.LOADBALANCER: provider_lb,
|
|
constants.LOADBALANCER_ID: load_balancer.id}
|
|
|
|
self.run_flow(
|
|
flow_utils.get_create_listener_flow,
|
|
flavor_dict=flavor_dict, store=store)
|
|
|
|
def delete_listener(self, listener):
|
|
"""Deletes a listener.
|
|
|
|
:param listener: A listener provider dictionary to delete
|
|
:returns: None
|
|
:raises ListenerNotFound: The referenced listener was not found
|
|
"""
|
|
try:
|
|
db_lb = self._get_db_obj_until_pending_update(
|
|
self._lb_repo, listener[constants.LOADBALANCER_ID])
|
|
except tenacity.RetryError as e:
|
|
LOG.warning('Loadbalancer did not go into %s in 60 seconds. '
|
|
'This either due to an in-progress Octavia upgrade '
|
|
'or an overloaded and failing database. Assuming '
|
|
'an upgrade is in progress and continuing.',
|
|
constants.PENDING_UPDATE)
|
|
db_lb = e.last_attempt.result()
|
|
|
|
flavor_dict = {}
|
|
if db_lb.flavor_id:
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
flavor_dict = (
|
|
self._flavor_repo.get_flavor_metadata_dict(
|
|
session, db_lb.flavor_id))
|
|
flavor_dict[constants.LOADBALANCER_TOPOLOGY] = db_lb.topology
|
|
|
|
store = {constants.LISTENER: listener,
|
|
constants.LOADBALANCER_ID:
|
|
listener[constants.LOADBALANCER_ID],
|
|
constants.PROJECT_ID: listener[constants.PROJECT_ID]}
|
|
self.run_flow(
|
|
flow_utils.get_delete_listener_flow, flavor_dict=flavor_dict,
|
|
store=store)
|
|
|
|
def update_listener(self, listener, listener_updates):
|
|
"""Updates a listener.
|
|
|
|
:param listener: A listener provider dictionary to update
|
|
:param listener_updates: Dict containing updated listener attributes
|
|
:returns: None
|
|
:raises ListenerNotFound: The referenced listener was not found
|
|
"""
|
|
try:
|
|
db_lb = self._get_db_obj_until_pending_update(
|
|
self._lb_repo, listener[constants.LOADBALANCER_ID])
|
|
except tenacity.RetryError as e:
|
|
LOG.warning('Loadbalancer did not go into %s in 60 seconds. '
|
|
'This either due to an in-progress Octavia upgrade '
|
|
'or an overloaded and failing database. Assuming '
|
|
'an upgrade is in progress and continuing.',
|
|
constants.PENDING_UPDATE)
|
|
db_lb = e.last_attempt.result()
|
|
|
|
session = db_apis.get_session()
|
|
flavor_dict = {}
|
|
if db_lb.flavor_id:
|
|
with session.begin():
|
|
flavor_dict = (
|
|
self._flavor_repo.get_flavor_metadata_dict(
|
|
session, db_lb.flavor_id))
|
|
flavor_dict[constants.LOADBALANCER_TOPOLOGY] = db_lb.topology
|
|
|
|
store = {constants.LISTENER: listener,
|
|
constants.UPDATE_DICT: listener_updates,
|
|
constants.LOADBALANCER_ID: db_lb.id,
|
|
constants.LISTENERS: [listener]}
|
|
self.run_flow(
|
|
flow_utils.get_update_listener_flow, flavor_dict=flavor_dict,
|
|
store=store)
|
|
|
|
@tenacity.retry(
|
|
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
|
wait=tenacity.wait_incrementing(
|
|
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
|
|
CONF.haproxy_amphora.api_db_commit_retry_backoff,
|
|
CONF.haproxy_amphora.api_db_commit_retry_max),
|
|
stop=tenacity.stop_after_attempt(
|
|
CONF.haproxy_amphora.api_db_commit_retry_attempts))
|
|
def create_load_balancer(self, loadbalancer, flavor=None,
|
|
availability_zone=None):
|
|
"""Creates a load balancer by allocating Amphorae.
|
|
|
|
First tries to allocate an existing Amphora in READY state.
|
|
If none are available it will attempt to build one specifically
|
|
for this load balancer.
|
|
|
|
:param loadbalancer: The dict of load balancer to create
|
|
:returns: None
|
|
:raises NoResultFound: Unable to find the object
|
|
"""
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
lb = self._lb_repo.get(session,
|
|
id=loadbalancer[constants.LOADBALANCER_ID])
|
|
if not lb:
|
|
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
|
'60 seconds.', 'load_balancer',
|
|
loadbalancer[constants.LOADBALANCER_ID])
|
|
raise db_exceptions.NoResultFound
|
|
|
|
store = {lib_consts.LOADBALANCER_ID:
|
|
loadbalancer[lib_consts.LOADBALANCER_ID],
|
|
constants.BUILD_TYPE_PRIORITY:
|
|
constants.LB_CREATE_NORMAL_PRIORITY,
|
|
lib_consts.FLAVOR: flavor,
|
|
lib_consts.AVAILABILITY_ZONE: availability_zone}
|
|
|
|
topology = lb.topology
|
|
if (not CONF.nova.enable_anti_affinity or
|
|
topology == constants.TOPOLOGY_SINGLE):
|
|
store[constants.SERVER_GROUP_ID] = None
|
|
|
|
listeners_dicts = (
|
|
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
|
lb.listeners)
|
|
)
|
|
|
|
store[constants.UPDATE_DICT] = {
|
|
constants.TOPOLOGY: topology
|
|
}
|
|
self.run_flow(
|
|
flow_utils.get_create_load_balancer_flow,
|
|
topology, listeners=listeners_dicts, flavor_dict=flavor,
|
|
store=store)
|
|
|
|
def delete_load_balancer(self, load_balancer, cascade=False):
|
|
"""Deletes a load balancer by de-allocating Amphorae.
|
|
|
|
:param load_balancer: Dict of the load balancer to delete
|
|
:returns: None
|
|
:raises LBNotFound: The referenced load balancer was not found
|
|
"""
|
|
loadbalancer_id = load_balancer[constants.LOADBALANCER_ID]
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_lb = self._lb_repo.get(session, id=loadbalancer_id)
|
|
store = {constants.LOADBALANCER: load_balancer,
|
|
constants.LOADBALANCER_ID: loadbalancer_id,
|
|
constants.SERVER_GROUP_ID: db_lb.server_group_id,
|
|
constants.PROJECT_ID: db_lb.project_id}
|
|
if cascade:
|
|
listeners = flow_utils.get_listeners_on_lb(db_lb)
|
|
pools = flow_utils.get_pools_on_lb(db_lb)
|
|
|
|
self.run_flow(
|
|
flow_utils.get_cascade_delete_load_balancer_flow,
|
|
load_balancer, listeners, pools, store=store)
|
|
else:
|
|
self.run_flow(
|
|
flow_utils.get_delete_load_balancer_flow,
|
|
load_balancer, store=store)
|
|
|
|
def update_load_balancer(self, original_load_balancer,
|
|
load_balancer_updates):
|
|
"""Updates a load balancer.
|
|
|
|
:param original_load_balancer: Dict of the load balancer to update
|
|
:param load_balancer_updates: Dict containing updated load balancer
|
|
:returns: None
|
|
:raises LBNotFound: The referenced load balancer was not found
|
|
"""
|
|
|
|
try:
|
|
self._get_db_obj_until_pending_update(
|
|
self._lb_repo,
|
|
original_load_balancer[constants.LOADBALANCER_ID])
|
|
except tenacity.RetryError:
|
|
LOG.warning('Load balancer did not go into %s in 60 seconds. '
|
|
'This either due to an in-progress Octavia upgrade '
|
|
'or an overloaded and failing database. Assuming '
|
|
'an upgrade is in progress and continuing.',
|
|
constants.PENDING_UPDATE)
|
|
|
|
store = {constants.LOADBALANCER: original_load_balancer,
|
|
constants.LOADBALANCER_ID:
|
|
original_load_balancer[constants.LOADBALANCER_ID],
|
|
constants.UPDATE_DICT: load_balancer_updates}
|
|
|
|
self.run_flow(
|
|
flow_utils.get_update_load_balancer_flow,
|
|
store=store)
|
|
|
|
@tenacity.retry(
|
|
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
|
wait=tenacity.wait_incrementing(
|
|
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
|
|
CONF.haproxy_amphora.api_db_commit_retry_backoff,
|
|
CONF.haproxy_amphora.api_db_commit_retry_max),
|
|
stop=tenacity.stop_after_attempt(
|
|
CONF.haproxy_amphora.api_db_commit_retry_attempts))
|
|
def create_member(self, member):
|
|
"""Creates a pool member.
|
|
|
|
:param member: A member provider dictionary to create
|
|
:returns: None
|
|
:raises NoSuitablePool: Unable to find the node pool
|
|
"""
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_member = self._member_repo.get(session,
|
|
id=member[constants.MEMBER_ID])
|
|
if not db_member:
|
|
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
|
'60 seconds.', 'l7member',
|
|
member[constants.MEMBER_ID])
|
|
raise db_exceptions.NoResultFound
|
|
|
|
pool = db_member.pool
|
|
load_balancer = pool.load_balancer
|
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
load_balancer).to_dict(recurse=True)
|
|
listeners_dicts = provider_lb.get('listeners', [])
|
|
|
|
store = {
|
|
constants.MEMBER: member,
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: load_balancer.id,
|
|
constants.LOADBALANCER: provider_lb,
|
|
constants.POOL_ID: pool.id}
|
|
if load_balancer.availability_zone:
|
|
with session.begin():
|
|
store[constants.AVAILABILITY_ZONE] = (
|
|
self._az_repo.get_availability_zone_metadata_dict(
|
|
session, load_balancer.availability_zone))
|
|
else:
|
|
store[constants.AVAILABILITY_ZONE] = {}
|
|
|
|
self.run_flow(
|
|
flow_utils.get_create_member_flow,
|
|
store=store)
|
|
|
|
def delete_member(self, member):
|
|
"""Deletes a pool member.
|
|
|
|
:param member: A member provider dictionary to delete
|
|
:returns: None
|
|
:raises MemberNotFound: The referenced member was not found
|
|
"""
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
pool = self._pool_repo.get(session,
|
|
id=member[constants.POOL_ID])
|
|
|
|
load_balancer = pool.load_balancer
|
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
load_balancer).to_dict(recurse=True)
|
|
listeners_dicts = provider_lb.get('listeners', [])
|
|
|
|
store = {
|
|
constants.MEMBER: member,
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: load_balancer.id,
|
|
constants.LOADBALANCER: provider_lb,
|
|
constants.POOL_ID: pool.id,
|
|
constants.PROJECT_ID: load_balancer.project_id}
|
|
if load_balancer.availability_zone:
|
|
with session.begin():
|
|
store[constants.AVAILABILITY_ZONE] = (
|
|
self._az_repo.get_availability_zone_metadata_dict(
|
|
session, load_balancer.availability_zone))
|
|
else:
|
|
store[constants.AVAILABILITY_ZONE] = {}
|
|
|
|
self.run_flow(
|
|
flow_utils.get_delete_member_flow,
|
|
store=store)
|
|
|
|
@tenacity.retry(
|
|
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
|
wait=tenacity.wait_incrementing(
|
|
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
|
|
CONF.haproxy_amphora.api_db_commit_retry_backoff,
|
|
CONF.haproxy_amphora.api_db_commit_retry_max),
|
|
stop=tenacity.stop_after_attempt(
|
|
CONF.haproxy_amphora.api_db_commit_retry_attempts))
|
|
def batch_update_members(self, old_members, new_members,
|
|
updated_members):
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_new_members = [
|
|
self._member_repo.get(
|
|
session, id=member[constants.MEMBER_ID])
|
|
for member in new_members]
|
|
# The API may not have committed all of the new member records yet.
|
|
# Make sure we retry looking them up.
|
|
if None in db_new_members or len(db_new_members) != len(new_members):
|
|
LOG.warning('Failed to fetch one of the new members from DB. '
|
|
'Retrying for up to 60 seconds.')
|
|
raise db_exceptions.NoResultFound
|
|
|
|
with session.begin():
|
|
updated_members = [
|
|
(provider_utils.db_member_to_provider_member(
|
|
self._member_repo.get(session,
|
|
id=m.get(constants.ID))).to_dict(),
|
|
m)
|
|
for m in updated_members]
|
|
provider_old_members = [
|
|
provider_utils.db_member_to_provider_member(
|
|
self._member_repo.get(session,
|
|
id=m.get(constants.ID))).to_dict()
|
|
for m in old_members]
|
|
if old_members:
|
|
pool = self._pool_repo.get(
|
|
session, id=old_members[0][constants.POOL_ID])
|
|
elif new_members:
|
|
pool = self._pool_repo.get(
|
|
session, id=new_members[0][constants.POOL_ID])
|
|
else:
|
|
pool = self._pool_repo.get(
|
|
session,
|
|
id=updated_members[0][0][constants.POOL_ID])
|
|
load_balancer = pool.load_balancer
|
|
|
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
load_balancer).to_dict(recurse=True)
|
|
listeners_dicts = provider_lb.get('listeners', [])
|
|
|
|
store = {
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: load_balancer.id,
|
|
constants.LOADBALANCER: provider_lb,
|
|
constants.POOL_ID: pool.id,
|
|
constants.PROJECT_ID: load_balancer.project_id}
|
|
if load_balancer.availability_zone:
|
|
with session.begin():
|
|
store[constants.AVAILABILITY_ZONE] = (
|
|
self._az_repo.get_availability_zone_metadata_dict(
|
|
session, load_balancer.availability_zone))
|
|
else:
|
|
store[constants.AVAILABILITY_ZONE] = {}
|
|
|
|
self.run_flow(
|
|
flow_utils.get_batch_update_members_flow,
|
|
provider_old_members, new_members, updated_members,
|
|
store=store)
|
|
|
|
def update_member(self, member, member_updates):
|
|
"""Updates a pool member.
|
|
|
|
:param member_id: A member provider dictionary to update
|
|
:param member_updates: Dict containing updated member attributes
|
|
:returns: None
|
|
:raises MemberNotFound: The referenced member was not found
|
|
"""
|
|
|
|
try:
|
|
db_member = self._get_db_obj_until_pending_update(
|
|
self._member_repo, member[constants.MEMBER_ID])
|
|
except tenacity.RetryError as e:
|
|
LOG.warning('Member did not go into %s in 60 seconds. '
|
|
'This either due to an in-progress Octavia upgrade '
|
|
'or an overloaded and failing database. Assuming '
|
|
'an upgrade is in progress and continuing.',
|
|
constants.PENDING_UPDATE)
|
|
db_member = e.last_attempt.result()
|
|
|
|
pool = db_member.pool
|
|
load_balancer = pool.load_balancer
|
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
load_balancer).to_dict(recurse=True)
|
|
listeners_dicts = provider_lb.get('listeners', [])
|
|
store = {
|
|
constants.MEMBER: member,
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: load_balancer.id,
|
|
constants.LOADBALANCER: provider_lb,
|
|
constants.POOL_ID: pool.id,
|
|
constants.UPDATE_DICT: member_updates}
|
|
if load_balancer.availability_zone:
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
store[constants.AVAILABILITY_ZONE] = (
|
|
self._az_repo.get_availability_zone_metadata_dict(
|
|
session, load_balancer.availability_zone))
|
|
else:
|
|
store[constants.AVAILABILITY_ZONE] = {}
|
|
|
|
self.run_flow(
|
|
flow_utils.get_update_member_flow,
|
|
store=store)
|
|
|
|
@tenacity.retry(
|
|
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
|
wait=tenacity.wait_incrementing(
|
|
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
|
|
CONF.haproxy_amphora.api_db_commit_retry_backoff,
|
|
CONF.haproxy_amphora.api_db_commit_retry_max),
|
|
stop=tenacity.stop_after_attempt(
|
|
CONF.haproxy_amphora.api_db_commit_retry_attempts))
|
|
def create_pool(self, pool):
|
|
"""Creates a node pool.
|
|
|
|
:param pool: Provider pool dict to create
|
|
:returns: None
|
|
:raises NoResultFound: Unable to find the object
|
|
"""
|
|
|
|
# TODO(ataraday) It seems we need to get db pool here anyway to get
|
|
# proper listeners
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_pool = self._pool_repo.get(session,
|
|
id=pool[constants.POOL_ID])
|
|
if not db_pool:
|
|
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
|
'60 seconds.', 'pool', pool[constants.POOL_ID])
|
|
raise db_exceptions.NoResultFound
|
|
|
|
load_balancer = db_pool.load_balancer
|
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
load_balancer).to_dict(recurse=True)
|
|
listeners_dicts = provider_lb.get('listeners', [])
|
|
|
|
store = {constants.POOL_ID: pool[constants.POOL_ID],
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: load_balancer.id,
|
|
constants.LOADBALANCER: provider_lb}
|
|
self.run_flow(
|
|
flow_utils.get_create_pool_flow,
|
|
store=store)
|
|
|
|
def delete_pool(self, pool):
|
|
"""Deletes a node pool.
|
|
|
|
:param pool: Provider pool dict to delete
|
|
:returns: None
|
|
:raises PoolNotFound: The referenced pool was not found
|
|
"""
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_pool = self._pool_repo.get(session,
|
|
id=pool[constants.POOL_ID])
|
|
|
|
load_balancer = db_pool.load_balancer
|
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
load_balancer).to_dict(recurse=True)
|
|
listeners_dicts = provider_lb.get('listeners', [])
|
|
|
|
store = {constants.POOL_ID: pool[constants.POOL_ID],
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER: provider_lb,
|
|
constants.LOADBALANCER_ID: load_balancer.id,
|
|
constants.PROJECT_ID: db_pool.project_id}
|
|
self.run_flow(
|
|
flow_utils.get_delete_pool_flow,
|
|
store=store)
|
|
|
|
def update_pool(self, origin_pool, pool_updates):
|
|
"""Updates a node pool.
|
|
|
|
:param origin_pool: Provider pool dict to update
|
|
:param pool_updates: Dict containing updated pool attributes
|
|
:returns: None
|
|
:raises PoolNotFound: The referenced pool was not found
|
|
"""
|
|
try:
|
|
db_pool = self._get_db_obj_until_pending_update(
|
|
self._pool_repo, origin_pool[constants.POOL_ID])
|
|
except tenacity.RetryError as e:
|
|
LOG.warning('Pool did not go into %s in 60 seconds. '
|
|
'This either due to an in-progress Octavia upgrade '
|
|
'or an overloaded and failing database. Assuming '
|
|
'an upgrade is in progress and continuing.',
|
|
constants.PENDING_UPDATE)
|
|
db_pool = e.last_attempt.result()
|
|
|
|
load_balancer = db_pool.load_balancer
|
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
load_balancer).to_dict(recurse=True)
|
|
listeners_dicts = provider_lb.get('listeners', [])
|
|
|
|
store = {constants.POOL_ID: db_pool.id,
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER: provider_lb,
|
|
constants.LOADBALANCER_ID: load_balancer.id,
|
|
constants.UPDATE_DICT: pool_updates}
|
|
self.run_flow(
|
|
flow_utils.get_update_pool_flow,
|
|
store=store)
|
|
|
|
@tenacity.retry(
|
|
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
|
wait=tenacity.wait_incrementing(
|
|
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
|
|
CONF.haproxy_amphora.api_db_commit_retry_backoff,
|
|
CONF.haproxy_amphora.api_db_commit_retry_max),
|
|
stop=tenacity.stop_after_attempt(
|
|
CONF.haproxy_amphora.api_db_commit_retry_attempts))
|
|
def create_l7policy(self, l7policy):
|
|
"""Creates an L7 Policy.
|
|
|
|
:param l7policy: Provider dict of the l7policy to create
|
|
:returns: None
|
|
:raises NoResultFound: Unable to find the object
|
|
"""
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_l7policy = self._l7policy_repo.get(
|
|
session, id=l7policy[constants.L7POLICY_ID])
|
|
if not db_l7policy:
|
|
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
|
'60 seconds.', 'l7policy',
|
|
l7policy[constants.L7POLICY_ID])
|
|
raise db_exceptions.NoResultFound
|
|
|
|
db_listener = db_l7policy.listener
|
|
|
|
listeners_dicts = (
|
|
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
|
[db_listener]))
|
|
|
|
store = {constants.L7POLICY: l7policy,
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: db_listener.load_balancer.id
|
|
}
|
|
self.run_flow(
|
|
flow_utils.get_create_l7policy_flow,
|
|
store=store)
|
|
|
|
def delete_l7policy(self, l7policy):
|
|
"""Deletes an L7 policy.
|
|
|
|
:param l7policy: Provider dict of the l7policy to delete
|
|
:returns: None
|
|
:raises L7PolicyNotFound: The referenced l7policy was not found
|
|
"""
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_listener = self._listener_repo.get(
|
|
session, id=l7policy[constants.LISTENER_ID])
|
|
listeners_dicts = (
|
|
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
|
[db_listener]))
|
|
|
|
store = {constants.L7POLICY: l7policy,
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: db_listener.load_balancer.id
|
|
}
|
|
self.run_flow(
|
|
flow_utils.get_delete_l7policy_flow,
|
|
store=store)
|
|
|
|
def update_l7policy(self, original_l7policy, l7policy_updates):
|
|
"""Updates an L7 policy.
|
|
|
|
:param l7policy: Provider dict of the l7policy to update
|
|
:param l7policy_updates: Dict containing updated l7policy attributes
|
|
:returns: None
|
|
:raises L7PolicyNotFound: The referenced l7policy was not found
|
|
"""
|
|
try:
|
|
db_l7policy = self._get_db_obj_until_pending_update(
|
|
self._l7policy_repo, original_l7policy[constants.L7POLICY_ID])
|
|
except tenacity.RetryError as e:
|
|
LOG.warning('L7 policy did not go into %s in 60 seconds. '
|
|
'This either due to an in-progress Octavia upgrade '
|
|
'or an overloaded and failing database. Assuming '
|
|
'an upgrade is in progress and continuing.',
|
|
constants.PENDING_UPDATE)
|
|
db_l7policy = e.last_attempt.result()
|
|
|
|
db_listener = db_l7policy.listener
|
|
|
|
listeners_dicts = (
|
|
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
|
[db_listener]))
|
|
|
|
store = {constants.L7POLICY: original_l7policy,
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: db_listener.load_balancer.id,
|
|
constants.UPDATE_DICT: l7policy_updates}
|
|
self.run_flow(
|
|
flow_utils.get_update_l7policy_flow,
|
|
store=store)
|
|
|
|
@tenacity.retry(
|
|
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
|
wait=tenacity.wait_incrementing(
|
|
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
|
|
CONF.haproxy_amphora.api_db_commit_retry_backoff,
|
|
CONF.haproxy_amphora.api_db_commit_retry_max),
|
|
stop=tenacity.stop_after_attempt(
|
|
CONF.haproxy_amphora.api_db_commit_retry_attempts))
|
|
def create_l7rule(self, l7rule):
|
|
"""Creates an L7 Rule.
|
|
|
|
:param l7rule: Provider dict l7rule
|
|
:returns: None
|
|
:raises NoResultFound: Unable to find the object
|
|
"""
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_l7rule = self._l7rule_repo.get(session,
|
|
id=l7rule[constants.L7RULE_ID])
|
|
if not db_l7rule:
|
|
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
|
'60 seconds.', 'l7rule',
|
|
l7rule[constants.L7RULE_ID])
|
|
raise db_exceptions.NoResultFound
|
|
|
|
db_l7policy = db_l7rule.l7policy
|
|
|
|
load_balancer = db_l7policy.listener.load_balancer
|
|
|
|
listeners_dicts = (
|
|
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
|
[db_l7policy.listener]))
|
|
l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy(
|
|
db_l7policy)
|
|
|
|
store = {constants.L7RULE: l7rule,
|
|
constants.L7POLICY: l7policy_dict.to_dict(),
|
|
constants.L7POLICY_ID: db_l7policy.id,
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.LOADBALANCER_ID: load_balancer.id
|
|
}
|
|
self.run_flow(
|
|
flow_utils.get_create_l7rule_flow,
|
|
store=store)
|
|
|
|
def delete_l7rule(self, l7rule):
|
|
"""Deletes an L7 rule.
|
|
|
|
:param l7rule: Provider dict of the l7rule to delete
|
|
:returns: None
|
|
:raises L7RuleNotFound: The referenced l7rule was not found
|
|
"""
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
db_l7policy = self._l7policy_repo.get(
|
|
session, id=l7rule[constants.L7POLICY_ID])
|
|
l7policy = provider_utils.db_l7policy_to_provider_l7policy(db_l7policy)
|
|
load_balancer = db_l7policy.listener.load_balancer
|
|
|
|
listeners_dicts = (
|
|
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
|
[db_l7policy.listener]))
|
|
|
|
store = {constants.L7RULE: l7rule,
|
|
constants.L7POLICY: l7policy.to_dict(),
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.L7POLICY_ID: db_l7policy.id,
|
|
constants.LOADBALANCER_ID: load_balancer.id
|
|
}
|
|
self.run_flow(
|
|
flow_utils.get_delete_l7rule_flow,
|
|
store=store)
|
|
|
|
def update_l7rule(self, original_l7rule, l7rule_updates):
|
|
"""Updates an L7 rule.
|
|
|
|
:param l7rule: Origin dict of the l7rule to update
|
|
:param l7rule_updates: Dict containing updated l7rule attributes
|
|
:returns: None
|
|
:raises L7RuleNotFound: The referenced l7rule was not found
|
|
"""
|
|
try:
|
|
db_l7rule = self._get_db_obj_until_pending_update(
|
|
self._l7rule_repo, original_l7rule[constants.L7RULE_ID])
|
|
except tenacity.RetryError as e:
|
|
LOG.warning('L7 rule did not go into %s in 60 seconds. '
|
|
'This either due to an in-progress Octavia upgrade '
|
|
'or an overloaded and failing database. Assuming '
|
|
'an upgrade is in progress and continuing.',
|
|
constants.PENDING_UPDATE)
|
|
db_l7rule = e.last_attempt.result()
|
|
db_l7policy = db_l7rule.l7policy
|
|
load_balancer = db_l7policy.listener.load_balancer
|
|
|
|
listeners_dicts = (
|
|
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
|
[db_l7policy.listener]))
|
|
l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy(
|
|
db_l7policy)
|
|
|
|
store = {constants.L7RULE: original_l7rule,
|
|
constants.L7POLICY: l7policy_dict.to_dict(),
|
|
constants.LISTENERS: listeners_dicts,
|
|
constants.L7POLICY_ID: db_l7policy.id,
|
|
constants.LOADBALANCER_ID: load_balancer.id,
|
|
constants.UPDATE_DICT: l7rule_updates}
|
|
self.run_flow(
|
|
flow_utils.get_update_l7rule_flow,
|
|
store=store)
|
|
|
|
def failover_amphora(self, amphora_id, reraise=False):
|
|
"""Perform failover operations for an amphora.
|
|
|
|
Note: This expects the load balancer to already be in
|
|
provisioning_status=PENDING_UPDATE state.
|
|
|
|
:param amphora_id: ID for amphora to failover
|
|
:param reraise: If enabled reraise any caught exception
|
|
:returns: None
|
|
:raises octavia.common.exceptions.NotFound: The referenced amphora was
|
|
not found
|
|
"""
|
|
amphora = None
|
|
try:
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
amphora = self._amphora_repo.get(session,
|
|
id=amphora_id)
|
|
if amphora is None:
|
|
LOG.error('Amphora failover for amphora %s failed because '
|
|
'there is no record of this amphora in the '
|
|
'database. Check that the [house_keeping] '
|
|
'amphora_expiry_age configuration setting is not '
|
|
'too short. Skipping failover.', amphora_id)
|
|
raise exceptions.NotFound(resource=constants.AMPHORA,
|
|
id=amphora_id)
|
|
|
|
if amphora.status == constants.DELETED:
|
|
LOG.warning('Amphora %s is marked DELETED in the database but '
|
|
'was submitted for failover. Deleting it from the '
|
|
'amphora health table to exclude it from health '
|
|
'checks and skipping the failover.', amphora.id)
|
|
with session.begin():
|
|
self._amphora_health_repo.delete(session,
|
|
amphora_id=amphora.id)
|
|
return
|
|
|
|
loadbalancer = None
|
|
if amphora.load_balancer_id:
|
|
with session.begin():
|
|
loadbalancer = self._lb_repo.get(
|
|
session, id=amphora.load_balancer_id)
|
|
lb_amp_count = None
|
|
if loadbalancer:
|
|
if loadbalancer.topology == constants.TOPOLOGY_ACTIVE_STANDBY:
|
|
lb_amp_count = 2
|
|
elif loadbalancer.topology == constants.TOPOLOGY_SINGLE:
|
|
lb_amp_count = 1
|
|
|
|
az_metadata = {}
|
|
flavor_dict = {}
|
|
lb_id = None
|
|
vip_dict = {}
|
|
additional_vip_dicts = []
|
|
server_group_id = None
|
|
if loadbalancer:
|
|
lb_id = loadbalancer.id
|
|
# Even if the LB doesn't have a flavor, create one and
|
|
# pass through the topology.
|
|
flavor_dict = {}
|
|
if loadbalancer.flavor_id:
|
|
with session.begin():
|
|
flavor_dict = (
|
|
self._flavor_repo.get_flavor_metadata_dict(
|
|
session, loadbalancer.flavor_id))
|
|
flavor_dict[constants.LOADBALANCER_TOPOLOGY] = (
|
|
loadbalancer.topology)
|
|
if loadbalancer.availability_zone:
|
|
with session.begin():
|
|
az_metadata = (
|
|
self._az_repo.get_availability_zone_metadata_dict(
|
|
session,
|
|
loadbalancer.availability_zone))
|
|
vip_dict = loadbalancer.vip.to_dict()
|
|
additional_vip_dicts = [
|
|
av.to_dict()
|
|
for av in loadbalancer.additional_vips]
|
|
server_group_id = loadbalancer.server_group_id
|
|
provider_lb_dict = (provider_utils.
|
|
db_loadbalancer_to_provider_loadbalancer)(
|
|
loadbalancer).to_dict() if loadbalancer else loadbalancer
|
|
|
|
stored_params = {constants.AVAILABILITY_ZONE: az_metadata,
|
|
constants.BUILD_TYPE_PRIORITY:
|
|
constants.LB_CREATE_FAILOVER_PRIORITY,
|
|
constants.FLAVOR: flavor_dict,
|
|
constants.LOADBALANCER: provider_lb_dict,
|
|
constants.SERVER_GROUP_ID: server_group_id,
|
|
constants.LOADBALANCER_ID: lb_id,
|
|
constants.VIP: vip_dict,
|
|
constants.ADDITIONAL_VIPS: additional_vip_dicts}
|
|
|
|
self.run_flow(
|
|
flow_utils.get_failover_amphora_flow,
|
|
amphora.to_dict(), lb_amp_count, flavor_dict=flavor_dict,
|
|
store=stored_params)
|
|
|
|
LOG.info("Successfully completed the failover for an amphora: %s",
|
|
{"id": amphora_id,
|
|
"load_balancer_id": lb_id,
|
|
"lb_network_ip": amphora.lb_network_ip,
|
|
"compute_id": amphora.compute_id,
|
|
"role": amphora.role})
|
|
|
|
except Exception as e:
|
|
with excutils.save_and_reraise_exception(reraise=reraise):
|
|
LOG.exception("Amphora %s failover exception: %s",
|
|
amphora_id, str(e))
|
|
with session.begin():
|
|
self._amphora_repo.update(session,
|
|
amphora_id,
|
|
status=constants.ERROR)
|
|
if amphora and amphora.load_balancer_id:
|
|
self._lb_repo.update(
|
|
session, amphora.load_balancer_id,
|
|
provisioning_status=constants.ERROR)
|
|
|
|
@staticmethod
|
|
def _get_amphorae_for_failover(load_balancer):
|
|
"""Returns an ordered list of amphora to failover.
|
|
|
|
:param load_balancer: The load balancer being failed over.
|
|
:returns: An ordered list of amphora to failover,
|
|
first amp to failover is last in the list
|
|
:raises octavia.common.exceptions.InvalidTopology: LB has an unknown
|
|
topology.
|
|
"""
|
|
if load_balancer.topology == constants.TOPOLOGY_SINGLE:
|
|
# In SINGLE topology, amp failover order does not matter
|
|
return [a.to_dict() for a in load_balancer.amphorae
|
|
if a.status != constants.DELETED]
|
|
|
|
if load_balancer.topology == constants.TOPOLOGY_ACTIVE_STANDBY:
|
|
# In Active/Standby we should preference the standby amp
|
|
# for failover first in case the Active is still able to pass
|
|
# traffic.
|
|
# Note: The active amp can switch at any time and in less than a
|
|
# second, so this is "best effort".
|
|
amphora_driver = utils.get_amphora_driver()
|
|
timeout_dict = {
|
|
constants.CONN_MAX_RETRIES:
|
|
CONF.haproxy_amphora.failover_connection_max_retries,
|
|
constants.CONN_RETRY_INTERVAL:
|
|
CONF.haproxy_amphora.failover_connection_retry_interval}
|
|
amps = []
|
|
selected_amp = None
|
|
for amp in load_balancer.amphorae:
|
|
if amp.status == constants.DELETED:
|
|
continue
|
|
if selected_amp is None:
|
|
try:
|
|
if amphora_driver.get_interface_from_ip(
|
|
amp, load_balancer.vip.ip_address,
|
|
timeout_dict):
|
|
# This is a potential ACTIVE, add it to the list
|
|
amps.append(amp.to_dict())
|
|
else:
|
|
# This one doesn't have the VIP IP, so start
|
|
# failovers here.
|
|
selected_amp = amp
|
|
LOG.debug("Selected amphora %s as the initial "
|
|
"failover amphora.", amp.id)
|
|
except Exception:
|
|
# This amphora is broken, so start failovers here.
|
|
selected_amp = amp
|
|
else:
|
|
# We have already found a STANDBY, so add the rest to the
|
|
# list without querying them.
|
|
amps.append(amp.to_dict())
|
|
# Put the selected amphora at the end of the list so it is
|
|
# first to failover.
|
|
if selected_amp:
|
|
amps.append(selected_amp.to_dict())
|
|
return amps
|
|
|
|
LOG.error('Unknown load balancer topology found: %s, aborting '
|
|
'failover.', load_balancer.topology)
|
|
raise exceptions.InvalidTopology(topology=load_balancer.topology)
|
|
|
|
def failover_loadbalancer(self, load_balancer_id):
|
|
"""Perform failover operations for a load balancer.
|
|
|
|
Note: This expects the load balancer to already be in
|
|
provisioning_status=PENDING_UPDATE state.
|
|
|
|
:param load_balancer_id: ID for load balancer to failover
|
|
:returns: None
|
|
:raises octavia.common.exceptions.NotFound: The load balancer was not
|
|
found.
|
|
"""
|
|
try:
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
lb = self._lb_repo.get(session,
|
|
id=load_balancer_id)
|
|
if lb is None:
|
|
raise exceptions.NotFound(resource=constants.LOADBALANCER,
|
|
id=load_balancer_id)
|
|
|
|
# Get the ordered list of amphorae to failover for this LB.
|
|
amps = self._get_amphorae_for_failover(lb)
|
|
|
|
if lb.topology == constants.TOPOLOGY_SINGLE:
|
|
if len(amps) != 1:
|
|
LOG.warning('%d amphorae found on load balancer %s where '
|
|
'one should exist. Repairing.', len(amps),
|
|
load_balancer_id)
|
|
elif lb.topology == constants.TOPOLOGY_ACTIVE_STANDBY:
|
|
|
|
if len(amps) != 2:
|
|
LOG.warning('%d amphorae found on load balancer %s where '
|
|
'two should exist. Repairing.', len(amps),
|
|
load_balancer_id)
|
|
else:
|
|
LOG.error('Unknown load balancer topology found: %s, aborting '
|
|
'failover!', lb.topology)
|
|
raise exceptions.InvalidTopology(topology=lb.topology)
|
|
|
|
# We must provide a topology in the flavor definition
|
|
# here for the amphora to be created with the correct
|
|
# configuration.
|
|
flavor = {}
|
|
if lb.flavor_id:
|
|
with session.begin():
|
|
flavor = self._flavor_repo.get_flavor_metadata_dict(
|
|
session, lb.flavor_id)
|
|
flavor[constants.LOADBALANCER_TOPOLOGY] = lb.topology
|
|
|
|
if lb:
|
|
provider_lb_dict = (
|
|
provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
|
lb).to_dict())
|
|
else:
|
|
provider_lb_dict = lb
|
|
|
|
provider_lb_dict[constants.FLAVOR] = flavor
|
|
|
|
stored_params = {constants.LOADBALANCER: provider_lb_dict,
|
|
constants.BUILD_TYPE_PRIORITY:
|
|
constants.LB_CREATE_FAILOVER_PRIORITY,
|
|
constants.SERVER_GROUP_ID: lb.server_group_id,
|
|
constants.LOADBALANCER_ID: lb.id,
|
|
constants.FLAVOR: flavor}
|
|
|
|
if lb.availability_zone:
|
|
with session.begin():
|
|
stored_params[constants.AVAILABILITY_ZONE] = (
|
|
self._az_repo.get_availability_zone_metadata_dict(
|
|
session, lb.availability_zone))
|
|
else:
|
|
stored_params[constants.AVAILABILITY_ZONE] = {}
|
|
|
|
self.run_flow(
|
|
flow_utils.get_failover_LB_flow, amps, provider_lb_dict,
|
|
store=stored_params)
|
|
|
|
LOG.info('Failover of load balancer %s completed successfully.',
|
|
lb.id)
|
|
|
|
except Exception as e:
|
|
with excutils.save_and_reraise_exception(reraise=False):
|
|
LOG.exception("LB %(lbid)s failover exception: %(exc)s",
|
|
{'lbid': load_balancer_id, 'exc': str(e)})
|
|
with session.begin():
|
|
self._lb_repo.update(
|
|
session, load_balancer_id,
|
|
provisioning_status=constants.ERROR)
|
|
|
|
def amphora_cert_rotation(self, amphora_id):
|
|
"""Perform cert rotation for an amphora.
|
|
|
|
:param amphora_id: ID for amphora to rotate
|
|
:returns: None
|
|
:raises AmphoraNotFound: The referenced amphora was not found
|
|
"""
|
|
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
amp = self._amphora_repo.get(session,
|
|
id=amphora_id)
|
|
LOG.info("Start amphora cert rotation, amphora's id is: %s",
|
|
amphora_id)
|
|
|
|
store = {constants.AMPHORA: amp.to_dict(),
|
|
constants.AMPHORA_ID: amphora_id}
|
|
|
|
self.run_flow(
|
|
flow_utils.cert_rotate_amphora_flow,
|
|
store=store)
|
|
LOG.info("Finished amphora cert rotation, amphora's id was: %s",
|
|
amphora_id)
|
|
|
|
def update_amphora_agent_config(self, amphora_id):
|
|
"""Update the amphora agent configuration.
|
|
|
|
Note: This will update the amphora agent configuration file and
|
|
update the running configuration for mutatable configuration
|
|
items.
|
|
|
|
:param amphora_id: ID of the amphora to update.
|
|
:returns: None
|
|
"""
|
|
LOG.info("Start amphora agent configuration update, amphora's id "
|
|
"is: %s", amphora_id)
|
|
session = db_apis.get_session()
|
|
with session.begin():
|
|
amp = self._amphora_repo.get(session, id=amphora_id)
|
|
lb = self._amphora_repo.get_lb_for_amphora(session,
|
|
amphora_id)
|
|
flavor = {}
|
|
if lb.flavor_id:
|
|
flavor = self._flavor_repo.get_flavor_metadata_dict(
|
|
session, lb.flavor_id)
|
|
|
|
store = {constants.AMPHORA: amp.to_dict(),
|
|
constants.FLAVOR: flavor}
|
|
|
|
self.run_flow(
|
|
flow_utils.update_amphora_config_flow,
|
|
store=store)
|
|
LOG.info("Finished amphora agent configuration update, amphora's id "
|
|
"was: %s", amphora_id)
|