diff --git a/karbor/common/opts.py b/karbor/common/opts.py index 94f7dc70..3a2e10a1 100644 --- a/karbor/common/opts.py +++ b/karbor/common/opts.py @@ -22,7 +22,7 @@ import karbor.exception import karbor.service import karbor.services.operationengine.engine.executors.green_thread_executor as green_thread_executor # noqa import karbor.services.operationengine.engine.executors.thread_pool_executor as thread_pool_executor # noqa -import karbor.services.operationengine.engine.triggers.timetrigger.time_trigger as time_trigger # noqa +import karbor.services.operationengine.engine.triggers.timetrigger as time_trigger # noqa import karbor.services.operationengine.karbor_client import karbor.services.operationengine.manager import karbor.services.operationengine.operations.base as base diff --git a/karbor/services/operationengine/engine/triggers/__init__.py b/karbor/services/operationengine/engine/triggers/__init__.py index 07329e8d..a2db08c8 100644 --- a/karbor/services/operationengine/engine/triggers/__init__.py +++ b/karbor/services/operationengine/engine/triggers/__init__.py @@ -22,6 +22,7 @@ class BaseTrigger(object): """Trigger base class that all Triggers should inherit from""" TRIGGER_TYPE = "" + IS_ENABLED = True def __init__(self, trigger_id, trigger_property, executor): super(BaseTrigger, self).__init__() @@ -68,4 +69,9 @@ class TriggerHandler(loadables.BaseLoader): def all_triggers(): """Get all trigger classes.""" - return TriggerHandler().get_all_classes() + all_classes = TriggerHandler().get_all_classes() + for trigger_class in all_classes[:]: + if trigger_class.TRIGGER_TYPE == 'time' and ( + not trigger_class.IS_ENABLED): + all_classes.remove(trigger_class) + return all_classes diff --git a/karbor/services/operationengine/engine/triggers/timetrigger/__init__.py b/karbor/services/operationengine/engine/triggers/timetrigger/__init__.py index e69de29b..2ef3d810 100644 --- a/karbor/services/operationengine/engine/triggers/timetrigger/__init__.py +++ b/karbor/services/operationengine/engine/triggers/timetrigger/__init__.py @@ -0,0 +1,41 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + +time_trigger_opts = [ + cfg.IntOpt('min_interval', + default=60 * 60, + help='The minimum interval of two adjacent time points. ' + 'min_interval >= (max_window_time * 2)'), + + cfg.IntOpt('min_window_time', + default=900, + help='The minimum window time'), + + cfg.IntOpt('max_window_time', + default=1800, + help='The maximum window time'), + + cfg.IntOpt('trigger_poll_interval', + default=15, + help='Interval, in seconds, in which Karbor will poll for ' + 'trigger events'), + + cfg.StrOpt('scheduling_strategy', + default='multi_node', + help='Time trigger scheduling strategy ' + ) +] + +CONF = cfg.CONF +CONF.register_opts(time_trigger_opts) diff --git a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py index b1fabc9d..e9e96c2a 100644 --- a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py +++ b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py @@ -12,48 +12,84 @@ from datetime import datetime from datetime import timedelta +import eventlet +import functools + from oslo_config import cfg from oslo_log import log as logging -from oslo_service import loopingcall from oslo_utils import timeutils -import six -from stevedore import driver as import_driver -from karbor import context as karbor_context -from karbor import db from karbor import exception from karbor.i18n import _ from karbor.services.operationengine.engine import triggers - -time_trigger_opts = [ - cfg.IntOpt('min_interval', - default=60 * 60, - help='The minimum interval of two adjacent time points. ' - 'min_interval >= (max_window_time * 2)'), - - cfg.IntOpt('min_window_time', - default=900, - help='The minimum window time'), - - cfg.IntOpt('max_window_time', - default=1800, - help='The maximum window time'), - - cfg.IntOpt('trigger_poll_interval', - default=15, - help='Interval, in seconds, in which Karbor will poll for ' - 'trigger events'), -] +from karbor.services.operationengine.engine.triggers.timetrigger import utils CONF = cfg.CONF -CONF.register_opts(time_trigger_opts) LOG = logging.getLogger(__name__) +class TriggerOperationGreenThread(object): + def __init__(self, first_run_time, function): + super(TriggerOperationGreenThread, self).__init__() + self._is_sleeping = True + self._pre_run_time = None + self._running = False + self._thread = None + + self._function = function + + self._start(first_run_time) + + def kill(self): + self._running = False + if self._is_sleeping: + self._thread.kill() + + @property + def running(self): + return self._running + + @property + def pre_run_time(self): + return self._pre_run_time + + def _start(self, first_run_time): + self._running = True + + now = datetime.utcnow() + initial_delay = 0 if first_run_time <= now else ( + int(timeutils.delta_seconds(now, first_run_time))) + + self._thread = eventlet.spawn_after( + initial_delay, self._run, first_run_time) + self._thread.link(self._on_done) + + def _on_done(self, gt, *args, **kwargs): + self._is_sleeping = True + self._pre_run_time = None + self._running = False + self._thread = None + + def _run(self, expect_run_time): + while self._running: + self._is_sleeping = False + self._pre_run_time = expect_run_time + + expect_run_time = self._function(expect_run_time) + if expect_run_time is None or not self._running: + break + + self._is_sleeping = True + + now = datetime.utcnow() + idle_time = 0 if expect_run_time <= now else int( + timeutils.delta_seconds(now, expect_run_time)) + eventlet.sleep(idle_time) + + class TimeTrigger(triggers.BaseTrigger): TRIGGER_TYPE = "time" - _loopingcall = None - _triggers = {} + IS_ENABLED = (CONF.scheduling_strategy == 'default') def __init__(self, trigger_id, trigger_property, executor): super(TimeTrigger, self).__init__( @@ -62,136 +98,30 @@ class TimeTrigger(triggers.BaseTrigger): self._trigger_property = self.check_trigger_definition( trigger_property) - timer = self._get_timer(self._trigger_property) - first_run_time = self._compute_next_run_time( - datetime.utcnow(), self._trigger_property['end_time'], timer) - LOG.debug("first_run_time: %s", first_run_time) - - self._trigger_execution_new(self._id, first_run_time) - - if not self.__class__._loopingcall: - self.__class__._loopingcall = loopingcall.FixedIntervalLoopingCall( - self._loop) - self.__class__._loopingcall.start( - interval=CONF.trigger_poll_interval, - stop_on_exception=False, - ) - - self._register() - - def _register(self): - self.__class__._triggers[self._id] = self - - def _unregister(self): - del self.__class__._triggers[self._id] - - @classmethod - def _loop(cls): - while True: - now = datetime.utcnow() - exec_to_handle = cls._trigger_execution_get_next() - if not exec_to_handle: - LOG.debug("No next trigger executions") - break - - trigger_id = exec_to_handle.trigger_id - execution_time = exec_to_handle.execution_time - trigger = cls._triggers.get(trigger_id) - if not trigger: - LOG.warning("Unable to find trigger %s", trigger_id) - res = cls._trigger_execution_delete( - execution_id=exec_to_handle.id) - continue - - if now < execution_time: - LOG.debug("Time trigger not yet due") - break - - trigger_property = trigger._trigger_property - timer = cls._get_timer(trigger_property) - window = trigger_property.get("window") - end_time_to_run = execution_time + timedelta( - seconds=window) - - if now > end_time_to_run: - LOG.debug("Time trigger (%s) out of window",) - execute = False - else: - LOG.debug("Time trigger (%s) is due", trigger_id) - execute = True - - next_exec_time = cls._compute_next_run_time( - now, - trigger_property['end_time'], - timer, - ) - res = False - if not next_exec_time: - LOG.debug("No more planned executions for trigger (%s)", - trigger_id) - res = cls._trigger_execution_delete( - execution_id=exec_to_handle.id) - else: - LOG.debug("Rescheduling (%s) from %s to %s", - trigger_id, - execution_time, - next_exec_time) - res = cls._trigger_execution_update( - exec_to_handle.id, - execution_time, - next_exec_time, - ) - - if not res: - LOG.info("Trigger probably handled by another node") - continue - - if execute: - cls._trigger_operations(trigger_id, execution_time, window) - - @classmethod - def _trigger_execution_new(cls, trigger_id, time): - # Find the first time. - # We don't known when using this trigger first time. - ctxt = karbor_context.get_admin_context() - try: - db.trigger_execution_create(ctxt, trigger_id, time) - return True - except Exception: - return False - - @classmethod - def _trigger_execution_update(cls, id, current_time, next_time): - ctxt = karbor_context.get_admin_context() - return db.trigger_execution_update(ctxt, id, current_time, next_time) - - @classmethod - def _trigger_execution_delete(cls, execution_id=None, trigger_id=None): - if execution_id is None and trigger_id is None: - raise exception.InvalidParameterValue('supply at least one id') - - ctxt = karbor_context.get_admin_context() - num_deleted = db.trigger_execution_delete(ctxt, execution_id, - trigger_id) - return num_deleted > 0 - - @classmethod - def _trigger_execution_get_next(cls): - ctxt = karbor_context.get_admin_context() - return db.trigger_execution_get_next(ctxt) + self._greenthread = None def shutdown(self): - self._unregister() + self._kill_greenthread() def register_operation(self, operation_id, **kwargs): if operation_id in self._operation_ids: msg = (_("The operation_id(%s) is exist") % operation_id) raise exception.ScheduledOperationExist(msg) + if self._greenthread and not self._greenthread.running: + raise exception.TriggerIsInvalid(trigger_id=self._id) + self._operation_ids.add(operation_id) + if self._greenthread is None: + self._start_greenthread() def unregister_operation(self, operation_id, **kwargs): - self._operation_ids.discard(operation_id) + if operation_id not in self._operation_ids: + return + + self._operation_ids.remove(operation_id) + if 0 == len(self._operation_ids): + self._kill_greenthread() def update_trigger_property(self, trigger_property): valid_trigger_property = self.check_trigger_definition( @@ -202,36 +132,82 @@ class TimeTrigger(triggers.BaseTrigger): timer = self._get_timer(valid_trigger_property) first_run_time = self._compute_next_run_time( - datetime.utcnow(), valid_trigger_property['end_time'], timer) - + datetime.utcnow(), trigger_property['end_time'], timer) if not first_run_time: msg = (_("The new trigger property is invalid, " "Can not find the first run time")) raise exception.InvalidInput(msg) + if self._greenthread is not None: + pre_run_time = self._greenthread.pre_run_time + if pre_run_time: + end_time = pre_run_time + timedelta( + seconds=self._trigger_property['window']) + if first_run_time <= end_time: + msg = (_("The new trigger property is invalid, " + "First run time%(t1)s must be after %(t2)s") % + {'t1': first_run_time, 't2': end_time}) + raise exception.InvalidInput(msg) + self._trigger_property = valid_trigger_property - self._trigger_execution_delete(trigger_id=self._id) - self._trigger_execution_new(self._id, first_run_time) - @classmethod - def _trigger_operations(cls, trigger_id, expect_run_time, window): - """Trigger operations once""" + if len(self._operation_ids) > 0: + # Restart greenthread to take the change of trigger property + # effect immediately + self._kill_greenthread() + self._create_green_thread(first_run_time, timer) - # The executor execute_operation may have I/O operation. + def _kill_greenthread(self): + if self._greenthread: + self._greenthread.kill() + self._greenthread = None + + def _start_greenthread(self): + # Find the first time. + # We don't known when using this trigger first time. + timer = self._get_timer(self._trigger_property) + first_run_time = self._compute_next_run_time( + datetime.utcnow(), self._trigger_property['end_time'], timer) + if not first_run_time: + raise exception.TriggerIsInvalid(trigger_id=self._id) + + self._create_green_thread(first_run_time, timer) + + def _create_green_thread(self, first_run_time, timer): + func = functools.partial( + self._trigger_operations, + trigger_property=self._trigger_property.copy(), + timer=timer) + + self._greenthread = TriggerOperationGreenThread( + first_run_time, func) + + def _trigger_operations(self, expect_run_time, trigger_property, timer): + """Trigger operations once + + returns: wait time for next run + """ + + # Just for robustness, actually expect_run_time always <= now + # but, if the scheduling of eventlet is not accurate, then we + # can do some adjustments. + entry_time = datetime.utcnow() + if entry_time < expect_run_time and ( + int(timeutils.delta_seconds(entry_time, expect_run_time)) > 0): + return expect_run_time + + # The self._executor.execute_operation may have I/O operation. # If it is, this green thread will be switched out during looping # operation_ids. In order to avoid changing self._operation_ids # during the green thread is switched out, copy self._operation_ids # as the iterative object. - trigger = cls._triggers.get(trigger_id) - if not trigger: - LOG.warning("Can't find trigger: %s" % trigger_id) - return - operations_ids = trigger._operation_ids.copy() + operation_ids = self._operation_ids.copy() sent_ops = set() + window = trigger_property.get("window") end_time = expect_run_time + timedelta(seconds=window) - for operation_id in operations_ids: - if operation_id not in trigger._operation_ids: + for operation_id in operation_ids: + if operation_id not in self._operation_ids: # Maybe, when traversing this operation_id, it has been # removed by self.unregister_operation LOG.warning("Execute operation %s which is not exist, " @@ -242,13 +218,15 @@ class TimeTrigger(triggers.BaseTrigger): if now >= end_time: LOG.error("Can not trigger operations to run. Because it is " "out of window time. now=%(now)s, " - "end time=%(end_time)s, waiting operations=%(ops)s", + "end time=%(end_time)s, expect run time=%(expect)s," + " wating operations=%(ops)s", {'now': now, 'end_time': end_time, - 'ops': operations_ids - sent_ops}) + 'expect': expect_run_time, + 'ops': operation_ids - sent_ops}) break try: - trigger._executor.execute_operation( + self._executor.execute_operation( operation_id, now, expect_run_time, window) except Exception: LOG.exception("Submit operation to executor failed, operation" @@ -256,104 +234,30 @@ class TimeTrigger(triggers.BaseTrigger): sent_ops.add(operation_id) + next_time = self._compute_next_run_time( + expect_run_time, trigger_property['end_time'], timer) + now = datetime.utcnow() + if next_time and next_time <= now: + LOG.error("Next run time:%(next_time)s <= now:%(now)s. Maybe the " + "entry time=%(entry)s is too late, even exceeds the end" + " time of window=%(end)s, or it was blocked where " + "sending the operation to executor.", + {'next_time': next_time, 'now': now, + 'entry': entry_time, 'end': end_time}) + return next_time + @classmethod def check_trigger_definition(cls, trigger_definition): - """Check trigger definition - - All the time instances of trigger_definition are in UTC, - including start_time, end_time - """ - tf_cls = cls._get_time_format_class() - - pattern = trigger_definition.get("pattern", None) - tf_cls.check_time_format(pattern) - - start_time = trigger_definition.get("start_time", None) - if not start_time: - msg = _("The trigger\'s start time is unknown") - raise exception.InvalidInput(msg) - start_time = cls._check_and_get_datetime(start_time, "start_time") - - interval = tf_cls(start_time, pattern).get_min_interval() - if interval is not None and interval < CONF.min_interval: - msg = (_("The interval of two adjacent time points " - "is less than %d") % CONF.min_interval) - raise exception.InvalidInput(msg) - - window = trigger_definition.get("window", CONF.min_window_time) - if not isinstance(window, int): - try: - window = int(window) - except Exception: - msg = (_("The trigger windows(%s) is not integer") % window) - raise exception.InvalidInput(msg) - - if window < CONF.min_window_time or window > CONF.max_window_time: - msg = (_("The trigger windows %(window)d must be between " - "%(min_window)d and %(max_window)d") % - {"window": window, - "min_window": CONF.min_window_time, - "max_window": CONF.max_window_time}) - raise exception.InvalidInput(msg) - - end_time = trigger_definition.get("end_time", None) - end_time = cls._check_and_get_datetime(end_time, "end_time") - - valid_trigger_property = trigger_definition.copy() - valid_trigger_property['window'] = window - valid_trigger_property['start_time'] = start_time - valid_trigger_property['end_time'] = end_time - return valid_trigger_property - - @classmethod - def _check_and_get_datetime(cls, time, time_name): - if not time: - return None - - if isinstance(time, datetime): - return time - - if not isinstance(time, six.string_types): - msg = (_("The trigger %(name)s(type = %(vtype)s) is not an " - "instance of string") % - {"name": time_name, "vtype": type(time)}) - raise exception.InvalidInput(msg) - - try: - time = timeutils.parse_strtime(time, fmt='%Y-%m-%d %H:%M:%S') - except Exception: - msg = (_("The format of trigger %s is not correct") % time_name) - raise exception.InvalidInput(msg) - - return time + return utils.check_trigger_definition(trigger_definition) @classmethod def _compute_next_run_time(cls, start_time, end_time, timer): - next_time = timer.compute_next_time(start_time) - - if next_time and (not end_time or next_time <= end_time): - return next_time - return None - - @classmethod - def _get_time_format_class(cls): - return import_driver.DriverManager( - 'karbor.operationengine.engine.timetrigger.time_format', - CONF.time_format).driver + return utils.compute_next_run_time(start_time, end_time, timer) @classmethod def _get_timer(cls, trigger_property): - tf_cls = cls._get_time_format_class() - timer = tf_cls(trigger_property['start_time'], - trigger_property['pattern']) - return timer + return utils.get_timer(trigger_property) @classmethod def check_configuration(cls): - min_window = CONF.min_window_time - max_window = CONF.max_window_time - min_interval = CONF.min_interval - - if not (min_window < max_window and (max_window * 2 <= min_interval)): - msg = (_('Configurations of time trigger are invalid')) - raise exception.InvalidInput(msg) + utils.check_configuration() diff --git a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger_multi_node.py b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger_multi_node.py new file mode 100644 index 00000000..5ebf54e6 --- /dev/null +++ b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger_multi_node.py @@ -0,0 +1,253 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from datetime import datetime +from datetime import timedelta + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_service import loopingcall + +from karbor import context as karbor_context +from karbor import db +from karbor import exception +from karbor.i18n import _ +from karbor.services.operationengine.engine import triggers +from karbor.services.operationengine.engine.triggers.timetrigger import utils + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class TimeTrigger(triggers.BaseTrigger): + + TRIGGER_TYPE = "time" + IS_ENABLED = (CONF.scheduling_strategy == 'multi_node') + + _loopingcall = None + _triggers = {} + + def __init__(self, trigger_id, trigger_property, executor): + super(TimeTrigger, self).__init__( + trigger_id, trigger_property, executor) + + self._trigger_property = self.check_trigger_definition( + trigger_property) + + timer = self._get_timer(self._trigger_property) + first_run_time = self._compute_next_run_time( + datetime.utcnow(), self._trigger_property['end_time'], timer) + LOG.debug("first_run_time: %s", first_run_time) + + self._trigger_execution_new(self._id, first_run_time) + + if not self.__class__._loopingcall: + self.__class__._loopingcall = loopingcall.FixedIntervalLoopingCall( + self._loop) + self.__class__._loopingcall.start( + interval=CONF.trigger_poll_interval, + stop_on_exception=False, + ) + + self._register() + + def _register(self): + self.__class__._triggers[self._id] = self + + def _unregister(self): + del self.__class__._triggers[self._id] + + @classmethod + def _loop(cls): + while True: + now = datetime.utcnow() + exec_to_handle = cls._trigger_execution_get_next() + if not exec_to_handle: + LOG.debug("No next trigger executions") + break + + trigger_id = exec_to_handle.trigger_id + execution_time = exec_to_handle.execution_time + trigger = cls._triggers.get(trigger_id) + if not trigger: + LOG.warning("Unable to find trigger %s", trigger_id) + res = cls._trigger_execution_delete( + execution_id=exec_to_handle.id) + continue + + if now < execution_time: + LOG.debug("Time trigger not yet due") + break + + trigger_property = trigger._trigger_property + timer = cls._get_timer(trigger_property) + window = trigger_property.get("window") + end_time_to_run = execution_time + timedelta( + seconds=window) + + if now > end_time_to_run: + LOG.debug("Time trigger (%s) out of window",) + execute = False + else: + LOG.debug("Time trigger (%s) is due", trigger_id) + execute = True + + next_exec_time = cls._compute_next_run_time( + now, + trigger_property['end_time'], + timer, + ) + if not next_exec_time: + LOG.debug("No more planned executions for trigger (%s)", + trigger_id) + res = cls._trigger_execution_delete( + execution_id=exec_to_handle.id) + else: + LOG.debug("Rescheduling (%s) from %s to %s", + trigger_id, + execution_time, + next_exec_time) + res = cls._trigger_execution_update( + exec_to_handle.id, + execution_time, + next_exec_time, + ) + + if not res: + LOG.info("Trigger probably handled by another node") + continue + + if execute: + cls._trigger_operations(trigger_id, execution_time, window) + + @classmethod + def _trigger_execution_new(cls, trigger_id, time): + # Find the first time. + # We don't known when using this trigger first time. + ctxt = karbor_context.get_admin_context() + try: + db.trigger_execution_create(ctxt, trigger_id, time) + return True + except Exception: + return False + + @classmethod + def _trigger_execution_update(cls, id, current_time, next_time): + ctxt = karbor_context.get_admin_context() + return db.trigger_execution_update(ctxt, id, current_time, next_time) + + @classmethod + def _trigger_execution_delete(cls, execution_id=None, trigger_id=None): + if execution_id is None and trigger_id is None: + raise exception.InvalidParameterValue('supply at least one id') + + ctxt = karbor_context.get_admin_context() + num_deleted = db.trigger_execution_delete(ctxt, execution_id, + trigger_id) + return num_deleted > 0 + + @classmethod + def _trigger_execution_get_next(cls): + ctxt = karbor_context.get_admin_context() + return db.trigger_execution_get_next(ctxt) + + def shutdown(self): + self._unregister() + + def register_operation(self, operation_id, **kwargs): + if operation_id in self._operation_ids: + msg = (_("The operation_id(%s) is exist") % operation_id) + raise exception.ScheduledOperationExist(msg) + + self._operation_ids.add(operation_id) + + def unregister_operation(self, operation_id, **kwargs): + self._operation_ids.discard(operation_id) + + def update_trigger_property(self, trigger_property): + valid_trigger_property = self.check_trigger_definition( + trigger_property) + + if valid_trigger_property == self._trigger_property: + return + + timer = self._get_timer(valid_trigger_property) + first_run_time = self._compute_next_run_time( + datetime.utcnow(), valid_trigger_property['end_time'], timer) + + if not first_run_time: + msg = (_("The new trigger property is invalid, " + "Can not find the first run time")) + raise exception.InvalidInput(msg) + + self._trigger_property = valid_trigger_property + self._trigger_execution_delete(trigger_id=self._id) + self._trigger_execution_new(self._id, first_run_time) + + @classmethod + def _trigger_operations(cls, trigger_id, expect_run_time, window): + """Trigger operations once""" + + # The executor execute_operation may have I/O operation. + # If it is, this green thread will be switched out during looping + # operation_ids. In order to avoid changing self._operation_ids + # during the green thread is switched out, copy self._operation_ids + # as the iterative object. + trigger = cls._triggers.get(trigger_id) + if not trigger: + LOG.warning("Can't find trigger: %s" % trigger_id) + return + operations_ids = trigger._operation_ids.copy() + sent_ops = set() + end_time = expect_run_time + timedelta(seconds=window) + + for operation_id in operations_ids: + if operation_id not in trigger._operation_ids: + # Maybe, when traversing this operation_id, it has been + # removed by self.unregister_operation + LOG.warning("Execute operation %s which is not exist, " + "ignore it", operation_id) + continue + + now = datetime.utcnow() + if now >= end_time: + LOG.error("Can not trigger operations to run. Because it is " + "out of window time. now=%(now)s, " + "end time=%(end_time)s, waiting operations=%(ops)s", + {'now': now, 'end_time': end_time, + 'ops': operations_ids - sent_ops}) + break + + try: + trigger._executor.execute_operation( + operation_id, now, expect_run_time, window) + except Exception: + LOG.exception("Submit operation to executor failed, operation" + " id=%s", operation_id) + + sent_ops.add(operation_id) + + @classmethod + def check_trigger_definition(cls, trigger_definition): + return utils.check_trigger_definition(trigger_definition) + + @classmethod + def _compute_next_run_time(cls, start_time, end_time, timer): + return utils.compute_next_run_time(start_time, end_time, timer) + + @classmethod + def _get_timer(cls, trigger_property): + return utils.get_timer(trigger_property) + + @classmethod + def check_configuration(cls): + utils.check_configuration() diff --git a/karbor/services/operationengine/engine/triggers/timetrigger/utils.py b/karbor/services/operationengine/engine/triggers/timetrigger/utils.py new file mode 100644 index 00000000..e2dcfec4 --- /dev/null +++ b/karbor/services/operationengine/engine/triggers/timetrigger/utils.py @@ -0,0 +1,123 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from datetime import datetime +from oslo_config import cfg +from oslo_utils import timeutils +import six +from stevedore import driver as import_driver + +from karbor import exception +from karbor.i18n import _ + +CONF = cfg.CONF + + +def get_time_format_class(): + return import_driver.DriverManager( + 'karbor.operationengine.engine.timetrigger.time_format', + CONF.time_format).driver + + +def compute_next_run_time(start_time, end_time, timer): + next_time = timer.compute_next_time(start_time) + if next_time and (not end_time or next_time <= end_time): + return next_time + return None + + +def check_and_get_datetime(time, time_name): + if not time: + return None + + if isinstance(time, datetime): + return time + + if not isinstance(time, six.string_types): + msg = (_("The trigger %(name)s(type = %(vtype)s) is " + "not an instance of string") % + {"name": time_name, "vtype": type(time)}) + raise exception.InvalidInput(msg) + + try: + time = timeutils.parse_strtime(time, fmt='%Y-%m-%d %H:%M:%S') + except Exception: + msg = (_("The format of trigger %s is not correct") % time_name) + raise exception.InvalidInput(msg) + + return time + + +def check_trigger_definition(trigger_definition): + """Check trigger definition + + All the time instances of trigger_definition are in UTC, + including start_time, end_time + """ + tf_cls = get_time_format_class() + + pattern = trigger_definition.get("pattern", None) + tf_cls.check_time_format(pattern) + + start_time = trigger_definition.get("start_time", None) + if not start_time: + msg = _("The trigger\'s start time is unknown") + raise exception.InvalidInput(msg) + start_time = check_and_get_datetime(start_time, "start_time") + + interval = tf_cls(start_time, pattern).get_min_interval() + if interval is not None and interval < CONF.min_interval: + msg = (_("The interval of two adjacent time points " + "is less than %d") % CONF.min_interval) + raise exception.InvalidInput(msg) + + window = trigger_definition.get("window", CONF.min_window_time) + if not isinstance(window, int): + try: + window = int(window) + except Exception: + msg = (_("The trigger windows(%s) is not integer") % window) + raise exception.InvalidInput(msg) + + if window < CONF.min_window_time or window > CONF.max_window_time: + msg = (_("The trigger windows %(window)d must be between " + "%(min_window)d and %(max_window)d") % + {"window": window, + "min_window": CONF.min_window_time, + "max_window": CONF.max_window_time}) + raise exception.InvalidInput(msg) + + end_time = trigger_definition.get("end_time", None) + end_time = check_and_get_datetime(end_time, "end_time") + + valid_trigger_property = trigger_definition.copy() + valid_trigger_property['window'] = window + valid_trigger_property['start_time'] = start_time + valid_trigger_property['end_time'] = end_time + return valid_trigger_property + + +def check_configuration(): + min_window = CONF.min_window_time + max_window = CONF.max_window_time + min_interval = CONF.min_interval + + if not (min_window < max_window and (max_window * 2 <= min_interval)): + msg = (_('Configurations of time trigger are invalid')) + raise exception.InvalidInput(msg) + + +def get_timer(trigger_property): + tf_cls = get_time_format_class() + timer = tf_cls(trigger_property['start_time'], + trigger_property['pattern']) + return timer diff --git a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py index 6ed63f1f..96fb1c08 100644 --- a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py +++ b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py @@ -9,28 +9,19 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -from collections import namedtuple from datetime import datetime from datetime import timedelta import eventlet -import functools -import heapq import mock from oslo_config import cfg -from oslo_utils import uuidutils -from karbor import context as karbor_context from karbor import exception -from karbor.services.operationengine.engine.triggers.timetrigger \ - import time_trigger as tt +from karbor.services.operationengine.engine.triggers.timetrigger.time_trigger \ + import TimeTrigger +from karbor.services.operationengine.engine.triggers.timetrigger import utils from karbor.tests import base -TriggerExecution = namedtuple('TriggerExecution', - ['execution_time', 'id', 'trigger_id']) - - class FakeTimeFormat(object): def __init__(self, start_time, pattern): super(FakeTimeFormat, self).__init__() @@ -58,78 +49,30 @@ class FakeExecutor(object): self._ops[operation_id] += 1 eventlet.sleep(0.5) - -class FakeTimeTrigger(object): - @classmethod - def get_time_format(cls, *args, **kwargs): - return FakeTimeFormat - - -class FakeDb(object): - def __init__(self): - self._db = [] - - def trigger_execution_get_next(self, context): - if len(self._db) == 0: - return None - return self._db[0] - - def trigger_execution_create(self, context, trigger_id, time): - element = TriggerExecution(time, uuidutils.generate_uuid(), trigger_id) - heapq.heappush(self._db, element) - - def trigger_execution_update(self, context, id, current_time, new_time): - for idx, element in enumerate(self._db): - if element.id == id: - if element.execution_time != current_time: - return False - self._db[idx] = TriggerExecution(new_time, element.id, - element.trigger_id) - break - heapq.heapify(self._db) - return True - - def trigger_execution_delete(self, context, id, trigger_id): - removed_ids = [] - for idx, element in enumerate(self._db): - if (id and element.id == id) or (trigger_id and - element.trigger_id == trigger_id): - removed_ids.append(idx) - - for idx in reversed(removed_ids): - self._db.pop(idx) - heapq.heapify(self._db) - return len(removed_ids) - - -def time_trigger_test(func): - @functools.wraps(func) - @mock.patch.object(tt, 'db', FakeDb()) - @mock.patch.object(karbor_context, 'get_admin_context', lambda: None) - @mock.patch.object(tt.TimeTrigger, '_get_time_format_class', - FakeTimeTrigger.get_time_format) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - - return wrapper + def clear(self): + self._ops.clear() class TimeTriggerTestCase(base.TestCase): - _tid = 0 - _default_executor = FakeExecutor() def setUp(self): super(TimeTriggerTestCase, self).setUp() + self._set_configuration() + mock_obj = mock.Mock() + mock_obj.return_value = FakeTimeFormat + utils.get_time_format_class = mock_obj + + self._default_executor = FakeExecutor() + def test_check_configuration(self): self._set_configuration(10, 20, 30) self.assertRaisesRegex(exception.InvalidInput, "Configurations of time trigger are invalid", - tt.TimeTrigger.check_configuration) + TimeTrigger.check_configuration) self._set_configuration() - @time_trigger_test def test_check_trigger_property_start_time(self): trigger_property = { "pattern": "", @@ -138,23 +81,22 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The trigger\'s start time is unknown", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) trigger_property['start_time'] = 'abc' self.assertRaisesRegex(exception.InvalidInput, "The format of trigger .* is not correct", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) trigger_property['start_time'] = 123 self.assertRaisesRegex(exception.InvalidInput, "The trigger .* is not an instance of string", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) @mock.patch.object(FakeTimeFormat, 'get_min_interval') - @time_trigger_test def test_check_trigger_property_interval(self, get_min_interval): get_min_interval.return_value = 0 @@ -164,10 +106,9 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The interval of two adjacent time points .*", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) - @time_trigger_test def test_check_trigger_property_window(self): trigger_property = { "window": "abc", @@ -176,16 +117,15 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The trigger window.* is not integer", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) trigger_property['window'] = 1000 self.assertRaisesRegex(exception.InvalidInput, "The trigger windows .* must be between .*", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) - @time_trigger_test def test_check_trigger_property_end_time(self): trigger_property = { "window": 15, @@ -195,24 +135,27 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The format of trigger .* is not correct", - tt.TimeTrigger.check_trigger_definition, + TimeTrigger.check_trigger_definition, trigger_property) - @time_trigger_test def test_register_operation(self): trigger = self._generate_trigger() operation_id = "1" trigger.register_operation(operation_id) - eventlet.sleep(2) + eventlet.sleep(0.3) - self.assertGreaterEqual(self._default_executor._ops[operation_id], 1) + self.assertGreaterEqual(trigger._executor._ops[operation_id], 1) self.assertRaisesRegex(exception.ScheduledOperationExist, "The operation_id.* is exist", trigger.register_operation, operation_id) - @time_trigger_test + eventlet.sleep(0.3) + self.assertRaises(exception.TriggerIsInvalid, + trigger.register_operation, + "2") + def test_unregister_operation(self): trigger = self._generate_trigger() operation_id = "2" @@ -221,9 +164,26 @@ class TimeTriggerTestCase(base.TestCase): self.assertIn(operation_id, trigger._operation_ids) trigger.unregister_operation(operation_id) - self.assertNotIn(trigger._id, trigger._operation_ids) + self.assertNotIn(operation_id, trigger._operation_ids) + + def test_unregister_operation_when_scheduling(self): + trigger = self._generate_trigger() + + for op_id in ['1', '2', '3']: + trigger.register_operation(op_id) + self.assertIn(op_id, trigger._operation_ids) + eventlet.sleep(0.5) + + for op_id in ['2', '3']: + trigger.unregister_operation(op_id) + self.assertNotIn(op_id, trigger._operation_ids) + eventlet.sleep(0.6) + + self.assertGreaterEqual(trigger._executor._ops['1'], 1) + + self.assertTrue(('2' not in trigger._executor._ops) or ( + '3' not in trigger._executor._ops)) - @time_trigger_test def test_update_trigger_property(self): trigger = self._generate_trigger() @@ -239,10 +199,18 @@ class TimeTriggerTestCase(base.TestCase): trigger.update_trigger_property, trigger_property) - @time_trigger_test + trigger.register_operation('1') + eventlet.sleep(0.2) + trigger_property['end_time'] = ( + datetime.utcnow() + timedelta(seconds=1)) + self.assertRaisesRegex(exception.InvalidInput, + ".*First run time.* must be after.*", + trigger.update_trigger_property, + trigger_property) + def test_update_trigger_property_success(self): trigger = self._generate_trigger() - trigger.register_operation('7') + trigger.register_operation('1') eventlet.sleep(0.2) trigger_property = { @@ -253,8 +221,12 @@ class TimeTriggerTestCase(base.TestCase): } with mock.patch.object(FakeTimeFormat, 'compute_next_time') as c: c.return_value = datetime.utcnow() + timedelta(seconds=20) + old_id = id(trigger._greenthread) + trigger.update_trigger_property(trigger_property) + self.assertNotEqual(old_id, id(trigger._greenthread)) + def _generate_trigger(self, end_time=None): if not end_time: end_time = datetime.utcnow() + timedelta(seconds=1) @@ -266,15 +238,11 @@ class TimeTriggerTestCase(base.TestCase): "end_time": end_time } - return tt.TimeTrigger( - uuidutils.generate_uuid(), - trigger_property, - self._default_executor, - ) + self._default_executor.clear() + return TimeTrigger("123", trigger_property, self._default_executor) def _set_configuration(self, min_window=15, - max_window=30, min_interval=60, poll_interval=1): + max_window=30, min_interval=60): self.override_config('min_interval', min_interval) self.override_config('min_window_time', min_window) self.override_config('max_window_time', max_window) - self.override_config('trigger_poll_interval', poll_interval) diff --git a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger_multi_node.py b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger_multi_node.py new file mode 100644 index 00000000..501ed36d --- /dev/null +++ b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger_multi_node.py @@ -0,0 +1,281 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import namedtuple +from datetime import datetime +from datetime import timedelta +import eventlet +import functools +import heapq +import mock +from oslo_config import cfg +from oslo_utils import uuidutils + +from karbor import context as karbor_context +from karbor import exception +from karbor.services.operationengine.engine.triggers.timetrigger import \ + time_trigger_multi_node as tt +from karbor.services.operationengine.engine.triggers.timetrigger import utils +from karbor.tests import base + + +TriggerExecution = namedtuple('TriggerExecution', + ['execution_time', 'id', 'trigger_id']) + + +class FakeTimeFormat(object): + def __init__(self, start_time, pattern): + super(FakeTimeFormat, self).__init__() + + @classmethod + def check_time_format(cls, pattern): + pass + + def compute_next_time(self, current_time): + return current_time + timedelta(seconds=0.5) + + def get_min_interval(self): + return cfg.CONF.min_interval + + +class FakeExecutor(object): + def __init__(self): + super(FakeExecutor, self).__init__() + self._ops = {} + + def execute_operation(self, operation_id, triggered_time, + expect_start_time, window): + if operation_id not in self._ops: + self._ops[operation_id] = 0 + self._ops[operation_id] += 1 + eventlet.sleep(0.5) + + +class FakeTimeTrigger(object): + @classmethod + def get_time_format(cls, *args, **kwargs): + return FakeTimeFormat + + +class FakeDb(object): + def __init__(self): + self._db = [] + + def trigger_execution_get_next(self, context): + if len(self._db) == 0: + return None + return self._db[0] + + def trigger_execution_create(self, context, trigger_id, time): + element = TriggerExecution(time, uuidutils.generate_uuid(), trigger_id) + heapq.heappush(self._db, element) + + def trigger_execution_update(self, context, id, current_time, new_time): + for idx, element in enumerate(self._db): + if element.id == id: + if element.execution_time != current_time: + return False + self._db[idx] = TriggerExecution(new_time, element.id, + element.trigger_id) + break + heapq.heapify(self._db) + return True + + def trigger_execution_delete(self, context, id, trigger_id): + removed_ids = [] + for idx, element in enumerate(self._db): + if (id and element.id == id) or (trigger_id and + element.trigger_id == trigger_id): + removed_ids.append(idx) + + for idx in reversed(removed_ids): + self._db.pop(idx) + heapq.heapify(self._db) + return len(removed_ids) + + +def time_trigger_test(func): + @functools.wraps(func) + @mock.patch.object(tt, 'db', FakeDb()) + @mock.patch.object(karbor_context, 'get_admin_context', lambda: None) + @mock.patch.object(utils, 'get_time_format_class', + FakeTimeTrigger.get_time_format) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + +class TimeTriggerTestCase(base.TestCase): + _tid = 0 + _default_executor = FakeExecutor() + + def setUp(self): + super(TimeTriggerTestCase, self).setUp() + self._set_configuration() + + def test_check_configuration(self): + self._set_configuration(10, 20, 30) + self.assertRaisesRegex(exception.InvalidInput, + "Configurations of time trigger are invalid", + tt.TimeTrigger.check_configuration) + self._set_configuration() + + @time_trigger_test + def test_check_trigger_property_start_time(self): + trigger_property = { + "pattern": "", + "start_time": "" + } + + self.assertRaisesRegex(exception.InvalidInput, + "The trigger\'s start time is unknown", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + trigger_property['start_time'] = 'abc' + self.assertRaisesRegex(exception.InvalidInput, + "The format of trigger .* is not correct", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + trigger_property['start_time'] = 123 + self.assertRaisesRegex(exception.InvalidInput, + "The trigger .* is not an instance of string", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + @mock.patch.object(FakeTimeFormat, 'get_min_interval') + @time_trigger_test + def test_check_trigger_property_interval(self, get_min_interval): + get_min_interval.return_value = 0 + + trigger_property = { + "start_time": '2016-8-18 01:03:04' + } + + self.assertRaisesRegex(exception.InvalidInput, + "The interval of two adjacent time points .*", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + @time_trigger_test + def test_check_trigger_property_window(self): + trigger_property = { + "window": "abc", + "start_time": '2016-8-18 01:03:04' + } + + self.assertRaisesRegex(exception.InvalidInput, + "The trigger window.* is not integer", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + trigger_property['window'] = 1000 + self.assertRaisesRegex(exception.InvalidInput, + "The trigger windows .* must be between .*", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + @time_trigger_test + def test_check_trigger_property_end_time(self): + trigger_property = { + "window": 15, + "start_time": '2016-8-18 01:03:04', + "end_time": "abc" + } + + self.assertRaisesRegex(exception.InvalidInput, + "The format of trigger .* is not correct", + tt.TimeTrigger.check_trigger_definition, + trigger_property) + + @time_trigger_test + def test_register_operation(self): + trigger = self._generate_trigger() + + operation_id = "1" + trigger.register_operation(operation_id) + eventlet.sleep(2) + + self.assertGreaterEqual(self._default_executor._ops[operation_id], 1) + self.assertRaisesRegex(exception.ScheduledOperationExist, + "The operation_id.* is exist", + trigger.register_operation, + operation_id) + + @time_trigger_test + def test_unregister_operation(self): + trigger = self._generate_trigger() + operation_id = "2" + + trigger.register_operation(operation_id) + self.assertIn(operation_id, trigger._operation_ids) + + trigger.unregister_operation(operation_id) + self.assertNotIn(trigger._id, trigger._operation_ids) + + @time_trigger_test + def test_update_trigger_property(self): + trigger = self._generate_trigger() + + trigger_property = { + "pattern": "", + "window": 15, + "start_time": datetime.utcnow(), + "end_time": datetime.utcnow() + } + + self.assertRaisesRegex(exception.InvalidInput, + ".*Can not find the first run tim", + trigger.update_trigger_property, + trigger_property) + + @time_trigger_test + def test_update_trigger_property_success(self): + trigger = self._generate_trigger() + trigger.register_operation('7') + eventlet.sleep(0.2) + + trigger_property = { + "pattern": "", + "window": 15, + "start_time": datetime.utcnow(), + "end_time": '' + } + with mock.patch.object(FakeTimeFormat, 'compute_next_time') as c: + c.return_value = datetime.utcnow() + timedelta(seconds=20) + trigger.update_trigger_property(trigger_property) + + def _generate_trigger(self, end_time=None): + if not end_time: + end_time = datetime.utcnow() + timedelta(seconds=1) + + trigger_property = { + "pattern": "", + "window": 15, + "start_time": datetime.utcnow(), + "end_time": end_time + } + + return tt.TimeTrigger( + uuidutils.generate_uuid(), + trigger_property, + self._default_executor, + ) + + def _set_configuration(self, min_window=15, + max_window=30, min_interval=60, poll_interval=1): + self.override_config('min_interval', min_interval) + self.override_config('min_window_time', min_window) + self.override_config('max_window_time', max_window) + self.override_config('trigger_poll_interval', poll_interval)