Add Network Policy support to services
This patch adds support for Network Policy on services. It applies pods' security groups onto the services in front of them. It makes the next assumptions: - All the pods pointed by one svc have the same labels, thus the same sgs being enforced - Only copies the SG rules that have the same protocol and direction as the listener being created - Adds a default rule to NP to enable traffic from services subnet CIDR Partially Implements: blueprint k8s-network-policies Change-Id: Ibd4b51ff40b69af26ab7e7b81d18e63abddf775b
This commit is contained in:
parent
71a8ebd1f0
commit
b200d368cd
|
@ -31,6 +31,7 @@ from kuryr_kubernetes import constants as const
|
|||
from kuryr_kubernetes.controller.drivers import base
|
||||
from kuryr_kubernetes import exceptions as k_exc
|
||||
from kuryr_kubernetes.objects import lbaas as obj_lbaas
|
||||
from kuryr_kubernetes import utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -199,6 +200,53 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
|||
LOG.exception('Failed when creating security group rule '
|
||||
'for listener %s.', listener.name)
|
||||
|
||||
def _apply_members_security_groups(self, loadbalancer, port, target_port,
|
||||
protocol, sg_rule_name):
|
||||
neutron = clients.get_neutron_client()
|
||||
if CONF.octavia_defaults.sg_mode == 'create':
|
||||
sg_id = self._find_listeners_sg(loadbalancer)
|
||||
else:
|
||||
sg_id = self._get_vip_port(loadbalancer).get('security_groups')[0]
|
||||
|
||||
# Check if Network Policy allows listener on the pods
|
||||
for sg in loadbalancer.security_groups:
|
||||
if sg != sg_id:
|
||||
rules = neutron.list_security_group_rules(
|
||||
security_group_id=sg)
|
||||
for rule in rules['security_group_rules']:
|
||||
# copying ingress rules with same protocol onto the
|
||||
# loadbalancer sg rules
|
||||
# NOTE(ltomasbo): NP security groups only have
|
||||
# remote_ip_prefix, not remote_group_id, therefore only
|
||||
# applying the ones with remote_ip_prefix
|
||||
if (rule['protocol'] == protocol.lower() and
|
||||
rule['direction'] == 'ingress' and
|
||||
rule['remote_ip_prefix']):
|
||||
# If listener port not in allowed range, skip
|
||||
min_port = rule.get('port_range_min')
|
||||
max_port = rule.get('port_range_max')
|
||||
if (min_port and target_port not in range(min_port,
|
||||
max_port+1)):
|
||||
continue
|
||||
try:
|
||||
neutron.create_security_group_rule({
|
||||
'security_group_rule': {
|
||||
'direction': 'ingress',
|
||||
'port_range_min': port,
|
||||
'port_range_max': port,
|
||||
'protocol': protocol,
|
||||
'remote_ip_prefix': rule[
|
||||
'remote_ip_prefix'],
|
||||
'security_group_id': sg_id,
|
||||
'description': sg_rule_name,
|
||||
},
|
||||
})
|
||||
except n_exc.NeutronClientException as ex:
|
||||
if ex.status_code != requests.codes.conflict:
|
||||
LOG.exception('Failed when creating security '
|
||||
'group rule for listener %s.',
|
||||
sg_rule_name)
|
||||
|
||||
def _extend_lb_security_group_rules(self, loadbalancer, listener):
|
||||
neutron = clients.get_neutron_client()
|
||||
|
||||
|
@ -242,7 +290,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
|||
'rule for listener %s.', listener.name)
|
||||
|
||||
# ensure routes have access to the services
|
||||
service_subnet_cidr = self._get_subnet_cidr(loadbalancer.subnet_id)
|
||||
service_subnet_cidr = utils.get_subnet_cidr(loadbalancer.subnet_id)
|
||||
try:
|
||||
# add access from service subnet
|
||||
neutron.create_security_group_rule({
|
||||
|
@ -261,7 +309,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
|||
# support
|
||||
worker_subnet_id = CONF.pod_vif_nested.worker_nodes_subnet
|
||||
if worker_subnet_id:
|
||||
worker_subnet_cidr = self._get_subnet_cidr(worker_subnet_id)
|
||||
worker_subnet_cidr = utils.get_subnet_cidr(worker_subnet_id)
|
||||
neutron.create_security_group_rule({
|
||||
'security_group_rule': {
|
||||
'direction': 'ingress',
|
||||
|
@ -321,7 +369,10 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
|||
lbaas.delete_listener,
|
||||
listener.id)
|
||||
|
||||
sg_id = self._find_listeners_sg(loadbalancer)
|
||||
if CONF.octavia_defaults.sg_mode == 'create':
|
||||
sg_id = self._find_listeners_sg(loadbalancer)
|
||||
else:
|
||||
sg_id = self._get_vip_port(loadbalancer).get('security_groups')[0]
|
||||
if sg_id:
|
||||
rules = neutron.list_security_group_rules(
|
||||
security_group_id=sg_id, description=listener.name)
|
||||
|
@ -363,7 +414,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
|||
|
||||
def ensure_member(self, loadbalancer, pool,
|
||||
subnet_id, ip, port, target_ref_namespace,
|
||||
target_ref_name):
|
||||
target_ref_name, listener_port=None):
|
||||
name = ("%s/%s" % (target_ref_namespace, target_ref_name))
|
||||
name += ":%s" % port
|
||||
member = obj_lbaas.LBaaSMember(name=name,
|
||||
|
@ -372,9 +423,19 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
|||
subnet_id=subnet_id,
|
||||
ip=ip,
|
||||
port=port)
|
||||
return self._ensure_provisioned(loadbalancer, member,
|
||||
self._create_member,
|
||||
self._find_member)
|
||||
result = self._ensure_provisioned(loadbalancer, member,
|
||||
self._create_member,
|
||||
self._find_member)
|
||||
|
||||
network_policy = (
|
||||
'policy' in CONF.kubernetes.enabled_handlers and
|
||||
CONF.kubernetes.service_security_groups_driver == 'policy')
|
||||
if network_policy and listener_port:
|
||||
protocol = pool.protocol
|
||||
sg_rule_name = pool.name
|
||||
self._apply_members_security_groups(loadbalancer, listener_port,
|
||||
port, protocol, sg_rule_name)
|
||||
return result
|
||||
|
||||
def release_member(self, loadbalancer, member):
|
||||
lbaas = clients.get_loadbalancer_client()
|
||||
|
@ -397,15 +458,6 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
|||
|
||||
return None
|
||||
|
||||
def _get_subnet_cidr(self, subnet_id):
|
||||
neutron = clients.get_neutron_client()
|
||||
try:
|
||||
subnet_obj = neutron.show_subnet(subnet_id)
|
||||
except n_exc.NeutronClientException:
|
||||
LOG.exception("Subnet %s CIDR not found!", subnet_id)
|
||||
raise
|
||||
return subnet_obj.get('subnet')['cidr']
|
||||
|
||||
def _create_loadbalancer(self, loadbalancer):
|
||||
lbaas = clients.get_loadbalancer_client()
|
||||
|
||||
|
|
|
@ -17,10 +17,12 @@ from oslo_log import log as logging
|
|||
from neutronclient.common import exceptions as n_exc
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
from kuryr_kubernetes import config
|
||||
from kuryr_kubernetes import constants
|
||||
from kuryr_kubernetes.controller.drivers import base
|
||||
from kuryr_kubernetes.controller.drivers import utils
|
||||
from kuryr_kubernetes.controller.drivers import utils as driver_utils
|
||||
from kuryr_kubernetes import exceptions
|
||||
from kuryr_kubernetes import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -93,14 +95,14 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
current_sg_rules]
|
||||
for sg_rule in sg_rules_to_delete:
|
||||
try:
|
||||
utils.delete_security_group_rule(sgr_ids[sg_rule])
|
||||
driver_utils.delete_security_group_rule(sgr_ids[sg_rule])
|
||||
except n_exc.NotFound:
|
||||
LOG.debug('Trying to delete non existing sg_rule %s', sg_rule)
|
||||
# Create new rules that weren't already on the security group
|
||||
sg_rules_to_add = [rule for rule in current_sg_rules if rule not in
|
||||
existing_sg_rules]
|
||||
for sg_rule in sg_rules_to_add:
|
||||
sgr_id = utils.create_security_group_rule(sg_rule)
|
||||
sgr_id = driver_utils.create_security_group_rule(sg_rule)
|
||||
if sg_rule['security_group_rule'].get('direction') == 'ingress':
|
||||
for i_rule in i_rules:
|
||||
if sg_rule == i_rule:
|
||||
|
@ -111,8 +113,8 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
e_rule["security_group_rule"]["id"] = sgr_id
|
||||
# Annotate kuryrnetpolicy CRD with current policy and ruleset
|
||||
pod_selector = policy['spec'].get('podSelector')
|
||||
utils.patch_kuryr_crd(crd, i_rules, e_rules, pod_selector,
|
||||
np_spec=policy['spec'])
|
||||
driver_utils.patch_kuryr_crd(crd, i_rules, e_rules, pod_selector,
|
||||
np_spec=policy['spec'])
|
||||
|
||||
if existing_pod_selector != pod_selector:
|
||||
return existing_pod_selector
|
||||
|
@ -142,13 +144,26 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
sg_id = sg['security_group']['id']
|
||||
i_rules, e_rules = self.parse_network_policy_rules(policy, sg_id)
|
||||
for i_rule in i_rules:
|
||||
sgr_id = utils.create_security_group_rule(i_rule)
|
||||
sgr_id = driver_utils.create_security_group_rule(i_rule)
|
||||
i_rule['security_group_rule']['id'] = sgr_id
|
||||
|
||||
for e_rule in e_rules:
|
||||
sgr_id = utils.create_security_group_rule(e_rule)
|
||||
sgr_id = driver_utils.create_security_group_rule(e_rule)
|
||||
e_rule['security_group_rule']['id'] = sgr_id
|
||||
|
||||
# NOTE(ltomasbo): Add extra SG rule to allow traffic from services
|
||||
# subnet
|
||||
svc_cidr = utils.get_subnet_cidr(
|
||||
config.CONF.neutron_defaults.service_subnet)
|
||||
svc_rule = {
|
||||
u'security_group_rule': {
|
||||
u'ethertype': 'IPv4',
|
||||
u'security_group_id': sg_id,
|
||||
u'direction': 'ingress',
|
||||
u'description': 'Kuryr-Kubernetes NetPolicy SG rule',
|
||||
u'remote_ip_prefix': svc_cidr
|
||||
}}
|
||||
driver_utils.create_security_group_rule(svc_rule)
|
||||
except (n_exc.NeutronClientException, exceptions.ResourceNotReady):
|
||||
LOG.exception("Error creating security group for network policy "
|
||||
" %s", policy['metadata']['name'])
|
||||
|
@ -179,12 +194,13 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
ips = []
|
||||
matching_pods = []
|
||||
if namespace_selector:
|
||||
matching_namespaces = utils.get_namespaces(namespace_selector)
|
||||
matching_namespaces = driver_utils.get_namespaces(
|
||||
namespace_selector)
|
||||
for ns in matching_namespaces.get('items'):
|
||||
matching_pods = utils.get_pods(pod_selector,
|
||||
ns['metadata']['name'])
|
||||
matching_pods = driver_utils.get_pods(pod_selector,
|
||||
ns['metadata']['name'])
|
||||
else:
|
||||
matching_pods = utils.get_pods(pod_selector, namespace)
|
||||
matching_pods = driver_utils.get_pods(pod_selector, namespace)
|
||||
for pod in matching_pods.get('items'):
|
||||
if pod['status']['podIP']:
|
||||
ips.append(pod['status']['podIP'])
|
||||
|
@ -214,7 +230,8 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
ns_cidr = self._get_namespace_subnet_cidr(ns)
|
||||
cidrs.append(ns_cidr)
|
||||
else:
|
||||
matching_namespaces = utils.get_namespaces(namespace_selector)
|
||||
matching_namespaces = driver_utils.get_namespaces(
|
||||
namespace_selector)
|
||||
for ns in matching_namespaces.get('items'):
|
||||
# NOTE(ltomasbo): This requires the namespace handler to be
|
||||
# also enabled
|
||||
|
@ -280,7 +297,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
if rule_list[0] == {}:
|
||||
LOG.debug('Applying default all open policy from %s',
|
||||
policy['metadata']['selfLink'])
|
||||
rule = utils.create_security_group_rule_body(
|
||||
rule = driver_utils.create_security_group_rule_body(
|
||||
sg_id, direction, port_range_min=1, port_range_max=65535)
|
||||
sg_rule_body_list.append(rule)
|
||||
|
||||
|
@ -294,31 +311,33 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
for port in rule_block['ports']:
|
||||
if allowed_cidrs or allow_all or selectors:
|
||||
for cidr in allowed_cidrs:
|
||||
rule = utils.create_security_group_rule_body(
|
||||
sg_id, direction, port.get('port'),
|
||||
protocol=port.get('protocol'),
|
||||
cidr=cidr)
|
||||
rule = (
|
||||
driver_utils.create_security_group_rule_body(
|
||||
sg_id, direction, port.get('port'),
|
||||
protocol=port.get('protocol'),
|
||||
cidr=cidr))
|
||||
sg_rule_body_list.append(rule)
|
||||
if allow_all:
|
||||
rule = utils.create_security_group_rule_body(
|
||||
sg_id, direction, port.get('port'),
|
||||
protocol=port.get('protocol'))
|
||||
rule = (
|
||||
driver_utils.create_security_group_rule_body(
|
||||
sg_id, direction, port.get('port'),
|
||||
protocol=port.get('protocol')))
|
||||
sg_rule_body_list.append(rule)
|
||||
else:
|
||||
rule = utils.create_security_group_rule_body(
|
||||
rule = driver_utils.create_security_group_rule_body(
|
||||
sg_id, direction, port.get('port'),
|
||||
protocol=port.get('protocol'))
|
||||
sg_rule_body_list.append(rule)
|
||||
elif allowed_cidrs or allow_all or selectors:
|
||||
for cidr in allowed_cidrs:
|
||||
rule = utils.create_security_group_rule_body(
|
||||
rule = driver_utils.create_security_group_rule_body(
|
||||
sg_id, direction,
|
||||
port_range_min=1,
|
||||
port_range_max=65535,
|
||||
cidr=cidr)
|
||||
sg_rule_body_list.append(rule)
|
||||
if allow_all:
|
||||
rule = utils.create_security_group_rule_body(
|
||||
rule = driver_utils.create_security_group_rule_body(
|
||||
sg_id, direction,
|
||||
port_range_min=1,
|
||||
port_range_max=65535)
|
||||
|
@ -456,7 +475,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
pod_selector = policy['spec'].get('podSelector')
|
||||
if pod_selector:
|
||||
policy_namespace = policy['metadata']['namespace']
|
||||
pods = utils.get_pods(pod_selector, policy_namespace)
|
||||
pods = driver_utils.get_pods(pod_selector, policy_namespace)
|
||||
return pods.get('items')
|
||||
else:
|
||||
# NOTE(ltomasbo): It affects all the pods on the namespace
|
||||
|
|
|
@ -192,36 +192,40 @@ def _parse_rules(direction, crd, pod):
|
|||
return matched, crd_rules
|
||||
|
||||
|
||||
def _get_pod_sgs(pod, project_id):
|
||||
sg_list = []
|
||||
|
||||
pod_labels = pod['metadata'].get('labels')
|
||||
pod_namespace = pod['metadata']['namespace']
|
||||
|
||||
knp_crds = _get_kuryrnetpolicy_crds(namespace=pod_namespace)
|
||||
for crd in knp_crds.get('items'):
|
||||
pod_selector = crd['spec'].get('podSelector')
|
||||
if pod_selector:
|
||||
if _match_selector(pod_selector, pod_labels):
|
||||
LOG.debug("Appending %s",
|
||||
str(crd['spec']['securityGroupId']))
|
||||
sg_list.append(str(crd['spec']['securityGroupId']))
|
||||
else:
|
||||
LOG.debug("Appending %s", str(crd['spec']['securityGroupId']))
|
||||
sg_list.append(str(crd['spec']['securityGroupId']))
|
||||
|
||||
# NOTE(maysams) Pods that are not selected by any Networkpolicy
|
||||
# are fully accessible. Thus, the default security group is associated.
|
||||
if not sg_list:
|
||||
sg_list = config.CONF.neutron_defaults.pod_security_groups
|
||||
if not sg_list:
|
||||
raise cfg.RequiredOptError('pod_security_groups',
|
||||
cfg.OptGroup('neutron_defaults'))
|
||||
|
||||
return sg_list[:]
|
||||
|
||||
|
||||
class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
|
||||
"""Provides security groups for pods based on network policies"""
|
||||
|
||||
def get_security_groups(self, pod, project_id):
|
||||
sg_list = []
|
||||
|
||||
pod_labels = pod['metadata'].get('labels')
|
||||
pod_namespace = pod['metadata']['namespace']
|
||||
|
||||
knp_crds = _get_kuryrnetpolicy_crds(namespace=pod_namespace)
|
||||
for crd in knp_crds.get('items'):
|
||||
pod_selector = crd['spec'].get('podSelector')
|
||||
if pod_selector:
|
||||
if _match_selector(pod_selector, pod_labels):
|
||||
LOG.debug("Appending %s",
|
||||
str(crd['spec']['securityGroupId']))
|
||||
sg_list.append(str(crd['spec']['securityGroupId']))
|
||||
else:
|
||||
LOG.debug("Appending %s", str(crd['spec']['securityGroupId']))
|
||||
sg_list.append(str(crd['spec']['securityGroupId']))
|
||||
|
||||
# NOTE(maysams) Pods that are not selected by any Networkpolicy
|
||||
# are fully accessible. Thus, the default security group is associated.
|
||||
if not sg_list:
|
||||
sg_list = config.CONF.neutron_defaults.pod_security_groups
|
||||
if not sg_list:
|
||||
raise cfg.RequiredOptError('pod_security_groups',
|
||||
cfg.OptGroup('neutron_defaults'))
|
||||
|
||||
return sg_list[:]
|
||||
return _get_pod_sgs(pod, project_id)
|
||||
|
||||
def create_sg_rules(self, pod):
|
||||
LOG.debug("Creating sg rule for pod: %s", pod['metadata']['name'])
|
||||
|
@ -297,36 +301,17 @@ class NetworkPolicyServiceSecurityGroupsDriver(
|
|||
def get_security_groups(self, service, project_id):
|
||||
sg_list = []
|
||||
svc_namespace = service['metadata']['namespace']
|
||||
svc_labels = service['metadata'].get('labels')
|
||||
LOG.debug("Using labels %s", svc_labels)
|
||||
|
||||
knp_crds = _get_kuryrnetpolicy_crds(namespace=svc_namespace)
|
||||
for crd in knp_crds.get('items'):
|
||||
pod_selector = crd['spec'].get('podSelector')
|
||||
if pod_selector:
|
||||
crd_labels = pod_selector.get('matchLabels', None)
|
||||
crd_expressions = pod_selector.get('matchExpressions', None)
|
||||
|
||||
match_exp = match_lb = True
|
||||
if crd_expressions:
|
||||
match_exp = _match_expressions(crd_expressions,
|
||||
svc_labels)
|
||||
if crd_labels and svc_labels:
|
||||
match_lb = _match_labels(crd_labels, svc_labels)
|
||||
if match_exp and match_lb:
|
||||
LOG.debug("Appending %s",
|
||||
str(crd['spec']['securityGroupId']))
|
||||
sg_list.append(str(crd['spec']['securityGroupId']))
|
||||
else:
|
||||
LOG.debug("Appending %s", str(crd['spec']['securityGroupId']))
|
||||
sg_list.append(str(crd['spec']['securityGroupId']))
|
||||
|
||||
# NOTE(maysams) Pods that are not selected by any Networkpolicy
|
||||
# are fully accessible. Thus, the default security group is associated.
|
||||
if not sg_list:
|
||||
sg_list = config.CONF.neutron_defaults.pod_security_groups
|
||||
if not sg_list:
|
||||
raise cfg.RequiredOptError('pod_security_groups',
|
||||
cfg.OptGroup('neutron_defaults'))
|
||||
svc_selector = service['spec'].get('selector')
|
||||
|
||||
# skip is no selector
|
||||
if svc_selector:
|
||||
# get affected pods by svc selector
|
||||
pods = driver_utils.get_pods({'selector': svc_selector},
|
||||
svc_namespace).get('items')
|
||||
# NOTE(ltomasbo): We assume all the pods pointed by a service
|
||||
# have the same labels, and the same policy will be applied to
|
||||
# all of them. Hence only considering the security groups applied
|
||||
# to the first one.
|
||||
if pods:
|
||||
return _get_pod_sgs(pods[0], project_id)
|
||||
return sg_list[:]
|
||||
|
|
|
@ -102,21 +102,26 @@ def get_pods(selector, namespace=None):
|
|||
|
||||
"""
|
||||
kubernetes = clients.get_kubernetes_client()
|
||||
labels = selector.get('matchLabels', None)
|
||||
if labels:
|
||||
# Removing pod-template-hash as pods will not have it and
|
||||
# otherwise there will be no match
|
||||
labels.pop('pod-template-hash', None)
|
||||
labels = replace_encoded_characters(labels)
|
||||
|
||||
exps = selector.get('matchExpressions', None)
|
||||
if exps:
|
||||
exps = ', '.join(format_expression(exp) for exp in exps)
|
||||
svc_selector = selector.get('selector')
|
||||
if svc_selector:
|
||||
labels = replace_encoded_characters(svc_selector)
|
||||
else:
|
||||
labels = selector.get('matchLabels', None)
|
||||
if labels:
|
||||
expressions = parse.quote("," + exps)
|
||||
labels += expressions
|
||||
else:
|
||||
labels = parse.quote(exps)
|
||||
# Removing pod-template-hash as pods will not have it and
|
||||
# otherwise there will be no match
|
||||
labels.pop('pod-template-hash', None)
|
||||
labels = replace_encoded_characters(labels)
|
||||
|
||||
exps = selector.get('matchExpressions', None)
|
||||
if exps:
|
||||
exps = ', '.join(format_expression(exp) for exp in exps)
|
||||
if labels:
|
||||
expressions = parse.quote("," + exps)
|
||||
labels += expressions
|
||||
else:
|
||||
labels = parse.quote(exps)
|
||||
|
||||
if namespace:
|
||||
pods = kubernetes.get(
|
||||
|
|
|
@ -364,7 +364,8 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
p.port]
|
||||
except KeyError:
|
||||
continue
|
||||
current_targets = {(str(m.ip), m.port) for m in lbaas_state.members}
|
||||
current_targets = {(str(m.ip), m.port, m.pool_id)
|
||||
for m in lbaas_state.members}
|
||||
|
||||
for subset in endpoints.get('subsets', []):
|
||||
subset_ports = subset.get('ports', [])
|
||||
|
@ -380,14 +381,14 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
continue
|
||||
for subset_port in subset_ports:
|
||||
target_port = subset_port['port']
|
||||
if (target_ip, target_port) in current_targets:
|
||||
continue
|
||||
port_name = subset_port.get('name')
|
||||
try:
|
||||
pool = pool_by_tgt_name[port_name]
|
||||
except KeyError:
|
||||
LOG.debug("No pool found for port: %r", port_name)
|
||||
continue
|
||||
if (target_ip, target_port, pool.id) in current_targets:
|
||||
continue
|
||||
# TODO(apuimedo): Do not pass subnet_id at all when in
|
||||
# L3 mode once old neutron-lbaasv2 is not supported, as
|
||||
# octavia does not require it
|
||||
|
@ -400,6 +401,15 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
# from VIP to pods happens in layer 3 mode, i.e.,
|
||||
# routed.
|
||||
member_subnet_id = lbaas_state.loadbalancer.subnet_id
|
||||
first_member_of_the_pool = True
|
||||
for member in lbaas_state.members:
|
||||
if pool.id == member.pool_id:
|
||||
first_member_of_the_pool = False
|
||||
break
|
||||
if first_member_of_the_pool:
|
||||
listener_port = lsnr_by_id[pool.listener_id].port
|
||||
else:
|
||||
listener_port = None
|
||||
member = self._drv_lbaas.ensure_member(
|
||||
loadbalancer=lbaas_state.loadbalancer,
|
||||
pool=pool,
|
||||
|
@ -407,7 +417,8 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
ip=target_ip,
|
||||
port=target_port,
|
||||
target_ref_namespace=target_ref['namespace'],
|
||||
target_ref_name=target_ref['name'])
|
||||
target_ref_name=target_ref['name'],
|
||||
listener_port=listener_port)
|
||||
lbaas_state.members.append(member)
|
||||
changed = True
|
||||
|
||||
|
|
|
@ -159,6 +159,8 @@ class TestLBaaSv2Driver(test_base.TestCase):
|
|||
'security_group_rules': []}
|
||||
cls = d_lbaasv2.LBaaSv2Driver
|
||||
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
|
||||
m_driver._get_vip_port.return_value = {
|
||||
'security_groups': [mock.sentinel.sg_id]}
|
||||
loadbalancer = mock.Mock()
|
||||
listener = mock.Mock()
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ from kuryr_kubernetes.controller.drivers import network_policy
|
|||
from kuryr_kubernetes import exceptions
|
||||
from kuryr_kubernetes.tests import base as test_base
|
||||
from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix
|
||||
from kuryr_kubernetes import utils
|
||||
|
||||
from neutronclient.common import exceptions as n_exc
|
||||
|
||||
|
@ -185,11 +186,15 @@ class TestNetworkPolicyDriver(test_base.TestCase):
|
|||
'_add_kuryrnetpolicy_crd')
|
||||
@mock.patch.object(network_policy.NetworkPolicyDriver,
|
||||
'parse_network_policy_rules')
|
||||
def test_create_security_group_rules_from_network_policy(self, m_parse,
|
||||
@mock.patch.object(utils, 'get_subnet_cidr')
|
||||
def test_create_security_group_rules_from_network_policy(self, m_utils,
|
||||
m_parse,
|
||||
m_add_crd,
|
||||
m_get_crd):
|
||||
self._driver.neutron.create_security_group.return_value = {
|
||||
'security_group': {'id': mock.sentinel.id}}
|
||||
m_utils.get_subnet_cidr.return_value = {
|
||||
'subnet': {'cidr': mock.sentinel.cidr}}
|
||||
m_parse.return_value = (self._i_rules, self._e_rules)
|
||||
self._driver.neutron.create_security_group_rule.return_value = {
|
||||
'security_group_rule': {'id': mock.sentinel.id}}
|
||||
|
@ -204,10 +209,13 @@ class TestNetworkPolicyDriver(test_base.TestCase):
|
|||
'_add_kuryrnetpolicy_crd')
|
||||
@mock.patch.object(network_policy.NetworkPolicyDriver,
|
||||
'parse_network_policy_rules')
|
||||
def test_create_security_group_rules_with_k8s_exc(self, m_parse,
|
||||
@mock.patch.object(utils, 'get_subnet_cidr')
|
||||
def test_create_security_group_rules_with_k8s_exc(self, m_utils, m_parse,
|
||||
m_add_crd, m_get_crd):
|
||||
self._driver.neutron.create_security_group.return_value = {
|
||||
'security_group': {'id': mock.sentinel.id}}
|
||||
m_utils.get_subnet_cidr.return_value = {
|
||||
'subnet': {'cidr': mock.sentinel.cidr}}
|
||||
m_parse.return_value = (self._i_rules, self._e_rules)
|
||||
m_get_crd.side_effect = exceptions.K8sClientException
|
||||
self._driver.neutron.create_security_group_rule.return_value = {
|
||||
|
@ -224,10 +232,13 @@ class TestNetworkPolicyDriver(test_base.TestCase):
|
|||
'_add_kuryrnetpolicy_crd')
|
||||
@mock.patch.object(network_policy.NetworkPolicyDriver,
|
||||
'parse_network_policy_rules')
|
||||
def test_create_security_group_rules_error_add_crd(self, m_parse,
|
||||
@mock.patch.object(utils, 'get_subnet_cidr')
|
||||
def test_create_security_group_rules_error_add_crd(self, m_utils, m_parse,
|
||||
m_add_crd, m_get_crd):
|
||||
self._driver.neutron.create_security_group.return_value = {
|
||||
'security_group': {'id': mock.sentinel.id}}
|
||||
m_utils.get_subnet_cidr.return_value = {
|
||||
'subnet': {'cidr': mock.sentinel.cidr}}
|
||||
m_parse.return_value = (self._i_rules, self._e_rules)
|
||||
m_add_crd.side_effect = exceptions.K8sClientException
|
||||
self._driver.neutron.create_security_group_rule.return_value = {
|
||||
|
|
|
@ -382,7 +382,7 @@ class FakeLBaaSDriver(drv_base.LBaaSDriver):
|
|||
id=str(uuid.uuid4()))
|
||||
|
||||
def ensure_member(self, loadbalancer, pool, subnet_id, ip, port,
|
||||
target_ref_namespace, target_ref_name
|
||||
target_ref_namespace, target_ref_name, listener_port=None
|
||||
):
|
||||
name = "%s:%s:%s" % (loadbalancer.name, ip, port)
|
||||
return obj_lbaas.LBaaSMember(name=name,
|
||||
|
|
|
@ -16,6 +16,7 @@ import time
|
|||
|
||||
import requests
|
||||
|
||||
from neutronclient.common import exceptions as n_exc
|
||||
from os_vif import objects
|
||||
from oslo_cache import core as cache
|
||||
from oslo_config import cfg
|
||||
|
@ -161,6 +162,17 @@ def get_subnet(subnet_id):
|
|||
return network
|
||||
|
||||
|
||||
@MEMOIZE
|
||||
def get_subnet_cidr(subnet_id):
|
||||
neutron = clients.get_neutron_client()
|
||||
try:
|
||||
subnet_obj = neutron.show_subnet(subnet_id)
|
||||
except n_exc.NeutronClientException:
|
||||
LOG.exception("Subnet %s CIDR not found!", subnet_id)
|
||||
raise
|
||||
return subnet_obj.get('subnet')['cidr']
|
||||
|
||||
|
||||
def extract_pod_annotation(annotation):
|
||||
obj = objects.base.VersionedObject.obj_from_primitive(annotation)
|
||||
# FIXME(dulek): This is code to maintain compatibility with Queens. We can
|
||||
|
|
Loading…
Reference in New Issue