Operation Engine: Support Trigger Multi Node
In multi operation engine nodes scenario, each trigger must be handled by one node. If a trigger is raised in more than 1 node, the same operation will start execution in the same time. Create TriggerExecutions table in database, which will be used to sync the next trigger executions. Each node tries to atomically update the next execution time, and only a node which succeeds can move forward on handling the actual trigger. Change-Id: I8f276f4612e8b3a0dc86bc0d300e55c40ee9c0f2
This commit is contained in:
parent
e3a9dd30dc
commit
c8af5d225c
|
@ -192,6 +192,25 @@ def trigger_get_all_by_filters_sort(context, filters, limit=None,
|
|||
###################
|
||||
|
||||
|
||||
def trigger_execution_create(context, trigger_id, time):
|
||||
return IMPL.trigger_execution_create(context, trigger_id, time)
|
||||
|
||||
|
||||
def trigger_execution_get_next(context):
|
||||
return IMPL.trigger_execution_get_next(context)
|
||||
|
||||
|
||||
def trigger_execution_delete(context, id, trigger_id):
|
||||
return IMPL.trigger_execution_delete(context, id, trigger_id)
|
||||
|
||||
|
||||
def trigger_execution_update(context, id, current_time, new_time):
|
||||
return IMPL.trigger_execution_update(context, id, current_time, new_time)
|
||||
|
||||
|
||||
###################
|
||||
|
||||
|
||||
def scheduled_operation_get(context, id, columns_to_join=[]):
|
||||
"""Get a scheduled operation by its id.
|
||||
|
||||
|
|
|
@ -445,6 +445,109 @@ def trigger_get_all_by_filters_sort(context, filters, limit=None, marker=None,
|
|||
###################
|
||||
|
||||
|
||||
def _trigger_execution_list_query(context, session, **kwargs):
|
||||
return model_query(context, models.TriggerExecution, session=session)
|
||||
|
||||
|
||||
def _trigger_execution_list_process_filters(query, filters):
|
||||
exact_match_filter_names = ['id', 'trigger_id', 'execution_time']
|
||||
query = _list_common_process_exact_filter(models.Trigger, query, filters,
|
||||
exact_match_filter_names)
|
||||
return query
|
||||
|
||||
|
||||
def _trigger_execution_get(context, id, session=None):
|
||||
result = model_query(context, models.TriggerExecution,
|
||||
session=session).filter_by(id=id)
|
||||
result = result.first()
|
||||
|
||||
if not result:
|
||||
raise exception.TriggerNotFound(id=id)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def trigger_execution_update(context, id, old_time, new_time):
|
||||
session = get_session()
|
||||
try:
|
||||
with session.begin():
|
||||
result = model_query(
|
||||
context, models.TriggerExecution, session=session
|
||||
).filter_by(
|
||||
id=id, execution_time=old_time
|
||||
).update({"execution_time": new_time})
|
||||
except Exception as e:
|
||||
LOG.warning("Unable to update trigger execution (%(execution)s): "
|
||||
"%(exc)s",
|
||||
{"execution": id, "exc": e})
|
||||
return False
|
||||
else:
|
||||
LOG.debug("Updated trigger execution (%(execution)s) from %(old_time)s"
|
||||
" to %(new_time)s",
|
||||
{"execution": id, "old_time": old_time, "new_time": new_time}
|
||||
)
|
||||
return result == 1
|
||||
|
||||
|
||||
def trigger_execution_create(context, trigger_id, time):
|
||||
trigger_ex_ref = models.TriggerExecution()
|
||||
trigger_ex_ref.update({
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'trigger_id': trigger_id,
|
||||
'execution_time': time,
|
||||
})
|
||||
trigger_ex_ref.save(get_session())
|
||||
return trigger_ex_ref
|
||||
|
||||
|
||||
def trigger_execution_delete(context, id, trigger_id):
|
||||
filters = {}
|
||||
if id:
|
||||
filters['id'] = id
|
||||
if trigger_id:
|
||||
filters['trigger_id'] = trigger_id
|
||||
|
||||
session = get_session()
|
||||
try:
|
||||
with session.begin():
|
||||
deleted = model_query(
|
||||
context, models.TriggerExecution, session=session
|
||||
).filter_by(**filters).delete()
|
||||
except Exception as e:
|
||||
LOG.warning("Unable to delete trigger (%(trigger)s) execution "
|
||||
"(%(execution)s): %(exc)s",
|
||||
{"trigger": trigger_id, "execution": id, "exc": e})
|
||||
return False
|
||||
else:
|
||||
LOG.debug("Deleted trigger (%(trigger)s) execution (%(execution)s)",
|
||||
{"trigger": trigger_id, "execution": id})
|
||||
return deleted == 1
|
||||
|
||||
|
||||
def trigger_execution_get_next(context):
|
||||
session = get_session()
|
||||
try:
|
||||
with session.begin():
|
||||
query = _generate_paginate_query(
|
||||
context, session,
|
||||
marker=None,
|
||||
limit=1,
|
||||
sort_keys=('execution_time', ),
|
||||
sort_dirs=('asc', ),
|
||||
filters=None,
|
||||
paginate_type=models.TriggerExecution,
|
||||
)
|
||||
result = query.first()
|
||||
except Exception as e:
|
||||
LOG.warning("Unable to get next trigger execution %s", e)
|
||||
return None
|
||||
else:
|
||||
return result
|
||||
|
||||
|
||||
###################
|
||||
|
||||
|
||||
def scheduled_operation_get(context, id, columns_to_join=[]):
|
||||
return _scheduled_operation_get(context, id,
|
||||
columns_to_join=columns_to_join)
|
||||
|
@ -1484,6 +1587,9 @@ PAGINATION_HELPERS = {
|
|||
_restore_get),
|
||||
models.Trigger: (_trigger_list_query, _trigger_list_process_filters,
|
||||
_trigger_get),
|
||||
models.TriggerExecution: (_trigger_execution_list_query,
|
||||
_trigger_execution_list_process_filters,
|
||||
_trigger_execution_get),
|
||||
models.ScheduledOperation: (_scheduled_operation_list_query,
|
||||
_scheduled_operation_list_process_filters,
|
||||
_scheduled_operation_get),
|
||||
|
|
|
@ -121,6 +121,19 @@ def define_tables(meta):
|
|||
mysql_engine='InnoDB'
|
||||
)
|
||||
|
||||
trigger_executions = Table(
|
||||
'trigger_executions', meta,
|
||||
Column('created_at', DateTime),
|
||||
Column('updated_at', DateTime),
|
||||
Column('deleted_at', DateTime),
|
||||
Column('deleted', Boolean, nullable=False),
|
||||
Column('id', String(length=36), primary_key=True, nullable=False),
|
||||
Column('trigger_id', String(length=36), unique=True, nullable=False,
|
||||
index=True),
|
||||
Column('execution_time', DateTime, nullable=False, index=True),
|
||||
mysql_engine='InnoDB'
|
||||
)
|
||||
|
||||
scheduled_operations = Table(
|
||||
'scheduled_operations', meta,
|
||||
Column('created_at', DateTime),
|
||||
|
@ -206,6 +219,7 @@ def define_tables(meta):
|
|||
restores,
|
||||
operation_logs,
|
||||
triggers,
|
||||
trigger_executions,
|
||||
scheduled_operations,
|
||||
scheduled_operation_states,
|
||||
scheduled_operation_logs,
|
||||
|
|
|
@ -74,6 +74,16 @@ class Trigger(BASE, KarborBase):
|
|||
properties = Column(Text, nullable=False)
|
||||
|
||||
|
||||
class TriggerExecution(BASE, KarborBase):
|
||||
"""Represents a future trigger execition"""
|
||||
|
||||
__tablename__ = 'trigger_executions'
|
||||
|
||||
id = Column(String(36), primary_key=True, nullable=False)
|
||||
trigger_id = Column(String(36), unique=True, nullable=False, index=True)
|
||||
execution_time = Column(DateTime, nullable=False, index=True)
|
||||
|
||||
|
||||
class ScheduledOperation(BASE, KarborBase):
|
||||
"""Represents a scheduled operation."""
|
||||
|
||||
|
@ -235,6 +245,7 @@ def register_models():
|
|||
Plan,
|
||||
Resource,
|
||||
Trigger,
|
||||
TriggerExecution,
|
||||
ScheduledOperation,
|
||||
ScheduledOperationState,
|
||||
ScheduledOperationLog,
|
||||
|
|
|
@ -12,14 +12,15 @@
|
|||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
import eventlet
|
||||
import functools
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
from stevedore import driver as import_driver
|
||||
|
||||
from karbor import context as karbor_context
|
||||
from karbor import db
|
||||
from karbor import exception
|
||||
from karbor.i18n import _
|
||||
from karbor.services.operationengine.engine import triggers
|
||||
|
@ -41,7 +42,11 @@ time_trigger_opts = [
|
|||
cfg.StrOpt('time_format',
|
||||
default='calendar',
|
||||
choices=['crontab', 'calendar'],
|
||||
help='The type of time format which is used to compute time')
|
||||
help='The type of time format which is used to compute time'),
|
||||
cfg.IntOpt('trigger_poll_interval',
|
||||
default=15,
|
||||
help='Interval, in seconds, in which Karbor will poll for '
|
||||
'trigger events'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -49,67 +54,10 @@ CONF.register_opts(time_trigger_opts)
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TriggerOperationGreenThread(object):
|
||||
def __init__(self, first_run_time, function):
|
||||
super(TriggerOperationGreenThread, self).__init__()
|
||||
self._is_sleeping = True
|
||||
self._pre_run_time = None
|
||||
self._running = False
|
||||
self._thread = None
|
||||
|
||||
self._function = function
|
||||
|
||||
self._start(first_run_time)
|
||||
|
||||
def kill(self):
|
||||
self._running = False
|
||||
if self._is_sleeping:
|
||||
self._thread.kill()
|
||||
|
||||
@property
|
||||
def running(self):
|
||||
return self._running
|
||||
|
||||
@property
|
||||
def pre_run_time(self):
|
||||
return self._pre_run_time
|
||||
|
||||
def _start(self, first_run_time):
|
||||
self._running = True
|
||||
|
||||
now = datetime.utcnow()
|
||||
initial_delay = 0 if first_run_time <= now else (
|
||||
int(timeutils.delta_seconds(now, first_run_time)))
|
||||
|
||||
self._thread = eventlet.spawn_after(
|
||||
initial_delay, self._run, first_run_time)
|
||||
self._thread.link(self._on_done)
|
||||
|
||||
def _on_done(self, gt, *args, **kwargs):
|
||||
self._is_sleeping = True
|
||||
self._pre_run_time = None
|
||||
self._running = False
|
||||
self._thread = None
|
||||
|
||||
def _run(self, expect_run_time):
|
||||
while self._running:
|
||||
self._is_sleeping = False
|
||||
self._pre_run_time = expect_run_time
|
||||
|
||||
expect_run_time = self._function(expect_run_time)
|
||||
if expect_run_time is None or not self._running:
|
||||
break
|
||||
|
||||
self._is_sleeping = True
|
||||
|
||||
now = datetime.utcnow()
|
||||
idle_time = 0 if expect_run_time <= now else int(
|
||||
timeutils.delta_seconds(now, expect_run_time))
|
||||
eventlet.sleep(idle_time)
|
||||
|
||||
|
||||
class TimeTrigger(triggers.BaseTrigger):
|
||||
TRIGGER_TYPE = "time"
|
||||
_loopingcall = None
|
||||
_triggers = {}
|
||||
|
||||
def __init__(self, trigger_id, trigger_property, executor):
|
||||
super(TimeTrigger, self).__init__(
|
||||
|
@ -118,30 +66,136 @@ class TimeTrigger(triggers.BaseTrigger):
|
|||
self._trigger_property = self.check_trigger_definition(
|
||||
trigger_property)
|
||||
|
||||
self._greenthread = None
|
||||
timer = self._get_timer(self._trigger_property)
|
||||
first_run_time = self._compute_next_run_time(
|
||||
datetime.utcnow(), self._trigger_property['end_time'], timer)
|
||||
LOG.debug("first_run_time: %s", first_run_time)
|
||||
|
||||
self._trigger_execution_new(self._id, first_run_time)
|
||||
|
||||
if not self.__class__._loopingcall:
|
||||
self.__class__._loopingcall = loopingcall.FixedIntervalLoopingCall(
|
||||
self._loop)
|
||||
self.__class__._loopingcall.start(
|
||||
interval=CONF.trigger_poll_interval,
|
||||
stop_on_exception=False,
|
||||
)
|
||||
|
||||
self._register()
|
||||
|
||||
def _register(self):
|
||||
self.__class__._triggers[self._id] = self
|
||||
|
||||
def _unregister(self):
|
||||
del self.__class__._triggers[self._id]
|
||||
|
||||
@classmethod
|
||||
def _loop(cls):
|
||||
while True:
|
||||
now = datetime.utcnow()
|
||||
exec_to_handle = cls._trigger_execution_get_next()
|
||||
if not exec_to_handle:
|
||||
LOG.debug("No next trigger executions")
|
||||
break
|
||||
|
||||
trigger_id = exec_to_handle.trigger_id
|
||||
execution_time = exec_to_handle.execution_time
|
||||
trigger = cls._triggers.get(trigger_id)
|
||||
if not trigger:
|
||||
LOG.warning("Unable to find trigger %s", trigger_id)
|
||||
res = cls._trigger_execution_delete(
|
||||
execution_id=exec_to_handle.id)
|
||||
continue
|
||||
|
||||
if now < execution_time:
|
||||
LOG.debug("Time trigger not yet due")
|
||||
break
|
||||
|
||||
trigger_property = trigger._trigger_property
|
||||
timer = cls._get_timer(trigger_property)
|
||||
window = trigger_property.get("window")
|
||||
end_time_to_run = execution_time + timedelta(
|
||||
seconds=window)
|
||||
|
||||
if now > end_time_to_run:
|
||||
LOG.debug("Time trigger (%s) out of window",)
|
||||
execute = False
|
||||
else:
|
||||
LOG.debug("Time trigger (%s) is due", trigger_id)
|
||||
execute = True
|
||||
|
||||
next_exec_time = cls._compute_next_run_time(
|
||||
now,
|
||||
trigger_property['end_time'],
|
||||
timer,
|
||||
)
|
||||
res = False
|
||||
if not next_exec_time:
|
||||
LOG.debug("No more planned executions for trigger (%s)",
|
||||
trigger_id)
|
||||
res = cls._trigger_execution_delete(
|
||||
execution_id=exec_to_handle.id)
|
||||
else:
|
||||
LOG.debug("Rescheduling (%s) from %s to %s",
|
||||
trigger_id,
|
||||
execution_time,
|
||||
next_exec_time)
|
||||
res = cls._trigger_execution_update(
|
||||
exec_to_handle.id,
|
||||
execution_time,
|
||||
next_exec_time,
|
||||
)
|
||||
|
||||
if not res:
|
||||
LOG.info("Trigger probably handled by another node")
|
||||
continue
|
||||
|
||||
if execute:
|
||||
cls._trigger_operations(trigger_id, execution_time, window)
|
||||
|
||||
@classmethod
|
||||
def _trigger_execution_new(cls, trigger_id, time):
|
||||
# Find the first time.
|
||||
# We don't known when using this trigger first time.
|
||||
ctxt = karbor_context.get_admin_context()
|
||||
try:
|
||||
db.trigger_execution_create(ctxt, trigger_id, time)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def _trigger_execution_update(cls, id, current_time, next_time):
|
||||
ctxt = karbor_context.get_admin_context()
|
||||
return db.trigger_execution_update(ctxt, id, current_time, next_time)
|
||||
|
||||
@classmethod
|
||||
def _trigger_execution_delete(cls, execution_id=None, trigger_id=None):
|
||||
if execution_id is None and trigger_id is None:
|
||||
raise exception.InvalidParameterValue('supply at least one id')
|
||||
|
||||
ctxt = karbor_context.get_admin_context()
|
||||
num_deleted = db.trigger_execution_delete(ctxt, execution_id,
|
||||
trigger_id)
|
||||
return num_deleted > 0
|
||||
|
||||
@classmethod
|
||||
def _trigger_execution_get_next(cls):
|
||||
ctxt = karbor_context.get_admin_context()
|
||||
return db.trigger_execution_get_next(ctxt)
|
||||
|
||||
def shutdown(self):
|
||||
self._kill_greenthread()
|
||||
self._unregister()
|
||||
|
||||
def register_operation(self, operation_id, **kwargs):
|
||||
if operation_id in self._operation_ids:
|
||||
msg = (_("The operation_id(%s) is exist") % operation_id)
|
||||
raise exception.ScheduledOperationExist(msg)
|
||||
|
||||
if self._greenthread and not self._greenthread.running:
|
||||
raise exception.TriggerIsInvalid(trigger_id=self._id)
|
||||
|
||||
self._operation_ids.add(operation_id)
|
||||
if self._greenthread is None:
|
||||
self._start_greenthread()
|
||||
|
||||
def unregister_operation(self, operation_id, **kwargs):
|
||||
if operation_id not in self._operation_ids:
|
||||
return
|
||||
|
||||
self._operation_ids.remove(operation_id)
|
||||
if 0 == len(self._operation_ids):
|
||||
self._kill_greenthread()
|
||||
self._operation_ids.discard(operation_id)
|
||||
|
||||
def update_trigger_property(self, trigger_property):
|
||||
valid_trigger_property = self.check_trigger_definition(
|
||||
|
@ -150,82 +204,38 @@ class TimeTrigger(triggers.BaseTrigger):
|
|||
if valid_trigger_property == self._trigger_property:
|
||||
return
|
||||
|
||||
timer, first_run_time = self._get_timer_and_first_run_time(
|
||||
valid_trigger_property)
|
||||
timer = self._get_timer(valid_trigger_property)
|
||||
first_run_time = self._compute_next_run_time(
|
||||
datetime.utcnow(), valid_trigger_property['end_time'], timer)
|
||||
|
||||
if not first_run_time:
|
||||
msg = (_("The new trigger property is invalid, "
|
||||
"Can not find the first run time"))
|
||||
raise exception.InvalidInput(msg)
|
||||
|
||||
if self._greenthread is not None:
|
||||
pre_run_time = self._greenthread.pre_run_time
|
||||
if pre_run_time:
|
||||
end_time = pre_run_time + timedelta(
|
||||
seconds=self._trigger_property['window'])
|
||||
if first_run_time <= end_time:
|
||||
msg = (_("The new trigger property is invalid, "
|
||||
"First run time%(t1)s must be after %(t2)s") %
|
||||
{'t1': first_run_time, 't2': end_time})
|
||||
raise exception.InvalidInput(msg)
|
||||
|
||||
self._trigger_property = valid_trigger_property
|
||||
self._trigger_execution_delete(trigger_id=self._id)
|
||||
self._trigger_execution_new(self._id, first_run_time)
|
||||
|
||||
if len(self._operation_ids) > 0:
|
||||
# Restart greenthread to take the change of trigger property
|
||||
# effect immediately
|
||||
self._kill_greenthread()
|
||||
self._create_green_thread(first_run_time, timer)
|
||||
@classmethod
|
||||
def _trigger_operations(cls, trigger_id, expect_run_time, window):
|
||||
"""Trigger operations once"""
|
||||
|
||||
def _kill_greenthread(self):
|
||||
if self._greenthread:
|
||||
self._greenthread.kill()
|
||||
self._greenthread = None
|
||||
|
||||
def _start_greenthread(self):
|
||||
# Find the first time.
|
||||
# We don't known when using this trigger first time.
|
||||
timer, first_run_time = self._get_timer_and_first_run_time(
|
||||
self._trigger_property)
|
||||
if not first_run_time:
|
||||
raise exception.TriggerIsInvalid(trigger_id=self._id)
|
||||
|
||||
self._create_green_thread(first_run_time, timer)
|
||||
|
||||
def _create_green_thread(self, first_run_time, timer):
|
||||
func = functools.partial(
|
||||
self._trigger_operations,
|
||||
trigger_property=self._trigger_property.copy(),
|
||||
timer=timer)
|
||||
|
||||
self._greenthread = TriggerOperationGreenThread(
|
||||
first_run_time, func)
|
||||
|
||||
def _trigger_operations(self, expect_run_time, trigger_property, timer):
|
||||
"""Trigger operations once
|
||||
|
||||
returns: wait time for next run
|
||||
"""
|
||||
|
||||
# Just for robustness, actually expect_run_time always <= now
|
||||
# but, if the scheduling of eventlet is not accurate, then we
|
||||
# can do some adjustments.
|
||||
entry_time = datetime.utcnow()
|
||||
if entry_time < expect_run_time and (
|
||||
int(timeutils.delta_seconds(entry_time, expect_run_time)) > 0):
|
||||
return expect_run_time
|
||||
|
||||
# The self._executor.execute_operation may have I/O operation.
|
||||
# The executor execute_operation may have I/O operation.
|
||||
# If it is, this green thread will be switched out during looping
|
||||
# operation_ids. In order to avoid changing self._operation_ids
|
||||
# during the green thread is switched out, copy self._operation_ids
|
||||
# as the iterative object.
|
||||
operation_ids = self._operation_ids.copy()
|
||||
trigger = cls._triggers.get(trigger_id)
|
||||
if not trigger:
|
||||
LOG.warning("Can't find trigger: %s" % trigger_id)
|
||||
return
|
||||
operations_ids = trigger._operation_ids.copy()
|
||||
sent_ops = set()
|
||||
window = trigger_property.get("window")
|
||||
end_time = expect_run_time + timedelta(seconds=window)
|
||||
|
||||
for operation_id in operation_ids:
|
||||
if operation_id not in self._operation_ids:
|
||||
for operation_id in operations_ids:
|
||||
if operation_id not in trigger._operation_ids:
|
||||
# Maybe, when traversing this operation_id, it has been
|
||||
# removed by self.unregister_operation
|
||||
LOG.warning("Execute operation %s which is not exist, "
|
||||
|
@ -236,15 +246,13 @@ class TimeTrigger(triggers.BaseTrigger):
|
|||
if now >= end_time:
|
||||
LOG.error("Can not trigger operations to run. Because it is "
|
||||
"out of window time. now=%(now)s, "
|
||||
"end time=%(end_time)s, expect run time=%(expect)s,"
|
||||
" wating operations=%(ops)s",
|
||||
"end time=%(end_time)s, waiting operations=%(ops)s",
|
||||
{'now': now, 'end_time': end_time,
|
||||
'expect': expect_run_time,
|
||||
'ops': operation_ids - sent_ops})
|
||||
'ops': operations_ids - sent_ops})
|
||||
break
|
||||
|
||||
try:
|
||||
self._executor.execute_operation(
|
||||
trigger._executor.execute_operation(
|
||||
operation_id, now, expect_run_time, window)
|
||||
except Exception:
|
||||
LOG.exception("Submit operation to executor failed, operation"
|
||||
|
@ -252,18 +260,6 @@ class TimeTrigger(triggers.BaseTrigger):
|
|||
|
||||
sent_ops.add(operation_id)
|
||||
|
||||
next_time = self._compute_next_run_time(
|
||||
expect_run_time, trigger_property['end_time'], timer)
|
||||
now = datetime.utcnow()
|
||||
if next_time and next_time <= now:
|
||||
LOG.error("Next run time:%(next_time)s <= now:%(now)s. Maybe the "
|
||||
"entry time=%(entry)s is too late, even exceeds the end"
|
||||
" time of window=%(end)s, or it was blocked where "
|
||||
"sending the operation to executor.",
|
||||
{'next_time': next_time, 'now': now,
|
||||
'entry': entry_time, 'end': end_time})
|
||||
return next_time
|
||||
|
||||
@classmethod
|
||||
def check_trigger_definition(cls, trigger_definition):
|
||||
"""Check trigger definition
|
||||
|
@ -350,14 +346,11 @@ class TimeTrigger(triggers.BaseTrigger):
|
|||
CONF.time_format).driver
|
||||
|
||||
@classmethod
|
||||
def _get_timer_and_first_run_time(cls, trigger_property):
|
||||
def _get_timer(cls, trigger_property):
|
||||
tf_cls = cls._get_time_format_class()
|
||||
timer = tf_cls(trigger_property['start_time'],
|
||||
trigger_property['pattern'])
|
||||
first_run_time = cls._compute_next_run_time(
|
||||
datetime.utcnow(), trigger_property['end_time'], timer)
|
||||
|
||||
return timer, first_run_time
|
||||
return timer
|
||||
|
||||
@classmethod
|
||||
def check_configuration(cls):
|
||||
|
|
|
@ -221,15 +221,19 @@ class OperationEngineManager(manager.Manager):
|
|||
|
||||
@messaging.expected_exceptions(exception.InvalidInput)
|
||||
def create_trigger(self, context, trigger):
|
||||
LOG.debug('Creating trigger (id: "%s" type: "%s")',
|
||||
trigger.id, trigger.type)
|
||||
self.trigger_manager.add_trigger(trigger.id, trigger.type,
|
||||
trigger.properties)
|
||||
|
||||
@messaging.expected_exceptions(exception.TriggerNotFound,
|
||||
exception.DeleteTriggerNotAllowed)
|
||||
def delete_trigger(self, context, trigger_id):
|
||||
LOG.debug('Deleting trigger (id: "%s")', trigger_id)
|
||||
self.trigger_manager.remove_trigger(trigger_id)
|
||||
|
||||
@messaging.expected_exceptions(exception.TriggerNotFound,
|
||||
exception.InvalidInput)
|
||||
def update_trigger(self, context, trigger):
|
||||
LOG.debug('Updating trigger (id: "%s")', trigger.id)
|
||||
self.trigger_manager.update_trigger(trigger.id, trigger.properties)
|
||||
|
|
|
@ -64,10 +64,13 @@ class OperationEngineAPI(object):
|
|||
trigger_id=trigger_id)
|
||||
|
||||
def create_trigger(self, ctxt, trigger):
|
||||
return self._client.call(ctxt, 'create_trigger', trigger=trigger)
|
||||
self._client.prepare(fanout=True).cast(ctxt, 'create_trigger',
|
||||
trigger=trigger)
|
||||
|
||||
def delete_trigger(self, ctxt, trigger_id):
|
||||
return self._client.call(ctxt, 'delete_trigger', trigger_id=trigger_id)
|
||||
self._client.prepare(fanout=True).cast(ctxt, 'delete_trigger',
|
||||
trigger_id=trigger_id)
|
||||
|
||||
def update_trigger(self, ctxt, trigger):
|
||||
return self._client.call(ctxt, 'update_trigger', trigger=trigger)
|
||||
self._client.prepare(fanout=True).cast(ctxt, 'update_trigger',
|
||||
trigger=trigger)
|
||||
|
|
|
@ -109,12 +109,14 @@ class ScheduledOperationsTest(karbor_base.KarborBaseTest):
|
|||
|
||||
def test_scheduled_operations_create_and_scheduled(self):
|
||||
freq = 2
|
||||
eventlet_grace = 20
|
||||
pattern = "BEGIN:VEVENT\nRRULE:FREQ=MINUTELY;INTERVAL=5;\nEND:VEVENT"
|
||||
cur_property = {'pattern': pattern, 'format': 'calendar'}
|
||||
|
||||
operation = self.store(self._create_for_volume(cur_property))
|
||||
start_time = datetime.now().replace(microsecond=0)
|
||||
sleep_time = self._wait_timestamp(pattern, start_time, freq)
|
||||
sleep_time += eventlet_grace
|
||||
self.assertNotEqual(0, sleep_time)
|
||||
eventlet.sleep(sleep_time)
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
# under the License.
|
||||
|
||||
|
||||
from datetime import datetime
|
||||
from karbor.tests.fullstack import karbor_base
|
||||
from karbor.tests.fullstack import karbor_objects as objects
|
||||
|
||||
|
@ -40,6 +41,29 @@ class TriggersTest(karbor_base.KarborBaseTest):
|
|||
trigger = self.karbor_client.triggers.get(trigger.id)
|
||||
self.assertEqual(trigger_name, trigger.name)
|
||||
|
||||
def test_triggers_update(self):
|
||||
trigger_name = "FullStack Trigger Test Update"
|
||||
pattern1 = "BEGIN:VEVENT\nRRULE:FREQ=WEEKLY;INTERVAL=1;\nEND:VEVENT"
|
||||
pattern2 = "BEGIN:VEVENT\nRRULE:FREQ=DAILY;INTERVAL=1;\nEND:VEVENT"
|
||||
trigger = self.store(objects.Trigger())
|
||||
trigger.create('time', {'pattern': pattern1, 'format': 'calendar'},
|
||||
name=trigger_name)
|
||||
properties = {
|
||||
'properties': {
|
||||
'pattern': pattern2,
|
||||
'format': 'calendar',
|
||||
'start_time': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
|
||||
}
|
||||
}
|
||||
|
||||
self.karbor_client.triggers.update(
|
||||
trigger.id,
|
||||
properties,
|
||||
)
|
||||
|
||||
trigger = self.karbor_client.triggers.get(trigger.id)
|
||||
self.assertEqual(trigger.properties['pattern'], pattern2)
|
||||
|
||||
def test_triggers_delete(self):
|
||||
pattern = "BEGIN:VEVENT\nRRULE:FREQ=WEEKLY;INTERVAL=1;\nEND:VEVENT"
|
||||
trigger = objects.Trigger()
|
||||
|
|
|
@ -43,3 +43,4 @@ def set_defaults(conf):
|
|||
conf.set_default('username', 'karbor', group='trustee')
|
||||
conf.set_default('password', 'password', group='trustee')
|
||||
conf.set_default('user_domain_id', 'default', group='trustee')
|
||||
conf.set_default('trigger_poll_interval', 1)
|
||||
|
|
|
@ -10,18 +10,27 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from collections import namedtuple
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
import eventlet
|
||||
import functools
|
||||
import heapq
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from karbor import context as karbor_context
|
||||
from karbor import exception
|
||||
from karbor.services.operationengine.engine.triggers.timetrigger.time_trigger \
|
||||
import TimeTrigger
|
||||
from karbor.services.operationengine.engine.triggers.timetrigger \
|
||||
import time_trigger as tt
|
||||
from karbor.tests import base
|
||||
|
||||
|
||||
TriggerExecution = namedtuple('TriggerExecution',
|
||||
['execution_time', 'id', 'trigger_id'])
|
||||
|
||||
|
||||
class FakeTimeFormat(object):
|
||||
def __init__(self, start_time, pattern):
|
||||
super(FakeTimeFormat, self).__init__()
|
||||
|
@ -49,30 +58,78 @@ class FakeExecutor(object):
|
|||
self._ops[operation_id] += 1
|
||||
eventlet.sleep(0.5)
|
||||
|
||||
def clear(self):
|
||||
self._ops.clear()
|
||||
|
||||
class FakeTimeTrigger(object):
|
||||
@classmethod
|
||||
def get_time_format(cls, *args, **kwargs):
|
||||
return FakeTimeFormat
|
||||
|
||||
|
||||
class FakeDb(object):
|
||||
def __init__(self):
|
||||
self._db = []
|
||||
|
||||
def trigger_execution_get_next(self, context):
|
||||
if len(self._db) == 0:
|
||||
return None
|
||||
return self._db[0]
|
||||
|
||||
def trigger_execution_create(self, context, trigger_id, time):
|
||||
element = TriggerExecution(time, uuidutils.generate_uuid(), trigger_id)
|
||||
heapq.heappush(self._db, element)
|
||||
|
||||
def trigger_execution_update(self, context, id, current_time, new_time):
|
||||
for idx, element in enumerate(self._db):
|
||||
if element.id == id:
|
||||
if element.execution_time != current_time:
|
||||
return False
|
||||
self._db[idx] = TriggerExecution(new_time, element.id,
|
||||
element.trigger_id)
|
||||
break
|
||||
heapq.heapify(self._db)
|
||||
return True
|
||||
|
||||
def trigger_execution_delete(self, context, id, trigger_id):
|
||||
removed_ids = []
|
||||
for idx, element in enumerate(self._db):
|
||||
if (id and element.id == id) or (trigger_id and
|
||||
element.trigger_id == trigger_id):
|
||||
removed_ids.append(idx)
|
||||
|
||||
for idx in reversed(removed_ids):
|
||||
self._db.pop(idx)
|
||||
heapq.heapify(self._db)
|
||||
return len(removed_ids)
|
||||
|
||||
|
||||
def time_trigger_test(func):
|
||||
@functools.wraps(func)
|
||||
@mock.patch.object(tt, 'db', FakeDb())
|
||||
@mock.patch.object(karbor_context, 'get_admin_context', lambda: None)
|
||||
@mock.patch.object(tt.TimeTrigger, '_get_time_format_class',
|
||||
FakeTimeTrigger.get_time_format)
|
||||
def wrapper(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class TimeTriggerTestCase(base.TestCase):
|
||||
_tid = 0
|
||||
_default_executor = FakeExecutor()
|
||||
|
||||
def setUp(self):
|
||||
super(TimeTriggerTestCase, self).setUp()
|
||||
|
||||
self._set_configuration()
|
||||
|
||||
mock_obj = mock.Mock()
|
||||
mock_obj.return_value = FakeTimeFormat
|
||||
TimeTrigger._get_time_format_class = mock_obj
|
||||
|
||||
self._default_executor = FakeExecutor()
|
||||
|
||||
def test_check_configuration(self):
|
||||
self._set_configuration(10, 20, 30)
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"Configurations of time trigger are invalid",
|
||||
TimeTrigger.check_configuration)
|
||||
tt.TimeTrigger.check_configuration)
|
||||
self._set_configuration()
|
||||
|
||||
@time_trigger_test
|
||||
def test_check_trigger_property_start_time(self):
|
||||
trigger_property = {
|
||||
"pattern": "",
|
||||
|
@ -81,22 +138,23 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The trigger\'s start time is unknown",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
trigger_property['start_time'] = 'abc'
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The format of trigger .* is not correct",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
trigger_property['start_time'] = 123
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The trigger .* is not an instance of string",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
@mock.patch.object(FakeTimeFormat, 'get_min_interval')
|
||||
@time_trigger_test
|
||||
def test_check_trigger_property_interval(self, get_min_interval):
|
||||
get_min_interval.return_value = 0
|
||||
|
||||
|
@ -106,9 +164,10 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The interval of two adjacent time points .*",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
@time_trigger_test
|
||||
def test_check_trigger_property_window(self):
|
||||
trigger_property = {
|
||||
"window": "abc",
|
||||
|
@ -117,15 +176,16 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The trigger window.* is not integer",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
trigger_property['window'] = 1000
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The trigger windows .* must be between .*",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
@time_trigger_test
|
||||
def test_check_trigger_property_end_time(self):
|
||||
trigger_property = {
|
||||
"window": 15,
|
||||
|
@ -135,27 +195,24 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
"The format of trigger .* is not correct",
|
||||
TimeTrigger.check_trigger_definition,
|
||||
tt.TimeTrigger.check_trigger_definition,
|
||||
trigger_property)
|
||||
|
||||
@time_trigger_test
|
||||
def test_register_operation(self):
|
||||
trigger = self._generate_trigger()
|
||||
|
||||
operation_id = "1"
|
||||
trigger.register_operation(operation_id)
|
||||
eventlet.sleep(0.3)
|
||||
eventlet.sleep(2)
|
||||
|
||||
self.assertGreaterEqual(trigger._executor._ops[operation_id], 1)
|
||||
self.assertGreaterEqual(self._default_executor._ops[operation_id], 1)
|
||||
self.assertRaisesRegex(exception.ScheduledOperationExist,
|
||||
"The operation_id.* is exist",
|
||||
trigger.register_operation,
|
||||
operation_id)
|
||||
|
||||
eventlet.sleep(0.3)
|
||||
self.assertRaises(exception.TriggerIsInvalid,
|
||||
trigger.register_operation,
|
||||
"2")
|
||||
|
||||
@time_trigger_test
|
||||
def test_unregister_operation(self):
|
||||
trigger = self._generate_trigger()
|
||||
operation_id = "2"
|
||||
|
@ -164,26 +221,9 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
self.assertIn(operation_id, trigger._operation_ids)
|
||||
|
||||
trigger.unregister_operation(operation_id)
|
||||
self.assertNotIn(operation_id, trigger._operation_ids)
|
||||
|
||||
def test_unregister_operation_when_scheduling(self):
|
||||
trigger = self._generate_trigger()
|
||||
|
||||
for op_id in ['1', '2', '3']:
|
||||
trigger.register_operation(op_id)
|
||||
self.assertIn(op_id, trigger._operation_ids)
|
||||
eventlet.sleep(0.5)
|
||||
|
||||
for op_id in ['2', '3']:
|
||||
trigger.unregister_operation(op_id)
|
||||
self.assertNotIn(op_id, trigger._operation_ids)
|
||||
eventlet.sleep(0.6)
|
||||
|
||||
self.assertGreaterEqual(trigger._executor._ops['1'], 1)
|
||||
|
||||
self.assertTrue(('2' not in trigger._executor._ops) or (
|
||||
'3' not in trigger._executor._ops))
|
||||
self.assertNotIn(trigger._id, trigger._operation_ids)
|
||||
|
||||
@time_trigger_test
|
||||
def test_update_trigger_property(self):
|
||||
trigger = self._generate_trigger()
|
||||
|
||||
|
@ -199,18 +239,10 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
trigger.update_trigger_property,
|
||||
trigger_property)
|
||||
|
||||
trigger.register_operation('1')
|
||||
eventlet.sleep(0.2)
|
||||
trigger_property['end_time'] = (
|
||||
datetime.utcnow() + timedelta(seconds=1))
|
||||
self.assertRaisesRegex(exception.InvalidInput,
|
||||
".*First run time.* must be after.*",
|
||||
trigger.update_trigger_property,
|
||||
trigger_property)
|
||||
|
||||
@time_trigger_test
|
||||
def test_update_trigger_property_success(self):
|
||||
trigger = self._generate_trigger()
|
||||
trigger.register_operation('1')
|
||||
trigger.register_operation('7')
|
||||
eventlet.sleep(0.2)
|
||||
|
||||
trigger_property = {
|
||||
|
@ -221,12 +253,8 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
}
|
||||
with mock.patch.object(FakeTimeFormat, 'compute_next_time') as c:
|
||||
c.return_value = datetime.utcnow() + timedelta(seconds=20)
|
||||
old_id = id(trigger._greenthread)
|
||||
|
||||
trigger.update_trigger_property(trigger_property)
|
||||
|
||||
self.assertNotEqual(old_id, id(trigger._greenthread))
|
||||
|
||||
def _generate_trigger(self, end_time=None):
|
||||
if not end_time:
|
||||
end_time = datetime.utcnow() + timedelta(seconds=1)
|
||||
|
@ -238,11 +266,15 @@ class TimeTriggerTestCase(base.TestCase):
|
|||
"end_time": end_time
|
||||
}
|
||||
|
||||
self._default_executor.clear()
|
||||
return TimeTrigger("123", trigger_property, self._default_executor)
|
||||
return tt.TimeTrigger(
|
||||
uuidutils.generate_uuid(),
|
||||
trigger_property,
|
||||
self._default_executor,
|
||||
)
|
||||
|
||||
def _set_configuration(self, min_window=15,
|
||||
max_window=30, min_interval=60):
|
||||
max_window=30, min_interval=60, poll_interval=1):
|
||||
self.override_config('min_interval', min_interval)
|
||||
self.override_config('min_window_time', min_window)
|
||||
self.override_config('max_window_time', max_window)
|
||||
self.override_config('trigger_poll_interval', poll_interval)
|
||||
|
|
Loading…
Reference in New Issue