Make time trigger scheduling strategy configurable

Currently, karbor use the multi node scheduling strategy
instead of the original single node scheduling strategy.
It's better to keep the two scheduling strategies in code
and make scheduling strategy configurable, so that users
can choose which one to use.

Change-Id: I13693da334041b58745040a1cbda1007e3cc2853
Implements: blueprint make-time-trigger-scheduling-strategy-configurable
This commit is contained in:
jiaopengju 2017-11-20 17:20:26 +08:00
parent 4897261d8a
commit 8c1224e513
8 changed files with 928 additions and 352 deletions

View File

@ -22,7 +22,7 @@ import karbor.exception
import karbor.service
import karbor.services.operationengine.engine.executors.green_thread_executor as green_thread_executor # noqa
import karbor.services.operationengine.engine.executors.thread_pool_executor as thread_pool_executor # noqa
import karbor.services.operationengine.engine.triggers.timetrigger.time_trigger as time_trigger # noqa
import karbor.services.operationengine.engine.triggers.timetrigger as time_trigger # noqa
import karbor.services.operationengine.karbor_client
import karbor.services.operationengine.manager
import karbor.services.operationengine.operations.base as base

View File

@ -22,6 +22,7 @@ class BaseTrigger(object):
"""Trigger base class that all Triggers should inherit from"""
TRIGGER_TYPE = ""
IS_ENABLED = True
def __init__(self, trigger_id, trigger_property, executor):
super(BaseTrigger, self).__init__()
@ -68,4 +69,9 @@ class TriggerHandler(loadables.BaseLoader):
def all_triggers():
"""Get all trigger classes."""
return TriggerHandler().get_all_classes()
all_classes = TriggerHandler().get_all_classes()
for trigger_class in all_classes[:]:
if trigger_class.TRIGGER_TYPE == 'time' and (
not trigger_class.IS_ENABLED):
all_classes.remove(trigger_class)
return all_classes

View File

@ -0,0 +1,41 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
time_trigger_opts = [
cfg.IntOpt('min_interval',
default=60 * 60,
help='The minimum interval of two adjacent time points. '
'min_interval >= (max_window_time * 2)'),
cfg.IntOpt('min_window_time',
default=900,
help='The minimum window time'),
cfg.IntOpt('max_window_time',
default=1800,
help='The maximum window time'),
cfg.IntOpt('trigger_poll_interval',
default=15,
help='Interval, in seconds, in which Karbor will poll for '
'trigger events'),
cfg.StrOpt('scheduling_strategy',
default='multi_node',
help='Time trigger scheduling strategy '
)
]
CONF = cfg.CONF
CONF.register_opts(time_trigger_opts)

View File

