Ensure only affected services are updated on Pod/NetworkPolicy events

When Pods or Network Policies are created/updated/deleted, only the affected
service(s) should have the SG updated. Right now, all the services are updated.

This commit fixes the issue, on the Network Policy side, by checking if any of
the pods selected by a Service is also selected by a Network Policy, and if so
update the SG of that LBaaS.
And on the Pods side, by matching the Service selectors and Network Policy
selectors, when this NP got the pointed pods SG updated. If the selectors
match the LBaaS SG is updated.

Closes-Bug: 1818203
Change-Id: Id996651a7d03bc7621e57b46825ddfa9d98e48ce
This commit is contained in:
Maysa Macedo 2019-02-28 22:46:22 +00:00
parent ba89bd027f
commit 660bbf039a
7 changed files with 75 additions and 14 deletions

View File

@ -251,6 +251,8 @@ class PodSecurityGroupsDriver(DriverBase):
"""Create security group rules for a pod.
:param pod: dict containing Kubernetes Pod object
:return: a list containing podSelectors of CRDs
that had security group rules created
"""
raise NotImplementedError()
@ -258,6 +260,8 @@ class PodSecurityGroupsDriver(DriverBase):
"""Delete security group rules for a pod
:param pod: dict containing Kubernetes Pod object
:return: a list containing podSelectors of CRDs
that had security group rules deleted
"""
raise NotImplementedError()
@ -265,6 +269,8 @@ class PodSecurityGroupsDriver(DriverBase):
"""Update security group rules for a pod
:param pod: dict containing Kubernetes Pod object
:return: a list containing podSelectors of CRDs
that had security group rules updated
"""
raise NotImplementedError()

View File

@ -215,6 +215,7 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
def create_sg_rules(self, pod):
LOG.debug("Creating sg rule for pod: %s", pod['metadata']['name'])
crd_pod_selectors = []
knp_crds = driver_utils.get_kuryrnetpolicy_crds()
for crd in knp_crds.get('items'):
crd_selector = crd['spec'].get('podSelector')
@ -225,11 +226,13 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
if i_matched or e_matched:
driver_utils.patch_kuryr_crd(crd, i_rules,
e_rules, crd_selector)
crd_pod_selectors.append(crd_selector)
return crd_pod_selectors
def delete_sg_rules(self, pod):
LOG.debug("Deleting sg rule for pod: %s", pod['metadata']['name'])
pod_ip = driver_utils.get_pod_ip(pod)
crd_pod_selectors = []
knp_crds = driver_utils.get_kuryrnetpolicy_crds()
for crd in knp_crds.get('items'):
crd_selector = crd['spec'].get('podSelector')
@ -264,11 +267,15 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
if matched:
driver_utils.patch_kuryr_crd(crd, i_rules, e_rules,
crd_selector)
crd_pod_selectors.append(crd_selector)
return crd_pod_selectors
def update_sg_rules(self, pod):
LOG.debug("Updating sg rule for pod: %s", pod['metadata']['name'])
self.delete_sg_rules(pod)
self.create_sg_rules(pod)
crd_pod_selectors = []
crd_pod_selectors.extend(self.delete_sg_rules(pod))
crd_pod_selectors.extend(self.create_sg_rules(pod))
return crd_pod_selectors
def delete_namespace_sg_rules(self, namespace):
ns_name = namespace['metadata']['name']

View File

@ -405,3 +405,21 @@ def get_services(namespace):
'namespace %s', namespace)
raise
return services
def service_matches_affected_pods(service, pod_selectors):
"""Returns if the service is affected by the pod selectors
Checks if the service selector matches the labelSelectors of
NetworkPolicies.
param service: k8s service
param pod_selectors: a list of kubernetes labelSelectors
return: True if the service is selected by any of the labelSelectors
and False otherwise.
"""
svc_selector = service['spec'].get('selector')
for selector in pod_selectors:
if match_selector(selector, svc_selector):
return True
return False

View File

@ -39,9 +39,11 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
super(PodLabelHandler, self).__init__()
self._drv_project = drivers.PodProjectDriver.get_instance()
self._drv_sg = drivers.PodSecurityGroupsDriver.get_instance()
self._drv_svc_sg = drivers.ServiceSecurityGroupsDriver.get_instance()
self._drv_vif_pool = drivers.VIFPoolDriver.get_instance(
specific_driver='multi_pool')
self._drv_vif_pool.set_vif_driver()
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
def on_present(self, pod):
if driver_utils.is_host_network(pod) or not self._has_pod_state(pod):
@ -57,13 +59,16 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
if current_pod_labels == previous_pod_labels:
return
self._drv_sg.update_sg_rules(pod)
crd_pod_selectors = self._drv_sg.update_sg_rules(pod)
project_id = self._drv_project.get_project(pod)
security_groups = self._drv_sg.get_security_groups(pod, project_id)
self._drv_vif_pool.update_vif_sgs(pod, security_groups)
self._set_pod_labels(pod, current_pod_labels)
services = driver_utils.get_services(pod['metadata']['namespace'])
self._update_services(services, crd_pod_selectors, project_id)
def _get_pod_labels(self, pod):
try:
annotations = pod['metadata']['annotations']
@ -94,3 +99,13 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
except KeyError:
return False
return True
def _update_services(self, services, crd_pod_selectors, project_id):
for service in services.get('items'):
if (service['metadata']['name'] == 'kubernetes' or not
driver_utils.service_matches_affected_pods(
service, crd_pod_selectors)):
continue
sgs = self._drv_svc_sg.get_security_groups(service,
project_id)
self._drv_lbaas.update_lbaas_sg(service, sgs)

