Query multiple events in the event loop

The blazar manager queries only one event once a loop. The interval
of the loop is 10 seconds. So if more than 7 events start at same
time some events delay to start.

This patch enables the event loop queries multiple events at once.

Change-Id: Ib0eabb34f4ed367de5096a741be5ece8307d3d97
Closes-Bug: #1747560
This commit is contained in:
Masahito Muroi 2018-06-01 12:15:57 +09:00
parent c2ab836293
commit 8dd7a70a37
4 changed files with 82 additions and 14 deletions

View File

@ -317,6 +317,18 @@ def _event_get_sorted_by_filters(sort_key, sort_dir, filters):
if 'event_type' in filters:
events_query = events_query.filter(models.Event.event_type ==
filters['event_type'])
if 'time' in filters:
border = filters['time']['border']
if filters['time']['op'] == 'lt':
events_query = events_query.filter(models.Event.time < border)
elif filters['time']['op'] == 'le':
events_query = events_query.filter(models.Event.time <= border)
elif filters['time']['op'] == 'gt':
events_query = events_query.filter(models.Event.time > border)
elif filters['time']['op'] == 'ge':
events_query = events_query.filter(models.Event.time >= border)
elif filters['time']['op'] == 'eq':
events_query = events_query.filter(models.Event.time == border)
events_query = events_query.order_by(
sort_fn[sort_dir](getattr(models.Event, sort_key))

View File

@ -140,16 +140,19 @@ class ManagerService(service_utils.RPCServer):
status to 'DONE'.
"""
LOG.debug('Trying to get event from DB.')
event = db_api.event_get_first_sorted_by_filters(
events = db_api.event_get_all_sorted_by_filters(
sort_key='time',
sort_dir='asc',
filters={'status': status.event.UNDONE}
filters={'status': status.event.UNDONE,
'time': {'op': 'le',
'border': datetime.datetime.utcnow()}}
)
if not event:
if not events:
return
if event['time'] < datetime.datetime.utcnow():
LOG.info("Trying to execute events: %s", events)
for event in events:
db_api.event_update(event['id'],
{'status': status.event.IN_PROGRESS})
try:
@ -159,7 +162,8 @@ class ManagerService(service_utils.RPCServer):
except Exception:
db_api.event_update(event['id'],
{'status': status.event.ERROR})
LOG.exception('Error occurred while event handling.')
LOG.exception('Error occurred while event %s handling.',
event['id'])
def _exec_event(self, event):
"""Execute an event function"""

View File

@ -935,6 +935,32 @@ class SQLAlchemyDBApiTestCase(tests.DBTestCase):
sort_key=sort_key,
sort_dir=sort_dir))
def test_event_get_sorted_asc_by_time_filter(self):
def check_query(border, op, expected_ids):
filtered_events = db_api.event_get_all_sorted_by_filters(
sort_key=sort_key,
sort_dir=sort_dir,
filters={'time': {'border': _get_datetime(border),
'op': op}})
filtered_event_ids = [e.id for e in filtered_events]
self.assertListEqual(expected_ids, filtered_event_ids)
time1 = _get_datetime('2030-01-01 01:00')
time2 = _get_datetime('2030-01-01 02:00')
time3 = _get_datetime('2030-01-01 03:00')
sort_key = 'time'
sort_dir = 'asc'
db_api.event_create(_get_fake_event_values(id='1', time=time1))
db_api.event_create(_get_fake_event_values(id='2', time=time2))
db_api.event_create(_get_fake_event_values(id='3', time=time3))
check_query('2030-01-01 02:00', 'lt', ['1'])
check_query('2030-01-01 02:00', 'le', ['1', '2'])
check_query('2030-01-01 02:00', 'gt', ['3'])
check_query('2030-01-01 02:00', 'ge', ['2', '3'])
check_query('2030-01-01 02:00', 'eq', ['2'])
def test_event_get_sorted_desc_by_event_type_filter(self):
fake_event_type = 'test_event'
sort_dir = 'desc'
@ -1014,3 +1040,29 @@ class SQLAlchemyDBApiTestCase(tests.DBTestCase):
self.assertTrue(is_result_sorted_correctly(filtered_events,
sort_key=sort_key,
sort_dir=sort_dir))
def test_event_get_sorted_desc_by_time_filter(self):
def check_query(border, op, expected_ids):
filtered_events = db_api.event_get_all_sorted_by_filters(
sort_key=sort_key,
sort_dir=sort_dir,
filters={'time': {'border': _get_datetime(border),
'op': op}})
filtered_event_ids = [e.id for e in filtered_events]
self.assertListEqual(expected_ids, filtered_event_ids)
time1 = _get_datetime('2030-01-01 01:00')
time2 = _get_datetime('2030-01-01 02:00')
time3 = _get_datetime('2030-01-01 03:00')
sort_key = 'time'
sort_dir = 'desc'
db_api.event_create(_get_fake_event_values(id='1', time=time1))
db_api.event_create(_get_fake_event_values(id='2', time=time2))
db_api.event_create(_get_fake_event_values(id='3', time=time3))
check_query('2030-01-01 02:00', 'lt', ['1'])
check_query('2030-01-01 02:00', 'le', ['2', '1'])
check_query('2030-01-01 02:00', 'gt', ['3'])
check_query('2030-01-01 02:00', 'ge', ['3', '2'])
check_query('2030-01-01 02:00', 'eq', ['2'])

View File

@ -218,7 +218,7 @@ class ServiceTestCase(tests.TestCase):
self.assertEqual(actions, self.manager._setup_actions())
def test_no_events(self):
events = self.patch(self.db_api, 'event_get_first_sorted_by_filters')
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
event_update = self.patch(self.db_api, 'event_update')
events.return_value = None
@ -227,23 +227,23 @@ class ServiceTestCase(tests.TestCase):
self.assertFalse(event_update.called)
def test_event_success(self):
events = self.patch(self.db_api, 'event_get_first_sorted_by_filters')
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
event_update = self.patch(self.db_api, 'event_update')
events.return_value = {'id': '111-222-333',
'time': self.good_date}
events.return_value = [{'id': '111-222-333', 'time': self.good_date},
{'id': '444-555-666', 'time': self.good_date}]
self.patch(eventlet, 'spawn_n')
self.manager._event()
event_update.assert_called_once_with(
'111-222-333', {'status': status.event.IN_PROGRESS})
event_update.assert_has_calls([
mock.call('111-222-333', {'status': status.event.IN_PROGRESS}),
mock.call('444-555-666', {'status': status.event.IN_PROGRESS})])
def test_event_spawn_fail(self):
events = self.patch(self.db_api, 'event_get_first_sorted_by_filters')
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
event_update = self.patch(self.db_api, 'event_update')
self.patch(eventlet, 'spawn_n').side_effect = Exception
events.return_value = {'id': '111-222-333',
'time': self.good_date}
events.return_value = [{'id': '111-222-333', 'time': self.good_date}]
self.manager._event()