256 lines
10 KiB
Python
256 lines
10 KiB
Python
# Copyright (c) 2017 RedHat, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from kuryr.lib._i18n import _
|
|
|
|
from kuryr_kubernetes import clients
|
|
from kuryr_kubernetes import constants as k_const
|
|
from kuryr_kubernetes.controller.drivers import base as drv_base
|
|
from kuryr_kubernetes.controller.ingress import ingress_ctl
|
|
from kuryr_kubernetes import exceptions as k_exc
|
|
from kuryr_kubernetes.handlers import k8s_base
|
|
from kuryr_kubernetes.objects import lbaas as obj_lbaas
|
|
from kuryr_kubernetes.objects import route as obj_route
|
|
from kuryr_kubernetes.platform import constants as ocp_const
|
|
from oslo_log import log as logging
|
|
from oslo_serialization import jsonutils
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class OcpRouteHandler(k8s_base.ResourceEventHandler):
|
|
"""OcpRouteHandler handles OCP route events.
|
|
|
|
An OpenShift route allows service to be externally-reachable via host name.
|
|
This host name is then used to route traffic to the service.
|
|
The OcpRouteHandler is responsible for processing all route resource
|
|
events.
|
|
|
|
"""
|
|
|
|
OBJECT_KIND = ocp_const.OCP_OBJ_ROUTE
|
|
OBJECT_WATCH_PATH = "%s/%s" % (ocp_const.OCP_API_BASE, "routes")
|
|
|
|
def __init__(self):
|
|
self._drv_lbaas = drv_base.LBaaSDriver.get_instance()
|
|
self._l7_router = None
|
|
self._l7_router_listeners = None
|
|
|
|
def on_present(self, route):
|
|
if not self._l7_router or not self._l7_router_listeners:
|
|
ing_ctl = ingress_ctl.IngressCtrlr.get_instance()
|
|
self._l7_router, self._l7_router_listeners = (
|
|
ing_ctl.get_router_and_listener())
|
|
if not self._l7_router or not self._l7_router_listeners:
|
|
LOG.info("No L7 router found - do nothing")
|
|
return
|
|
|
|
route_spec = self._get_route_spec(route)
|
|
if not route_spec:
|
|
route_spec = obj_route.RouteSpec()
|
|
|
|
if self._should_ignore(route, route_spec):
|
|
return
|
|
route_state = self._get_route_state(route)
|
|
if not route_state:
|
|
route_state = obj_route.RouteState()
|
|
|
|
self._sync_router_pool(route, route_spec, route_state)
|
|
self._sync_l7_policy(route, route_spec, route_state)
|
|
self._sync_host_l7_rule(route, route_spec, route_state)
|
|
self._sync_path_l7_rule(route, route_spec, route_state)
|
|
|
|
self._set_route_state(route, route_state)
|
|
self._set_route_spec(route, route_spec)
|
|
self._send_route_notification_to_ep(
|
|
route, route_spec.to_service)
|
|
|
|
def _get_endpoints_link_by_route(self, route_link, ep_name):
|
|
route_link = route_link.replace(
|
|
ocp_const.OCP_API_BASE, k_const.K8S_API_BASE)
|
|
link_parts = route_link.split('/')
|
|
if link_parts[-2] != 'routes':
|
|
raise k_exc.IntegrityError(_(
|
|
"Unsupported route link: %(link)s") % {
|
|
'link': route_link})
|
|
link_parts[-2] = 'endpoints'
|
|
link_parts[-1] = ep_name
|
|
return "/".join(link_parts)
|
|
|
|
def _send_route_notification_to_ep(self, route, ep_name):
|
|
route_link = route['metadata']['selfLink']
|
|
ep_link = self._get_endpoints_link_by_route(route_link, ep_name)
|
|
k8s = clients.get_kubernetes_client()
|
|
try:
|
|
k8s.get(ep_link)
|
|
except k_exc.K8sClientException:
|
|
LOG.debug("Failed to get EP link : %s", ep_link)
|
|
return
|
|
|
|
route_notifier = obj_lbaas.LBaaSRouteNotifier()
|
|
route_notifier.routes.append(
|
|
obj_lbaas.LBaaSRouteNotifEntry(
|
|
route_id=route['metadata']['uid'], msg='RouteChanged'))
|
|
route_notifier.obj_reset_changes(recursive=True)
|
|
LOG.debug("Setting LBaaSRouteNotifier annotation: %r", route_notifier)
|
|
annotation = jsonutils.dumps(route_notifier.obj_to_primitive(),
|
|
sort_keys=True)
|
|
k8s.annotate(
|
|
ep_link,
|
|
{k_const.K8S_ANNOTATION_LBAAS_RT_NOTIF: annotation},
|
|
resource_version=route['metadata']['resourceVersion'])
|
|
|
|
def _should_ignore(self, route, route_spec):
|
|
spec = route['spec']
|
|
return ((not self._l7_router)
|
|
or
|
|
((spec.get('host') == route_spec.host) and
|
|
(spec.get('path') == route_spec.path) and
|
|
(spec['to'].get('name') == route_spec.to_service)))
|
|
|
|
def on_deleted(self, route):
|
|
if not self._l7_router:
|
|
LOG.info("No L7 router found - do nothing")
|
|
return
|
|
|
|
route_state = self._get_route_state(route)
|
|
if not route_state:
|
|
return
|
|
# NOTE(yboaron): deleting l7policy deletes also l7rules
|
|
if route_state.l7_policy:
|
|
self._drv_lbaas.release_l7_policy(
|
|
self._l7_router, route_state.l7_policy)
|
|
|
|
if route_state.router_pool:
|
|
if self._drv_lbaas.is_pool_used_by_other_l7policies(
|
|
route_state.l7_policy, route_state.router_pool):
|
|
LOG.debug("Can't delete pool (pointed by another route)")
|
|
else:
|
|
self._drv_lbaas.release_pool(
|
|
self._l7_router, route_state.router_pool)
|
|
# no more routes pointing to this pool/ep - update ep
|
|
spec = route['spec']
|
|
self._send_route_notification_to_ep(
|
|
route, spec['to'].get('name'))
|
|
|
|
def _sync_router_pool(self, route, route_spec, route_state):
|
|
if route_state.router_pool:
|
|
return
|
|
|
|
pool_name = self._drv_lbaas.get_loadbalancer_pool_name(
|
|
self._l7_router, route['metadata']['namespace'],
|
|
route['spec']['to']['name'])
|
|
pool = self._drv_lbaas.get_pool_by_name(
|
|
pool_name, self._l7_router.project_id)
|
|
if not pool:
|
|
pool = self._drv_lbaas.ensure_pool_attached_to_lb(
|
|
self._l7_router, route['metadata']['namespace'],
|
|
route['spec']['to']['name'], protocol='HTTP')
|
|
|
|
route_state.router_pool = pool
|
|
route_spec.to_service = route['spec']['to']['name']
|
|
|
|
def _sync_l7_policy(self, route, route_spec, route_state):
|
|
if route_state.l7_policy:
|
|
return
|
|
# TBD , take care of listener HTTPS
|
|
listener = self._l7_router_listeners[k_const.KURYR_L7_ROUTER_HTTP_PORT]
|
|
|
|
route_state.l7_policy = self._drv_lbaas.ensure_l7_policy(
|
|
route['metadata']['namespace'], route['metadata']['name'],
|
|
self._l7_router, route_state.router_pool, listener.id)
|
|
|
|
def _sync_host_l7_rule(self, route, route_spec, route_state):
|
|
if route_spec.host == route['spec']['host']:
|
|
return
|
|
if not route_spec.host:
|
|
route_state.h_l7_rule = self._drv_lbaas.ensure_l7_rule(
|
|
self._l7_router, route_state.l7_policy,
|
|
'EQUAL_TO', 'HOST_NAME', route['spec']['host'])
|
|
else:
|
|
self._drv_lbaas.update_l7_rule(
|
|
route_state.h_l7_rule, route['spec']['host'])
|
|
route_state.h_l7_rule.value = route['spec']['host']
|
|
|
|
route_spec.host = route['spec']['host']
|
|
|
|
def _sync_path_l7_rule(self, route, route_spec, route_state):
|
|
if route_spec.path == route['spec'].get('path'):
|
|
return
|
|
if not route_spec.path:
|
|
route_state.p_l7_rule = self._drv_lbaas.ensure_l7_rule(
|
|
self._l7_router, route_state.l7_policy,
|
|
'EQUAL_TO', 'PATH', route['spec']['path'])
|
|
else:
|
|
if route['spec']['path']:
|
|
self._drv_lbaas.update_l7_rule(
|
|
route_state.p_l7_rule, route['spec']['path'])
|
|
route_state.p_l7_rule.value = route['spec']['path']
|
|
else:
|
|
self._drv_lbaas.release_l7_rule(route_state.p_l7_rule)
|
|
route_state.p_l7_rule = None
|
|
|
|
route_spec.path = route['spec']['path']
|
|
|
|
def _get_route_spec(self, route):
|
|
try:
|
|
annotations = route['metadata']['annotations']
|
|
annotation = annotations[k_const.K8S_ANNOTATION_ROUTE_SPEC]
|
|
except KeyError:
|
|
return obj_route.RouteSpec()
|
|
obj_dict = jsonutils.loads(annotation)
|
|
obj = obj_route.RouteSpec.obj_from_primitive(obj_dict)
|
|
LOG.debug("Got RouteSpec from annotation: %r", obj)
|
|
return obj
|
|
|
|
def _set_route_spec(self, route, route_spec):
|
|
if route_spec is None:
|
|
LOG.debug("Removing RouteSpec annotation: %r", route_spec)
|
|
annotation = None
|
|
else:
|
|
route_spec.obj_reset_changes(recursive=True)
|
|
LOG.debug("Setting RouteSpec annotation: %r", route_spec)
|
|
annotation = jsonutils.dumps(route_spec.obj_to_primitive(),
|
|
sort_keys=True)
|
|
k8s = clients.get_kubernetes_client()
|
|
k8s.annotate(route['metadata']['selfLink'],
|
|
{k_const.K8S_ANNOTATION_ROUTE_SPEC: annotation},
|
|
resource_version=route['metadata']['resourceVersion'])
|
|
|
|
def _get_route_state(self, route):
|
|
try:
|
|
annotations = route['metadata']['annotations']
|
|
annotation = annotations[k_const.K8S_ANNOTATION_ROUTE_STATE]
|
|
except KeyError:
|
|
return obj_route.RouteState()
|
|
obj_dict = jsonutils.loads(annotation)
|
|
obj = obj_route.RouteState.obj_from_primitive(obj_dict)
|
|
LOG.debug("Got RouteState from annotation: %r", obj)
|
|
return obj
|
|
|
|
def _set_route_state(self, route, route_state):
|
|
if route_state is None:
|
|
LOG.debug("Removing RouteState annotation: %r", route_state)
|
|
annotation = None
|
|
else:
|
|
route_state.obj_reset_changes(recursive=True)
|
|
LOG.debug("Setting RouteState annotation: %r", route_state)
|
|
annotation = jsonutils.dumps(route_state.obj_to_primitive(),
|
|
sort_keys=True)
|
|
k8s = clients.get_kubernetes_client()
|
|
k8s.annotate(route['metadata']['selfLink'],
|
|
{k_const.K8S_ANNOTATION_ROUTE_STATE: annotation},
|
|
resource_version=route['metadata']['resourceVersion'])
|