View File

@ -85,7 +85,8 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
for service in services.get('items'):
# TODO(ltomasbo): Skip other services that are not affected
# by the policy
if service['metadata']['name'] == 'kubernetes':
if (service['metadata']['name'] == 'kubernetes' or not
self._is_service_affected(service, pods_to_update)):
continue
sgs = self._drv_svc_sg.get_security_groups(service,
project_id)
@ -119,7 +120,8 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
services = driver_utils.get_services(
policy['metadata']['namespace'])
for service in services.get('items'):
if service['metadata']['name'] == 'kubernetes':
if (service['metadata']['name'] == 'kubernetes' or not
self._is_service_affected(service, pods_to_update)):
continue
sgs = self._drv_svc_sg.get_security_groups(service,
project_id)
@ -138,3 +140,10 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
if utils.has_limit(sg_quota):
return utils.is_available('security_groups', sg_quota, sg_func)
return True
def _is_service_affected(self, service, affected_pods):
svc_namespace = service['metadata']['namespace']
svc_selector = service['spec'].get('selector')
svc_pods = driver_utils.get_pods({'selector': svc_selector},
svc_namespace).get('items')
return any(pod in svc_pods for pod in affected_pods)

View File

@ -126,12 +126,13 @@ class VIFHandler(k8s_base.ResourceEventHandler):
changed = True
if changed:
self._set_pod_state(pod, state)
self._drv_sg.create_sg_rules(pod)
crd_pod_selectors = self._drv_sg.create_sg_rules(pod)
if self._is_network_policy_enabled():
services = driver_utils.get_services(
pod['metadata']['namespace'])
self._update_services(services, project_id)
self._update_services(
services, crd_pod_selectors, project_id)
def on_deleted(self, pod):
if driver_utils.is_host_network(pod):
@ -139,7 +140,7 @@ class VIFHandler(k8s_base.ResourceEventHandler):
services = driver_utils.get_services(pod['metadata']['namespace'])
project_id = self._drv_project.get_project(pod)
self._drv_sg.delete_sg_rules(pod)
crd_pod_selectors = self._drv_sg.delete_sg_rules(pod)
try:
security_groups = self._drv_sg.get_security_groups(pod, project_id)
except k_exc.ResourceNotReady:
@ -158,7 +159,7 @@ class VIFHandler(k8s_base.ResourceEventHandler):
self._drv_vif_pool.release_vif(pod, vif, project_id,
security_groups)
if self._is_network_policy_enabled():
self._update_services(services, project_id)
self._update_services(services, crd_pod_selectors, project_id)
@MEMOIZE
def is_ready(self, quota):
@ -208,9 +209,11 @@ class VIFHandler(k8s_base.ResourceEventHandler):
constants.K8S_ANNOTATION_LABEL: labels_annotation},
resource_version=pod['metadata']['resourceVersion'])
def _update_services(self, services, project_id):
def _update_services(self, services, crd_pod_selectors, project_id):
for service in services.get('items'):
if service['metadata']['name'] == 'kubernetes':
if (service['metadata']['name'] == 'kubernetes' or not
driver_utils.service_matches_affected_pods(
service, crd_pod_selectors)):
continue
sgs = self._drv_svc_sg.get_security_groups(service,
project_id)

View File

@ -32,7 +32,8 @@ class TestPodLabelHandler(test_base.TestCase):
self._pod_link = mock.sentinel.pod_link
self._pod = {
'metadata': {'resourceVersion': self._pod_version,
'selfLink': self._pod_link},
'selfLink': self._pod_link,
'namespace': 'default'},
'status': {'phase': k_const.K8S_POD_STATUS_PENDING},
'spec': {'hostNetwork': False,
'nodeName': 'hostname'}
@ -72,7 +73,9 @@ class TestPodLabelHandler(test_base.TestCase):
self.assertEqual(sg_driver, handler._drv_sg)
self.assertEqual(vif_pool_driver, handler._drv_vif_pool)
def test_on_present(self):
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
def test_on_present(self, m_get_services):
m_get_services.return_value = {"items": []}
self._has_pod_state.return_value = True
self._get_pod_labels.return_value = {'test1': 'test'}