Merge "Allow amphorav2 to run without jobboard"

This commit is contained in:
Zuul 2020-08-25 22:47:04 +00:00 committed by Gerrit Code Review
commit 2d97ebcd24
15 changed files with 206 additions and 145 deletions

View File

@ -291,7 +291,7 @@ function octavia_configure {
iniset $OCTAVIA_CONF api_settings api_handler queue_producer
iniset $OCTAVIA_CONF database connection "mysql+pymysql://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:3306/octavia"
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} == True ]]; then
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD} == True ]]; then
iniset $OCTAVIA_CONF task_flow persistence_connection "mysql+pymysql://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:3306/octavia_persistence"
fi
# Configure keystone auth_token for all users
@ -368,7 +368,7 @@ function octavia_configure {
recreate_database_mysql octavia
octavia-db-manage upgrade head
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} == True ]]; then
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD} == True ]]; then
recreate_database_mysql octavia_persistence
octavia-db-manage upgrade_persistence
fi
@ -379,7 +379,7 @@ function octavia_configure {
fi
# amphorav2 required redis installation
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} == True ]]; then
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD} == True ]]; then
install_redis
fi
@ -671,7 +671,7 @@ function octavia_cleanup {
sudo rm -rf $NOVA_STATE_PATH $NOVA_AUTH_CACHE_DIR
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} == True ]]; then
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD} == True ]]; then
uninstall_redis
fi

View File

@ -33,7 +33,7 @@ OCTAVIA_PORT=${OCTAVIA_PORT:-"9876"}
OCTAVIA_HA_PORT=${OCTAVIA_HA_PORT:-"9875"}
OCTAVIA_HM_LISTEN_PORT=${OCTAVIA_HM_LISTEN_PORT:-"5555"}
OCTAVIA_ENABLE_AMPHORAV2_PROVIDER=${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER:-False}
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD=${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD:-False}
OCTAVIA_MGMT_SUBNET=${OCTAVIA_MGMT_SUBNET:-"192.168.0.0/24"}
OCTAVIA_MGMT_SUBNET_START=${OCTAVIA_MGMT_SUBNET_START:-"192.168.0.2"}

View File

@ -3,8 +3,19 @@
Additional configuration steps to configure amphorav2 provider
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If you would like to use amphorav2 provider for load-balancer service the
following additional steps are required.
The amphorav2 provider driver improves control plane resiliency. Should a
control plane host go down during a load balancer provisioning operation, an
alternate controller can resume the in-process provisioning and complete the
request. This solves the issue with resources stuck in PENDING_* states by
writing info about task states in persistent backend and monitoring job claims
via jobboard.
If you would like to use amphorav2 provider with jobboard-based controller
for load-balancer service the following additional steps are required.
This provider driver can also run without jobboard and its dependencies (extra
database, Redis/Zookeeper). This is the default setting while jobboard remains
an experimental feature.
Prerequisites
@ -66,6 +77,7 @@ Additional configuration to octavia components
.. code-block:: ini
[task_flow]
jobboard_enabled = True
jobboard_backend_driver = 'redis_taskflow_driver'
jobboard_backend_hosts = KEYVALUE_HOST_IPS
jobboard_backend_port = KEYVALUE_PORT

View File

@ -355,6 +355,9 @@
# Database connection url with db name (string value)
#persistence_connection = sqlite://
# If True, enables TaskFlow jobboard.
# jobboard_enabled = False
# Jobboard backend driver that will monitor job state. (string value)
# Possible values:
# - redis_taskflow_driver: Driver that will use Redis to store job states.

View File

