Fix multi-listener LB with missing certificate

This patch allows listeners on a load balancer to continue to
operate should one listener fail to access secret content in
barbican. Previously if one listener failed to access barbican
content, all of the listeners would be impacted.
This patch also cleans up some unused code and unnecessary comments.

Change-Id: I300839fe7cf88763e1e0b8c484029662beb64f0a
Story: 2006676
Task: 36951
This commit is contained in:
Michael Johnson 2019-10-30 11:46:41 -07:00
parent 85f7abbbaf
commit df5db0b000
10 changed files with 174 additions and 204 deletions

View File

@ -37,6 +37,9 @@ import octavia.common.jinja.haproxy.split_listeners.jinja_cfg as jinja_split
from octavia.common.jinja.lvs import jinja_cfg as jinja_udp_cfg
from octavia.common.tls_utils import cert_parser
from octavia.common import utils
from octavia.db import api as db_apis
from octavia.db import repositories as repo
LOG = logging.getLogger(__name__)
API_VERSION = consts.API_VERSION
@ -145,6 +148,11 @@ class HaproxyAmphoraLoadBalancerDriver(
'process mode.', amphora.id, loadbalancer.id)
has_tcp = False
certs = {}
client_ca_filename = None
crl_filename = None
pool_tls_certs = dict()
listeners_to_update = []
for listener in loadbalancer.listeners:
LOG.debug("%s updating listener %s on amphora %s",
self.__class__.__name__, listener.id, amphora.id)
@ -162,42 +170,63 @@ class HaproxyAmphoraLoadBalancerDriver(
else:
obj_id = loadbalancer.id
self._process_tls_certificates(listener, amphora, obj_id)
try:
certs.update({
listener.tls_certificate_id:
self._process_tls_certificates(
listener, amphora, obj_id)['tls_cert']})
client_ca_filename = self._process_secret(
listener, listener.client_ca_tls_certificate_id,
amphora, obj_id)
crl_filename = self._process_secret(
listener, listener.client_crl_container_id,
amphora, obj_id)
pool_tls_certs = self._process_listener_pool_certs(
listener, amphora, obj_id)
client_ca_filename = self._process_secret(
listener, listener.client_ca_tls_certificate_id,
amphora, obj_id)
crl_filename = self._process_secret(
listener, listener.client_crl_container_id,
amphora, obj_id)
pool_tls_certs = self._process_listener_pool_certs(
listener, amphora, obj_id)
if split_config:
config = self.jinja_split.build_config(
host_amphora=amphora, listener=listener,
haproxy_versions=haproxy_versions,
client_ca_filename=client_ca_filename,
client_crl=crl_filename,
pool_tls_certs=pool_tls_certs)
self.clients[amphora.api_version].upload_config(
amphora, listener.id, config,
timeout_dict=timeout_dict)
self.clients[amphora.api_version].reload_listener(
amphora, listener.id, timeout_dict=timeout_dict)
if split_config:
config = self.jinja_split.build_config(
host_amphora=amphora, listener=listener,
haproxy_versions=haproxy_versions,
client_ca_filename=client_ca_filename,
client_crl=crl_filename,
pool_tls_certs=pool_tls_certs)
self.clients[amphora.api_version].upload_config(
amphora, listener.id, config,
timeout_dict=timeout_dict)
self.clients[amphora.api_version].reload_listener(
amphora, listener.id, timeout_dict=timeout_dict)
else:
listeners_to_update.append(listener)
except Exception as e:
LOG.error('Unable to update listener {0} due to "{1}". '
'Skipping this listener.'.format(
listener.id, str(e)))
listener_repo = repo.ListenerRepository()
listener_repo.update(db_apis.get_session(), listener.id,
provisioning_status=consts.ERROR,
operating_status=consts.ERROR)
if has_tcp and not split_config:
# Generate HaProxy configuration from listener object
config = self.jinja_combo.build_config(
host_amphora=amphora, listeners=loadbalancer.listeners,
haproxy_versions=haproxy_versions,
client_ca_filename=client_ca_filename,
client_crl=crl_filename,
pool_tls_certs=pool_tls_certs)
self.clients[amphora.api_version].upload_config(
amphora, loadbalancer.id, config, timeout_dict=timeout_dict)
self.clients[amphora.api_version].reload_listener(
amphora, loadbalancer.id, timeout_dict=timeout_dict)
if listeners_to_update:
# Generate HaProxy configuration from listener object
config = self.jinja_combo.build_config(
host_amphora=amphora, listeners=listeners_to_update,
haproxy_versions=haproxy_versions,
client_ca_filename=client_ca_filename,
client_crl=crl_filename,
pool_tls_certs=pool_tls_certs)
self.clients[amphora.api_version].upload_config(
amphora, loadbalancer.id, config,
timeout_dict=timeout_dict)
self.clients[amphora.api_version].reload_listener(
amphora, loadbalancer.id, timeout_dict=timeout_dict)
else:
# If we aren't updating any listeners, make sure there are
# no listeners hanging around. For example if this update
# was called from a listener delete.
self.clients[amphora.api_version].delete_listener(
amphora, loadbalancer.id)
def _udp_update(self, listener, vip):
LOG.debug("Amphora %s keepalivedlvs, updating "

View File

@ -258,20 +258,26 @@ class UpdateHealthDb(update_base.HealthUpdateBase):
potential_offline_pools = {}
# We got a heartbeat so lb is healthy until proven otherwise
if db_lb['enabled'] is False:
if db_lb[constants.ENABLED] is False:
lb_status = constants.OFFLINE
else:
lb_status = constants.ONLINE
health_msg_version = health.get('ver', 0)
for listener_id in db_lb.get('listeners', {}):
db_op_status = db_lb['listeners'][listener_id]['operating_status']
for listener_id in db_lb.get(constants.LISTENERS, {}):
db_listener = db_lb[constants.LISTENERS][listener_id]
db_op_status = db_listener[constants.OPERATING_STATUS]
listener_status = None
listener = None
if listener_id not in listeners:
listener_status = constants.OFFLINE
if (db_listener[constants.ENABLED] and
db_lb[constants.PROVISIONING_STATUS] ==
constants.ACTIVE):
listener_status = constants.ERROR
else:
listener_status = constants.OFFLINE
else:
listener = listeners[listener_id]

View File

@ -1176,14 +1176,14 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
"""
LOG.debug("Mark ACTIVE in DB for load balancer id: %s "
"and listener ids: %s", loadbalancer.id,
"and updating status for listener ids: %s", loadbalancer.id,
', '.join([l.id for l in listeners]))
self.loadbalancer_repo.update(db_apis.get_session(),
loadbalancer.id,
provisioning_status=constants.ACTIVE)
for listener in listeners:
self.listener_repo.update(db_apis.get_session(), listener.id,
provisioning_status=constants.ACTIVE)
self.listener_repo.prov_status_active_if_not_error(
db_apis.get_session(), listener.id)
def revert(self, loadbalancer, listeners, *args, **kwargs):
"""Mark the load balancer and listeners as broken.
@ -1202,35 +1202,6 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
self.task_utils.mark_listener_prov_status_error(listener.id)
class MarkListenerActiveInDB(BaseDatabaseTask):
"""Mark the listener active in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, listener):
"""Mark the listener as active in DB
:param listener: The listener to be marked active
:returns: None
"""
LOG.debug("Mark ACTIVE in DB for listener id: %s ", listener.id)
self.listener_repo.update(db_apis.get_session(), listener.id,
provisioning_status=constants.ACTIVE)
def revert(self, listener, *args, **kwargs):
"""Mark the listener ERROR since the delete couldn't happen
:param listener: The listener that couldn't be updated
:returns: None
"""
LOG.warning("Reverting mark listener active in DB "
"for listener id %s", listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
class MarkListenerDeletedInDB(BaseDatabaseTask):
"""Mark the listener deleted in the DB.

View File

@ -1182,8 +1182,8 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
loadbalancer.id,
provisioning_status=constants.ACTIVE)
for listener in listeners:
self.listener_repo.update(db_apis.get_session(), listener.id,
provisioning_status=constants.ACTIVE)
self.listener_repo.prov_status_active_if_not_error(
db_apis.get_session(), listener.id)
def revert(self, loadbalancer, listeners, *args, **kwargs):
"""Mark the load balancer and listeners as broken.
@ -1202,35 +1202,6 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
self.task_utils.mark_listener_prov_status_error(listener.id)
class MarkListenerActiveInDB(BaseDatabaseTask):
"""Mark the listener active in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, listener):
"""Mark the listener as active in DB
:param listener: The listener to be marked active
:returns: None
"""
LOG.debug("Mark ACTIVE in DB for listener id: %s ", listener.id)
self.listener_repo.update(db_apis.get_session(), listener.id,
provisioning_status=constants.ACTIVE)
def revert(self, listener, *args, **kwargs):
"""Mark the listener ERROR since the delete couldn't happen
:param listener: The listener that couldn't be updated
:returns: None
"""
LOG.warning("Reverting mark listener active in DB "
"for listener id %s", listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
class MarkListenerDeletedInDB(BaseDatabaseTask):
"""Mark the listener deleted in the DB.

View File

@ -1105,6 +1105,16 @@ class ListenerRepository(BaseRepository):
session.add(model)
return model.to_data_model()
def prov_status_active_if_not_error(self, session, listener_id):
"""Update provisioning_status to ACTIVE if not already in ERROR."""
with session.begin(subtransactions=True):
(session.query(self.model_class).filter_by(id=listener_id).
# Don't mark ERROR or already ACTIVE as ACTIVE
filter(~self.model_class.provisioning_status.in_(
[consts.ERROR, consts.ACTIVE])).
update({self.model_class.provisioning_status: consts.ACTIVE},
synchronize_session='fetch'))
class ListenerStatisticsRepository(BaseRepository):
model_class = models.ListenerStatistics

View File

@ -2234,15 +2234,16 @@ class TestListenerRepositoryTest(BaseRepositoryTest):
operating_status=constants.ONLINE, enabled=True,
server_group_id=self.FAKE_UUID_1)
def create_listener(self, listener_id, port, default_pool_id=None):
def create_listener(self, listener_id, port, default_pool_id=None,
provisioning_status=constants.ACTIVE):
listener = self.listener_repo.create(
self.session, id=listener_id, project_id=self.FAKE_UUID_2,
name="listener_name", description="listener_description",
protocol=constants.PROTOCOL_HTTP, protocol_port=port,
connection_limit=1, load_balancer_id=self.load_balancer.id,
default_pool_id=default_pool_id, operating_status=constants.ONLINE,
provisioning_status=constants.ACTIVE, enabled=True, peer_port=1025,
tags=['test_tag'])
provisioning_status=provisioning_status, enabled=True,
peer_port=1025, tags=['test_tag'])
return listener
def create_amphora(self, amphora_id, loadbalancer_id):
@ -2469,6 +2470,40 @@ class TestListenerRepositoryTest(BaseRepositoryTest):
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertIsNone(new_listener.default_pool)
def test_prov_status_active_if_not_error_active(self):
listener = self.create_listener(self.FAKE_UUID_1, 80,
provisioning_status=constants.ACTIVE)
self.listener_repo.prov_status_active_if_not_error(self.session,
listener.id)
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertEqual(constants.ACTIVE, new_listener.provisioning_status)
def test_prov_status_active_if_not_error_error(self):
listener = self.create_listener(self.FAKE_UUID_1, 80,
provisioning_status=constants.ERROR)
self.listener_repo.prov_status_active_if_not_error(self.session,
listener.id)
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertEqual(constants.ERROR, new_listener.provisioning_status)
def test_prov_status_active_if_not_error_pending_update(self):
listener = self.create_listener(
self.FAKE_UUID_1, 80, provisioning_status=constants.PENDING_UPDATE)
self.listener_repo.prov_status_active_if_not_error(self.session,
listener.id)
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertEqual(constants.ACTIVE, new_listener.provisioning_status)
def test_prov_status_active_if_not_error_bogus_listener(self):
listener = self.create_listener(
self.FAKE_UUID_1, 80, provisioning_status=constants.PENDING_UPDATE)
# Should not raise an exception nor change any status
self.listener_repo.prov_status_active_if_not_error(self.session,
'bogus_id')
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertEqual(constants.PENDING_UPDATE,
new_listener.provisioning_status)
class ListenerStatisticsRepositoryTest(BaseRepositoryTest):

View File

@ -123,16 +123,6 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
mock_amphora = mock.MagicMock()
mock_amphora.id = 'mock_amphora_id'
mock_amphora.api_version = API_VERSION
# mock_listener = mock.MagicMock()
# mock_listener.id = 'mock_listener_id'
# mock_listener.protocol = constants.PROTOCOL_HTTP
# mock_listener.connection_limit = constants.DEFAULT_CONNECTION_LIMIT
# mock_listener.tls_certificate_id = None
# mock_loadbalancer = mock.MagicMock()
# mock_loadbalancer.id = 'mock_lb_id'
# mock_loadbalancer.project_id = 'mock_lb_project'
# mock_loadbalancer.listeners = [mock_listener]
# mock_listener.load_balancer = mock_loadbalancer
mock_secret.return_value = 'filename.pem'
mock_load_cert.return_value = {
'tls_cert': self.sl.default_tls_container, 'sni_certs': [],
@ -168,6 +158,26 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.driver.clients[API_VERSION].upload_config.assert_not_called()
self.driver.clients[API_VERSION].reload_listener.assert_not_called()
@mock.patch('octavia.db.api.get_session')
@mock.patch('octavia.db.repositories.ListenerRepository.update')
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
def test_update_amphora_listeners_bad_cert(
self, mock_load_cert, mock_list_update, mock_get_session):
mock_amphora = mock.MagicMock()
mock_amphora.id = 'mock_amphora_id'
mock_amphora.api_version = API_VERSION
mock_get_session.return_value = 'fake_session'
mock_load_cert.side_effect = [Exception]
self.driver.update_amphora_listeners(self.lb,
mock_amphora, self.timeout_dict)
mock_list_update.assert_called_once_with(
'fake_session', self.lb.listeners[0].id,
provisioning_status=constants.ERROR,
operating_status=constants.ERROR)
self.driver.jinja_split.build_config.assert_not_called()
self.driver.clients[API_VERSION].delete_listener.assert_not_called()
@mock.patch('octavia.amphorae.drivers.haproxy.rest_api_driver.'
'HaproxyAmphoraLoadBalancerDriver._process_secret')
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')

View File

@ -123,16 +123,6 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
mock_amphora = mock.MagicMock()
mock_amphora.id = 'mock_amphora_id'
mock_amphora.api_version = API_VERSION
# mock_listener = mock.MagicMock()
# mock_listener.id = 'mock_listener_id'
# mock_listener.protocol = constants.PROTOCOL_HTTP
# mock_listener.connection_limit = constants.DEFAULT_CONNECTION_LIMIT
# mock_listener.tls_certificate_id = None
# mock_loadbalancer = mock.MagicMock()
# mock_loadbalancer.id = 'mock_lb_id'
# mock_loadbalancer.project_id = 'mock_lb_project'
# mock_loadbalancer.listeners = [mock_listener]
# mock_listener.load_balancer = mock_loadbalancer
mock_secret.return_value = 'filename.pem'
mock_load_cert.return_value = {
'tls_cert': self.sl.default_tls_container, 'sni_certs': [],
@ -168,6 +158,27 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.driver.clients[API_VERSION].upload_config.assert_not_called()
self.driver.clients[API_VERSION].reload_listener.assert_not_called()
@mock.patch('octavia.db.api.get_session')
@mock.patch('octavia.db.repositories.ListenerRepository.update')
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
def test_update_amphora_listeners_bad_cert(
self, mock_load_cert, mock_list_update, mock_get_session):
mock_amphora = mock.MagicMock()
mock_amphora.id = 'mock_amphora_id'
mock_amphora.api_version = API_VERSION
mock_get_session.return_value = 'fake_session'
mock_load_cert.side_effect = [Exception]
self.driver.update_amphora_listeners(self.lb,
mock_amphora, self.timeout_dict)
mock_list_update.assert_called_once_with(
'fake_session', self.lb.listeners[0].id,
provisioning_status=constants.ERROR,
operating_status=constants.ERROR)
self.driver.jinja_combo.build_config.assert_not_called()
(self.driver.clients[API_VERSION].delete_listener.
assert_called_once_with)(mock_amphora, self.lb.id)
@mock.patch('octavia.amphorae.drivers.haproxy.rest_api_driver.'
'HaproxyAmphoraLoadBalancerDriver._process_secret')
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')

View File

@ -882,42 +882,6 @@ class TestDatabaseTasks(base.TestCase):
'TEST',
id=AMP_ID)
def test_mark_listener_active_in_db(self,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_listener_active = database_tasks.MarkListenerActiveInDB()
mark_listener_active.execute(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
LISTENER_ID,
provisioning_status=constants.ACTIVE)
# Test the revert
mock_listener_repo_update.reset_mock()
mark_listener_active.revert(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
id=LISTENER_ID,
provisioning_status=constants.ERROR)
# Test the revert
mock_listener_repo_update.reset_mock()
mock_listener_repo_update.side_effect = Exception('fail')
mark_listener_active.revert(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
id=LISTENER_ID,
provisioning_status=constants.ERROR)
def test_mark_listener_deleted_in_db(self,
mock_generate_uuid,
mock_LOG,
@ -991,7 +955,10 @@ class TestDatabaseTasks(base.TestCase):
id=LISTENER_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.ListenerRepository.'
'prov_status_active_if_not_error')
def test_mark_lb_and_listeners_active_in_db(self,
mock_list_not_error,
mock_generate_uuid,
mock_LOG,
mock_get_session,
@ -1005,10 +972,7 @@ class TestDatabaseTasks(base.TestCase):
mark_lb_and_listeners_active.execute(self.loadbalancer_mock,
[self.listener_mock])
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
LISTENER_ID,
provisioning_status=constants.ACTIVE)
mock_list_not_error.assert_called_once_with('TEST', LISTENER_ID)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
LB_ID,

View File

@ -882,42 +882,6 @@ class TestDatabaseTasks(base.TestCase):
'TEST',
id=AMP_ID)
def test_mark_listener_active_in_db(self,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_listener_active = database_tasks.MarkListenerActiveInDB()
mark_listener_active.execute(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
LISTENER_ID,
provisioning_status=constants.ACTIVE)
# Test the revert
mock_listener_repo_update.reset_mock()
mark_listener_active.revert(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
id=LISTENER_ID,
provisioning_status=constants.ERROR)
# Test the revert
mock_listener_repo_update.reset_mock()
mock_listener_repo_update.side_effect = Exception('fail')
mark_listener_active.revert(self.listener_mock)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
id=LISTENER_ID,
provisioning_status=constants.ERROR)
def test_mark_listener_deleted_in_db(self,
mock_generate_uuid,
mock_LOG,
@ -991,7 +955,10 @@ class TestDatabaseTasks(base.TestCase):
id=LISTENER_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.ListenerRepository.'
'prov_status_active_if_not_error')
def test_mark_lb_and_listeners_active_in_db(self,
mock_list_not_error,
mock_generate_uuid,
mock_LOG,
mock_get_session,
@ -999,16 +966,12 @@ class TestDatabaseTasks(base.TestCase):
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_lb_and_listeners_active = (database_tasks.
MarkLBAndListenersActiveInDB())
mark_lb_and_listeners_active.execute(self.loadbalancer_mock,
[self.listener_mock])
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
LISTENER_ID,
provisioning_status=constants.ACTIVE)
mock_list_not_error.assert_called_once_with('TEST', LISTENER_ID)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
LB_ID,