@ -12,48 +12,84 @@
from datetime import datetime
from datetime import timedelta
import eventlet
import functools
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import timeutils
import six
from stevedore import driver as import_driver
from karbor import context as karbor_context
from karbor import db
from karbor import exception
from karbor.i18n import _
from karbor.services.operationengine.engine import triggers
time_trigger_opts = [
cfg.IntOpt('min_interval',
default=60 * 60,
help='The minimum interval of two adjacent time points. '
'min_interval >= (max_window_time * 2)'),
cfg.IntOpt('min_window_time',
default=900,
help='The minimum window time'),
cfg.IntOpt('max_window_time',
default=1800,
help='The maximum window time'),
cfg.IntOpt('trigger_poll_interval',
default=15,
help='Interval, in seconds, in which Karbor will poll for '
'trigger events'),
]
from karbor.services.operationengine.engine.triggers.timetrigger import utils
CONF = cfg.CONF
CONF.register_opts(time_trigger_opts)
LOG = logging.getLogger(__name__)
class TriggerOperationGreenThread(object):
def __init__(self, first_run_time, function):
super(TriggerOperationGreenThread, self).__init__()
self._is_sleeping = True
self._pre_run_time = None
self._running = False
self._thread = None
self._function = function
self._start(first_run_time)
def kill(self):
self._running = False
if self._is_sleeping:
self._thread.kill()
@property
def running(self):
return self._running
@property
def pre_run_time(self):
return self._pre_run_time
def _start(self, first_run_time):
self._running = True
now = datetime.utcnow()
initial_delay = 0 if first_run_time <= now else (
int(timeutils.delta_seconds(now, first_run_time)))
self._thread = eventlet.spawn_after(
initial_delay, self._run, first_run_time)
self._thread.link(self._on_done)
def _on_done(self, gt, *args, **kwargs):
self._is_sleeping = True
self._pre_run_time = None
self._running = False
self._thread = None
def _run(self, expect_run_time):
while self._running:
self._is_sleeping = False
self._pre_run_time = expect_run_time
expect_run_time = self._function(expect_run_time)
if expect_run_time is None or not self._running:
break
self._is_sleeping = True
now = datetime.utcnow()
idle_time = 0 if expect_run_time <= now else int(
timeutils.delta_seconds(now, expect_run_time))
eventlet.sleep(idle_time)
class TimeTrigger(triggers.BaseTrigger):
TRIGGER_TYPE = "time"
_loopingcall = None
_triggers = {}
IS_ENABLED = (CONF.scheduling_strategy == 'default')
def __init__(self, trigger_id, trigger_property, executor):
super(TimeTrigger, self).__init__(
@ -62,136 +98,30 @@ class TimeTrigger(triggers.BaseTrigger):
self._trigger_property = self.check_trigger_definition(
trigger_property)
timer = self._get_timer(self._trigger_property)
first_run_time = self._compute_next_run_time(
datetime.utcnow(), self._trigger_property['end_time'], timer)
LOG.debug("first_run_time: %s", first_run_time)
self._trigger_execution_new(self._id, first_run_time)
if not self.__class__._loopingcall:
self.__class__._loopingcall = loopingcall.FixedIntervalLoopingCall(
self._loop)
self.__class__._loopingcall.start(
interval=CONF.trigger_poll_interval,
stop_on_exception=False,
)
self._register()
def _register(self):
self.__class__._triggers[self._id] = self
def _unregister(self):
del self.__class__._triggers[self._id]
@classmethod
def _loop(cls):
while True:
now = datetime.utcnow()
exec_to_handle = cls._trigger_execution_get_next()
if not exec_to_handle:
LOG.debug("No next trigger executions")
break
trigger_id = exec_to_handle.trigger_id
execution_time = exec_to_handle.execution_time
trigger = cls._triggers.get(trigger_id)
if not trigger:
LOG.warning("Unable to find trigger %s", trigger_id)
res = cls._trigger_execution_delete(
execution_id=exec_to_handle.id)
continue
if now < execution_time:
LOG.debug("Time trigger not yet due")
break
trigger_property = trigger._trigger_property
timer = cls._get_timer(trigger_property)
window = trigger_property.get("window")
end_time_to_run = execution_time + timedelta(
seconds=window)
if now > end_time_to_run:
LOG.debug("Time trigger (%s) out of window",)
execute = False
else:
LOG.debug("Time trigger (%s) is due", trigger_id)
execute = True
next_exec_time = cls._compute_next_run_time(
now,
trigger_property['end_time'],
timer,
)
res = False
if not next_exec_time:
LOG.debug("No more planned executions for trigger (%s)",
trigger_id)
res = cls._trigger_execution_delete(
execution_id=exec_to_handle.id)
else:
LOG.debug("Rescheduling (%s) from %s to %s",
trigger_id,
execution_time,
next_exec_time)
res = cls._trigger_execution_update(
exec_to_handle.id,
execution_time,
next_exec_time,
)
if not res:
LOG.info("Trigger probably handled by another node")
continue
if execute:
cls._trigger_operations(trigger_id, execution_time, window)
@classmethod
def _trigger_execution_new(cls, trigger_id, time):
# Find the first time.
# We don't known when using this trigger first time.
ctxt = karbor_context.get_admin_context()
try:
db.trigger_execution_create(ctxt, trigger_id, time)
return True
except Exception:
return False
@classmethod
def _trigger_execution_update(cls, id, current_time, next_time):
ctxt = karbor_context.get_admin_context()
return db.trigger_execution_update(ctxt, id, current_time, next_time)
@classmethod
def _trigger_execution_delete(cls, execution_id=None, trigger_id=None):
if execution_id is None and trigger_id is None:
raise exception.InvalidParameterValue('supply at least one id')
ctxt = karbor_context.get_admin_context()
num_deleted = db.trigger_execution_delete(ctxt, execution_id,
trigger_id)
return num_deleted > 0
@classmethod
def _trigger_execution_get_next(cls):
ctxt = karbor_context.get_admin_context()
return db.trigger_execution_get_next(ctxt)
self._greenthread = None
def shutdown(self):
self._unregister()
self._kill_greenthread()
def register_operation(self, operation_id, **kwargs):
if operation_id in self._operation_ids:
msg = (_("The operation_id(%s) is exist") % operation_id)
raise exception.ScheduledOperationExist(msg)
if self._greenthread and not self._greenthread.running:
raise exception.TriggerIsInvalid(trigger_id=self._id)
self._operation_ids.add(operation_id)
if self._greenthread is None:
self._start_greenthread()
def unregister_operation(self, operation_id, **kwargs):
self._operation_ids.discard(operation_id)
if operation_id not in self._operation_ids:
return
self._operation_ids.remove(operation_id)
if 0 == len(self._operation_ids):
self._kill_greenthread()
def update_trigger_property(self, trigger_property):
valid_trigger_property = self.check_trigger_definition(
@ -202,36 +132,82 @@ class TimeTrigger(triggers.BaseTrigger):
timer = self._get_timer(valid_trigger_property)
first_run_time = self._compute_next_run_time(
datetime.utcnow(), valid_trigger_property['end_time'], timer)
datetime.utcnow(), trigger_property['end_time'], timer)
if not first_run_time:
msg = (_("The new trigger property is invalid, "
"Can not find the first run time"))
raise exception.InvalidInput(msg)
if self._greenthread is not None:
pre_run_time = self._greenthread.pre_run_time
if pre_run_time:
end_time = pre_run_time + timedelta(
seconds=self._trigger_property['window'])
if first_run_time <= end_time:
msg = (_("The new trigger property is invalid, "
"First run time%(t1)s must be after %(t2)s") %
{'t1': first_run_time, 't2': end_time})
raise exception.InvalidInput(msg)
self._trigger_property = valid_trigger_property
self._trigger_execution_delete(trigger_id=self._id)
self._trigger_execution_new(self._id, first_run_time)
@classmethod
def _trigger_operations(cls, trigger_id, expect_run_time, window):
"""Trigger operations once"""
if len(self._operation_ids) > 0:
# Restart greenthread to take the change of trigger property
# effect immediately
self._kill_greenthread()
self._create_green_thread(first_run_time, timer)
# The executor execute_operation may have I/O operation.
def _kill_greenthread(self):
if self._greenthread:
self._greenthread.kill()
self._greenthread = None
def _start_greenthread(self):
# Find the first time.
# We don't known when using this trigger first time.
timer = self._get_timer(self._trigger_property)
first_run_time = self._compute_next_run_time(
datetime.utcnow(), self._trigger_property['end_time'], timer)
if not first_run_time:
raise exception.TriggerIsInvalid(trigger_id=self._id)
self._create_green_thread(first_run_time, timer)
def _create_green_thread(self, first_run_time, timer):
func = functools.partial(
self._trigger_operations,
trigger_property=self._trigger_property.copy(),
timer=timer)
self._greenthread = TriggerOperationGreenThread(
first_run_time, func)
def _trigger_operations(self, expect_run_time, trigger_property, timer):
"""Trigger operations once
returns: wait time for next run
"""
# Just for robustness, actually expect_run_time always <= now
# but, if the scheduling of eventlet is not accurate, then we
# can do some adjustments.
entry_time = datetime.utcnow()
if entry_time < expect_run_time and (
int(timeutils.delta_seconds(entry_time, expect_run_time)) > 0):
return expect_run_time
# The self._executor.execute_operation may have I/O operation.
# If it is, this green thread will be switched out during looping
# operation_ids. In order to avoid changing self._operation_ids
# during the green thread is switched out, copy self._operation_ids
# as the iterative object.
trigger = cls._triggers.get(trigger_id)
if not trigger:
LOG.warning("Can't find trigger: %s" % trigger_id)
return
operations_ids = trigger._operation_ids.copy()
operation_ids = self._operation_ids.copy()
sent_ops = set()
window = trigger_property.get("window")
end_time = expect_run_time + timedelta(seconds=window)
for operation_id in operations_ids:
if operation_id not in trigger._operation_ids:
for operation_id in operation_ids:
if operation_id not in self._operation_ids:
# Maybe, when traversing this operation_id, it has been
# removed by self.unregister_operation
LOG.warning("Execute operation %s which is not exist, "
@ -242,13 +218,15 @@ class TimeTrigger(triggers.BaseTrigger):
if now >= end_time:
LOG.error("Can not trigger operations to run. Because it is "
"out of window time. now=%(now)s, "
"end time=%(end_time)s, waiting operations=%(ops)s",
"end time=%(end_time)s, expect run time=%(expect)s,"
" wating operations=%(ops)s",
{'now': now, 'end_time': end_time,
'ops': operations_ids - sent_ops})
'expect': expect_run_time,
'ops': operation_ids - sent_ops})
break
try:
trigger._executor.execute_operation(
self._executor.execute_operation(
operation_id, now, expect_run_time, window)
except Exception:
LOG.exception("Submit operation to executor failed, operation"
@ -256,104 +234,30 @@ class TimeTrigger(triggers.BaseTrigger):
sent_ops.add(operation_id)
next_time = self._compute_next_run_time(
expect_run_time, trigger_property['end_time'], timer)
now = datetime.utcnow()
if next_time and next_time <= now:
LOG.error("Next run time:%(next_time)s <= now:%(now)s. Maybe the "
"entry time=%(entry)s is too late, even exceeds the end"
" time of window=%(end)s, or it was blocked where "
"sending the operation to executor.",
{'next_time': next_time, 'now': now,
'entry': entry_time, 'end': end_time})
return next_time
@classmethod
def check_trigger_definition(cls, trigger_definition):
"""Check trigger definition
All the time instances of trigger_definition are in UTC,
including start_time, end_time
"""
tf_cls = cls._get_time_format_class()
pattern = trigger_definition.get("pattern", None)
tf_cls.check_time_format(pattern)
start_time = trigger_definition.get("start_time", None)
if not start_time:
msg = _("The trigger\'s start time is unknown")
raise exception.InvalidInput(msg)
start_time = cls._check_and_get_datetime(start_time, "start_time")
interval = tf_cls(start_time, pattern).get_min_interval()
if interval is not None and interval < CONF.min_interval:
msg = (_("The interval of two adjacent time points "
"is less than %d") % CONF.min_interval)
raise exception.InvalidInput(msg)
window = trigger_definition.get("window", CONF.min_window_time)
if not isinstance(window, int):
try:
window = int(window)
except Exception:
msg = (_("The trigger windows(%s) is not integer") % window)
raise exception.InvalidInput(msg)
if window < CONF.min_window_time or window > CONF.max_window_time:
msg = (_("The trigger windows %(window)d must be between "
"%(min_window)d and %(max_window)d") %
{"window": window,
"min_window": CONF.min_window_time,
"max_window": CONF.max_window_time})
raise exception.InvalidInput(msg)
end_time = trigger_definition.get("end_time", None)
end_time = cls._check_and_get_datetime(end_time, "end_time")
valid_trigger_property = trigger_definition.copy()
valid_trigger_property['window'] = window
valid_trigger_property['start_time'] = start_time
valid_trigger_property['end_time'] = end_time
return valid_trigger_property
@classmethod
def _check_and_get_datetime(cls, time, time_name):
if not time:
return None
if isinstance(time, datetime):
return time
if not isinstance(time, six.string_types):
msg = (_("The trigger %(name)s(type = %(vtype)s) is not an "
"instance of string") %
{"name": time_name, "vtype": type(time)})
raise exception.InvalidInput(msg)
try:
time = timeutils.parse_strtime(time, fmt='%Y-%m-%d %H:%M:%S')
except Exception:
msg = (_("The format of trigger %s is not correct") % time_name)
raise exception.InvalidInput(msg)
return time
return utils.check_trigger_definition(trigger_definition)
@classmethod
def _compute_next_run_time(cls, start_time, end_time, timer):
next_time = timer.compute_next_time(start_time)
if next_time and (not end_time or next_time <= end_time):
return next_time
return None
@classmethod
def _get_time_format_class(cls):
return import_driver.DriverManager(
'karbor.operationengine.engine.timetrigger.time_format',
CONF.time_format).driver
return utils.compute_next_run_time(start_time, end_time, timer)
@classmethod
def _get_timer(cls, trigger_property):
tf_cls = cls._get_time_format_class()
timer = tf_cls(trigger_property['start_time'],
trigger_property['pattern'])
return timer
return utils.get_timer(trigger_property)
@classmethod
def check_configuration(cls):
min_window = CONF.min_window_time
max_window = CONF.max_window_time
min_interval = CONF.min_interval
if not (min_window < max_window and (max_window * 2 <= min_interval)):
msg = (_('Configurations of time trigger are invalid'))
raise exception.InvalidInput(msg)
utils.check_configuration()

View File

@ -0,0 +1,253 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from datetime import timedelta
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from karbor import context as karbor_context
from karbor import db
from karbor import exception
from karbor.i18n import _
from karbor.services.operationengine.engine import triggers
from karbor.services.operationengine.engine.triggers.timetrigger import utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class TimeTrigger(triggers.BaseTrigger):
TRIGGER_TYPE = "time"
IS_ENABLED = (CONF.scheduling_strategy == 'multi_node')
_loopingcall = None
_triggers = {}
def __init__(self, trigger_id, trigger_property, executor):
super(TimeTrigger, self).__init__(
trigger_id, trigger_property, executor)
self._trigger_property = self.check_trigger_definition(
trigger_property)
timer = self._get_timer(self._trigger_property)
first_run_time = self._compute_next_run_time(
datetime.utcnow(), self._trigger_property['end_time'], timer)
LOG.debug("first_run_time: %s", first_run_time)
self._trigger_execution_new(self._id, first_run_time)
if not self.__class__._loopingcall:
self.__class__._loopingcall = loopingcall.FixedIntervalLoopingCall(
self._loop)
self.__class__._loopingcall.start(
interval=CONF.trigger_poll_interval,
stop_on_exception=False,
)
self._register()
def _register(self):
self.__class__._triggers[self._id] = self
def _unregister(self):
del self.__class__._triggers[self._id]
@classmethod
def _loop(cls):
while True:
now = datetime.utcnow()
exec_to_handle = cls._trigger_execution_get_next()
if not exec_to_handle:
LOG.debug("No next trigger executions")
break
trigger_id = exec_to_handle.trigger_id
execution_time = exec_to_handle.execution_time
trigger = cls._triggers.get(trigger_id)
if not trigger:
LOG.warning("Unable to find trigger %s", trigger_id)
res = cls._trigger_execution_delete(
execution_id=exec_to_handle.id)
continue
if now < execution_time:
LOG.debug("Time trigger not yet due")
break
trigger_property = trigger._trigger_property
timer = cls._get_timer(trigger_property)
window = trigger_property.get("window")
end_time_to_run = execution_time + timedelta(
seconds=window)
if now > end_time_to_run:
LOG.debug("Time trigger (%s) out of window",)
execute = False
else:
LOG.debug("Time trigger (%s) is due", trigger_id)
execute = True
next_exec_time = cls._compute_next_run_time(
now,
trigger_property['end_time'],
timer,
)
if not next_exec_time:
LOG.debug("No more planned executions for trigger (%s)",
trigger_id)
res = cls._trigger_execution_delete(
execution_id=exec_to_handle.id)
else:
LOG.debug("Rescheduling (%s) from %s to %s",
trigger_id,
execution_time,
next_exec_time)
res = cls._trigger_execution_update(
exec_to_handle.id,
execution_time,
next_exec_time,
)
if not res:
LOG.info("Trigger probably handled by another node")
continue
if execute:
cls._trigger_operations(trigger_id, execution_time, window)
@classmethod
def _trigger_execution_new(cls, trigger_id, time):
# Find the first time.
# We don't known when using this trigger first time.
ctxt = karbor_context.get_admin_context()
try:
db.trigger_execution_create(ctxt, trigger_id, time)
return True
except Exception:
return False
@classmethod
def _trigger_execution_update(cls, id, current_time, next_time):
ctxt = karbor_context.get_admin_context()
return db.trigger_execution_update(ctxt, id, current_time, next_time)
@classmethod
def _trigger_execution_delete(cls, execution_id=None, trigger_id=None):
if execution_id is None and trigger_id is None:
raise exception.InvalidParameterValue('supply at least one id')
ctxt = karbor_context.get_admin_context()
num_deleted = db.trigger_execution_delete(ctxt, execution_id,
trigger_id)
return num_deleted > 0
@classmethod
def _trigger_execution_get_next(cls):
ctxt = karbor_context.get_admin_context()
return db.trigger_execution_get_next(ctxt)
def shutdown(self):
self._unregister()
def register_operation(self, operation_id, **kwargs):
if operation_id in self._operation_ids:
msg = (_("The operation_id(%s) is exist") % operation_id)
raise exception.ScheduledOperationExist(msg)
self._operation_ids.add(operation_id)
def unregister_operation(self, operation_id, **kwargs):
self._operation_ids.discard(operation_id)
def update_trigger_property(self, trigger_property):
valid_trigger_property = self.check_trigger_definition(
trigger_property)
if valid_trigger_property == self._trigger_property:
return
timer = self._get_timer(valid_trigger_property)
first_run_time = self._compute_next_run_time(
datetime.utcnow(), valid_trigger_property['end_time'], timer)
if not first_run_time:
msg = (_("The new trigger property is invalid, "
"Can not find the first run time"))
raise exception.InvalidInput(msg)
self._trigger_property = valid_trigger_property
self._trigger_execution_delete(trigger_id=self._id)
self._trigger_execution_new(self._id, first_run_time)
@classmethod
def _trigger_operations(cls, trigger_id, expect_run_time, window):
"""Trigger operations once"""
# The executor execute_operation may have I/O operation.
# If it is, this green thread will be switched out during looping
# operation_ids. In order to avoid changing self._operation_ids
# during the green thread is switched out, copy self._operation_ids
# as the iterative object.
trigger = cls._triggers.get(trigger_id)
if not trigger:
LOG.warning("Can't find trigger: %s" % trigger_id)
return
operations_ids = trigger._operation_ids.copy()
sent_ops = set()
end_time = expect_run_time + timedelta(seconds=window)
for operation_id in operations_ids:
if operation_id not in trigger._operation_ids:
# Maybe, when traversing this operation_id, it has been
# removed by self.unregister_operation
LOG.warning("Execute operation %s which is not exist, "
"ignore it", operation_id)
continue
now = datetime.utcnow()
if now >= end_time:
LOG.error("Can not trigger operations to run. Because it is "
"out of window time. now=%(now)s, "
"end time=%(end_time)s, waiting operations=%(ops)s",
{'now': now, 'end_time': end_time,
'ops': operations_ids - sent_ops})
break
try:
trigger._executor.execute_operation(
operation_id, now, expect_run_time, window)
except Exception:
LOG.exception("Submit operation to executor failed, operation"
" id=%s", operation_id)
sent_ops.add(operation_id)
@classmethod
def check_trigger_definition(cls, trigger_definition):
return utils.check_trigger_definition(trigger_definition)
@classmethod
def _compute_next_run_time(cls, start_time, end_time, timer):
return utils.compute_next_run_time(start_time, end_time, timer)
@classmethod
def _get_timer(cls, trigger_property):
return utils.get_timer(trigger_property)
@classmethod
def check_configuration(cls):
utils.check_configuration()

View File

@ -0,0 +1,123 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from oslo_config import cfg
from oslo_utils import timeutils
import six
from stevedore import driver as import_driver
from karbor import exception
from karbor.i18n import _
CONF = cfg.CONF
def get_time_format_class():
return import_driver.DriverManager(
'karbor.operationengine.engine.timetrigger.time_format',
CONF.time_format).driver
def compute_next_run_time(start_time, end_time, timer):
next_time = timer.compute_next_time(start_time)
if next_time and (not end_time or next_time <= end_time):
return next_time
return None
def check_and_get_datetime(time, time_name):
if not time:
return None
if isinstance(time, datetime):
return time
if not isinstance(time, six.string_types):
msg = (_("The trigger %(name)s(type = %(vtype)s) is "
"not an instance of string") %
{"name": time_name, "vtype": type(time)})
raise exception.InvalidInput(msg)
try:
time = timeutils.parse_strtime(time, fmt='%Y-%m-%d %H:%M:%S')
except Exception:
msg = (_("The format of trigger %s is not correct") % time_name)
raise exception.InvalidInput(msg)
return time
def check_trigger_definition(trigger_definition):
"""Check trigger definition
All the time instances of trigger_definition are in UTC,
including start_time, end_time
"""
tf_cls = get_time_format_class()
pattern = trigger_definition.get("pattern", None)
tf_cls.check_time_format(pattern)
start_time = trigger_definition.get("start_time", None)
if not start_time:
msg = _("The trigger\'s start time is unknown")
raise exception.InvalidInput(msg)
start_time = check_and_get_datetime(start_time, "start_time")
interval = tf_cls(start_time, pattern).get_min_interval()
if interval is not None and interval < CONF.min_interval:
msg = (_("The interval of two adjacent time points "
"is less than %d") % CONF.min_interval)
raise exception.InvalidInput(msg)
window = trigger_definition.get("window", CONF.min_window_time)
if not isinstance(window, int):
try:
window = int(window)
except Exception:
msg = (_("The trigger windows(%s) is not integer") % window)
raise exception.InvalidInput(msg)
if window < CONF.min_window_time or window > CONF.max_window_time:
msg = (_("The trigger windows %(window)d must be between "
"%(min_window)d and %(max_window)d") %
{"window": window,
"min_window": CONF.min_window_time,
"max_window": CONF.max_window_time})
raise exception.InvalidInput(msg)
end_time = trigger_definition.get("end_time", None)
end_time = check_and_get_datetime(end_time, "end_time")
valid_trigger_property = trigger_definition.copy()
valid_trigger_property['window'] = window
valid_trigger_property['start_time'] = start_time
valid_trigger_property['end_time'] = end_time
return valid_trigger_property
def check_configuration():
min_window = CONF.min_window_time
max_window = CONF.max_window_time
min_interval = CONF.min_interval
if not (min_window < max_window and (max_window * 2 <= min_interval)):
msg = (_('Configurations of time trigger are invalid'))
raise exception.InvalidInput(msg)
def get_timer(trigger_property):
tf_cls = get_time_format_class()
timer = tf_cls(trigger_property['start_time'],
trigger_property['pattern'])
return timer

View File

@ -9,28 +9,19 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
from datetime import datetime
from datetime import timedelta
import eventlet
import functools
import heapq
import mock
from oslo_config import cfg
from oslo_utils import uuidutils
from karbor import context as karbor_context
from karbor import exception
from karbor.services.operationengine.engine.triggers.timetrigger \
import time_trigger as tt
from karbor.services.operationengine.engine.triggers.timetrigger.time_trigger \
import TimeTrigger
from karbor.services.operationengine.engine.triggers.timetrigger import utils
from karbor.tests import base
TriggerExecution = namedtuple('TriggerExecution',
['execution_time', 'id', 'trigger_id'])
class FakeTimeFormat(object):
def __init__(self, start_time, pattern):
super(FakeTimeFormat, self).__init__()
@ -58,78 +49,30 @@ class FakeExecutor(object):
self._ops[operation_id] += 1
eventlet.sleep(0.5)
class FakeTimeTrigger(object):
@classmethod
def get_time_format(cls, *args, **kwargs):
return FakeTimeFormat
class FakeDb(object):
def __init__(self):
self._db = []
def trigger_execution_get_next(self, context):
if len(self._db) == 0:
return None
return self._db[0]
def trigger_execution_create(self, context, trigger_id, time):
element = TriggerExecution(time, uuidutils.generate_uuid(), trigger_id)
heapq.heappush(self._db, element)
def trigger_execution_update(self, context, id, current_time, new_time):
for idx, element in enumerate(self._db):
if element.id == id:
if element.execution_time != current_time:
return False
self._db[idx] = TriggerExecution(new_time, element.id,
element.trigger_id)
break
heapq.heapify(self._db)
return True
def trigger_execution_delete(self, context, id, trigger_id):
removed_ids = []
for idx, element in enumerate(self._db):
if (id and element.id == id) or (trigger_id and
element.trigger_id == trigger_id):
removed_ids.append(idx)
for idx in reversed(removed_ids):
self._db.pop(idx)
heapq.heapify(self._db)
return len(removed_ids)
def time_trigger_test(func):
@functools.wraps(func)
@mock.patch.object(tt, 'db', FakeDb())
@mock.patch.object(karbor_context, 'get_admin_context', lambda: None)
@mock.patch.object(tt.TimeTrigger, '_get_time_format_class',
FakeTimeTrigger.get_time_format)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
def clear(self):
self._ops.clear()
class TimeTriggerTestCase(base.TestCase):
_tid = 0
_default_executor = FakeExecutor()
def setUp(self):
super(TimeTriggerTestCase, self).setUp()
self._set_configuration()
mock_obj = mock.Mock()
mock_obj.return_value = FakeTimeFormat
utils.get_time_format_class = mock_obj
self._default_executor = FakeExecutor()
def test_check_configuration(self):
self._set_configuration(10, 20, 30)
self.assertRaisesRegex(exception.InvalidInput,
"Configurations of time trigger are invalid",
tt.TimeTrigger.check_configuration)
TimeTrigger.check_configuration)
self._set_configuration()
@time_trigger_test
def test_check_trigger_property_start_time(self):
trigger_property = {
"pattern": "",
@ -138,23 +81,22 @@ class TimeTriggerTestCase(base.TestCase):
self.assertRaisesRegex(exception.InvalidInput,
"The trigger\'s start time is unknown",
tt.TimeTrigger.check_trigger_definition,
TimeTrigger.check_trigger_definition,
trigger_property)
trigger_property['start_time'] = 'abc'
self.assertRaisesRegex(exception.InvalidInput,
"The format of trigger .* is not correct",
tt.TimeTrigger.check_trigger_definition,
TimeTrigger.check_trigger_definition,
trigger_property)
trigger_property['start_time'] = 123
self.assertRaisesRegex(exception.InvalidInput,
"The trigger .* is not an instance of string",
tt.TimeTrigger.check_trigger_definition,
TimeTrigger.check_trigger_definition,
trigger_property)
@mock.patch.object(FakeTimeFormat, 'get_min_interval')
@time_trigger_test
def test_check_trigger_property_interval(self, get_min_interval):
get_min_interval.return_value = 0
@ -164,10 +106,9 @@ class TimeTriggerTestCase(base.TestCase):
self.assertRaisesRegex(exception.InvalidInput,
"The interval of two adjacent time points .*",
tt.TimeTrigger.check_trigger_definition,
TimeTrigger.check_trigger_definition,
trigger_property)
@time_trigger_test
def test_check_trigger_property_window(self):
trigger_property = {
"window": "abc",
@ -176,16 +117,15 @@ class TimeTriggerTestCase(base.TestCase):
self.assertRaisesRegex(exception.InvalidInput,
"The trigger window.* is not integer",
tt.TimeTrigger.check_trigger_definition,
TimeTrigger.check_trigger_definition,
trigger_property)
trigger_property['window'] = 1000
self.assertRaisesRegex(exception.InvalidInput,
"The trigger windows .* must be between .*",
tt.TimeTrigger.check_trigger_definition,
TimeTrigger.check_trigger_definition,
trigger_property)
@time_trigger_test
def test_check_trigger_property_end_time(self):
trigger_property = {
"window": 15,
@ -195,24 +135,27 @@ class TimeTriggerTestCase(base.TestCase):
self.assertRaisesRegex(exception.InvalidInput,
"The format of trigger .* is not correct",
tt.TimeTrigger.check_trigger_definition,
TimeTrigger.check_trigger_definition,
trigger_property)
@time_trigger_test
def test_register_operation(self):
trigger = self._generate_trigger()
operation_id = "1"
trigger.register_operation(operation_id)
eventlet.sleep(2)
eventlet.sleep(0.3)
self.assertGreaterEqual(self._default_executor._ops[operation_id], 1)
self.assertGreaterEqual(trigger._executor._ops[operation_id], 1)
self.assertRaisesRegex(exception.ScheduledOperationExist,
"The operation_id.* is exist",
trigger.register_operation,
operation_id)
@time_trigger_test
eventlet.sleep(0.3)
self.assertRaises(exception.TriggerIsInvalid,
trigger.register_operation,
"2")
def test_unregister_operation(self):
trigger = self._generate_trigger()
operation_id = "2"
@ -221,9 +164,26 @@ class TimeTriggerTestCase(base.TestCase):
self.assertIn(operation_id, trigger._operation_ids)
trigger.unregister_operation(operation_id)
self.assertNotIn(trigger._id, trigger._operation_ids)
self.assertNotIn(operation_id, trigger._operation_ids)
def test_unregister_operation_when_scheduling(self):
trigger = self._generate_trigger()
for op_id in ['1', '2', '3']:
trigger.register_operation(op_id)
self.assertIn(op_id, trigger._operation_ids)
eventlet.sleep(0.5)
for op_id in ['2', '3']:
trigger.unregister_operation(op_id)
self.assertNotIn(op_id, trigger._operation_ids)
eventlet.sleep(0.6)
self.assertGreaterEqual(trigger._executor._ops['1'], 1)
self.assertTrue(('2' not in trigger._executor._ops) or (
'3' not in trigger._executor._ops))
@time_trigger_test
def test_update_trigger_property(self):
trigger = self._generate_trigger()
@ -239,10 +199,18 @@ class TimeTriggerTestCase(base.TestCase):
trigger.update_trigger_property,
trigger_property)
@time_trigger_test
trigger.register_operation('1')
eventlet.sleep(0.2)
trigger_property['end_time'] = (
datetime.utcnow() + timedelta(seconds=1))
self.assertRaisesRegex(exception.InvalidInput,
".*First run time.* must be after.*",
trigger.update_trigger_property,
trigger_property)
def test_update_trigger_property_success(self):
trigger = self._generate_trigger()
trigger.register_operation('7')
trigger.register_operation('1')
eventlet.sleep(0.2)
trigger_property = {
@ -253,8 +221,12 @@ class TimeTriggerTestCase(base.TestCase):
}
with mock.patch.object(FakeTimeFormat, 'compute_next_time') as c:
c.return_value = datetime.utcnow() + timedelta(seconds=20)
old_id = id(trigger._greenthread)
trigger.update_trigger_property(trigger_property)
self.assertNotEqual(old_id, id(trigger._greenthread))
def _generate_trigger(self, end_time=None):
if not end_time:
end_time = datetime.utcnow() + timedelta(seconds=1)
@ -266,15 +238,11 @@ class TimeTriggerTestCase(base.TestCase):
"end_time": end_time
}
return tt.TimeTrigger(
uuidutils.generate_uuid(),
trigger_property,
self._default_executor,
)
self._default_executor.clear()
return TimeTrigger("123", trigger_property, self._default_executor)
def _set_configuration(self, min_window=15,
max_window=30, min_interval=60, poll_interval=1):
max_window=30, min_interval=60):
self.override_config('min_interval', min_interval)
self.override_config('min_window_time', min_window)
self.override_config('max_window_time', max_window)
self.override_config('trigger_poll_interval', poll_interval)

View File

@ -0,0 +1,281 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
from datetime import datetime
from datetime import timedelta
import eventlet
import functools
import heapq
import mock
from oslo_config import cfg
from oslo_utils import uuidutils
from karbor import context as karbor_context
from karbor import exception
from karbor.services.operationengine.engine.triggers.timetrigger import \
time_trigger_multi_node as tt
from karbor.services.operationengine.engine.triggers.timetrigger import utils
from karbor.tests import base
TriggerExecution = namedtuple('TriggerExecution',
['execution_time', 'id', 'trigger_id'])
class FakeTimeFormat(object):
def __init__(self, start_time, pattern):
super(FakeTimeFormat, self).__init__()
@classmethod
def check_time_format(cls, pattern):
pass
def compute_next_time(self, current_time):
return current_time + timedelta(seconds=0.5)
def get_min_interval(self):
return cfg.CONF.min_interval
class FakeExecutor(object):
def __init__(self):
super(FakeExecutor, self).__init__()
self._ops = {}
def execute_operation(self, operation_id, triggered_time,
expect_start_time, window):
if operation_id not in self._ops:
self._ops[operation_id] = 0
self._ops[operation_id] += 1
eventlet.sleep(0.5)
class FakeTimeTrigger(object):
@classmethod
def get_time_format(cls, *args, **kwargs):
return FakeTimeFormat
class FakeDb(object):
def __init__(self):
self._db = []
def trigger_execution_get_next(self, context):
if len(self._db) == 0:
return None
return self._db[0]
def trigger_execution_create(self, context, trigger_id, time):
element = TriggerExecution(time, uuidutils.generate_uuid(), trigger_id)
heapq.heappush(self._db, element)
def trigger_execution_update(self, context, id, current_time, new_time):
for idx, element in enumerate(self._db):
if element.id == id:
if element.execution_time != current_time:
return False
self._db[idx] = TriggerExecution(new_time, element.id,
element.trigger_id)
break
heapq.heapify(self._db)
return True
def trigger_execution_delete(self, context, id, trigger_id):
removed_ids = []
for idx, element in enumerate(self._db):
if (id and element.id == id) or (trigger_id and
element.trigger_id == trigger_id):
removed_ids.append(idx)
for idx in reversed(removed_ids):
self._db.pop(idx)
heapq.heapify(self._db)
return len(removed_ids)
def time_trigger_test(func):
@functools.wraps(func)
@mock.patch.object(tt, 'db', FakeDb())
@mock.patch.object(karbor_context, 'get_admin_context', lambda: None)
@mock.patch.object(utils, 'get_time_format_class',
FakeTimeTrigger.get_time_format)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
class TimeTriggerTestCase(base.TestCase):
_tid = 0
_default_executor = FakeExecutor()
def setUp(self):
super(TimeTriggerTestCase, self).setUp()
self._set_configuration()
def test_check_configuration(self):
self._set_configuration(10, 20, 30)
self.assertRaisesRegex(exception.InvalidInput,
"Configurations of time trigger are invalid",
tt.TimeTrigger.check_configuration)
self._set_configuration()
@time_trigger_test
def test_check_trigger_property_start_time(self):
trigger_property = {
"pattern": "",
"start_time": ""
}
self.assertRaisesRegex(exception.InvalidInput,
"The trigger\'s start time is unknown",
tt.TimeTrigger.check_trigger_definition,
trigger_property)
trigger_property['start_time'] = 'abc'
self.assertRaisesRegex(exception.InvalidInput,
"The format of trigger .* is not correct",
tt.TimeTrigger.check_trigger_definition,
trigger_property)
trigger_property['start_time'] = 123
self.assertRaisesRegex(exception.InvalidInput,
"The trigger .* is not an instance of string",
tt.TimeTrigger.check_trigger_definition,
trigger_property)
@mock.patch.object(FakeTimeFormat, 'get_min_interval')
@time_trigger_test
def test_check_trigger_property_interval(self, get_min_interval):
get_min_interval.return_value = 0
trigger_property = {
"start_time": '2016-8-18 01:03:04'
}
self.assertRaisesRegex(exception.InvalidInput,
"The interval of two adjacent time points .*",
tt.TimeTrigger.check_trigger_definition,
trigger_property)
@time_trigger_test
def test_check_trigger_property_window(self):
trigger_property = {
"window": "abc",
"start_time": '2016-8-18 01:03:04'
}
self.assertRaisesRegex(exception.InvalidInput,
"The trigger window.* is not integer",
tt.TimeTrigger.check_trigger_definition,
trigger_property)
trigger_property['window'] = 1000
self.assertRaisesRegex(exception.InvalidInput,
"The trigger windows .* must be between .*",
tt.TimeTrigger.check_trigger_definition,
trigger_property)
@time_trigger_test
def test_check_trigger_property_end_time(self):
trigger_property = {
"window": 15,
"start_time": '2016-8-18 01:03:04',
"end_time": "abc"
}
self.assertRaisesRegex(exception.InvalidInput,
"The format of trigger .* is not correct",
tt.TimeTrigger.check_trigger_definition,
trigger_property)
@time_trigger_test
def test_register_operation(self):
trigger = self._generate_trigger()
operation_id = "1"
trigger.register_operation(operation_id)
eventlet.sleep(2)
self.assertGreaterEqual(self._default_executor._ops[operation_id], 1)
self.assertRaisesRegex(exception.ScheduledOperationExist,
"The operation_id.* is exist",
trigger.register_operation,
operation_id)
@time_trigger_test
def test_unregister_operation(self):
trigger = self._generate_trigger()
operation_id = "2"
trigger.register_operation(operation_id)
self.assertIn(operation_id, trigger._operation_ids)
trigger.unregister_operation(operation_id)
self.assertNotIn(trigger._id, trigger._operation_ids)
@time_trigger_test
def test_update_trigger_property(self):
trigger = self._generate_trigger()
trigger_property = {
"pattern": "",
"window": 15,
"start_time": datetime.utcnow(),
"end_time": datetime.utcnow()
}
self.assertRaisesRegex(exception.InvalidInput,
".*Can not find the first run tim",
trigger.update_trigger_property,
trigger_property)
@time_trigger_test
def test_update_trigger_property_success(self):
trigger = self._generate_trigger()
trigger.register_operation('7')
eventlet.sleep(0.2)
trigger_property = {
"pattern": "",
"window": 15,
"start_time": datetime.utcnow(),
"end_time": ''
}
with mock.patch.object(FakeTimeFormat, 'compute_next_time') as c:
c.return_value = datetime.utcnow() + timedelta(seconds=20)
trigger.update_trigger_property(trigger_property)
def _generate_trigger(self, end_time=None):
if not end_time:
end_time = datetime.utcnow() + timedelta(seconds=1)
trigger_property = {
"pattern": "",
"window": 15,
"start_time": datetime.utcnow(),
"end_time": end_time
}
return tt.TimeTrigger(
uuidutils.generate_uuid(),
trigger_property,
self._default_executor,
)
def _set_configuration(self, min_window=15,
max_window=30, min_interval=60, poll_interval=1):
self.override_config('min_interval', min_interval)
self.override_config('min_window_time', min_window)
self.override_config('max_window_time', max_window)
self.override_config('trigger_poll_interval', poll_interval)