@ -60,7 +60,7 @@ class BaseTaskFlowEngine(object):
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=CONF.task_flow.max_workers)
def _taskflow_load(self, flow, **kwargs):
def taskflow_load(self, flow, **kwargs):
eng = engines.load(
flow,
engine=CONF.task_flow.engine,

View File

@ -514,6 +514,8 @@ task_flow_opts = [
default='sqlite://',
help='Persistence database, which will be used to store tasks '
'states. Database connection url with db name'),
cfg.BoolOpt('jobboard_enabled', default=False,
help=_('If True, enables TaskFlow jobboard.')),
cfg.StrOpt('jobboard_backend_driver',
default='redis_taskflow_driver',
choices=['redis_taskflow_driver', 'zookeeper_taskflow_driver'],

View File

@ -51,10 +51,11 @@ class ConsumerService(cotyledon.Service):
)
self.message_listener.start()
if constants.AMPHORAV2 in CONF.api_settings.enabled_provider_drivers:
for e in self.endpoints:
e.worker.services_controller.run_conductor(
'octavia-task-flow-conductor-%s' %
uuidutils.generate_uuid())
if CONF.task_flow.jobboard_enabled:
for e in self.endpoints:
e.worker.services_controller.run_conductor(
'octavia-task-flow-conductor-%s' %
uuidutils.generate_uuid())
def terminate(self):
if self.message_listener:

View File

@ -102,7 +102,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), availability_zone))
create_amp_tf = self._taskflow_load(
create_amp_tf = self.taskflow_load(
self._amphora_flows.get_create_amphora_flow(),
store=store)
with tf_logging.DynamicLoggingListener(create_amp_tf, log=LOG):
@ -139,7 +139,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
pool.health_monitor = health_mon
load_balancer = pool.load_balancer
create_hm_tf = self._taskflow_load(
create_hm_tf = self.taskflow_load(
self._health_monitor_flows.get_create_health_monitor_flow(),
store={constants.HEALTH_MON: health_mon,
constants.POOL: pool,
@ -163,7 +163,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners = pool.listeners
load_balancer = pool.load_balancer
delete_hm_tf = self._taskflow_load(
delete_hm_tf = self.taskflow_load(
self._health_monitor_flows.get_delete_health_monitor_flow(),
store={constants.HEALTH_MON: health_mon,
constants.POOL: pool,
@ -198,7 +198,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
pool.health_monitor = health_mon
load_balancer = pool.load_balancer
update_hm_tf = self._taskflow_load(
update_hm_tf = self.taskflow_load(
self._health_monitor_flows.get_update_health_monitor_flow(),
store={constants.HEALTH_MON: health_mon,
constants.POOL: pool,
@ -234,12 +234,12 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
load_balancer = listener.load_balancer
listeners = load_balancer.listeners
create_listener_tf = self._taskflow_load(self._listener_flows.
get_create_listener_flow(),
store={constants.LOADBALANCER:
load_balancer,
constants.LISTENERS:
listeners})
create_listener_tf = self.taskflow_load(self._listener_flows.
get_create_listener_flow(),
store={constants.LOADBALANCER:
load_balancer,
constants.LISTENERS:
listeners})
with tf_logging.DynamicLoggingListener(create_listener_tf,
log=LOG):
create_listener_tf.run()
@ -255,7 +255,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
id=listener_id)
load_balancer = listener.load_balancer
delete_listener_tf = self._taskflow_load(
delete_listener_tf = self.taskflow_load(
self._listener_flows.get_delete_listener_flow(),
store={constants.LOADBALANCER: load_balancer,
constants.LISTENER: listener})
@ -285,16 +285,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
load_balancer = listener.load_balancer
update_listener_tf = self._taskflow_load(self._listener_flows.
get_update_listener_flow(),
store={constants.LISTENER:
listener,
constants.LOADBALANCER:
load_balancer,
constants.UPDATE_DICT:
listener_updates,
constants.LISTENERS:
[listener]})
update_listener_tf = self.taskflow_load(self._listener_flows.
get_update_listener_flow(),
store={constants.LISTENER:
listener,
constants.LOADBALANCER:
load_balancer,
constants.UPDATE_DICT:
listener_updates,
constants.LISTENERS:
[listener]})
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
update_listener_tf.run()
@ -345,7 +345,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
create_lb_flow = self._lb_flows.get_create_load_balancer_flow(
topology=topology, listeners=lb.listeners)
create_lb_tf = self._taskflow_load(create_lb_flow, store=store)
create_lb_tf = self.taskflow_load(create_lb_flow, store=store)
with tf_logging.DynamicLoggingListener(create_lb_tf, log=LOG):
create_lb_tf.run()
@ -366,7 +366,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
(flow, store) = self._lb_flows.get_delete_load_balancer_flow(lb)
store.update({constants.LOADBALANCER: lb,
constants.SERVER_GROUP_ID: lb.server_group_id})
delete_lb_tf = self._taskflow_load(flow, store=store)
delete_lb_tf = self.taskflow_load(flow, store=store)
with tf_logging.DynamicLoggingListener(delete_lb_tf,
log=LOG):
@ -396,7 +396,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
db_apis.get_session(),
load_balancer_id=load_balancer_id)
update_lb_tf = self._taskflow_load(
update_lb_tf = self.taskflow_load(
self._lb_flows.get_update_load_balancer_flow(),
store={constants.LOADBALANCER: lb,
constants.LISTENERS: listeners,
@ -444,7 +444,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else:
store[constants.AVAILABILITY_ZONE] = {}
create_member_tf = self._taskflow_load(
create_member_tf = self.taskflow_load(
self._member_flows.get_create_member_flow(),
store=store)
with tf_logging.DynamicLoggingListener(create_member_tf,
@ -476,7 +476,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else:
store[constants.AVAILABILITY_ZONE] = {}
delete_member_tf = self._taskflow_load(
delete_member_tf = self.taskflow_load(
self._member_flows.get_delete_member_flow(),
store=store
)
@ -527,7 +527,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else:
store[constants.AVAILABILITY_ZONE] = {}
batch_update_members_tf = self._taskflow_load(
batch_update_members_tf = self.taskflow_load(
self._member_flows.get_batch_update_members_flow(
old_members, new_members, updated_members),
store=store)
@ -571,7 +571,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else:
store[constants.AVAILABILITY_ZONE] = {}
update_member_tf = self._taskflow_load(
update_member_tf = self.taskflow_load(
self._member_flows.get_update_member_flow(),
store=store)
with tf_logging.DynamicLoggingListener(update_member_tf,
@ -603,13 +603,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners = pool.listeners
load_balancer = pool.load_balancer
create_pool_tf = self._taskflow_load(self._pool_flows.
get_create_pool_flow(),
store={constants.POOL: pool,
constants.LISTENERS:
listeners,
constants.LOADBALANCER:
load_balancer})
create_pool_tf = self.taskflow_load(self._pool_flows.
get_create_pool_flow(),
store={constants.POOL: pool,
constants.LISTENERS:
listeners,
constants.LOADBALANCER:
load_balancer})
with tf_logging.DynamicLoggingListener(create_pool_tf,
log=LOG):
create_pool_tf.run()
@ -627,7 +627,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
load_balancer = pool.load_balancer
listeners = pool.listeners
delete_pool_tf = self._taskflow_load(
delete_pool_tf = self.taskflow_load(
self._pool_flows.get_delete_pool_flow(),
store={constants.POOL: pool, constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer})
@ -658,15 +658,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners = pool.listeners
load_balancer = pool.load_balancer
update_pool_tf = self._taskflow_load(self._pool_flows.
get_update_pool_flow(),
store={constants.POOL: pool,
constants.LISTENERS:
listeners,
constants.LOADBALANCER:
load_balancer,
constants.UPDATE_DICT:
pool_updates})
update_pool_tf = self.taskflow_load(self._pool_flows.
get_update_pool_flow(),
store={constants.POOL: pool,
constants.LISTENERS:
listeners,
constants.LOADBALANCER:
load_balancer,
constants.UPDATE_DICT:
pool_updates})
with tf_logging.DynamicLoggingListener(update_pool_tf,
log=LOG):
update_pool_tf.run()
@ -696,7 +696,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
create_l7policy_tf = self._taskflow_load(
create_l7policy_tf = self.taskflow_load(
self._l7policy_flows.get_create_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
@ -718,7 +718,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
load_balancer = l7policy.listener.load_balancer
listeners = [l7policy.listener]
delete_l7policy_tf = self._taskflow_load(
delete_l7policy_tf = self.taskflow_load(
self._l7policy_flows.get_delete_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
@ -750,7 +750,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
update_l7policy_tf = self._taskflow_load(
update_l7policy_tf = self.taskflow_load(
self._l7policy_flows.get_update_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
@ -786,7 +786,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
create_l7rule_tf = self._taskflow_load(
create_l7rule_tf = self.taskflow_load(
self._l7rule_flows.get_create_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.L7POLICY: l7policy,
@ -809,7 +809,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
load_balancer = l7policy.listener.load_balancer
listeners = [l7policy.listener]
delete_l7rule_tf = self._taskflow_load(
delete_l7rule_tf = self.taskflow_load(
self._l7rule_flows.get_delete_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.L7POLICY: l7policy,
@ -843,7 +843,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
update_l7rule_tf = self._taskflow_load(
update_l7rule_tf = self.taskflow_load(
self._l7rule_flows.get_update_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.L7POLICY: l7policy,
@ -933,8 +933,8 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
constants.LOADBALANCER_ID: lb_id,
constants.VIP: vip}
failover_amphora_tf = self._taskflow_load(amp_failover_flow,
store=stored_params)
failover_amphora_tf = self.taskflow_load(amp_failover_flow,
store=stored_params)
with tf_logging.DynamicLoggingListener(failover_amphora_tf,
log=LOG):
@ -1084,8 +1084,8 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else:
stored_params[constants.AVAILABILITY_ZONE] = {}
failover_lb_tf = self._taskflow_load(lb_failover_flow,
store=stored_params)
failover_lb_tf = self.taskflow_load(lb_failover_flow,
store=stored_params)
with tf_logging.DynamicLoggingListener(failover_lb_tf, log=LOG):
failover_lb_tf.run()
@ -1112,7 +1112,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
id=amphora_id)
LOG.info("Start amphora cert rotation, amphora's id is: %s", amp.id)
certrotation_amphora_tf = self._taskflow_load(
certrotation_amphora_tf = self.taskflow_load(
self._amphora_flows.cert_rotate_amphora_flow(),
store={constants.AMPHORA: amp,
constants.AMPHORA_ID: amp.id})
@ -1141,7 +1141,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
flavor = self._flavor_repo.get_flavor_metadata_dict(
db_apis.get_session(), lb.flavor_id)
update_amphora_tf = self._taskflow_load(
update_amphora_tf = self.taskflow_load(
self._amphora_flows.update_amphora_config_flow(),
store={constants.AMPHORA: amp,
constants.FLAVOR: flavor})

View File

@ -18,8 +18,10 @@ from oslo_log import log as logging
from oslo_utils import excutils
from sqlalchemy.orm import exc as db_exceptions
from stevedore import driver as stevedore_driver
from taskflow.listeners import logging as tf_logging
import tenacity
from octavia.amphorae.driver_exceptions import exceptions
from octavia.api.drivers import utils as provider_utils
from octavia.common import base_taskflow
from octavia.common import constants
@ -37,6 +39,18 @@ RETRY_BACKOFF = 1
RETRY_MAX = 5
# We do not need to log retry exception information. Warning "Could not connect
# to instance" will be logged as usual.
def retryMaskFilter(record):
if record.exc_info is not None and isinstance(
record.exc_info[1], exceptions.AmpConnectionRetry):
return False
return True
LOG.logger.addFilter(retryMaskFilter)
def _is_provisioning_status_pending_update(lb_obj):
return not lb_obj.provisioning_status == constants.PENDING_UPDATE
@ -57,13 +71,16 @@ class ControllerWorker(object):
self._flavor_repo = repo.FlavorRepository()
self._az_repo = repo.AvailabilityZoneRepository()
persistence = tsk_driver.MysqlPersistenceDriver()
if CONF.task_flow.jobboard_enabled:
persistence = tsk_driver.MysqlPersistenceDriver()
self.jobboard_driver = stevedore_driver.DriverManager(
namespace='octavia.worker.jobboard_driver',
name=CONF.task_flow.jobboard_backend_driver,
invoke_args=(persistence,),
invoke_on_load=True).driver
self.jobboard_driver = stevedore_driver.DriverManager(
namespace='octavia.worker.jobboard_driver',
name=CONF.task_flow.jobboard_backend_driver,
invoke_args=(persistence,),
invoke_on_load=True).driver
else:
self.tf_engine = base_taskflow.BaseTaskFlowEngine()
@tenacity.retry(
retry=(
@ -80,6 +97,15 @@ class ControllerWorker(object):
def services_controller(self):
return base_taskflow.TaskFlowServiceController(self.jobboard_driver)
def run_flow(self, func, *args, **kwargs):
if CONF.task_flow.jobboard_enabled:
self.services_controller.run_poster(func, *args, **kwargs)
else:
tf = self.tf_engine.taskflow_load(
func(*args), **kwargs)
with tf_logging.DynamicLoggingListener(tf, log=LOG):
tf.run()
def create_amphora(self, availability_zone=None):
"""Creates an Amphora.
@ -96,11 +122,9 @@ class ControllerWorker(object):
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), availability_zone))
job_id = self.services_controller.run_poster(
self.run_flow(
flow_utils.get_create_amphora_flow,
store=store, wait=True)
return job_id
except Exception as e:
LOG.error('Failed to create an amphora due to: {}'.format(str(e)))
@ -114,7 +138,7 @@ class ControllerWorker(object):
amphora = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
store = {constants.AMPHORA: amphora.to_dict()}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_delete_amphora_flow,
store=store)
@ -149,7 +173,7 @@ class ControllerWorker(object):
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_create_health_monitor_flow,
store=store)
@ -179,7 +203,7 @@ class ControllerWorker(object):
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb,
constants.PROJECT_ID: load_balancer.project_id}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_delete_health_monitor_flow,
store=store)
@ -220,7 +244,7 @@ class ControllerWorker(object):
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb,
constants.UPDATE_DICT: health_monitor_updates}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_update_health_monitor_flow,
store=store)
@ -257,7 +281,7 @@ class ControllerWorker(object):
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_create_listener_flow,
store=store)
@ -276,7 +300,7 @@ class ControllerWorker(object):
constants.LOADBALANCER_ID:
listener[constants.LOADBALANCER_ID],
constants.PROJECT_ID: lb.project_id}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_delete_listener_flow,
store=store)
@ -294,7 +318,7 @@ class ControllerWorker(object):
constants.UPDATE_DICT: listener_updates,
constants.LOADBALANCER_ID: db_lb.id,
constants.LISTENERS: [listener]}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_update_listener_flow,
store=store)
@ -341,7 +365,7 @@ class ControllerWorker(object):
store[constants.UPDATE_DICT] = {
constants.TOPOLOGY: topology
}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_create_load_balancer_flow,
topology, listeners=listeners_dicts,
store=store)
@ -361,11 +385,11 @@ class ControllerWorker(object):
if cascade:
store.update(flow_utils.get_delete_pools_store(db_lb))
store.update(flow_utils.get_delete_listeners_store(db_lb))
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_cascade_delete_load_balancer_flow,
load_balancer, store=store)
else:
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_delete_load_balancer_flow,
load_balancer, store=store)
@ -383,7 +407,7 @@ class ControllerWorker(object):
original_load_balancer[constants.LOADBALANCER_ID],
constants.UPDATE_DICT: load_balancer_updates}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_update_load_balancer_flow,
store=store)
@ -417,7 +441,7 @@ class ControllerWorker(object):
else:
store[constants.AVAILABILITY_ZONE] = {}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_create_member_flow,
store=store)
@ -453,7 +477,7 @@ class ControllerWorker(object):
else:
store[constants.AVAILABILITY_ZONE] = {}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_delete_member_flow,
store=store)
@ -501,7 +525,7 @@ class ControllerWorker(object):
else:
store[constants.AVAILABILITY_ZONE] = {}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_batch_update_members_flow,
provider_old_members, new_members, updated_members,
store=store)
@ -539,7 +563,7 @@ class ControllerWorker(object):
else:
store[constants.AVAILABILITY_ZONE] = {}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_update_member_flow,
store=store)
@ -577,7 +601,7 @@ class ControllerWorker(object):
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_create_pool_flow,
store=store)
@ -604,7 +628,7 @@ class ControllerWorker(object):
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id,
constants.PROJECT_ID: db_pool.project_id}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_delete_pool_flow,
store=store)
@ -640,7 +664,7 @@ class ControllerWorker(object):
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id,
constants.UPDATE_DICT: pool_updates}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_update_pool_flow,
store=store)
@ -662,7 +686,7 @@ class ControllerWorker(object):
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: db_listener.load_balancer.id
}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_create_l7policy_flow,
store=store)
@ -683,7 +707,7 @@ class ControllerWorker(object):
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: db_listener.load_balancer.id
}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_delete_l7policy_flow,
store=store)
@ -706,7 +730,7 @@ class ControllerWorker(object):
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: db_listener.load_balancer.id,
constants.UPDATE_DICT: l7policy_updates}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_update_l7policy_flow,
store=store)
@ -734,7 +758,7 @@ class ControllerWorker(object):
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id
}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_create_l7rule_flow,
store=store)
@ -760,7 +784,7 @@ class ControllerWorker(object):
constants.L7POLICY_ID: db_l7policy.id,
constants.LOADBALANCER_ID: load_balancer.id
}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_delete_l7rule_flow,
store=store)
@ -788,7 +812,7 @@ class ControllerWorker(object):
constants.L7POLICY_ID: db_l7policy.id,
constants.LOADBALANCER_ID: load_balancer.id,
constants.UPDATE_DICT: l7rule_updates}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_update_l7rule_flow,
store=store)
@ -855,7 +879,7 @@ class ControllerWorker(object):
else:
stored_params[constants.AVAILABILITY_ZONE] = {}
self.services_controller.run_poster(
self.run_flow(
flow_utils.get_failover_flow,
role=amp.role, load_balancer=provider_lb,
store=stored_params, wait=True)
@ -958,7 +982,7 @@ class ControllerWorker(object):
store = {constants.AMPHORA: amp.to_dict(),
constants.AMPHORA_ID: amphora_id}
self.services_controller.run_poster(
self.run_flow(
flow_utils.cert_rotate_amphora_flow,
store=store)
@ -985,6 +1009,6 @@ class ControllerWorker(object):
store = {constants.AMPHORA: amp.to_dict(),
constants.FLAVOR: flavor}
self.services_controller.run_poster(
self.run_flow(
flow_utils.update_amphora_config_flow,
store=store)

View File

@ -54,9 +54,9 @@ class TestBaseTaskFlowEngine(base.TestCase):
concurrent.futures.ThreadPoolExecutor.assert_called_once_with(
max_workers=MAX_WORKERS)
# Test _taskflow_load
# Test taskflow_load
base_taskflow_engine._taskflow_load('TEST')
base_taskflow_engine.taskflow_load('TEST')
tf_engines.load.assert_called_once_with(
'TEST',

View File

@ -91,7 +91,7 @@ class TestException(Exception):
return_value=_member_mock)
@mock.patch('octavia.db.repositories.PoolRepository.get',
return_value=_pool_mock)
@mock.patch('octavia.common.base_taskflow.BaseTaskFlowEngine._taskflow_load',
@mock.patch('octavia.common.base_taskflow.BaseTaskFlowEngine.taskflow_load',
return_value=_flow_mock)
@mock.patch('taskflow.listeners.logging.DynamicLoggingListener')
@mock.patch('octavia.db.api.get_session', return_value=_db_session)
@ -142,7 +142,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
amp = cw.create_amphora()
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
'TEST',
store={constants.BUILD_TYPE_PRIORITY:
@ -184,7 +184,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
amp = cw.create_amphora(availability_zone=az)
mock_get_az_metadata.assert_called_once_with(_db_session, az)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
'TEST',
store={constants.BUILD_TYPE_PRIORITY:
@ -223,7 +223,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.create_health_monitor(_health_mon_mock)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.HEALTH_MON:
_health_mon_mock,
@ -260,7 +260,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.delete_health_monitor(HM_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.HEALTH_MON:
_health_mon_mock,
@ -298,7 +298,7 @@ class TestControllerWorker(base.TestCase):
cw.update_health_monitor(_health_mon_mock.id,
HEALTH_UPDATE_DICT)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.HEALTH_MON:
_health_mon_mock,
@ -336,7 +336,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.create_listener(LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.LOADBALANCER:
_load_balancer_mock,
@ -368,7 +368,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.delete_listener(LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
_flow_mock, store={constants.LISTENER: _listener_mock,
constants.LOADBALANCER: _load_balancer_mock}))
@ -398,7 +398,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.update_listener(LB_ID, LISTENER_UPDATE_DICT)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.LISTENER: _listener_mock,
constants.LOADBALANCER:
@ -658,7 +658,7 @@ class TestControllerWorker(base.TestCase):
_db_session,
id=LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.LOADBALANCER:
_load_balancer_mock,
@ -696,7 +696,7 @@ class TestControllerWorker(base.TestCase):
_db_session,
id=LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.LOADBALANCER:
_load_balancer_mock,
@ -739,7 +739,7 @@ class TestControllerWorker(base.TestCase):
_db_session,
id=LB_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.UPDATE_DICT: change,
constants.LOADBALANCER:
@ -776,7 +776,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.create_member(MEMBER_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
_flow_mock,
store={constants.MEMBER: _member_mock,
@ -813,7 +813,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.delete_member(MEMBER_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
_flow_mock, store={constants.MEMBER: _member_mock,
constants.LISTENERS:
@ -852,7 +852,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.update_member(MEMBER_ID, MEMBER_UPDATE_DICT)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.MEMBER: _member_mock,
constants.LISTENERS:
@ -894,7 +894,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.batch_update_members([9], [11], [MEMBER_UPDATE_DICT])
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={
constants.LISTENERS: [_listener_mock],
@ -929,7 +929,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.create_pool(POOL_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.POOL: _pool_mock,
constants.LISTENERS:
@ -962,7 +962,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.delete_pool(POOL_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.POOL: _pool_mock,
constants.LISTENERS:
@ -995,7 +995,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.update_pool(POOL_ID, POOL_UPDATE_DICT)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.POOL: _pool_mock,
constants.LISTENERS:
@ -1030,7 +1030,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.create_l7policy(L7POLICY_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
constants.LISTENERS:
@ -1063,7 +1063,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.delete_l7policy(L7POLICY_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
constants.LISTENERS:
@ -1096,7 +1096,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.update_l7policy(L7POLICY_ID, L7POLICY_UPDATE_DICT)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
constants.LISTENERS:
@ -1131,7 +1131,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.create_l7rule(L7RULE_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7RULE: _l7rule_mock,
constants.L7POLICY: _l7policy_mock,
@ -1165,7 +1165,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.delete_l7rule(L7RULE_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7RULE: _l7rule_mock,
constants.L7POLICY: _l7policy_mock,
@ -1199,7 +1199,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.update_l7rule(L7RULE_ID, L7RULE_UPDATE_DICT)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7RULE: _l7rule_mock,
constants.L7POLICY: _l7policy_mock,
@ -1242,7 +1242,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
_flow_mock,
store={constants.FLAVOR: {'loadbalancer_topology':
@ -1297,7 +1297,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
_flow_mock,
store={constants.FLAVOR: {'loadbalancer_topology':
@ -1352,7 +1352,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
_flow_mock,
store={constants.FLAVOR: {'loadbalancer_topology':
@ -1407,7 +1407,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
_flow_mock,
store={constants.FLAVOR: {'loadbalancer_topology':
@ -1465,7 +1465,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
_flow_mock,
store={constants.FLAVOR: {'loadbalancer_topology':
@ -1567,7 +1567,7 @@ class TestControllerWorker(base.TestCase):
cw.failover_amphora(AMP_ID)
mock_get_failover_amp_flow.assert_called_once_with(mock_amphora, None)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(FAKE_FLOW, store=expected_stored_params))
_flow_mock.run.assert_called_once_with()
@ -1604,7 +1604,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
_flow_mock,
store={constants.LOADBALANCER: None,
@ -2065,7 +2065,7 @@ class TestControllerWorker(base.TestCase):
cw = controller_worker.ControllerWorker()
cw.failover_amphora(AMP_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
_flow_mock,
store={constants.LOADBALANCER_ID: _load_balancer_mock.id,
@ -2101,7 +2101,7 @@ class TestControllerWorker(base.TestCase):
_flow_mock.reset_mock()
cw = controller_worker.ControllerWorker()
cw.amphora_cert_rotation(AMP_ID)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.AMPHORA: _amphora_mock,
constants.AMPHORA_ID:
@ -2140,7 +2140,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get.assert_called_once_with(_db_session, id=AMP_ID)
mock_get_lb_for_amp.assert_called_once_with(_db_session, AMP_ID)
mock_flavor_meta.assert_called_once_with(_db_session, 'vanilla')
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.AMPHORA: _amphora_mock,
constants.FLAVOR: {'test': 'dict'}}))
@ -2151,13 +2151,13 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get.reset_mock()
mock_get_lb_for_amp.reset_mock()
mock_flavor_meta.reset_mock()
base_taskflow.BaseTaskFlowEngine._taskflow_load.reset_mock()
base_taskflow.BaseTaskFlowEngine.taskflow_load.reset_mock()
mock_lb.flavor_id = None
cw.update_amphora_agent_config(AMP_ID)
mock_amp_repo_get.assert_called_once_with(_db_session, id=AMP_ID)
mock_get_lb_for_amp.assert_called_once_with(_db_session, AMP_ID)
mock_flavor_meta.assert_not_called()
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.AMPHORA: _amphora_mock,
constants.FLAVOR: {}}))

View File

@ -116,6 +116,7 @@ class TestControllerWorker(base.TestCase):
def setUp(self):
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
self.conf.config(group="task_flow", jobboard_enabled=True)
_db_pool_mock.listeners = [_listener_mock]
_db_pool_mock.load_balancer = _db_load_balancer_mock

View File

@ -0,0 +1,8 @@
---
features:
- |
Added a new configuration setting (``[task_flow]/jobboard_enabled``) to
enable/disable jobboard functionality in the amphorav2 provider. When
disabled, the amphorav2 provider behaves similarly to the amphora v1
provider and does not require extra dependencies. The default setting is
jobboard disabled while jobboard remains an experimental feature.

View File

@ -78,7 +78,7 @@
parent: octavia-v2-dsvm-scenario
vars:
devstack_localrc:
OCTAVIA_ENABLE_AMPHORAV2_PROVIDER: True
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD: True
devstack_local_conf:
post-config:
$OCTAVIA_CONF:
@ -93,6 +93,13 @@
enabled_provider_drivers: amphorav2:The v2 amphora driver.
provider: amphorav2
- job:
name: octavia-v2-dsvm-scenario-amphora-v2-no-jobboard
parent: octavia-v2-dsvm-scenario-amphora-v2
vars:
devstack_localrc:
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD: False
- project-template:
name: octavia-tox-tips
check:

View File

@ -58,6 +58,9 @@
- octavia-v2-dsvm-scenario-amphora-v2:
irrelevant-files: *irrelevant-files
voting: false
- octavia-v2-dsvm-scenario-amphora-v2-no-jobboard:
irrelevant-files: *irrelevant-files
voting: false
- octavia-v2-dsvm-scenario-centos-8:
irrelevant-files: *irrelevant-files
voting: false