From 82dce858cf6a077d85fbad8ae356006e9fa715a3 Mon Sep 17 00:00:00 2001 From: Jaume Devesa Date: Wed, 24 Aug 2016 15:19:26 +0200 Subject: [PATCH] Add asyncio eventloop. This commits introduces the asyncio event loop as well as the base abstract class to define watchers. It is in a very simple approach (it does not reschedule watchers if they fail) but it lets you to see the proposal of hierarchy in watchers as well as the methods that a watcher has to implement (see the pod module). Partial-Implements: blueprint kuryr-k8s-integration Co-Authored-By: Taku Fukushima Co-Authored-By: Antoni Segura Puimedon Change-Id: I91975dd197213c1a6b0e171c1ae218a547722eeb --- kuryr_kubernetes/server.py | 33 +++++-- kuryr_kubernetes/utils.py | 22 +++++ kuryr_kubernetes/watchers/__init__.py | 0 kuryr_kubernetes/watchers/base.py | 123 ++++++++++++++++++++++++++ kuryr_kubernetes/watchers/pod.py | 38 ++++++++ 5 files changed, 208 insertions(+), 8 deletions(-) create mode 100644 kuryr_kubernetes/utils.py create mode 100644 kuryr_kubernetes/watchers/__init__.py create mode 100644 kuryr_kubernetes/watchers/base.py create mode 100644 kuryr_kubernetes/watchers/pod.py diff --git a/kuryr_kubernetes/server.py b/kuryr_kubernetes/server.py index 954fe849b..6015534b0 100644 --- a/kuryr_kubernetes/server.py +++ b/kuryr_kubernetes/server.py @@ -10,34 +10,51 @@ # License for the specific language governing permissions and limitations # under the License. +import asyncio import sys -import time from kuryr.lib._i18n import _LI from oslo_log import log as logging from oslo_service import service from kuryr_kubernetes import config +from kuryr_kubernetes.watchers import pod LOG = logging.getLogger(__name__) class KuryrK8sService(service.Service): + """Kuryr-Kubernetes base service. + + This class extends the oslo_service.service.Service class to provide an + asynchronous event loop. It assumes that all the elements of the + `_watchers` list has a method called `watch` (normally, implemented by the + class `kuryr_kubernetes.watchers.base.AbstractBaseWatcher`). + + The event loop is the default used by asyncio (asyncio.SelectorEventLoop) + """ def __init__(self): super(KuryrK8sService, self).__init__() + self._event_loop = asyncio.new_event_loop() + self._watchers = [ + pod.PodWatcher + ] def start(self): - # TODO(devvesa): Remove this line as soon as it does anything - LOG.info(_LI("I am doing nothing")) + LOG.info(_LI("Service '%(class_name)s' started"), + {'class_name': self.__class__.__name__}) + + for watcher in self._watchers: + instance = watcher(self._event_loop) + self._event_loop.create_task(instance.watch()) try: - while(True): - time.sleep(5) - # TODO(devvesa): Remove this line as soon as does anything - LOG.info(_LI("Keep doing nothing")) - finally: + self._event_loop.run_forever() + self._event_loop.close() + except Exception: sys.exit(1) + sys.exit(0) def wait(self): """Waits for K8sController to complete.""" diff --git a/kuryr_kubernetes/utils.py b/kuryr_kubernetes/utils.py new file mode 100644 index 000000000..461043b09 --- /dev/null +++ b/kuryr_kubernetes/utils.py @@ -0,0 +1,22 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_serialization import jsonutils + + +def utf8_json_decoder(byte_data): + """Deserializes the bytes into UTF-8 encoded JSON. + + :param byte_data: The bytes to be converted into the UTF-8 encoded JSON. + :returns: The UTF-8 encoded JSON represented by Python dictionary format. + """ + return jsonutils.loads(byte_data.decode('utf8')) diff --git a/kuryr_kubernetes/watchers/__init__.py b/kuryr_kubernetes/watchers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kuryr_kubernetes/watchers/base.py b/kuryr_kubernetes/watchers/base.py new file mode 100644 index 000000000..032be1b8f --- /dev/null +++ b/kuryr_kubernetes/watchers/base.py @@ -0,0 +1,123 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import requests + +from kuryr.lib._i18n import _LE +from oslo_log import log as logging + +from kuryr_kubernetes.aio import headers as aio_headers +from kuryr_kubernetes.aio import methods as aio_methods +from kuryr_kubernetes import config +from kuryr_kubernetes import utils + +LOG = logging.getLogger(__name__) + +ADDED_EVENT = 'ADDED' +DELETED_EVENT = 'DELETED' +MODIFIED_EVENT = 'MODIFIED' + + +class AbstractBaseWatcher(object): + """Base abstract watcher. + + This class implements the default interface for the KuryrK8sService task + scheduler, which is the `watch` (no parameters) interface. + + It also define a serie of abstract methods that actual watchers have to + implement in order to deal directly with events without worrying about + connection and serialization details. + + These methods are: + + * get_endpoint(self): return a resource URL exposed on Kubernetes API + as a string (such as "/api/v1/pods") + * on_add/on_modify/on_delete: actions to do according to each event + type. + + These methods have to follow the async/await python3.5 syntax + """ + + def __init__(self, event_loop): + self._event_loop = event_loop + + @abc.abstractmethod + def get_api_endpoint(self): + pass + + @property + def api_endpoint(self): + k8s_root = config.CONF.kubernetes.api_root + return k8s_root + self.get_api_endpoint() + "?watch=true" + + async def _on_event(self, event): # flake8: noqa + + event_type = event['type'] + if event_type == ADDED_EVENT: + await self.on_add(event) + elif event_type == DELETED_EVENT: + await self.on_delete(event) + elif event_type == MODIFIED_EVENT: + await self.on_modify(event) + else: + LOG.warning(_LW("Unhandled event type '%(event_type)s'"), + {'event_type': event}) + + @abc.abstractmethod + async def on_add(self, event): + pass + + @abc.abstractmethod + async def on_modify(self, event): + pass + + @abc.abstractmethod + async def on_delete(self, event): + pass + + async def watch(self): + """Watches the endpoint and calls the callback with its response. + + This is an endless task that keeps the event loop running forever + """ + response = await self._get_chunked_response() + while True: + content = await response.read_line() + LOG.debug('Received new event from %(watcher)s:\n\n\t' + '%(event)s.\n\n', + {'watcher': self.__class__.__name__, + 'event': str(content)}) + await self._on_event(content) + + async def _get_chunked_response(self): + """Get the response from Kubernetes API.""" + response = await aio_methods.get( + endpoint=self.api_endpoint, + loop=self._event_loop, + decoder=utils.utf8_json_decoder) + + status, reason, hdrs = await response.read_headers() + if status != requests.codes.ok: # Function returns 200 + LOG.error(_LE('GET request to endpoint %(ep)s failed with ' + 'status %(status)s and reason %(reason)s'), + {'ep': endpoint, 'status': status, 'reason': reason}) + raise requests.exceptions.HTTPError('{}: {}. Endpoint {}'.format( + status, reason, endpoint)) + if hdrs.get(aio_headers.TRANSFER_ENCODING) != 'chunked': + LOG.error(_LE('watcher GET request to endpoint %(ep)s is not ' + 'chunked. headers: %(hdrs)s'), + {'ep': endpoint, 'hdrs': hdrs}) + raise IOError(_('Can only watch endpoints that returned chunked ' + 'encoded transfers')) + + return response diff --git a/kuryr_kubernetes/watchers/pod.py b/kuryr_kubernetes/watchers/pod.py new file mode 100644 index 000000000..03670cb41 --- /dev/null +++ b/kuryr_kubernetes/watchers/pod.py @@ -0,0 +1,38 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kuryr.lib._i18n import _LI +from oslo_log import log as logging + +from kuryr_kubernetes.watchers import base + +LOG = logging.getLogger(__name__) + + +class PodWatcher(base.AbstractBaseWatcher): + + ENDPOINT = "/api/v1/pods" + + def __init__(self, event_loop): + super().__init__(event_loop) + + def get_api_endpoint(self): + return self.ENDPOINT + + async def on_add(self, event): # flake8: noqa + LOG.info(_LI('Received an ADDED event on a Pod')) + + async def on_modify(self, event): + LOG.info(_LI('Received a MODIFIED event on a Pod')) + + async def on_delete(self, event): + LOG.info(_LI('Received a DELETED event on a Pod'))