Query multiple events in the event loop
The blazar manager queries only one event once a loop. The interval of the loop is 10 seconds. So if more than 7 events start at same time some events delay to start. This patch enables the event loop queries multiple events at once. Change-Id: Ib0eabb34f4ed367de5096a741be5ece8307d3d97 Closes-Bug: #1747560
This commit is contained in:
parent
c2ab836293
commit
8dd7a70a37
|
@ -317,6 +317,18 @@ def _event_get_sorted_by_filters(sort_key, sort_dir, filters):
|
|||
if 'event_type' in filters:
|
||||
events_query = events_query.filter(models.Event.event_type ==
|
||||
filters['event_type'])
|
||||
if 'time' in filters:
|
||||
border = filters['time']['border']
|
||||
if filters['time']['op'] == 'lt':
|
||||
events_query = events_query.filter(models.Event.time < border)
|
||||
elif filters['time']['op'] == 'le':
|
||||
events_query = events_query.filter(models.Event.time <= border)
|
||||
elif filters['time']['op'] == 'gt':
|
||||
events_query = events_query.filter(models.Event.time > border)
|
||||
elif filters['time']['op'] == 'ge':
|
||||
events_query = events_query.filter(models.Event.time >= border)
|
||||
elif filters['time']['op'] == 'eq':
|
||||
events_query = events_query.filter(models.Event.time == border)
|
||||
|
||||
events_query = events_query.order_by(
|
||||
sort_fn[sort_dir](getattr(models.Event, sort_key))
|
||||
|
|
|
@ -140,16 +140,19 @@ class ManagerService(service_utils.RPCServer):
|
|||
status to 'DONE'.
|
||||
"""
|
||||
LOG.debug('Trying to get event from DB.')
|
||||
event = db_api.event_get_first_sorted_by_filters(
|
||||
events = db_api.event_get_all_sorted_by_filters(
|
||||
sort_key='time',
|
||||
sort_dir='asc',
|
||||
filters={'status': status.event.UNDONE}
|
||||
filters={'status': status.event.UNDONE,
|
||||
'time': {'op': 'le',
|
||||
'border': datetime.datetime.utcnow()}}
|
||||
)
|
||||
|
||||
if not event:
|
||||
if not events:
|
||||
return
|
||||
|
||||
if event['time'] < datetime.datetime.utcnow():
|
||||
LOG.info("Trying to execute events: %s", events)
|
||||
for event in events:
|
||||
db_api.event_update(event['id'],
|
||||
{'status': status.event.IN_PROGRESS})
|
||||
try:
|
||||
|
@ -159,7 +162,8 @@ class ManagerService(service_utils.RPCServer):
|
|||
except Exception:
|
||||
db_api.event_update(event['id'],
|
||||
{'status': status.event.ERROR})
|
||||
LOG.exception('Error occurred while event handling.')
|
||||
LOG.exception('Error occurred while event %s handling.',
|
||||
event['id'])
|
||||
|
||||
def _exec_event(self, event):
|
||||
"""Execute an event function"""
|
||||
|
|
|
@ -935,6 +935,32 @@ class SQLAlchemyDBApiTestCase(tests.DBTestCase):
|
|||
sort_key=sort_key,
|
||||
sort_dir=sort_dir))
|
||||
|
||||
def test_event_get_sorted_asc_by_time_filter(self):
|
||||
def check_query(border, op, expected_ids):
|
||||
filtered_events = db_api.event_get_all_sorted_by_filters(
|
||||
sort_key=sort_key,
|
||||
sort_dir=sort_dir,
|
||||
filters={'time': {'border': _get_datetime(border),
|
||||
'op': op}})
|
||||
filtered_event_ids = [e.id for e in filtered_events]
|
||||
self.assertListEqual(expected_ids, filtered_event_ids)
|
||||
|
||||
time1 = _get_datetime('2030-01-01 01:00')
|
||||
time2 = _get_datetime('2030-01-01 02:00')
|
||||
time3 = _get_datetime('2030-01-01 03:00')
|
||||
sort_key = 'time'
|
||||
sort_dir = 'asc'
|
||||
|
||||
db_api.event_create(_get_fake_event_values(id='1', time=time1))
|
||||
db_api.event_create(_get_fake_event_values(id='2', time=time2))
|
||||
db_api.event_create(_get_fake_event_values(id='3', time=time3))
|
||||
|
||||
check_query('2030-01-01 02:00', 'lt', ['1'])
|
||||
check_query('2030-01-01 02:00', 'le', ['1', '2'])
|
||||
check_query('2030-01-01 02:00', 'gt', ['3'])
|
||||
check_query('2030-01-01 02:00', 'ge', ['2', '3'])
|
||||
check_query('2030-01-01 02:00', 'eq', ['2'])
|
||||
|
||||
def test_event_get_sorted_desc_by_event_type_filter(self):
|
||||
fake_event_type = 'test_event'
|
||||
sort_dir = 'desc'
|
||||
|
@ -1014,3 +1040,29 @@ class SQLAlchemyDBApiTestCase(tests.DBTestCase):
|
|||
self.assertTrue(is_result_sorted_correctly(filtered_events,
|
||||
sort_key=sort_key,
|
||||
sort_dir=sort_dir))
|
||||
|
||||
def test_event_get_sorted_desc_by_time_filter(self):
|
||||
def check_query(border, op, expected_ids):
|
||||
filtered_events = db_api.event_get_all_sorted_by_filters(
|
||||
sort_key=sort_key,
|
||||
sort_dir=sort_dir,
|
||||
filters={'time': {'border': _get_datetime(border),
|
||||
'op': op}})
|
||||
filtered_event_ids = [e.id for e in filtered_events]
|
||||
self.assertListEqual(expected_ids, filtered_event_ids)
|
||||
|
||||
time1 = _get_datetime('2030-01-01 01:00')
|
||||
time2 = _get_datetime('2030-01-01 02:00')
|
||||
time3 = _get_datetime('2030-01-01 03:00')
|
||||
sort_key = 'time'
|
||||
sort_dir = 'desc'
|
||||
|
||||
db_api.event_create(_get_fake_event_values(id='1', time=time1))
|
||||
db_api.event_create(_get_fake_event_values(id='2', time=time2))
|
||||
db_api.event_create(_get_fake_event_values(id='3', time=time3))
|
||||
|
||||
check_query('2030-01-01 02:00', 'lt', ['1'])
|
||||
check_query('2030-01-01 02:00', 'le', ['2', '1'])
|
||||
check_query('2030-01-01 02:00', 'gt', ['3'])
|
||||
check_query('2030-01-01 02:00', 'ge', ['3', '2'])
|
||||
check_query('2030-01-01 02:00', 'eq', ['2'])
|
||||
|
|
|
@ -218,7 +218,7 @@ class ServiceTestCase(tests.TestCase):
|
|||
self.assertEqual(actions, self.manager._setup_actions())
|
||||
|
||||
def test_no_events(self):
|
||||
events = self.patch(self.db_api, 'event_get_first_sorted_by_filters')
|
||||
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
|
||||
event_update = self.patch(self.db_api, 'event_update')
|
||||
events.return_value = None
|
||||
|
||||
|
@ -227,23 +227,23 @@ class ServiceTestCase(tests.TestCase):
|
|||
self.assertFalse(event_update.called)
|
||||
|
||||
def test_event_success(self):
|
||||
events = self.patch(self.db_api, 'event_get_first_sorted_by_filters')
|
||||
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
|
||||
event_update = self.patch(self.db_api, 'event_update')
|
||||
events.return_value = {'id': '111-222-333',
|
||||
'time': self.good_date}
|
||||
events.return_value = [{'id': '111-222-333', 'time': self.good_date},
|
||||
{'id': '444-555-666', 'time': self.good_date}]
|
||||
self.patch(eventlet, 'spawn_n')
|
||||
|
||||
self.manager._event()
|
||||
|
||||
event_update.assert_called_once_with(
|
||||
'111-222-333', {'status': status.event.IN_PROGRESS})
|
||||
event_update.assert_has_calls([
|
||||
mock.call('111-222-333', {'status': status.event.IN_PROGRESS}),
|
||||
mock.call('444-555-666', {'status': status.event.IN_PROGRESS})])
|
||||
|
||||
def test_event_spawn_fail(self):
|
||||
events = self.patch(self.db_api, 'event_get_first_sorted_by_filters')
|
||||
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
|
||||
event_update = self.patch(self.db_api, 'event_update')
|
||||
self.patch(eventlet, 'spawn_n').side_effect = Exception
|
||||
events.return_value = {'id': '111-222-333',
|
||||
'time': self.good_date}
|
||||
events.return_value = [{'id': '111-222-333', 'time': self.good_date}]
|
||||
|
||||
self.manager._event()
|
||||
|
||||
|
|
Loading…
Reference in New Issue