From 8c1224e513f16d4b2c59b98995670e4823edb35f Mon Sep 17 00:00:00 2001 From: jiaopengju Date: Mon, 20 Nov 2017 17:20:26 +0800 Subject: [PATCH] Make time trigger scheduling strategy configurable Currently, karbor use the multi node scheduling strategy instead of the original single node scheduling strategy. It's better to keep the two scheduling strategies in code and make scheduling strategy configurable, so that users can choose which one to use. Change-Id: I13693da334041b58745040a1cbda1007e3cc2853 Implements: blueprint make-time-trigger-scheduling-strategy-configurable --- karbor/common/opts.py | 2 +- .../engine/triggers/__init__.py | 8 +- .../engine/triggers/timetrigger/__init__.py | 41 ++ .../triggers/timetrigger/time_trigger.py | 414 +++++++----------- .../timetrigger/time_trigger_multi_node.py | 253 +++++++++++ .../engine/triggers/timetrigger/utils.py | 123 ++++++ .../triggers/timetrigger/test_time_trigger.py | 158 +++---- .../test_time_trigger_multi_node.py | 281 ++++++++++++ 8 files changed, 928 insertions(+), 352 deletions(-) create mode 100644 karbor/services/operationengine/engine/triggers/timetrigger/time_trigger_multi_node.py create mode 100644 karbor/services/operationengine/engine/triggers/timetrigger/utils.py create mode 100644 karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger_multi_node.py diff --git a/karbor/common/opts.py b/karbor/common/opts.py index 94f7dc70..3a2e10a1 100644 --- a/karbor/common/opts.py +++ b/karbor/common/opts.py @@ -22,7 +22,7 @@ import karbor.exception import karbor.service import karbor.services.operationengine.engine.executors.green_thread_executor as green_thread_executor # noqa import karbor.services.operationengine.engine.executors.thread_pool_executor as thread_pool_executor # noqa -import karbor.services.operationengine.engine.triggers.timetrigger.time_trigger as time_trigger # noqa +import karbor.services.operationengine.engine.triggers.timetrigger as time_trigger # noqa import karbor.services.operationengine.karbor_client import karbor.services.operationengine.manager import karbor.services.operationengine.operations.base as base diff --git a/karbor/services/operationengine/engine/triggers/__init__.py b/karbor/services/operationengine/engine/triggers/__init__.py index 07329e8d..a2db08c8 100644 --- a/karbor/services/operationengine/engine/triggers/__init__.py +++ b/karbor/services/operationengine/engine/triggers/__init__.py @@ -22,6 +22,7 @@ class BaseTrigger(object): """Trigger base class that all Triggers should inherit from""" TRIGGER_TYPE = "" + IS_ENABLED = True def __init__(self, trigger_id, trigger_property, executor): super(BaseTrigger, self).__init__() @@ -68,4 +69,9 @@ class TriggerHandler(loadables.BaseLoader): def all_triggers(): """Get all trigger classes.""" - return TriggerHandler().get_all_classes() + all_classes = TriggerHandler().get_all_classes() + for trigger_class in all_classes[:]: + if trigger_class.TRIGGER_TYPE == 'time' and ( + not trigger_class.IS_ENABLED): + all_classes.remove(trigger_class) + return all_classes diff --git a/karbor/services/operationengine/engine/triggers/timetrigger/__init__.py b/karbor/services/operationengine/engine/triggers/timetrigger/__init__.py index e69de29b..2ef3d810 100644 --- a/karbor/services/operationengine/engine/triggers/timetrigger/__init__.py +++ b/karbor/services/operationengine/engine/triggers/timetrigger/__init__.py @@ -0,0 +1,41 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + +time_trigger_opts = [ + cfg.IntOpt('min_interval', + default=60 * 60, + help='The minimum interval of two adjacent time points. ' + 'min_interval >= (max_window_time * 2)'), + + cfg.IntOpt('min_window_time', + default=900, + help='The minimum window time'), + + cfg.IntOpt('max_window_time', + default=1800, + help='The maximum window time'), + + cfg.IntOpt('trigger_poll_interval', + default=15, + help='Interval, in seconds, in which Karbor will poll for ' + 'trigger events'), + + cfg.StrOpt('scheduling_strategy', + default='multi_node', + help='Time trigger scheduling strategy ' + ) +] + +CONF = cfg.CONF +CONF.register_opts(time_trigger_opts) diff --git a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py index b1fabc9d..e9e96c2a 100644 --- a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py +++ b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py @@ -12,48 +12,84 @@ from datetime import datetime from datetime import timedelta +import eventlet +import functools + from oslo_config import cfg from oslo_log import log as logging -from oslo_service import loopingcall from oslo_utils import timeutils -import six -from stevedore import driver as import_driver -from karbor import context as karbor_context -from karbor import db from karbor import exception from karbor.i18n import _ from karbor.services.operationengine.engine import triggers - -time_trigger_opts = [ - cfg.IntOpt('min_interval', - default=60 * 60, - help='The minimum interval of two adjacent time points. ' - 'min_interval >= (max_window_time * 2)'), - - cfg.IntOpt('min_window_time', - default=900, - help='The minimum window time'), - - cfg.IntOpt('max_window_time', - default=1800, - help='The maximum window time'), - - cfg.IntOpt('trigger_poll_interval', - default=15, - help='Interval, in seconds, in which Karbor will poll for ' - 'trigger events'), -] +from karbor.services.operationengine.engine.triggers.timetrigger import utils CONF = cfg.CONF -CONF.register_opts(time_trigger_opts) LOG = logging.getLogger(__name__) +class TriggerOperationGreenThread(object): + def __init__(self, first_run_time, function): + super(TriggerOperationGreenThread, self).__init__() + self._is_sleeping = True + self._pre_run_time = None + self._running = False + self._thread = None + + self._function = function + + self._start(first_run_time) + + def kill(self): + self._running = False + if self._is_sleeping: + self._thread.kill() + + @property + def running(self): + return self._running + + @property + def pre_run_time(self): + return self._pre_run_time + + def _start(self, first_run_time): + self._running = True + + now = datetime.utcnow() + initial_delay = 0 if first_run_time <= now else ( + int(timeutils.delta_seconds(now, first_run_time))) + + self._thread = eventlet.spawn_after( + initial_delay, self._run, first_run_time) + self._thread.link(self._on_done) + + def _on_done(self, gt, *args, **kwargs): + self._is_sleeping = True + self._pre_run_time = None + self._running = False + self._thread = None + + def _run(self, expect_run_time): + while self._running: + self._is_sleeping = False + self._pre_run_time = expect_run_time + + expect_run_time = self._function(expect_run_time) + if expect_run_time is None or not self._running: + break + + self._is_sleeping = True + + now = datetime.utcnow() + idle_time = 0 if expect_run_time <= now else int( + timeutils.delta_seconds(now, expect_run_time)) + eventlet.sleep(idle_time) + + class TimeTrigger(triggers.BaseTrigger): TRIGGER_TYPE = "time" - _loopingcall = None - _triggers = {} + IS_ENABLED = (CONF.scheduling_strategy == 'default') def __init__(self, trigger_id, trigger_property, executor): super(TimeTrigger, self).__init__( @@ -62,136 +98,30 @@ class TimeTrigger(triggers.BaseTrigger): self._trigger_property = self.check_trigger_definition( trigger_property) - timer = self._get_timer(self._trigger_property) - first_run_time = self._compute_next_run_time( - datetime.utcnow(), self._trigger_property['end_time'], timer) - LOG.debug("first_run_time: %s", first_run_time) - - self._trigger_execution_new(self._id, first_run_time) - - if not self.__class__._loopingcall: - self.__class__._loopingcall = loopingcall.FixedIntervalLoopingCall( - self._loop) - self.__class__._loopingcall.start( - interval=CONF.trigger_poll_interval, - stop_on_exception=False, - ) - - self._register() - - def _register(self): - self.__class__._triggers[self._id] = self - - def _unregister(self): - del self.__class__._triggers[self._id] - - @classmethod - def _loop(cls): - while True: - now = datetime.utcnow() - exec_to_handle = cls._trigger_execution_get_next() - if not exec_to_handle: - LOG.debug("No next trigger executions") - break - - trigger_id = exec_to_handle.trigger_id - execution_time = exec_to_handle.execution_time - trigger = cls._triggers.get(trigger_id) - if not trigger: - LOG.warning("Unable to find trigger %s", trigger_id) - res = cls._trigger_execution_delete( - execution_id=exec_to_handle.id) - continue - - if now < execution_time: - LOG.debug("Time trigger not yet due") - break - - trigger_property = trigger._trigger_property - timer = cls._get_timer(trigger_property) - window = trigger_property.get("window") - end_time_to_run = execution_time + timedelta( - seconds=window) - - if now > end_time_to_run: - LOG.debug("Time trigger (%s) out of window",) - execute = False - else: - LOG.debug("Time trigger (%s) is due", trigger_id) - execute = True - - next_exec_time = cls._compute_next_run_time( - now, - trigger_property['end_time'], - timer, - ) - res = False - if not next_exec_time: - LOG.debug("No more planned executions for trigger (%s)", - trigger_id) - res = cls._trigger_execution_delete( - execution_id=exec_to_handle.id) - else: - LOG.debug("Rescheduling (%s) from %s to %s", - trigger_id, - execution_time, - next_exec_time) - res = cls._trigger_execution_update( - exec_to_handle.id, - execution_time, - next_exec_time, - ) - - if not res: - LOG.info("Trigger probably handled by another node") - continue - - if execute: - cls._trigger_operations(trigger_id, execution_time, window) - - @classmethod - def _trigger_execution_new(cls, trigger_id, time): - # Find the first time. - # We don't known when using this trigger first time. - ctxt = karbor_context.get_admin_context() - try: - db.trigger_execution_create(ctxt, trigger_id, time) - return True - except Exception: - return False - - @classmethod - def _trigger_execution_update(cls, id, current_time, next_time): - ctxt = karbor_context.get_admin_context() - return db.trigger_execution_update(ctxt, id, current_time, next_time) - - @classmethod - def _trigger_execution_delete(cls, execution_id=None, trigger_id=None): - if execution_id is None and trigger_id is None: - raise exception.InvalidParameterValue('supply at least one id') - - ctxt = karbor_context.get_admin_context() - num_deleted = db.trigger_execution_delete(ctxt, execution_id, - trigger_id) - return num_deleted > 0 - - @classmethod - def _trigger_execution_get_next(cls): - ctxt = karbor_context.get_admin_context() - return db.trigger_execution_get_next(ctxt) + self._greenthread = None def shutdown(self): - self._unregister() + self._kill_greenthread() def register_operation(self, operation_id, **kwargs): if operation_id in self._operation_ids: msg = (_("The operation_id(%s) is exist") % operation_id) raise exception.ScheduledOperationExist(msg) + if self._greenthread and not self._greenthread.running: + raise exception.TriggerIsInvalid(trigger_id=self._id) + self._operation_ids.add(operation_id) + if self._greenthread is None: + self._start_greenthread() def unregister_operation(self, operation_id, **kwargs): - self._operation_ids.discard(operation_id) + if operation_id not in self._operation_ids: + return + + self._operation_ids.remove(operation_id) + if 0 == len(self._operation_ids): + self._kill_greenthread() def update_trigger_property(self, trigger_property): valid_trigger_property = self.check_trigger_definition( @@ -202,36 +132,82 @@ class TimeTrigger(triggers.BaseTrigger): timer = self._get_timer(valid_trigger_property) first_run_time = self._compute_next_run_time( - datetime.utcnow(), valid_trigger_property['end_time'], timer) - + datetime.utcnow(), trigger_property['end_time'], timer) if not first_run_time: msg = (_("The new trigger property is invalid, " "Can not find the first run time")) raise exception.InvalidInput(msg) + if self._greenthread is not None: + pre_run_time = self._greenthread.pre_run_time + if pre_run_time: + end_time = pre_run_time + timedelta( + seconds=self._trigger_property['window']) + if first_run_time <= end_time: + msg = (_("The new trigger property is invalid, " + "First run time%(t1)s must be after %(t2)s") % + {'t1': first_run_time, 't2': end_time}) + raise exception.InvalidInput(msg) + self._trigger_property = valid_trigger_property - self._trigger_execution_delete(trigger_id=self._id) - self._trigger_execution_new(self._id, first_run_time) - @classmethod - def _trigger_operations(cls, trigger_id, expect_run_time, window): - """Trigger operations once""" + if len(self._operation_ids) > 0: + # Restart greenthread to take the change of trigger property + # effect immediately + self._kill_greenthread() + self._create_green_thread(first_run_time, timer) - # The executor execute_operation may have I/O operation. + def _kill_greenthread(self): + if self._greenthread: + self._greenthread.kill() + self._greenthread = None + + def _start_greenthread(self): + # Find the first time. + # We don't known when using this trigger first time. + timer = self._get_timer(self._trigger_property) + first_run_time = self._compute_next_run_time( + datetime.utcnow(), self._trigger_property['end_time'], timer) + if not first_run_time: + raise exception.TriggerIsInvalid(trigger_id=self._id) + + self._create_green_thread(first_run_time, timer) + + def _create_green_thread(self, first_run_time, timer): + func = functools.partial( + self._trigger_operations, + trigger_property=self._trigger_property.copy(), + timer=timer) + + self._greenthread = TriggerOperationGreenThread( + first_run_time, func) + + def _trigger_operations(self, expect_run_time, trigger_property, timer): + """Trigger operations once + + returns: wait time for next run + """ + + # Just for robustness, actually expect_run_time always <= now + # but, if the scheduling of eventlet is not accurate, then we + # can do some adjustments. + entry_time = datetime.utcnow() + if entry_time < expect_run_time and ( + int(timeutils.delta_seconds(entry_time, expect_run_time)) > 0): + return expect_run_time + + # The self._executor.execute_operation may have I/O operation. # If it is, this green thread will be switched out during looping # operation_ids. In order to avoid changing self._operation_ids # during the green thread is switched out, copy self._operation_ids # as the iterative object. - trigger = cls._triggers.get(trigger_id) - if not trigger: - LOG.warning("Can't find trigger: %s" % trigger_id) - return - operations_ids = trigger._operation_ids.copy() + operation_ids = self._operation_ids.copy() sent_ops = set() + window = trigger_property.get("window") end_time = expect_run_time + timedelta(seconds=window) - for operation_id in operations_ids: - if operation_id not in trigger._operation_ids: + for operation_id in operation_ids: + if operation_id not in self._operation_ids: # Maybe, when traversing this operation_id, it has been # removed by self.unregister_operation LOG.warning("Execute operation %s which is not exist, " @@ -242,13 +218,15 @@ class TimeTrigger(triggers.BaseTrigger): if now >= end_time: LOG.error("Can not trigger operations to run. Because it is " "out of window time. now=%(now)s, " - "end time=%(end_time)s, waiting operations=%(ops)s", + "end time=%(end_time)s, expect run time=%(expect)s," + " wating operations=%(ops)s", {'now': now, 'end_time': end_time, - 'ops': operations_ids - sent_ops}) + 'expect': expect_run_time, + 'ops': operation_ids - sent_ops}) break try: - trigger._executor.execute_operation( + self._executor.execute_operation( operation_id, now, expect_run_time, window) except Exception: LOG.exception("Submit operation to executor failed, operation" @@ -256,104 +234,30 @@ class TimeTrigger(triggers.BaseTrigger): sent_ops.add(operation_id) + next_time = self._compute_next_run_time( + expect_run_time, trigger_property['end_time'], timer) + now = datetime.utcnow() + if next_time and next_time <= now: + LOG.error("Next run time:%(next_time)s <= now:%(now)s. Maybe the " + "entry time=%(entry)s is too late, even exceeds the end" + " time of window=%(end)s, or it was blocked where " + "sending the operation to executor.", + {'next_time': next_time, 'now': now, + 'entry': entry_time, 'end': end_time}) + return next_time + @classmethod def check_trigger_definition(cls, trigger_definition): - """Check trigger definition - - All the time instances of trigger_definition are in UTC, - including start_time, end_time - """ - tf_cls = cls._get_time_format_class() - - pattern = trigger_definition.get("pattern", None) - tf_cls.check_time_format(pattern) - - start_time = trigger_definition.get("start_time", None) - if not start_time: - msg = _("The trigger\'s start time is unknown") - raise exception.InvalidInput(msg) - start_time = cls._check_and_get_datetime(start_time, "start_time") - - interval = tf_cls(start_time, pattern).get_min_interval() - if interval is not None and interval < CONF.min_interval: - msg = (_("The interval of two adjacent time points " - "is less than %d") % CONF.min_interval) - raise exception.InvalidInput(msg) - - window = trigger_definition.get("window", CONF.min_window_time) - if not isinstance(window, int): - try: - window = int(window) - except Exception: - msg = (_("The trigger windows(%s) is not integer") % window) - raise exception.InvalidInput(msg) - - if window < CONF.min_window_time or window > CONF.max_window_time: - msg = (_("The trigger windows %(window)d must be between " - "%(min_window)d and %(max_window)d") % - {"window": window, - "min_window": CONF.min_window_time, - "max_window": CONF.max_window_time}) - raise exception.InvalidInput(msg) - - end_time = trigger_definition.get("end_time", None) - end_time = cls._check_and_get_datetime(end_time, "end_time") - - valid_trigger_property = trigger_definition.copy() - valid_trigger_property['window'] = window - valid_trigger_property['start_time'] = start_time - valid_trigger_property['end_time'] = end_time - return valid_trigger_property - - @classmethod - def _check_and_get_datetime(cls, time, time_name): - if not time: - return None - - if isinstance(time, datetime): - return time - - if not isinstance(time, six.string_types): - msg = (_("The trigger %(name)s(type = %(vtype)s) is not an " - "instance of string") % - {"name": time_name, "vtype": type(time)}) - raise exception.InvalidInput(msg) - - try: - time = timeutils.parse_strtime(time, fmt='%Y-%m-%d %H:%M:%S') - except Exception: - msg = (_("The format of trigger %s is not correct") % time_name) - raise exception.InvalidInput(msg) - - return time + return utils.check_trigger_definition(trigger_definition) @classmethod def _compute_next_run_time(cls, start_time, end_time, timer): - next_time = timer.compute_next_time(start_time) - - if next_time and (not end_time or next_time <= end_time): - return next_time - return None - - @classmethod - def _get_time_format_class(cls): - return import_driver.DriverManager( - 'karbor.operationengine.engine.timetrigger.time_format', - CONF.time_format).driver + return utils.compute_next_run_time(start_time, end_time, timer) @classmethod def _get_timer(cls, trigger_property): - tf_cls = cls._get_time_format_class() - timer = tf_cls(trigger_property['start_time'], - trigger_property['pattern']) - return timer + return utils.get_timer(trigger_property) @classmethod def check_configuration(cls): - min_window = CONF.min_window_time - max_window = CONF.max_window_time - min_interval = CONF.min_interval - - if not (min_window < max_window and (max_window * 2 <= min_interval)): - msg = (_('Configurations of time trigger are invalid')) - raise exception.InvalidInput(msg) + utils.check_configuration() diff --git a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger_multi_node.py b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger_multi_node.py new file mode 100644 index 00000000..5ebf54e6 --- /dev/null +++ b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger_multi_node.py @@ -0,0 +1,253 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from datetime import datetime +from datetime import timedelta + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_service import loopingcall + +from karbor import context as karbor_context +from karbor import db +from karbor import exception +from karbor.i18n import _ +from karbor.services.operationengine.engine import triggers +from karbor.services.operationengine.engine.triggers.timetrigger import utils + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class TimeTrigger(triggers.BaseTrigger): + + TRIGGER_TYPE = "time" + IS_ENABLED = (CONF.scheduling_strategy == 'multi_node') + + _loopingcall = None + _triggers = {} + + def __init__(self, trigger_id, trigger_property, executor): + super(TimeTrigger, self).__init__( + trigger_id, trigger_property, executor) + + self._trigger_property = self.check_trigger_definition( + trigger_property) + + timer = self._get_timer(self._trigger_property) + first_run_time = self._compute_next_run_time( + datetime.utcnow(), self._trigger_property['end_time'], timer) + LOG.debug("first_run_time: %s", first_run_time) + + self._trigger_execution_new(self._id, first_run_time) + + if not self.__class__._loopingcall: + self.__class__._loopingcall = loopingcall.FixedIntervalLoopingCall( + self._loop) + self.__class__._loopingcall.start( + interval=CONF.trigger_poll_interval, + stop_on_exception=False, + ) + + self._register() + + def _register(self): + self.__class__._triggers[self._id] = self + + def _unregister(self): + del self.__class__._triggers[self._id] + + @classmethod + def _loop(cls): + while True: + now = datetime.utcnow() + exec_to_handle = cls._trigger_execution_get_next() + if not exec_to_handle: + LOG.debug("No next trigger executions") + break + + trigger_id = exec_to_handle.trigger_id + execution_time = exec_to_handle.execution_time + trigger = cls._triggers.get(trigger_id) + if not trigger: + LOG.warning("Unable to find trigger %s", trigger_id) + res = cls._trigger_execution_delete( + execution_id=exec_to_handle.id) + continue + + if now < execution_time: + LOG.debug("Time trigger not yet due") + break + + trigger_property = trigger._trigger_property + timer = cls._get_timer(trigger_property) + window = trigger_property.get("window") + end_time_to_run = execution_time + timedelta( + seconds=window) + + if now > end_time_to_run: + LOG.debug("Time trigger (%s) out of window",) + execute = False + else: + LOG.debug("Time trigger (%s) is due", trigger_id) + execute = True + + next_exec_time = cls._compute_next_run_time( + now, + trigger_property['end_time'], + timer, + ) + if not next_exec_time: + LOG.debug("No more planned executions for trigger (%s)", + trigger_id) + res = cls._trigger_execution_delete( + execution_id=exec_to_handle.id) + else: + LOG.debug("Rescheduling (%s) from %s to %s", + trigger_id, + execution_time, + next_exec_time) + res = cls._trigger_execution_update( + exec_to_handle.id, + execution_time, + next_exec_time, + ) + + if not res: + LOG.info("Trigger probably handled by another node") + continue + + if execute: + cls._trigger_operations(trigger_id, execution_time, window) + + @classmethod + def _trigger_execution_new(cls, trigger_id, time): + # Find the first time. + # We don't known when using this trigger first time. + ctxt = karbor_context.get_admin_context() + try: + db.trigger_execution_create(ctxt, trigger_id, time) + return True + except Exception: + return False + + @classmethod + def _trigger_execution_update(cls, id, current_time, next_time): + ctxt = karbor_context.get_admin_context() + return db.trigger_execution_update(ctxt, id, current_time, next_time) + + @classmethod + def _trigger_execution_delete(cls, execution_id=None, trigger_id=None): + if execution_id is None and trigger_id is None: + raise exception.InvalidParameterValue('supply at least one id') + + ctxt = karbor_context.get_admin_context() + num_deleted = db.trigger_execution_delete(ctxt, execution_id, + trigger_id) + return num_deleted > 0 + + @classmethod + def _trigger_execution_get_next(cls): + ctxt = karbor_context.get_admin_context() + return db.trigger_execution_get_next(ctxt) + + def shutdown(self): + self._unregister() + + def register_operation(self, operation_id, **kwargs): + if operation_id in self._operation_ids: + msg = (_("The operation_id(%s) is exist") % operation_id) + raise exception.ScheduledOperationExist(msg) + + self._operation_ids.add(operation_id) + + def unregister_operation(self, operation_id, **kwargs): + self._operation_ids.discard(operation_id) + + def update_trigger_property(self, trigger_property): + valid_trigger_property = self.check_trigger_definition( + trigger_property) + + if valid_trigger_property == self._trigger_property: + return + + timer = self._get_timer(valid_trigger_property) + first_run_time = self._compute_next_run_time( + datetime.utcnow(), valid_trigger_property['end_time'], timer) + + if not first_run_time: + msg = (_("The new trigger property is invalid, " + "Can not find the first run time")) + raise exception.InvalidInput(msg) + + self._trigger_property = valid_trigger_property + self._trigger_execution_delete(trigger_id=self._id) + self._trigger_execution_new(self._id, first_run_time) + + @classmethod + def _trigger_operations(cls, trigger_id, expect_run_time, window): + """Trigger operations once""" + + # The executor execute_operation may have I/O operation. + # If it is, this green thread will be switched out during looping + # operation_ids. In order to avoid changing self._operation_ids + # during the green thread is switched out, copy self._operation_ids + # as the iterative object. + trigger = cls._triggers.get(trigger_id) + if not trigger: + LOG.warning("Can't find trigger: %s" % trigger_id) + return + operations_ids = trigger._operation_ids.copy() + sent_ops = set() + end_time = expect_run_time + timedelta(seconds=window) + + for operation_id in operations_ids: + if operation_id not in trigger._operation_ids: + # Maybe, when traversing this operation_id, it has been + # removed by self.unregister_operation + LOG.warning("Execute operation %s which is not exist, " + "ignore it", operation_id) + continue + + now = datetime.utcnow() + if now >= end_time: + LOG.error("Can not trigger operations to run. Because it is " + "out of window time. now=%(now)s, " + "end time=%(end_time)s, waiting operations=%(ops)s", + {'now': now, 'end_time': end_time, + 'ops': operations_ids - sent_ops}) + break + + try: + trigger._executor.execute_operation( + operation_id, now, expect_run_time, window) + except Exception: + LOG.exception("Submit operation to executor failed, operation" + " id=%s", operation_id) + + sent_ops.add(operation_id) + + @classmethod + def check_trigger_definition(cls, trigger_definition): + return utils.check_trigger_definition(trigger_definition) + + @classmethod + def _compute_next_run_time(cls, start_time, end_time, timer): + return utils.compute_next_run_time(start_time, end_time, timer) + + @classmethod + def _get_timer(cls, trigger_property): + return utils.get_timer(trigger_property) + + @classmethod + def check_configuration(cls): + utils.check_configuration() diff --git a/karbor/services/operationengine/engine/triggers/timetrigger/utils.py b/karbor/services/operationengine/engine/triggers/timetrigger/utils.py new file mode 100644 index 00000000..e2dcfec4 --- /dev/null +++ b/karbor/services/operationengine/engine/triggers/timetrigger/utils.py @@ -0,0 +1,123 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from datetime import datetime +from oslo_config import cfg +from oslo_utils import timeutils +import six +from stevedore import driver as import_driver + +from karbor import exception +from karbor.i18n import _ + +CONF = cfg.CONF + + +def get_time_format_class(): + return import_driver.DriverManager( + 'karbor.operationengine.engine.timetrigger.time_format', + CONF.time_format).driver + + +def compute_next_run_time(start_time, end_time, timer): + next_time = timer.compute_next_time(start_time) + if next_time and (not end_time or next_time <= end_time): + return next_time + return None + + +def check_and_get_datetime(time, time_name): + if not time: + return None + + if isinstance(time, datetime): + return time + + if not isinstance(time, six.string_types): + msg = (_("The trigger %(name)s(type = %(vtype)s) is " + "not an instance of string") % + {"name": time_name, "vtype": type(time)}) + raise exception.InvalidInput(msg) + + try: + time = timeutils.parse_strtime(time, fmt='%Y-%m-%d %H:%M:%S') + except Exception: + msg = (_("The format of trigger %s is not correct") % time_name) + raise exception.InvalidInput(msg) + + return time + + +def check_trigger_definition(trigger_definition): + """Check trigger definition + + All the time instances of trigger_definition are in UTC, + including start_time, end_time + """ + tf_cls = get_time_format_class() + + pattern = trigger_definition.get("pattern", None) + tf_cls.check_time_format(pattern) + + start_time = trigger_definition.get("start_time", None) + if not start_time: + msg = _("The trigger\'s start time is unknown") + raise exception.InvalidInput(msg) + start_time = check_and_get_datetime(start_time, "start_time") + + interval = tf_cls(start_time, pattern).get_min_interval() + if interval is not None and interval < CONF.min_interval: + msg = (_("The interval of two adjacent time points " + "is less than %d") % CONF.min_interval) + raise exception.InvalidInput(msg) + + window = trigger_definition.get("window", CONF.min_window_time) + if not isinstance(window, int): + try: + window = int(window) + except Exception: + msg = (_("The trigger windows(%s) is not integer") % window) + raise exception.InvalidInput(msg) + + if window < CONF.min_window_time or window > CONF.max_window_time: + msg = (_("The trigger windows %(window)d must be between " + "%(min_window)d and %(max_window)d") % + {"window": window, + "min_window": CONF.min_window_time, + "max_window": CONF.max_window_time}) + raise exception.InvalidInput(msg) + + end_time = trigger_definition.get("end_time", None) + end_time = check_and_get_datetime(end_time, "end_time") + + valid_trigger_property = trigger_definition.copy() + valid_trigger_property['window'] = window + valid_trigger_property['start_time'] = start_time + valid_trigger_property['end_time'] = end_time + return valid_trigger_property + + +def check_configuration(): + min_window = CONF.min_window_time + max_window = CONF.max_window_time + min_interval = CONF.min_interval + + if not (min_window < max_window and (max_window * 2 <= min_interval)): + msg = (_('Configurations of time trigger are invalid')) + raise exception.InvalidInput(msg) + + +def get_timer(trigger_property): + tf_cls = get_time_format_class() + timer = tf_cls(trigger_property['start_time'], + trigger_property['pattern']) + return timer diff --git a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py index 6ed63f1f..96fb1c08 100644 --- a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py +++ b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py @@ -9,28 +9,19 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -from collections import namedtuple from datetime import datetime from datetime import timedelta import eventlet -import functools -import heapq import mock from oslo_config import cfg -from oslo_utils import uuidutils -from karbor import context as karbor_context from karbor import exception -from karbor.services.operationengine.engine.triggers.timetrigger \ - import time_trigger as tt +from karbor.services.operationengine.engine.triggers.timetrigger.time_trigger \ + import TimeTrigger +from karbor.services.operationengine.engine.triggers.timetrigger import utils from karbor.tests import base -TriggerExecution = namedtuple('TriggerExecution', - ['execution_time', 'id', 'trigger_id']) - - class FakeTimeFormat(object): def __init__(self, start_time, pattern): super(FakeTimeFormat, self).__init__() @@ -58,78 +49,30 @@ class FakeExecutor(object): self._ops[operation_id] += 1 eventlet.sleep(0.5) - -class FakeTimeTrigger(object): - @classmethod - def get_time_format(cls, *args, **kwargs): - return FakeTimeFormat - - -class FakeDb(object): - def __init__(self): - self._db = [] - - def trigger_execution_get_next(self, context): - if len(self._db) == 0: - return None - return self._db[0] - - def trigger_execution_create(self, context, trigger_id, time): - element = TriggerExecution(time, uuidutils.generate_uuid(), trigger_id) - heapq.heappush(self._db, element) - - def trigger_execution_update(self, context, id, current_time, new_time): - for idx, element in enumerate(self._db): - if element.id == id: - if element.execution_time != current_time: - return False - self._db[idx] = TriggerExecution(new_time, element.id, - element.trigger_id) - break - heapq.heapify(self._db) - return True - - def trigger_execution_delete(self, context, id, trigger_id): - removed_ids = [] - for idx, element in enumerate(self._db): - if (id and element.id == id) or (trigger_id and - element.trigger_id == trigger_id): - removed_ids.append(idx) - - for idx in reversed(removed_ids): - self._db.pop(idx) - heapq.heapify(self._db) - return len(removed_ids) - - -def time_trigger_test(func): - @functools.wraps(func) - @mock.patch.object(tt, 'db', FakeDb()) - @mock.patch.object(karbor_context, 'get_admin_context', lambda: None) - @mock.patch.object(tt.TimeTrigger, '_get_time_format_class', - FakeTimeTrigger.get_time_format) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - - return wrapper + def clear(self): + self._ops.clear() class TimeTriggerTestCase(base.TestCase): - _tid = 0 - _default_executor = FakeExecutor() def setUp(self): super(TimeTriggerTestCase, self).setUp() + self._set_configuration() + mock_obj = mock.Mock() + mock_obj.return_value = FakeTimeFormat + utils.get_time_format_class = mock_obj + + self._default_executor = FakeExecutor() + def test_check_configuration(self): self._set_configuration(10, 20, 30) self.assertRaisesRegex(exception.InvalidInput, "Configurations of time trigger are invalid", - tt.TimeTrigger.check_configuration) + TimeTrigger.check_configuration) self._set_configuration() - @time_trigger_test def test_check_trigger_property_start_time(self): trigger_property = { "pattern": "", @@ -138,23 +81,22 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The trigger\'s start time is unknown", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) trigger_property['start_time'] = 'abc' self.assertRaisesRegex(exception.InvalidInput, "The format of trigger .* is not correct", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) trigger_property['start_time'] = 123 self.assertRaisesRegex(exception.InvalidInput, "The trigger .* is not an instance of string", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) @mock.patch.object(FakeTimeFormat, 'get_min_interval') - @time_trigger_test def test_check_trigger_property_interval(self, get_min_interval): get_min_interval.return_value = 0 @@ -164,10 +106,9 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The interval of two adjacent time points .*", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) - @time_trigger_test def test_check_trigger_property_window(self): trigger_property = { "window": "abc", @@ -176,16 +117,15 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The trigger window.* is not integer", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) trigger_property['window'] = 1000 self.assertRaisesRegex(exception.InvalidInput, "The trigger windows .* must be between .*", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) - @time_trigger_test def test_check_trigger_property_end_time(self): trigger_property = { "window": 15, @@ -195,24 +135,27 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The format of trigger .* is not correct", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) - @time_trigger_test def test_register_operation(self): trigger = self._generate_trigger() operation_id = "1" trigger.register_operation(operation_id) - eventlet.sleep(2) + eventlet.sleep(0.3) - self.assertGreaterEqual(self._default_executor._ops[operation_id], 1) + self.assertGreaterEqual(trigger._executor._ops[operation_id], 1) self.assertRaisesRegex(exception.ScheduledOperationExist, "The operation_id.* is exist", trigger.register_operation, operation_id) - @time_trigger_test + eventlet.sleep(0.3) + self.assertRaises(exception.TriggerIsInvalid, + trigger.register_operation, + "2") + def test_unregister_operation(self): trigger = self._generate_trigger() operation_id = "2" @@ -221,9 +164,26 @@ class TimeTriggerTestCase(base.TestCase): self.assertIn(operation_id, trigger._operation_ids) trigger.unregister_operation(operation_id) - self.assertNotIn(trigger._id, trigger._operation_ids) + self.assertNotIn(operation_id, trigger._operation_ids) + + def test_unregister_operation_when_scheduling(self): + trigger = self._generate_trigger() + + for op_id in ['1', '2', '3']: + trigger.register_operation(op_id) + self.assertIn(op_id, trigger._operation_ids) + eventlet.sleep(0.5) + + for op_id in ['2', '3']: + trigger.unregister_operation(op_id) + self.assertNotIn(op_id, trigger._operation_ids) + eventlet.sleep(0.6) + + self.assertGreaterEqual(trigger._executor._ops['1'], 1) + + self.assertTrue(('2' not in trigger._executor._ops) or ( + '3' not in trigger._executor._ops)) - @time_trigger_test def test_update_trigger_property(self): trigger = self._generate_trigger() @@ -239,10 +199,18 @@ class TimeTriggerTestCase(base.TestCase): trigger.update_trigger_property, trigger_property) - @time_trigger_test + trigger.register_operation('1') + eventlet.sleep(0.2) + trigger_property['end_time'] = ( + datetime.utcnow() + timedelta(seconds=1)) + self.assertRaisesRegex(exception.InvalidInput, + ".*First run time.* must be after.*", + trigger.update_trigger_property, + trigger_property) + def test_update_trigger_property_success(self): trigger = self._generate_trigger() - trigger.register_operation('7') + trigger.register_operation('1') eventlet.sleep(0.2) trigger_property = { @@ -253,8 +221,12 @@ class TimeTriggerTestCase(base.TestCase): } with mock.patch.object(FakeTimeFormat, 'compute_next_time') as c: c.return_value = datetime.utcnow() + timedelta(seconds=20) + old_id = id(trigger._greenthread) + trigger.update_trigger_property(trigger_property) + self.assertNotEqual(old_id, id(trigger._greenthread)) + def _generate_trigger(self, end_time=None): if not end_time: end_time = datetime.utcnow() + timedelta(seconds=1) @@ -266,15 +238,11 @@ class TimeTriggerTestCase(base.TestCase): "end_time": end_time } - return tt.TimeTrigger( - uuidutils.generate_uuid(), - trigger_property, - self._default_executor, - ) + self._default_executor.clear() + return TimeTrigger("123", trigger_property, self._default_executor) def _set_configuration(self, min_window=15, - max_window=30, min_interval=60, poll_interval=1): + max_window=30, min_interval=60): self.override_config('min_interval', min_interval) self.override_config('min_window_time', min_window) self.override_config('max_window_time', max_window) - self.override_config('trigger_poll_interval', poll_interval) diff --git a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger_multi_node.py b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger_multi_node.py new file mode 100644 index 00000000..501ed36d --- /dev/null +++ b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger_multi_node.py @@ -0,0 +1,281 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import namedtuple +from datetime import datetime +from datetime import timedelta +import eventlet +import functools +import heapq +import mock +from oslo_config import cfg +from oslo_utils import uuidutils + +from karbor import context as karbor_context +from karbor import exception +from karbor.services.operationengine.engine.triggers.timetrigger import \ + time_trigger_multi_node as tt +from karbor.services.operationengine.engine.triggers.timetrigger import utils +from karbor.tests import base + + +TriggerExecution = namedtuple('TriggerExecution', + ['execution_time', 'id', 'trigger_id']) + + +class FakeTimeFormat(object): + def __init__(self, start_time, pattern): + super(FakeTimeFormat, self).__init__() + + @classmethod + def check_time_format(cls, pattern): + pass + + def compute_next_time(self, current_time): + return current_time + timedelta(seconds=0.5) + + def get_min_interval(self): + return cfg.CONF.min_interval + + +class FakeExecutor(object): + def __init__(self): + super(FakeExecutor, self).__init__() + self._ops = {} + + def execute_operation(self, operation_id, triggered_time, + expect_start_time, window): + if operation_id not in self._ops: + self._ops[operation_id] = 0 + self._ops[operation_id] += 1 + eventlet.sleep(0.5) + + +class FakeTimeTrigger(object): + @classmethod + def get_time_format(cls, *args, **kwargs): + return FakeTimeFormat + + +class FakeDb(object): + def __init__(self): + self._db = [] + + def trigger_execution_get_next(self, context): + if len(self._db) == 0: + return None + return self._db[0] + + def trigger_execution_create(self, context, trigger_id, time): + element = TriggerExecution(time, uuidutils.generate_uuid(), trigger_id) + heapq.heappush(self._db, element) + + def trigger_execution_update(self, context, id, current_time, new_time): + for idx, element in enumerate(self._db): + if element.id == id: + if element.execution_time != current_time: + return False + self._db[idx] = TriggerExecution(new_time, element.id, + element.trigger_id) + break + heapq.heapify(self._db) + return True + + def trigger_execution_delete(self, context, id, trigger_id): + removed_ids = [] + for idx, element in enumerate(self._db): + if (id and element.id == id) or (trigger_id and + element.trigger_id == trigger_id): + removed_ids.append(idx) + + for idx in reversed(removed_ids): + self._db.pop(idx) + heapq.heapify(self._db) + return len(removed_ids) + + +def time_trigger_test(func): + @functools.wraps(func) + @mock.patch.object(tt, 'db', FakeDb()) + @mock.patch.object(karbor_context, 'get_admin_context', lambda: None) + @mock.patch.object(utils, 'get_time_format_class', + FakeTimeTrigger.get_time_format) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + +class TimeTriggerTestCase(base.TestCase): + _tid = 0 + _default_executor = FakeExecutor() + + def setUp(self): + super(TimeTriggerTestCase, self).setUp() + self._set_configuration() + + def test_check_configuration(self): + self._set_configuration(10, 20, 30) + self.assertRaisesRegex(exception.InvalidInput, + "Configurations of time trigger are invalid", + tt.TimeTrigger.check_configuration) + self._set_configuration() + + @time_trigger_test + def test_check_trigger_property_start_time(self): + trigger_property = { + "pattern": "", + "start_time": "" + } + + self.assertRaisesRegex(exception.InvalidInput, + "The trigger\'s start time is unknown", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + trigger_property['start_time'] = 'abc' + self.assertRaisesRegex(exception.InvalidInput, + "The format of trigger .* is not correct", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + trigger_property['start_time'] = 123 + self.assertRaisesRegex(exception.InvalidInput, + "The trigger .* is not an instance of string", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + @mock.patch.object(FakeTimeFormat, 'get_min_interval') + @time_trigger_test + def test_check_trigger_property_interval(self, get_min_interval): + get_min_interval.return_value = 0 + + trigger_property = { + "start_time": '2016-8-18 01:03:04' + } + + self.assertRaisesRegex(exception.InvalidInput, + "The interval of two adjacent time points .*", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + @time_trigger_test + def test_check_trigger_property_window(self): + trigger_property = { + "window": "abc", + "start_time": '2016-8-18 01:03:04' + } + + self.assertRaisesRegex(exception.InvalidInput, + "The trigger window.* is not integer", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + trigger_property['window'] = 1000 + self.assertRaisesRegex(exception.InvalidInput, + "The trigger windows .* must be between .*", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + @time_trigger_test + def test_check_trigger_property_end_time(self): + trigger_property = { + "window": 15, + "start_time": '2016-8-18 01:03:04', + "end_time": "abc" + } + + self.assertRaisesRegex(exception.InvalidInput, + "The format of trigger .* is not correct", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + @time_trigger_test + def test_register_operation(self): + trigger = self._generate_trigger() + + operation_id = "1" + trigger.register_operation(operation_id) + eventlet.sleep(2) + + self.assertGreaterEqual(self._default_executor._ops[operation_id], 1) + self.assertRaisesRegex(exception.ScheduledOperationExist, + "The operation_id.* is exist", + trigger.register_operation, + operation_id) + + @time_trigger_test + def test_unregister_operation(self): + trigger = self._generate_trigger() + operation_id = "2" + + trigger.register_operation(operation_id) + self.assertIn(operation_id, trigger._operation_ids) + + trigger.unregister_operation(operation_id) + self.assertNotIn(trigger._id, trigger._operation_ids) + + @time_trigger_test + def test_update_trigger_property(self): + trigger = self._generate_trigger() + + trigger_property = { + "pattern": "", + "window": 15, + "start_time": datetime.utcnow(), + "end_time": datetime.utcnow() + } + + self.assertRaisesRegex(exception.InvalidInput, + ".*Can not find the first run tim", + trigger.update_trigger_property, + trigger_property) + + @time_trigger_test + def test_update_trigger_property_success(self): + trigger = self._generate_trigger() + trigger.register_operation('7') + eventlet.sleep(0.2) + + trigger_property = { + "pattern": "", + "window": 15, + "start_time": datetime.utcnow(), + "end_time": '' + } + with mock.patch.object(FakeTimeFormat, 'compute_next_time') as c: + c.return_value = datetime.utcnow() + timedelta(seconds=20) + trigger.update_trigger_property(trigger_property) + + def _generate_trigger(self, end_time=None): + if not end_time: + end_time = datetime.utcnow() + timedelta(seconds=1) + + trigger_property = { + "pattern": "", + "window": 15, + "start_time": datetime.utcnow(), + "end_time": end_time + } + + return tt.TimeTrigger( + uuidutils.generate_uuid(), + trigger_property, + self._default_executor, + ) + + def _set_configuration(self, min_window=15, + max_window=30, min_interval=60, poll_interval=1): + self.override_config('min_interval', min_interval) + self.override_config('min_window_time', min_window) + self.override_config('max_window_time', max_window) + self.override_config('trigger_poll_interval', poll_interval)