kuryr-kubernetes/kuryr_kubernetes/handlers/asynchronous.py

112 lines
4.6 KiB
Python

# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from six.moves import queue as six_queue
import time
from oslo_log import log as logging
from kuryr_kubernetes.handlers import base
LOG = logging.getLogger(__name__)
DEFAULT_QUEUE_DEPTH = 100
DEFAULT_GRACE_PERIOD = 5
STALE_PERIOD = 0.5
class Async(base.EventHandler):
"""Handles events asynchronously.
`Async` can be used to decorate another `handler` to be run asynchronously
using the specified `thread_group`. `Async` distinguishes *related* and
*unrelated* events (based on the result of `group_by`(`event`) function)
and handles *unrelated* events concurrently while *related* events are
handled serially and in the same order they arrived to `Async`.
"""
def __init__(self, handler, thread_group, group_by,
queue_depth=DEFAULT_QUEUE_DEPTH,
grace_period=DEFAULT_GRACE_PERIOD):
self._handler = handler
self._thread_group = thread_group
self._group_by = group_by
self._queue_depth = queue_depth
self._grace_period = grace_period
self._queues = {}
def __call__(self, event):
group = self._group_by(event)
try:
queue = self._queues[group]
except KeyError:
queue = six_queue.Queue(self._queue_depth)
self._queues[group] = queue
thread = self._thread_group.add_thread(self._run, group, queue)
thread.link(self._done, group)
queue.put(event)
def _run(self, group, queue):
LOG.debug("Asynchronous handler started processing %s", group)
for _ in itertools.count():
# NOTE(ivc): this is a mock-friendly replacement for 'while True'
# to allow more controlled environment for unit-tests (e.g. to
# avoid tests getting stuck in infinite loops)
try:
event = queue.get(timeout=self._grace_period)
except six_queue.Empty:
break
# FIXME(ivc): temporary workaround to skip stale events
# If K8s updates resource while the handler is processing it,
# when the handler finishes its work it can fail to update an
# annotation due to the 'resourceVersion' conflict. K8sClient
# was updated to allow *new* annotations to be set ignoring
# 'resourceVersion', but it leads to another problem as the
# Handler will receive old events (i.e. before annotation is set)
# and will start processing the event 'from scratch'.
# It has negative effect on handlers' performance (VIFHandler
# creates ports only to later delete them and LBaaS handler also
# produces some excess requests to Neutron, although with lesser
# impact).
# Possible solutions (can be combined):
# - use K8s ThirdPartyResources to store data/annotations instead
# of native K8s resources (assuming Kuryr-K8s will own those
# resources and no one else would update them)
# - use the resulting 'resourceVersion' received from K8sClient's
# 'annotate' to provide feedback to Async to skip all events
# until that version
# - stick to the 'get-or-create' behaviour in handlers and
# also introduce cache for long operations
time.sleep(STALE_PERIOD)
while not queue.empty():
event = queue.get()
if queue.empty():
time.sleep(STALE_PERIOD)
self._handler(event)
def _done(self, thread, group):
LOG.debug("Asynchronous handler stopped processing %s", group)
queue = self._queues.pop(group)
if not queue.empty():
LOG.critical("Asynchronous handler terminated abnormally; "
"%(count)s events dropped for %(group)s",
{'count': queue.qsize(), 'group': group})
if not self._queues:
LOG.debug("Asynchronous handler is idle")