920 lines
38 KiB
Python
920 lines
38 KiB
Python
# Copyright (c) 2016 Mirantis, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import random
|
|
import six
|
|
import sys
|
|
import time
|
|
import traceback
|
|
|
|
import requests
|
|
|
|
from neutronclient.common import exceptions as n_exc
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
from oslo_utils import timeutils
|
|
|
|
from kuryr_kubernetes import clients
|
|
from kuryr_kubernetes import config
|
|
from kuryr_kubernetes.controller.drivers import base
|
|
from kuryr_kubernetes import exceptions as k_exc
|
|
from kuryr_kubernetes.objects import lbaas as obj_lbaas
|
|
from kuryr_kubernetes import utils
|
|
|
|
CONF = cfg.CONF
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
_ACTIVATION_TIMEOUT = CONF.neutron_defaults.lbaas_activation_timeout
|
|
_L7_POLICY_ACT_REDIRECT_TO_POOL = 'REDIRECT_TO_POOL'
|
|
# NOTE(yboaron):Prior to sending create request to Octavia, LBaaS driver
|
|
# verifies that LB is in a stable state by polling LB's provisioning_status
|
|
# using backoff timer.
|
|
# A similar method is used also for the delete flow.
|
|
# Unlike LB creation, rest of octavia operations are completed usually after
|
|
# few seconds. Next constants define the intervals values for 'fast' and
|
|
# 'slow' (will be used for LB creation) polling.
|
|
_LB_STS_POLL_FAST_INTERVAL = 1
|
|
_LB_STS_POLL_SLOW_INTERVAL = 3
|
|
|
|
|
|
class LBaaSv2Driver(base.LBaaSDriver):
|
|
"""LBaaSv2Driver implements LBaaSDriver for Neutron LBaaSv2 API."""
|
|
|
|
@property
|
|
def cascading_capable(self):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
return lbaas.cascading_capable
|
|
|
|
def get_service_loadbalancer_name(self, namespace, svc_name):
|
|
return "%s/%s" % (namespace, svc_name)
|
|
|
|
def get_loadbalancer_pool_name(self, loadbalancer, namespace, svc_name):
|
|
return "%s/%s/%s" % (loadbalancer.name, namespace, svc_name)
|
|
|
|
def ensure_loadbalancer(self, name, project_id, subnet_id, ip,
|
|
security_groups_ids=None, service_type=None,
|
|
provider=None):
|
|
request = obj_lbaas.LBaaSLoadBalancer(
|
|
name=name, project_id=project_id, subnet_id=subnet_id, ip=ip,
|
|
security_groups=security_groups_ids, provider=provider)
|
|
response = self._ensure(request, self._create_loadbalancer,
|
|
self._find_loadbalancer)
|
|
if not response:
|
|
# NOTE(ivc): load balancer was present before 'create', but got
|
|
# deleted externally between 'create' and 'find'
|
|
raise k_exc.ResourceNotReady(request)
|
|
|
|
return response
|
|
|
|
def release_loadbalancer(self, loadbalancer):
|
|
neutron = clients.get_neutron_client()
|
|
lbaas = clients.get_loadbalancer_client()
|
|
if lbaas.cascading_capable:
|
|
self._release(
|
|
loadbalancer,
|
|
loadbalancer,
|
|
lbaas.delete,
|
|
lbaas.lbaas_loadbalancer_path % loadbalancer.id,
|
|
params={'cascade': True})
|
|
|
|
else:
|
|
self._release(loadbalancer, loadbalancer,
|
|
lbaas.delete_loadbalancer, loadbalancer.id)
|
|
|
|
sg_id = self._find_listeners_sg(loadbalancer)
|
|
if sg_id:
|
|
# Note: reusing activation timeout as deletion timeout
|
|
self._wait_for_deletion(loadbalancer, _ACTIVATION_TIMEOUT)
|
|
try:
|
|
neutron.delete_security_group(sg_id)
|
|
except n_exc.NeutronClientException:
|
|
LOG.exception('Error when deleting loadbalancer security '
|
|
'group. Leaving it orphaned.')
|
|
except n_exc.NotFound:
|
|
LOG.debug('Security group %s already deleted', sg_id)
|
|
|
|
def _create_lb_security_group_rule(self, loadbalancer, listener):
|
|
neutron = clients.get_neutron_client()
|
|
sg_id = self._find_listeners_sg(loadbalancer)
|
|
# if an SG for the loadbalancer has not being created, create one
|
|
if not sg_id:
|
|
sg = neutron.create_security_group({
|
|
'security_group': {
|
|
'name': loadbalancer.name,
|
|
'project_id': loadbalancer.project_id,
|
|
},
|
|
})
|
|
sg_id = sg['security_group']['id']
|
|
loadbalancer.security_groups.append(sg_id)
|
|
vip_port = self._get_vip_port(loadbalancer)
|
|
neutron.update_port(
|
|
vip_port.get('id'),
|
|
{'port': {
|
|
'security_groups': [sg_id]}})
|
|
|
|
try:
|
|
neutron.create_security_group_rule({
|
|
'security_group_rule': {
|
|
'direction': 'ingress',
|
|
'port_range_min': listener.port,
|
|
'port_range_max': listener.port,
|
|
'protocol': listener.protocol,
|
|
'security_group_id': sg_id,
|
|
'description': listener.name,
|
|
},
|
|
})
|
|
except n_exc.NeutronClientException as ex:
|
|
if ex.status_code != requests.codes.conflict:
|
|
LOG.exception('Failed when creating security group rule '
|
|
'for listener %s.', listener.name)
|
|
|
|
def _apply_members_security_groups(self, loadbalancer, port, target_port,
|
|
protocol, sg_rule_name, new_sgs=None):
|
|
LOG.debug("Applying members security groups.")
|
|
neutron = clients.get_neutron_client()
|
|
if CONF.octavia_defaults.sg_mode == 'create':
|
|
if new_sgs:
|
|
lb_name = sg_rule_name.split(":")[0]
|
|
lb_sg = self._find_listeners_sg(loadbalancer, lb_name=lb_name)
|
|
else:
|
|
lb_sg = self._find_listeners_sg(loadbalancer)
|
|
else:
|
|
lb_sg = self._get_vip_port(loadbalancer).get('security_groups')[0]
|
|
|
|
lbaas_sg_rules = neutron.list_security_group_rules(
|
|
security_group_id=lb_sg)
|
|
all_pod_rules = []
|
|
add_default_rules = False
|
|
|
|
if new_sgs:
|
|
sgs = new_sgs
|
|
else:
|
|
sgs = loadbalancer.security_groups
|
|
|
|
# Check if Network Policy allows listener on the pods
|
|
for sg in sgs:
|
|
if sg != lb_sg:
|
|
if sg in config.CONF.neutron_defaults.pod_security_groups:
|
|
# If default sg is set, this means there is no NP
|
|
# associated to the service, thus falling back to the
|
|
# default listener rules
|
|
add_default_rules = True
|
|
break
|
|
rules = neutron.list_security_group_rules(
|
|
security_group_id=sg)
|
|
for rule in rules['security_group_rules']:
|
|
# copying ingress rules with same protocol onto the
|
|
# loadbalancer sg rules
|
|
# NOTE(ltomasbo): NP security groups only have
|
|
# remote_ip_prefix, not remote_group_id, therefore only
|
|
# applying the ones with remote_ip_prefix
|
|
if (rule['protocol'] == protocol.lower() and
|
|
rule['direction'] == 'ingress' and
|
|
rule['remote_ip_prefix']):
|
|
# If listener port not in allowed range, skip
|
|
min_port = rule.get('port_range_min')
|
|
max_port = rule.get('port_range_max')
|
|
if (min_port and target_port not in range(min_port,
|
|
max_port+1)):
|
|
continue
|
|
all_pod_rules.append(rule)
|
|
try:
|
|
LOG.debug("Creating LBaaS sg rule for sg: %r",
|
|
lb_sg)
|
|
neutron.create_security_group_rule({
|
|
'security_group_rule': {
|
|
'direction': 'ingress',
|
|
'port_range_min': port,
|
|
'port_range_max': port,
|
|
'protocol': protocol,
|
|
'remote_ip_prefix': rule[
|
|
'remote_ip_prefix'],
|
|
'security_group_id': lb_sg,
|
|
'description': sg_rule_name,
|
|
},
|
|
})
|
|
except n_exc.NeutronClientException as ex:
|
|
if ex.status_code != requests.codes.conflict:
|
|
LOG.exception('Failed when creating security '
|
|
'group rule for listener %s.',
|
|
sg_rule_name)
|
|
|
|
# Delete LBaaS sg rules that do not match NP
|
|
for rule in lbaas_sg_rules['security_group_rules']:
|
|
if (rule.get('protocol') != protocol.lower() or
|
|
rule.get('port_range_min') != port or
|
|
rule.get('direction') != 'ingress' or
|
|
not rule.get('remote_ip_prefix')):
|
|
if all_pod_rules and self._is_default_rule(rule):
|
|
LOG.debug("Removing default LBaaS sg rule for sg: %r",
|
|
lb_sg)
|
|
neutron.delete_security_group_rule(rule['id'])
|
|
continue
|
|
self._delete_rule_if_no_match(rule, all_pod_rules)
|
|
|
|
if add_default_rules:
|
|
try:
|
|
LOG.debug("Restoring default LBaaS sg rule for sg: %r", lb_sg)
|
|
neutron.create_security_group_rule({
|
|
'security_group_rule': {
|
|
'direction': 'ingress',
|
|
'port_range_min': port,
|
|
'port_range_max': port,
|
|
'protocol': protocol,
|
|
'security_group_id': lb_sg,
|
|
'description': sg_rule_name,
|
|
},
|
|
})
|
|
except n_exc.NeutronClientException as ex:
|
|
if ex.status_code != requests.codes.conflict:
|
|
LOG.exception('Failed when creating security '
|
|
'group rule for listener %s.',
|
|
sg_rule_name)
|
|
|
|
def _delete_rule_if_no_match(self, rule, all_pod_rules):
|
|
for pod_rule in all_pod_rules:
|
|
if pod_rule['remote_ip_prefix'] == rule['remote_ip_prefix']:
|
|
return
|
|
neutron = clients.get_neutron_client()
|
|
LOG.debug("Deleting sg rule: %r", rule['id'])
|
|
neutron.delete_security_group_rule(rule['id'])
|
|
|
|
def _is_default_rule(self, rule):
|
|
if (rule.get('direction') == 'ingress' and
|
|
not rule.get('remote_ip_prefix')):
|
|
return True
|
|
return False
|
|
|
|
def _remove_default_octavia_rules(self, sg_id, listener):
|
|
neutron = clients.get_neutron_client()
|
|
for remaining in self._provisioning_timer(
|
|
_ACTIVATION_TIMEOUT, _LB_STS_POLL_SLOW_INTERVAL):
|
|
listener_rules = neutron.list_security_group_rules(
|
|
security_group_id=sg_id,
|
|
protocol=listener.protocol,
|
|
port_range_min=listener.port,
|
|
port_range_max=listener.port,
|
|
direction='ingress')
|
|
for rule in listener_rules['security_group_rules']:
|
|
if not (rule.get('remote_group_id') or
|
|
rule.get('remote_ip_prefix')):
|
|
# remove default sg rules
|
|
neutron.delete_security_group_rule(rule['id'])
|
|
return
|
|
|
|
def _extend_lb_security_group_rules(self, loadbalancer, listener):
|
|
neutron = clients.get_neutron_client()
|
|
|
|
if CONF.octavia_defaults.sg_mode == 'create':
|
|
sg_id = self._find_listeners_sg(loadbalancer)
|
|
# if an SG for the loadbalancer has not being created, create one
|
|
if not sg_id:
|
|
sg = neutron.create_security_group({
|
|
'security_group': {
|
|
'name': loadbalancer.name,
|
|
'project_id': loadbalancer.project_id,
|
|
},
|
|
})
|
|
sg_id = sg['security_group']['id']
|
|
loadbalancer.security_groups.append(sg_id)
|
|
vip_port = self._get_vip_port(loadbalancer)
|
|
neutron.update_port(
|
|
vip_port.get('id'),
|
|
{'port': {
|
|
'security_groups': loadbalancer.security_groups}})
|
|
else:
|
|
sg_id = self._get_vip_port(loadbalancer).get('security_groups')[0]
|
|
# wait until octavia adds default sg rules
|
|
self._remove_default_octavia_rules(sg_id, listener)
|
|
|
|
for sg in loadbalancer.security_groups:
|
|
if sg != sg_id:
|
|
try:
|
|
neutron.create_security_group_rule({
|
|
'security_group_rule': {
|
|
'direction': 'ingress',
|
|
'port_range_min': listener.port,
|
|
'port_range_max': listener.port,
|
|
'protocol': listener.protocol,
|
|
'security_group_id': sg_id,
|
|
'remote_group_id': sg,
|
|
'description': listener.name,
|
|
},
|
|
})
|
|
except n_exc.NeutronClientException as ex:
|
|
if ex.status_code != requests.codes.conflict:
|
|
LOG.exception('Failed when creating security group '
|
|
'rule for listener %s.', listener.name)
|
|
|
|
# ensure routes have access to the services
|
|
service_subnet_cidr = utils.get_subnet_cidr(loadbalancer.subnet_id)
|
|
try:
|
|
# add access from service subnet
|
|
neutron.create_security_group_rule({
|
|
'security_group_rule': {
|
|
'direction': 'ingress',
|
|
'port_range_min': listener.port,
|
|
'port_range_max': listener.port,
|
|
'protocol': listener.protocol,
|
|
'security_group_id': sg_id,
|
|
'remote_ip_prefix': service_subnet_cidr,
|
|
'description': listener.name,
|
|
},
|
|
})
|
|
|
|
# add access from worker node VM subnet for non-native route
|
|
# support
|
|
worker_subnet_id = CONF.pod_vif_nested.worker_nodes_subnet
|
|
if worker_subnet_id:
|
|
worker_subnet_cidr = utils.get_subnet_cidr(worker_subnet_id)
|
|
neutron.create_security_group_rule({
|
|
'security_group_rule': {
|
|
'direction': 'ingress',
|
|
'port_range_min': listener.port,
|
|
'port_range_max': listener.port,
|
|
'protocol': listener.protocol,
|
|
'security_group_id': sg_id,
|
|
'remote_ip_prefix': worker_subnet_cidr,
|
|
'description': listener.name,
|
|
},
|
|
})
|
|
except n_exc.NeutronClientException as ex:
|
|
if ex.status_code != requests.codes.conflict:
|
|
LOG.exception('Failed when creating security group rule '
|
|
'to enable routes for listener %s.',
|
|
listener.name)
|
|
|
|
def _ensure_security_group_rules(self, loadbalancer, listener,
|
|
service_type):
|
|
namespace_isolation = (
|
|
'namespace' in CONF.kubernetes.enabled_handlers and
|
|
CONF.kubernetes.service_security_groups_driver == 'namespace')
|
|
create_sg = CONF.octavia_defaults.sg_mode == 'create'
|
|
|
|
if namespace_isolation and service_type == 'ClusterIP':
|
|
self._extend_lb_security_group_rules(loadbalancer, listener)
|
|
elif create_sg:
|
|
self._create_lb_security_group_rule(loadbalancer, listener)
|
|
|
|
def ensure_listener(self, loadbalancer, protocol, port,
|
|
service_type='ClusterIP'):
|
|
name = "%s:%s:%s" % (loadbalancer.name, protocol, port)
|
|
listener = obj_lbaas.LBaaSListener(name=name,
|
|
project_id=loadbalancer.project_id,
|
|
loadbalancer_id=loadbalancer.id,
|
|
protocol=protocol,
|
|
port=port)
|
|
try:
|
|
result = self._ensure_provisioned(
|
|
loadbalancer, listener, self._create_listener,
|
|
self._find_listener, _LB_STS_POLL_SLOW_INTERVAL)
|
|
except n_exc.BadRequest:
|
|
LOG.info("Listener creation failed, most probably because "
|
|
"protocol %(prot)s is not supported", {'prot': protocol})
|
|
return None
|
|
|
|
self._ensure_security_group_rules(loadbalancer, result, service_type)
|
|
|
|
return result
|
|
|
|
def release_listener(self, loadbalancer, listener):
|
|
neutron = clients.get_neutron_client()
|
|
lbaas = clients.get_loadbalancer_client()
|
|
self._release(loadbalancer, listener,
|
|
lbaas.delete_listener,
|
|
listener.id)
|
|
|
|
if CONF.octavia_defaults.sg_mode == 'create':
|
|
sg_id = self._find_listeners_sg(loadbalancer)
|
|
else:
|
|
sg_id = self._get_vip_port(loadbalancer).get('security_groups')[0]
|
|
if sg_id:
|
|
rules = neutron.list_security_group_rules(
|
|
security_group_id=sg_id, description=listener.name)
|
|
rules = rules['security_group_rules']
|
|
if len(rules):
|
|
neutron.delete_security_group_rule(rules[0]['id'])
|
|
else:
|
|
LOG.warning('Cannot find SG rule for %s (%s) listener.',
|
|
listener.id, listener.name)
|
|
|
|
def ensure_pool(self, loadbalancer, listener):
|
|
pool = obj_lbaas.LBaaSPool(name=listener.name,
|
|
project_id=loadbalancer.project_id,
|
|
loadbalancer_id=loadbalancer.id,
|
|
listener_id=listener.id,
|
|
protocol=listener.protocol)
|
|
return self._ensure_provisioned(loadbalancer, pool,
|
|
self._create_pool,
|
|
self._find_pool)
|
|
|
|
def ensure_pool_attached_to_lb(self, loadbalancer, namespace,
|
|
svc_name, protocol):
|
|
name = self.get_loadbalancer_pool_name(loadbalancer,
|
|
namespace, svc_name)
|
|
pool = obj_lbaas.LBaaSPool(name=name,
|
|
project_id=loadbalancer.project_id,
|
|
loadbalancer_id=loadbalancer.id,
|
|
listener_id=None,
|
|
protocol=protocol)
|
|
return self._ensure_provisioned(loadbalancer, pool,
|
|
self._create_pool,
|
|
self._find_pool_by_name)
|
|
|
|
def release_pool(self, loadbalancer, pool):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
self._release(loadbalancer, pool,
|
|
lbaas.delete_lbaas_pool,
|
|
pool.id)
|
|
|
|
def ensure_member(self, loadbalancer, pool,
|
|
subnet_id, ip, port, target_ref_namespace,
|
|
target_ref_name, listener_port=None):
|
|
name = ("%s/%s" % (target_ref_namespace, target_ref_name))
|
|
name += ":%s" % port
|
|
member = obj_lbaas.LBaaSMember(name=name,
|
|
project_id=loadbalancer.project_id,
|
|
pool_id=pool.id,
|
|
subnet_id=subnet_id,
|
|
ip=ip,
|
|
port=port)
|
|
result = self._ensure_provisioned(loadbalancer, member,
|
|
self._create_member,
|
|
self._find_member)
|
|
|
|
network_policy = (
|
|
'policy' in CONF.kubernetes.enabled_handlers and
|
|
CONF.kubernetes.service_security_groups_driver == 'policy')
|
|
if network_policy and listener_port:
|
|
protocol = pool.protocol
|
|
sg_rule_name = pool.name
|
|
self._apply_members_security_groups(loadbalancer, listener_port,
|
|
port, protocol, sg_rule_name)
|
|
return result
|
|
|
|
def release_member(self, loadbalancer, member):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
self._release(loadbalancer, member,
|
|
lbaas.delete_lbaas_member,
|
|
member.id, member.pool_id)
|
|
|
|
def _get_vip_port(self, loadbalancer):
|
|
neutron = clients.get_neutron_client()
|
|
try:
|
|
fixed_ips = ['subnet_id=%s' % str(loadbalancer.subnet_id),
|
|
'ip_address=%s' % str(loadbalancer.ip)]
|
|
ports = neutron.list_ports(fixed_ips=fixed_ips)
|
|
except n_exc.NeutronClientException:
|
|
LOG.error("Port with fixed ips %s not found!", fixed_ips)
|
|
raise
|
|
|
|
if ports['ports']:
|
|
return ports['ports'][0]
|
|
|
|
return None
|
|
|
|
def _create_loadbalancer(self, loadbalancer):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
|
|
request = {'loadbalancer': {
|
|
'name': loadbalancer.name,
|
|
'project_id': loadbalancer.project_id,
|
|
'vip_address': str(loadbalancer.ip),
|
|
'vip_subnet_id': loadbalancer.subnet_id}}
|
|
|
|
if loadbalancer.provider is not None:
|
|
request['loadbalancer']['provider'] = loadbalancer.provider
|
|
|
|
response = lbaas.create_loadbalancer(request)
|
|
loadbalancer.id = response['loadbalancer']['id']
|
|
loadbalancer.port_id = self._get_vip_port(loadbalancer).get("id")
|
|
if (loadbalancer.provider is not None and
|
|
loadbalancer.provider != response['loadbalancer']['provider']):
|
|
LOG.error("Request provider(%s) != Response provider(%s)",
|
|
loadbalancer.provider,
|
|
response['loadbalancer']['provider'])
|
|
return None
|
|
loadbalancer.provider = response['loadbalancer']['provider']
|
|
return loadbalancer
|
|
|
|
def _find_loadbalancer(self, loadbalancer):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
response = lbaas.list_loadbalancers(
|
|
name=loadbalancer.name,
|
|
project_id=loadbalancer.project_id,
|
|
vip_address=str(loadbalancer.ip),
|
|
vip_subnet_id=loadbalancer.subnet_id)
|
|
|
|
try:
|
|
loadbalancer.id = response['loadbalancers'][0]['id']
|
|
loadbalancer.port_id = self._get_vip_port(loadbalancer).get("id")
|
|
loadbalancer.provider = response['loadbalancers'][0]['provider']
|
|
except (KeyError, IndexError):
|
|
return None
|
|
|
|
return loadbalancer
|
|
|
|
def _create_listener(self, listener):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
response = lbaas.create_listener({'listener': {
|
|
'name': listener.name,
|
|
'project_id': listener.project_id,
|
|
'loadbalancer_id': listener.loadbalancer_id,
|
|
'protocol': listener.protocol,
|
|
'protocol_port': listener.port}})
|
|
listener.id = response['listener']['id']
|
|
return listener
|
|
|
|
def _find_listener(self, listener):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
response = lbaas.list_listeners(
|
|
name=listener.name,
|
|
project_id=listener.project_id,
|
|
loadbalancer_id=listener.loadbalancer_id,
|
|
protocol=listener.protocol,
|
|
protocol_port=listener.port)
|
|
|
|
try:
|
|
listener.id = response['listeners'][0]['id']
|
|
except (KeyError, IndexError):
|
|
return None
|
|
|
|
return listener
|
|
|
|
def _create_pool(self, pool):
|
|
# TODO(ivc): make lb_algorithm configurable
|
|
lb_algorithm = 'ROUND_ROBIN'
|
|
lbaas = clients.get_loadbalancer_client()
|
|
try:
|
|
response = lbaas.create_lbaas_pool({'pool': {
|
|
'name': pool.name,
|
|
'project_id': pool.project_id,
|
|
'listener_id': pool.listener_id,
|
|
'loadbalancer_id': pool.loadbalancer_id,
|
|
'protocol': pool.protocol,
|
|
'lb_algorithm': lb_algorithm}})
|
|
pool.id = response['pool']['id']
|
|
return pool
|
|
except n_exc.StateInvalidClient:
|
|
(type_, value, tb) = sys.exc_info()
|
|
try:
|
|
self._cleanup_bogus_pool(lbaas, pool, lb_algorithm)
|
|
except Exception:
|
|
LOG.error('Pool creation traceback: %s',
|
|
traceback.format_exception(type_, value, tb))
|
|
raise
|
|
else:
|
|
six.reraise(type_, value, tb)
|
|
|
|
def _cleanup_bogus_pool(self, lbaas, pool, lb_algorithm):
|
|
# REVISIT(ivc): LBaaSv2 creates pool object despite raising an
|
|
# exception. The created pool is not bound to listener, but
|
|
# it is bound to loadbalancer and will cause an error on
|
|
# 'release_loadbalancer'.
|
|
pools = lbaas.list_lbaas_pools(
|
|
name=pool.name, project_id=pool.project_id,
|
|
loadbalancer_id=pool.loadbalancer_id,
|
|
protocol=pool.protocol, lb_algorithm=lb_algorithm)
|
|
bogus_pool_ids = [p['id'] for p in pools.get('pools')
|
|
if not p['listeners'] and pool.name == p['name']]
|
|
for pool_id in bogus_pool_ids:
|
|
try:
|
|
LOG.debug("Removing bogus pool %(id)s %(pool)s", {
|
|
'id': pool_id, 'pool': pool})
|
|
lbaas.delete_lbaas_pool(pool_id)
|
|
except (n_exc.NotFound, n_exc.StateInvalidClient):
|
|
pass
|
|
|
|
def _find_pool(self, pool, by_listener=True):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
response = lbaas.list_lbaas_pools(
|
|
name=pool.name,
|
|
project_id=pool.project_id,
|
|
loadbalancer_id=pool.loadbalancer_id,
|
|
protocol=pool.protocol)
|
|
|
|
try:
|
|
if by_listener:
|
|
pools = [p for p in response['pools']
|
|
if pool.listener_id
|
|
in {l['id'] for l in p['listeners']}]
|
|
else:
|
|
pools = [p for p in response['pools']
|
|
if pool.name == p['name']]
|
|
|
|
pool.id = pools[0]['id']
|
|
except (KeyError, IndexError):
|
|
return None
|
|
return pool
|
|
|
|
def _find_pool_by_name(self, pool):
|
|
return self._find_pool(pool, by_listener=False)
|
|
|
|
def _create_member(self, member):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
response = lbaas.create_lbaas_member(member.pool_id, {'member': {
|
|
'name': member.name,
|
|
'project_id': member.project_id,
|
|
'subnet_id': member.subnet_id,
|
|
'address': str(member.ip),
|
|
'protocol_port': member.port}})
|
|
member.id = response['member']['id']
|
|
return member
|
|
|
|
def _find_member(self, member):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
response = lbaas.list_lbaas_members(
|
|
member.pool_id,
|
|
name=member.name,
|
|
project_id=member.project_id,
|
|
subnet_id=member.subnet_id,
|
|
address=member.ip,
|
|
protocol_port=member.port)
|
|
|
|
try:
|
|
member.id = response['members'][0]['id']
|
|
except (KeyError, IndexError):
|
|
return None
|
|
|
|
return member
|
|
|
|
def _ensure(self, obj, create, find):
|
|
# TODO(yboaron): change the create/find order.
|
|
try:
|
|
result = create(obj)
|
|
LOG.debug("Created %(obj)s", {'obj': result})
|
|
except (n_exc.Conflict, n_exc.InternalServerError):
|
|
result = find(obj)
|
|
if result:
|
|
LOG.debug("Found %(obj)s", {'obj': result})
|
|
return result
|
|
|
|
def _ensure_provisioned(self, loadbalancer, obj, create, find,
|
|
interval=_LB_STS_POLL_FAST_INTERVAL):
|
|
for remaining in self._provisioning_timer(_ACTIVATION_TIMEOUT,
|
|
interval):
|
|
self._wait_for_provisioning(loadbalancer, remaining, interval)
|
|
try:
|
|
result = self._ensure(obj, create, find)
|
|
if result:
|
|
return result
|
|
except n_exc.StateInvalidClient:
|
|
continue
|
|
|
|
raise k_exc.ResourceNotReady(obj)
|
|
|
|
def _release(self, loadbalancer, obj, delete, *args, **kwargs):
|
|
for remaining in self._provisioning_timer(_ACTIVATION_TIMEOUT):
|
|
try:
|
|
try:
|
|
delete(*args, **kwargs)
|
|
return
|
|
except (n_exc.Conflict, n_exc.StateInvalidClient):
|
|
self._wait_for_provisioning(loadbalancer, remaining)
|
|
except n_exc.NotFound:
|
|
return
|
|
|
|
raise k_exc.ResourceNotReady(obj)
|
|
|
|
def _wait_for_provisioning(self, loadbalancer, timeout,
|
|
interval=_LB_STS_POLL_FAST_INTERVAL):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
|
|
for remaining in self._provisioning_timer(timeout, interval):
|
|
response = lbaas.show_loadbalancer(loadbalancer.id)
|
|
status = response['loadbalancer']['provisioning_status']
|
|
if status == 'ACTIVE':
|
|
LOG.debug("Provisioning complete for %(lb)s", {
|
|
'lb': loadbalancer})
|
|
return
|
|
else:
|
|
LOG.debug("Provisioning status %(status)s for %(lb)s, "
|
|
"%(rem).3gs remaining until timeout",
|
|
{'status': status, 'lb': loadbalancer,
|
|
'rem': remaining})
|
|
|
|
raise k_exc.ResourceNotReady(loadbalancer)
|
|
|
|
def _wait_for_deletion(self, loadbalancer, timeout,
|
|
interval=_LB_STS_POLL_FAST_INTERVAL):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
|
|
for remaining in self._provisioning_timer(timeout, interval):
|
|
try:
|
|
lbaas.show_loadbalancer(loadbalancer.id)
|
|
except n_exc.NotFound:
|
|
return
|
|
|
|
def _provisioning_timer(self, timeout,
|
|
interval=_LB_STS_POLL_FAST_INTERVAL):
|
|
# REVISIT(ivc): consider integrating with Retry
|
|
max_interval = 15
|
|
with timeutils.StopWatch(duration=timeout) as timer:
|
|
while not timer.expired():
|
|
yield timer.leftover()
|
|
interval = interval * 2 * random.gauss(0.8, 0.05)
|
|
interval = min(interval, max_interval)
|
|
interval = min(interval, timer.leftover())
|
|
if interval:
|
|
time.sleep(interval)
|
|
|
|
def _find_listeners_sg(self, loadbalancer, lb_name=None):
|
|
neutron = clients.get_neutron_client()
|
|
if lb_name:
|
|
sgs = neutron.list_security_groups(
|
|
name=lb_name, project_id=loadbalancer.project_id)
|
|
# NOTE(ltomasbo): lb_name parameter is only passed when sg_mode
|
|
# is 'create' and in that case there is only one sg associated
|
|
# to the loadbalancer
|
|
return sgs['security_groups'][0]['id']
|
|
try:
|
|
sgs = neutron.list_security_groups(
|
|
name=loadbalancer.name, project_id=loadbalancer.project_id)
|
|
for sg in sgs['security_groups']:
|
|
sg_id = sg['id']
|
|
if sg_id in loadbalancer.security_groups:
|
|
return sg_id
|
|
except n_exc.NeutronClientException:
|
|
LOG.exception('Cannot list security groups for loadbalancer %s.',
|
|
loadbalancer.name)
|
|
|
|
return None
|
|
|
|
def get_lb_by_uuid(self, lb_uuid):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
response = lbaas.show_loadbalancer(lb_uuid)
|
|
try:
|
|
return obj_lbaas.LBaaSLoadBalancer(
|
|
id=response['loadbalancer']['id'],
|
|
port_id=response['loadbalancer']['vip_port_id'],
|
|
name=response['loadbalancer']['name'],
|
|
project_id=response['loadbalancer']['project_id'],
|
|
subnet_id=response['loadbalancer']['vip_subnet_id'],
|
|
ip=response['loadbalancer']['vip_address'],
|
|
security_groups=None,
|
|
provider=response['loadbalancer']['provider'])
|
|
except (KeyError, IndexError):
|
|
LOG.debug("Couldn't find loadbalancer with uuid=%s", lb_uuid)
|
|
return None
|
|
|
|
def get_pool_by_name(self, pool_name, project_id):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
|
|
# NOTE(yboaron): pool_name should be constructed using
|
|
# get_loadbalancer_pool_name function, which means that pool's name
|
|
# is unique
|
|
|
|
pools_list = lbaas.list_lbaas_pools(
|
|
project_id=project_id)
|
|
for entry in pools_list['pools']:
|
|
if not entry:
|
|
continue
|
|
if entry['name'] == pool_name:
|
|
listener_id = (entry['listeners'][0]['id'] if
|
|
entry['listeners'] else None)
|
|
return obj_lbaas.LBaaSPool(
|
|
name=entry['name'], project_id=entry['project_id'],
|
|
loadbalancer_id=entry['loadbalancers'][0]['id'],
|
|
listener_id=listener_id,
|
|
protocol=entry['protocol'], id=entry['id'])
|
|
return None
|
|
|
|
def ensure_l7_policy(self, namespace, route_name,
|
|
loadbalancer, pool,
|
|
listener_id):
|
|
name = namespace + route_name
|
|
l7_policy = obj_lbaas.LBaaSL7Policy(name=name,
|
|
project_id=pool.project_id,
|
|
listener_id=listener_id,
|
|
redirect_pool_id=pool.id)
|
|
|
|
return self._ensure_provisioned(
|
|
loadbalancer, l7_policy, self._create_l7_policy,
|
|
self._find_l7_policy)
|
|
|
|
def release_l7_policy(self, loadbalancer, l7_policy):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
self._release(
|
|
loadbalancer, l7_policy, lbaas.delete_lbaas_l7policy,
|
|
l7_policy.id)
|
|
|
|
def _create_l7_policy(self, l7_policy):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
response = lbaas.create_lbaas_l7policy({'l7policy': {
|
|
'action': _L7_POLICY_ACT_REDIRECT_TO_POOL,
|
|
'listener_id': l7_policy.listener_id,
|
|
'name': l7_policy.name,
|
|
'project_id': l7_policy.project_id,
|
|
'redirect_pool_id': l7_policy.redirect_pool_id}})
|
|
l7_policy.id = response['l7policy']['id']
|
|
return l7_policy
|
|
|
|
def _find_l7_policy(self, l7_policy):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
response = lbaas.list_lbaas_l7policies(
|
|
name=l7_policy.name,
|
|
project_id=l7_policy.project_id,
|
|
redirect_pool_id=l7_policy.redirect_pool_id,
|
|
listener_id=l7_policy.listener_id)
|
|
try:
|
|
l7_policy.id = response['l7policies'][0]['id']
|
|
except (KeyError, IndexError):
|
|
return None
|
|
return l7_policy
|
|
|
|
def ensure_l7_rule(self, loadbalancer, l7_policy, compare_type,
|
|
type, value):
|
|
|
|
l7_rule = obj_lbaas.LBaaSL7Rule(
|
|
compare_type=compare_type, l7policy_id=l7_policy.id,
|
|
type=type, value=value)
|
|
return self._ensure_provisioned(
|
|
loadbalancer, l7_rule, self._create_l7_rule,
|
|
self._find_l7_rule)
|
|
|
|
def _create_l7_rule(self, l7_rule):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
response = lbaas.create_lbaas_l7rule(
|
|
l7_rule.l7policy_id,
|
|
{'rule': {'compare_type': l7_rule.compare_type,
|
|
'type': l7_rule.type,
|
|
'value': l7_rule.value}})
|
|
l7_rule.id = response['rule']['id']
|
|
return l7_rule
|
|
|
|
def _find_l7_rule(self, l7_rule):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
response = lbaas.list_lbaas_l7rules(
|
|
l7_rule.l7policy_id,
|
|
type=l7_rule.type,
|
|
value=l7_rule.value,
|
|
compare_type=l7_rule.compare_type)
|
|
try:
|
|
l7_rule.id = response['rules'][0]['id']
|
|
except (KeyError, IndexError):
|
|
return None
|
|
return l7_rule
|
|
|
|
def release_l7_rule(self, loadbalancer, l7_rule):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
self._release(
|
|
loadbalancer, l7_rule, lbaas.delete_lbaas_l7rule,
|
|
l7_rule.id, l7_rule.l7policy_id)
|
|
|
|
def update_l7_rule(self, l7_rule, new_value):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
try:
|
|
lbaas.update_lbaas_l7rule(
|
|
l7_rule.id, l7_rule.l7policy_id,
|
|
{'rule': {'value': new_value}})
|
|
|
|
except n_exc.NeutronClientException:
|
|
LOG.exception("Failed to update l7_rule- id=%s ", l7_rule.id)
|
|
raise
|
|
|
|
def is_pool_used_by_other_l7policies(self, l7policy, pool):
|
|
lbaas = clients.get_loadbalancer_client()
|
|
l7policy_list = lbaas.list_lbaas_l7policies(
|
|
project_id=l7policy.project_id)
|
|
for entry in l7policy_list['l7policies']:
|
|
if not entry:
|
|
continue
|
|
if (entry['redirect_pool_id'] == pool.id and
|
|
entry['id'] != l7policy.id):
|
|
return True
|
|
return False
|
|
|
|
def update_lbaas_sg(self, service, sgs):
|
|
LOG.debug('Setting SG for LBaaS VIP port')
|
|
|
|
svc_namespace = service['metadata']['namespace']
|
|
svc_name = service['metadata']['name']
|
|
svc_ports = service['spec']['ports']
|
|
|
|
lbaas_name = "%s/%s" % (svc_namespace, svc_name)
|
|
lbaas = utils.get_lbaas_spec(service)
|
|
if not lbaas:
|
|
return
|
|
|
|
for port in svc_ports:
|
|
port_protocol = port['protocol']
|
|
lbaas_port = port['port']
|
|
target_port = port['targetPort']
|
|
sg_rule_name = "%s:%s:%s" % (lbaas_name, port_protocol, lbaas_port)
|
|
|
|
self._apply_members_security_groups(lbaas, lbaas_port,
|
|
target_port, port_protocol,
|
|
sg_rule_name, sgs)
|