Add support for podSelector
This include support for both types, when pod selector is used alone or together with a namespace selector. TODO in follow up patch sets: - React to new pods/namespaces created with labels - React to pod/namespaces relabeling/deletion Partially Implements: blueprint k8s-network-policies Change-Id: Ie29b9da64fcd5df7b9a0e9af7b4835208f76da66
This commit is contained in:
parent
fe583c3e6d
commit
30369502bb
|
@ -12,9 +12,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from six.moves.urllib.parse import urlencode
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutronclient.common import exceptions as n_exc
|
||||
|
@ -72,7 +69,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
LOG.debug("Already existing CRD %s", crd_name)
|
||||
sg_id = crd['spec']['securityGroupId']
|
||||
# Fetch existing SG rules from kuryrnetpolicy CRD
|
||||
existing_sg_rules = None
|
||||
existing_sg_rules = []
|
||||
existing_i_rules = crd['spec'].get('ingressSgRules')
|
||||
existing_e_rules = crd['spec'].get('egressSgRules')
|
||||
if existing_i_rules or existing_e_rules:
|
||||
|
@ -158,7 +155,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
for e_rule in e_rules:
|
||||
sgr_id = self._create_security_group_rule(e_rule)
|
||||
e_rule['security_group_rule']['id'] = sgr_id
|
||||
except n_exc.NeutronClientException:
|
||||
except (n_exc.NeutronClientException, exceptions.ResourceNotReady):
|
||||
LOG.exception("Error creating security group for network policy "
|
||||
" %s", policy['metadata']['name'])
|
||||
# If there's any issue creating sg rules, remove them
|
||||
|
@ -183,36 +180,161 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
LOG.exception('Error annotating network policy')
|
||||
raise
|
||||
|
||||
def _get_namespaces_cidr(self, namespace_selector):
|
||||
def _get_pods_ips(self, pod_selector, namespace=None,
|
||||
namespace_selector=None):
|
||||
ips = []
|
||||
matching_pods = []
|
||||
if namespace_selector:
|
||||
matching_namespaces = utils.get_namespaces(namespace_selector)
|
||||
for ns in matching_namespaces.get('items'):
|
||||
matching_pods = utils.get_pods(pod_selector,
|
||||
ns['metadata']['name'])
|
||||
else:
|
||||
matching_pods = utils.get_pods(pod_selector, namespace)
|
||||
for pod in matching_pods.get('items'):
|
||||
if pod['status']['podIP']:
|
||||
ips.append(pod['status']['podIP'])
|
||||
return ips
|
||||
|
||||
def _get_namespace_subnet_cidr(self, namespace):
|
||||
try:
|
||||
ns_annotations = namespace['metadata']['annotations']
|
||||
ns_name = ns_annotations[constants.K8S_ANNOTATION_NET_CRD]
|
||||
except KeyError:
|
||||
LOG.exception('Namespace handler must be enabled to support '
|
||||
'Network Policies with namespaceSelector')
|
||||
raise exceptions.ResourceNotReady(namespace)
|
||||
try:
|
||||
net_crd = self.kubernetes.get('{}/kuryrnets/{}'.format(
|
||||
constants.K8S_API_CRD, ns_name))
|
||||
except exceptions.K8sClientException:
|
||||
LOG.exception("Kubernetes Client Exception.")
|
||||
raise
|
||||
return net_crd['spec']['subnetCIDR']
|
||||
|
||||
def _get_namespaces_cidr(self, namespace_selector, namespace=None):
|
||||
cidrs = []
|
||||
namespace_label = urlencode(namespace_selector[
|
||||
'matchLabels'])
|
||||
# NOTE(maysams): K8s API does not accept &, so we need to replace
|
||||
# it with ',' or '%2C' instead
|
||||
namespace_label = namespace_label.replace('&', ',')
|
||||
matching_namespaces = self.kubernetes.get(
|
||||
'{}/namespaces?labelSelector={}'.format(
|
||||
constants.K8S_API_BASE, namespace_label)).get('items')
|
||||
for ns in matching_namespaces:
|
||||
# NOTE(ltomasbo): This requires the namespace handler to be
|
||||
# also enabled
|
||||
try:
|
||||
ns_annotations = ns['metadata']['annotations']
|
||||
ns_name = ns_annotations[constants.K8S_ANNOTATION_NET_CRD]
|
||||
except KeyError:
|
||||
LOG.exception('Namespace handler must be enabled to support '
|
||||
'Network Policies with namespaceSelector')
|
||||
raise
|
||||
try:
|
||||
net_crd = self.kubernetes.get('{}/kuryrnets/{}'.format(
|
||||
constants.K8S_API_CRD, ns_name))
|
||||
except exceptions.K8sClientException:
|
||||
LOG.exception("Kubernetes Client Exception.")
|
||||
raise
|
||||
ns_cidr = net_crd['spec']['subnetCIDR']
|
||||
if not namespace_selector and namespace:
|
||||
ns = self.kubernetes.get(
|
||||
'{}/namespaces/{}'.format(constants.K8S_API_BASE, namespace))
|
||||
ns_cidr = self._get_namespace_subnet_cidr(ns)
|
||||
cidrs.append(ns_cidr)
|
||||
else:
|
||||
matching_namespaces = utils.get_namespaces(namespace_selector)
|
||||
for ns in matching_namespaces.get('items'):
|
||||
# NOTE(ltomasbo): This requires the namespace handler to be
|
||||
# also enabled
|
||||
ns_cidr = self._get_namespace_subnet_cidr(ns)
|
||||
cidrs.append(ns_cidr)
|
||||
return cidrs
|
||||
|
||||
def _parse_selectors(self, rule_block, rule_direction, policy_namespace):
|
||||
allowed_cidrs = []
|
||||
allow_all = False
|
||||
selectors = False
|
||||
for rule in rule_block.get(rule_direction, []):
|
||||
namespace_selector = rule.get('namespaceSelector')
|
||||
pod_selector = rule.get('podSelector')
|
||||
if namespace_selector == {}:
|
||||
selectors = True
|
||||
if pod_selector:
|
||||
# allow matching pods in all namespaces
|
||||
allowed_cidrs.extend(self._get_pods_ips(
|
||||
pod_selector))
|
||||
else:
|
||||
# allow from all
|
||||
allow_all = True
|
||||
elif namespace_selector:
|
||||
selectors = True
|
||||
if pod_selector:
|
||||
# allow matching pods on maching namespaces
|
||||
allowed_cidrs.extend(self._get_pods_ips(
|
||||
pod_selector,
|
||||
namespace_selector=namespace_selector))
|
||||
else:
|
||||
# allow from/to all on the maching namespaces
|
||||
allowed_cidrs.extend(self._get_namespaces_cidr(
|
||||
namespace_selector))
|
||||
else:
|
||||
if pod_selector == {}:
|
||||
# allow from/to all pods on the network policy
|
||||
# namespace
|
||||
selectors = True
|
||||
allowed_cidrs.extend(self._get_namespaces_cidr(
|
||||
None,
|
||||
namespace=policy_namespace))
|
||||
elif pod_selector:
|
||||
# allow matching pods on the network policy
|
||||
# namespace
|
||||
selectors = True
|
||||
allowed_cidrs.extend(self._get_pods_ips(
|
||||
pod_selector,
|
||||
namespace=policy_namespace))
|
||||
return allow_all, selectors, allowed_cidrs
|
||||
|
||||
def _parse_sg_rules(self, sg_rule_body_list, direction, policy, sg_id):
|
||||
rule_list = policy['spec'].get(direction)
|
||||
if not rule_list:
|
||||
return
|
||||
|
||||
policy_namespace = policy['metadata']['namespace']
|
||||
rule_direction = 'from'
|
||||
if direction == 'egress':
|
||||
rule_direction = 'to'
|
||||
|
||||
if rule_list[0] == {}:
|
||||
LOG.debug('Applying default all open policy from %s',
|
||||
policy['metadata']['selfLink'])
|
||||
rule = self._create_security_group_rule_body(
|
||||
sg_id, direction, port_range_min=1, port_range_max=65535)
|
||||
sg_rule_body_list.append(rule)
|
||||
|
||||
for rule_block in rule_list:
|
||||
LOG.debug('Parsing %(dir)s Rule %(rule)s', {'dir': direction,
|
||||
'rule': rule_block})
|
||||
allow_all, selectors, allowed_cidrs = self._parse_selectors(
|
||||
rule_block, rule_direction, policy_namespace)
|
||||
|
||||
if 'ports' in rule_block:
|
||||
for port in rule_block['ports']:
|
||||
if allowed_cidrs or allow_all or selectors:
|
||||
for cidr in allowed_cidrs:
|
||||
rule = self._create_security_group_rule_body(
|
||||
sg_id, direction, port.get('port'),
|
||||
protocol=port.get('protocol'),
|
||||
cidr=cidr)
|
||||
sg_rule_body_list.append(rule)
|
||||
if allow_all:
|
||||
rule = self._create_security_group_rule_body(
|
||||
sg_id, direction, port.get('port'),
|
||||
protocol=port.get('protocol'))
|
||||
sg_rule_body_list.append(rule)
|
||||
else:
|
||||
rule = self._create_security_group_rule_body(
|
||||
sg_id, direction, port.get('port'),
|
||||
protocol=port.get('protocol'))
|
||||
sg_rule_body_list.append(rule)
|
||||
elif allowed_cidrs or allow_all or selectors:
|
||||
for cidr in allowed_cidrs:
|
||||
rule = self._create_security_group_rule_body(
|
||||
sg_id, direction,
|
||||
port_range_min=1,
|
||||
port_range_max=65535,
|
||||
cidr=cidr)
|
||||
sg_rule_body_list.append(rule)
|
||||
if allow_all:
|
||||
rule = self._create_security_group_rule_body(
|
||||
sg_id, direction,
|
||||
port_range_min=1,
|
||||
port_range_max=65535)
|
||||
sg_rule_body_list.append(rule)
|
||||
else:
|
||||
LOG.debug('This network policy specifies no %(direction)s '
|
||||
'%(rule_direction)s and no ports: %(policy)s',
|
||||
{'direction': direction,
|
||||
'rule_direction': rule_direction,
|
||||
'policy': policy['metadata']['selfLink']})
|
||||
|
||||
def parse_network_policy_rules(self, policy, sg_id):
|
||||
"""Create security group rule bodies out of network policies.
|
||||
|
||||
|
@ -221,94 +343,13 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
|
|||
ingress and egress ports blocks.
|
||||
"""
|
||||
LOG.debug('Parsing Network Policy %s' % policy['metadata']['name'])
|
||||
ingress_rule_list = policy['spec'].get('ingress')
|
||||
egress_rule_list = policy['spec'].get('egress')
|
||||
ingress_sg_rule_body_list = []
|
||||
egress_sg_rule_body_list = []
|
||||
|
||||
if ingress_rule_list:
|
||||
if ingress_rule_list[0] == {}:
|
||||
LOG.debug('Applying default all open policy from %s',
|
||||
policy['metadata']['selfLink'])
|
||||
i_rule = self._create_security_group_rule_body(
|
||||
sg_id, 'ingress', port_range_min=1, port_range_max=65535)
|
||||
ingress_sg_rule_body_list.append(i_rule)
|
||||
for ingress_rule in ingress_rule_list:
|
||||
LOG.debug('Parsing Ingress Rule %s', ingress_rule)
|
||||
allowed_cidrs = []
|
||||
for from_rule in ingress_rule.get('from', []):
|
||||
namespace_selector = from_rule.get('namespaceSelector')
|
||||
if namespace_selector:
|
||||
allowed_cidrs = self._get_namespaces_cidr(
|
||||
namespace_selector)
|
||||
if 'ports' in ingress_rule:
|
||||
for port in ingress_rule['ports']:
|
||||
if allowed_cidrs:
|
||||
for cidr in allowed_cidrs:
|
||||
i_rule = self._create_security_group_rule_body(
|
||||
sg_id, 'ingress', port.get('port'),
|
||||
protocol=port.get('protocol'),
|
||||
cidr=cidr)
|
||||
ingress_sg_rule_body_list.append(i_rule)
|
||||
else:
|
||||
i_rule = self._create_security_group_rule_body(
|
||||
sg_id, 'ingress', port.get('port'),
|
||||
protocol=port.get('protocol'))
|
||||
ingress_sg_rule_body_list.append(i_rule)
|
||||
elif allowed_cidrs:
|
||||
for cidr in allowed_cidrs:
|
||||
i_rule = self._create_security_group_rule_body(
|
||||
sg_id, 'ingress',
|
||||
port_range_min=1,
|
||||
port_range_max=65535,
|
||||
cidr=cidr)
|
||||
ingress_sg_rule_body_list.append(i_rule)
|
||||
else:
|
||||
LOG.debug('This network policy specifies no ingress from '
|
||||
'and no ports: %s',
|
||||
policy['metadata']['selfLink'])
|
||||
|
||||
if egress_rule_list:
|
||||
if egress_rule_list[0] == {}:
|
||||
LOG.debug('Applying default all open policy from %s',
|
||||
policy['metadata']['selfLink'])
|
||||
e_rule = self._create_security_group_rule_body(
|
||||
sg_id, 'egress', port_range_min=1, port_range_max=65535)
|
||||
egress_sg_rule_body_list.append(e_rule)
|
||||
for egress_rule in egress_rule_list:
|
||||
LOG.debug('Parsing Egress Rule %s', egress_rule)
|
||||
allowed_cidrs = []
|
||||
for from_rule in egress_rule.get('to', []):
|
||||
namespace_selector = from_rule.get('namespaceSelector')
|
||||
if namespace_selector:
|
||||
allowed_cidrs = self._get_namespaces_cidr(
|
||||
namespace_selector)
|
||||
if 'ports' in egress_rule:
|
||||
for port in egress_rule['ports']:
|
||||
if allowed_cidrs:
|
||||
for cidr in allowed_cidrs:
|
||||
e_rule = self._create_security_group_rule_body(
|
||||
sg_id, 'egress', port.get('port'),
|
||||
protocol=port.get('protocol'),
|
||||
cidr=cidr)
|
||||
egress_sg_rule_body_list.append(e_rule)
|
||||
else:
|
||||
e_rule = self._create_security_group_rule_body(
|
||||
sg_id, 'egress', port.get('port'),
|
||||
protocol=port.get('protocol'))
|
||||
egress_sg_rule_body_list.append(e_rule)
|
||||
elif allowed_cidrs:
|
||||
for cidr in allowed_cidrs:
|
||||
e_rule = self._create_security_group_rule_body(
|
||||
sg_id, 'egress',
|
||||
port_range_min=1,
|
||||
port_range_max=65535,
|
||||
cidr=cidr)
|
||||
egress_sg_rule_body_list.append(e_rule)
|
||||
else:
|
||||
LOG.debug('This network policy specifies no egrees to '
|
||||
'and no ports: %s',
|
||||
policy['metadata']['selfLink'])
|
||||
self._parse_sg_rules(ingress_sg_rule_body_list, 'ingress', policy,
|
||||
sg_id)
|
||||
self._parse_sg_rules(egress_sg_rule_body_list, 'egress', policy,
|
||||
sg_id)
|
||||
|
||||
return ingress_sg_rule_body_list, egress_sg_rule_body_list
|
||||
|
||||
|
|
|
@ -66,6 +66,18 @@ def is_host_network(pod):
|
|||
|
||||
|
||||
def get_pods(selector, namespace):
|
||||
"""Return a k8s object list with the pods matching the selector.
|
||||
|
||||
It accepts an optional parameter to state the namespace where the pod
|
||||
selector will be apply. If empty namespace is passed, then the pod
|
||||
selector is applied in all namespaces.
|
||||
|
||||
param selector: k8s selector of types matchLabels or matchExpressions
|
||||
param namespace: namespace name where the selector will be applied. If
|
||||
None, the pod selector is applied in all namespaces
|
||||
return: k8s list objec containing all matching pods
|
||||
|
||||
"""
|
||||
kubernetes = clients.get_kubernetes_client()
|
||||
labels = selector.get('matchLabels', None)
|
||||
if labels:
|
||||
|
@ -83,13 +95,45 @@ def get_pods(selector, namespace):
|
|||
else:
|
||||
labels = parse.quote(exps)
|
||||
|
||||
pods = kubernetes.get(
|
||||
'{}/namespaces/{}/pods?labelSelector={}'.format(
|
||||
constants.K8S_API_BASE, namespace, labels))
|
||||
if namespace:
|
||||
pods = kubernetes.get(
|
||||
'{}/namespaces/{}/pods?labelSelector={}'.format(
|
||||
constants.K8S_API_BASE, namespace, labels))
|
||||
else:
|
||||
pods = kubernetes.get(
|
||||
'{}/pods?labelSelector={}'.format(constants.K8S_API_BASE, labels))
|
||||
|
||||
return pods
|
||||
|
||||
|
||||
def get_namespaces(selector):
|
||||
"""Return a k8s object list with the namespaces matching the selector.
|
||||
|
||||
param selector: k8s selector of types matchLabels or matchExpressions
|
||||
return: k8s list objec containing all matching namespaces
|
||||
|
||||
"""
|
||||
kubernetes = clients.get_kubernetes_client()
|
||||
labels = selector.get('matchLabels', None)
|
||||
if labels:
|
||||
labels = replace_encoded_characters(labels)
|
||||
|
||||
exps = selector.get('matchExpressions', None)
|
||||
if exps:
|
||||
exps = ', '.join(format_expression(exp) for exp in exps)
|
||||
if labels:
|
||||
expressions = parse.quote("," + exps)
|
||||
labels += expressions
|
||||
else:
|
||||
labels = parse.quote(exps)
|
||||
|
||||
namespaces = kubernetes.get(
|
||||
'{}/namespaces?labelSelector={}'.format(
|
||||
constants.K8S_API_BASE, labels))
|
||||
|
||||
return namespaces
|
||||
|
||||
|
||||
def format_expression(expression):
|
||||
key = expression['key']
|
||||
operator = expression['operator'].lower()
|
||||
|
|
|
@ -306,7 +306,8 @@ class TestNetworkPolicyDriver(test_base.TestCase):
|
|||
pod = get_pod_obj()
|
||||
self.kubernetes.get.return_value = {'items': [pod]}
|
||||
|
||||
self.assertRaises(KeyError, self._driver._get_namespaces_cidr,
|
||||
self.assertRaises(exceptions.ResourceNotReady,
|
||||
self._driver._get_namespaces_cidr,
|
||||
namespace_selector)
|
||||
self.kubernetes.get.assert_called_once()
|
||||
|
||||
|
|
Loading…
Reference in New Issue