Support KuryrLoadbalancer CRD

This commit includes the support for
KuryrLoadBalancer CRD on services tests.

Change-Id: I62a9e301d90ce5aff87b14ac4f1af02fe46436a5
This commit is contained in:
Maysa Macedo 2020-07-27 12:38:50 +00:00 committed by Michał Dulko
parent 5dc54def33
commit 2b388b8a93
2 changed files with 119 additions and 35 deletions

View File

@ -73,6 +73,8 @@ kuryr_k8s_opts = [
"cloud provider is set"),
cfg.BoolOpt("validate_crd", default=False, help="Whether or not kuryr "
"CRDs should be validated"),
cfg.BoolOpt("kuryrloadbalancers", default=False, help="Whether or not "
"kuryrloadbalancers CRDs are used"),
cfg.BoolOpt("kuryrnetworks", default=False, help="Whether or not "
"kuryrnetworks CRDs are used"),
cfg.BoolOpt("new_kuryrnetworkpolicy_crd", default=False,

View File

@ -44,6 +44,7 @@ KURYR_CRD_VERSION = 'v1'
KURYR_NET_CRD_PLURAL = 'kuryrnets'
KURYR_NETWORK_CRD_PLURAL = 'kuryrnetworks'
KURYR_PORT_CRD_PLURAL = 'kuryrports'
KURYR_LOAD_BALANCER_CRD_PLURAL = 'kuryrloadbalancers'
K8S_ANNOTATION_PREFIX = 'openstack.org/kuryr'
K8S_ANNOTATION_LBAAS_STATE = K8S_ANNOTATION_PREFIX + '-lbaas-state'
K8S_ANNOTATION_LBAAS_RT_STATE = K8S_ANNOTATION_PREFIX + '-lbaas-route-state'
@ -410,45 +411,89 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
def get_service_ip(
cls, service_name, spec_type="ClusterIP", namespace="default"):
api = cls.k8s_client.CoreV1Api()
while True:
service = api.read_namespaced_service(service_name, namespace)
if spec_type == "LoadBalancer":
# In case of a cloud provider not being configured, OpenShift
# allocates an external IP and overwrites the service
# status/ingress/IP set by Kuryr-controller.
# In this case, we should retrieve the external IP from
# Kuryr's annotation.
if CONF.kuryr_kubernetes.kuryrloadbalancers:
return cls.get_svc_ip_on_crd(service_name, namespace)
else:
return cls.get_svc_ip_on_annotation(service_name, namespace)
elif spec_type == "ClusterIP":
return service.spec.cluster_ip
else:
raise lib_exc.NotImplemented()
@classmethod
def get_svc_ip_on_annotation(cls, service_name, namespace):
api = cls.k8s_client.CoreV1Api()
start = time.time()
while time.time() - start < consts.LB_TIMEOUT:
time.sleep(5)
service = api.read_namespaced_service(service_name, namespace)
if spec_type == "LoadBalancer":
# In case of a cloud provider not being configured, OpenShift
# allocates an external IP and overwrites the service
# status/ingress/IP set by Kuryr-controller.
# In this case, we should retrieve the external IP from
# Kuryr's annotation.
if service.status.load_balancer.ingress:
endpoints = api.read_namespaced_endpoints(
service_name, namespace)
annotations = endpoints.metadata.annotations
try:
ann_dict = json.loads(
annotations[K8S_ANNOTATION_LBAAS_STATE])
ann_lb_ip = (
ann_dict["versioned_object.data"]
["service_pub_ip_info"]
["versioned_object.data"]
["ip_addr"])
except KeyError:
LOG.info("Waiting till LB's IP appears in annotation "
"(ingress.ip=%s)",
service.status.load_balancer.ingress[0].ip)
continue
if ann_lb_ip != service.status.load_balancer.ingress[0].ip:
LOG.warning(
'Annotated pub_ip(%s) != ingress.ip(%s).',
ann_lb_ip,
service.status.load_balancer.ingress[0].ip)
if not CONF.kuryr_kubernetes.cloud_provider:
return ann_lb_ip
return service.status.load_balancer.ingress[0].ip
msg = "Timed out waiting for endpoints annotation %s" % service_name
raise lib_exc.TimeoutException(msg)
if service.status.load_balancer.ingress:
endpoints = api.read_namespaced_endpoints(
@classmethod
def get_svc_ip_on_crd(cls, service_name, namespace):
api = cls.k8s_client.CoreV1Api()
start = time.time()
while time.time() - start < consts.LB_TIMEOUT:
time.sleep(5)
service = api.read_namespaced_service(service_name, namespace)
if service.status.load_balancer.ingress:
try:
klb_crd = cls.get_kuryr_loadbalancer_crds(
service_name, namespace)
annotations = endpoints.metadata.annotations
except kubernetes.client.rest.ApiException:
continue
ingress_ip = service.status.load_balancer.ingress[0].ip
klb_status = klb_crd.get('status')
if (klb_status and klb_status.get('service_pub_ip_info')):
try:
ann_dict = json.loads(
annotations[K8S_ANNOTATION_LBAAS_STATE])
ann_lb_ip = (
ann_dict["versioned_object.data"]
["service_pub_ip_info"]
["versioned_object.data"]
["ip_addr"])
crd_lb_ip = klb_status['service_pub_ip_info'][
'ip_addr']
except KeyError:
LOG.info("Waiting till LB's IP appears in annotation "
"(ingress.ip=%s)",
service.status.load_balancer.ingress[0].ip)
LOG.info("Waiting till LB's IP appears in CRD "
"(ingress.ip=%s)", ingress_ip)
continue
if ann_lb_ip != service.status.load_balancer.ingress[0].ip:
LOG.warning(
'Annotated pub_ip(%s) != ingress.ip(%s).',
ann_lb_ip,
service.status.load_balancer.ingress[0].ip)
if not CONF.kuryr_kubernetes.cloud_provider:
return ann_lb_ip
return service.status.load_balancer.ingress[0].ip
elif spec_type == "ClusterIP":
return service.spec.cluster_ip
else:
raise lib_exc.NotImplemented()
if crd_lb_ip != ingress_ip:
LOG.warning(
'LB CRD pub_ip(%s) != ingress.ip(%s).',
crd_lb_ip, ingress_ip)
if not CONF.kuryr_kubernetes.cloud_provider:
return crd_lb_ip
return ingress_ip
msg = "Timed out waiting for lb crd status %s" % service_name
raise lib_exc.TimeoutException(msg)
@classmethod
def wait_kuryr_annotation(cls, group, version, plural, annotation,
@ -678,6 +723,13 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
namespace=namespace, plural=KURYR_NETWORK_CRD_PLURAL,
name=namespace)
@classmethod
def get_kuryr_loadbalancer_crds(cls, name, namespace):
return cls.k8s_client.CustomObjectsApi().get_namespaced_custom_object(
group=KURYR_CRD_GROUP, version=KURYR_CRD_VERSION,
namespace=namespace, plural=KURYR_LOAD_BALANCER_CRD_PLURAL,
name=name)
def get_pod_list(self, namespace='default', label_selector=''):
return self.k8s_client.CoreV1Api().list_namespaced_pod(
namespace=namespace, label_selector=label_selector).items
@ -826,9 +878,39 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
@classmethod
def verify_lbaas_endpoints_configured(cls, ep_name, pod_num,
namespace='default'):
cls._verify_endpoints_annotation(
ep_name=ep_name, ann_string=K8S_ANNOTATION_LBAAS_STATE,
poll_interval=5, namespace=namespace, pod_num=pod_num)
if CONF.kuryr_kubernetes.kuryrloadbalancers:
cls._verify_klb_crd(
ep_name, poll_interval=5, namespace=namespace,
pod_num=pod_num)
else:
cls._verify_endpoints_annotation(
ep_name=ep_name, ann_string=K8S_ANNOTATION_LBAAS_STATE,
poll_interval=5, namespace=namespace, pod_num=pod_num)
@classmethod
def _verify_klb_crd(cls, name, poll_interval=1, namespace='default',
timeout_period=consts.LB_TIMEOUT, pod_num=None):
start = time.time()
klb_crd_has_status = False
while time.time() - start < timeout_period:
time.sleep(poll_interval)
try:
klb_crd = cls.get_kuryr_loadbalancer_crds(
name, namespace)
except kubernetes.client.rest.ApiException:
continue
if (klb_crd.get('status') and
klb_crd['status'].get('members')):
members_num = len(klb_crd['status'].get('members'))
if pod_num and pod_num != members_num:
LOG.info("members num(%d) != pod_num(%d)",
members_num, pod_num)
continue
klb_crd_has_status = True
break
if not klb_crd_has_status:
msg = "Timed out waiting for klb crd %s creation" % name
raise lib_exc.TimeoutException(msg)
@classmethod
def _verify_endpoints_annotation(cls, ep_name, ann_string,