Prevent conflicting events from running concurrently

If two leases have compute hosts in common, and the second lease starts
exactly when the first lease ends, there is the possibility of a race.
The Blazar manager can first run the start_lease event of the second
lease. This event would fail since the end_lease event of the first
lease would still be UNDONE, and the compute hosts in common would still
be in the aggregate associated with the first lease, instead of being in
the freepool.

This patch changes event execution code so that events are executed
concurrently if possible, with the following constraints:

- events are executed strictly in order, i.e. events are started only
  after all previous events have completed
- when events are at the same time, we first execute before_end_lease
  events (unless there is a start_lease at the same time), then
  end_lease events, followed by start_lease events, ensuring the bug
  described above does not happen. Finally, we run any before_end_lease
  which had a corresponding start_lease event at the same time.

It also has the side effect of providing better stack traces for event
execution failures, since we call wait() on all GreenThread objects.

Co-Authored-By: Jason Anderson <jasonanderson@uchicago.edu>
Change-Id: Ie2339db18e8baee379fbea082f1238ec44fca6b1
Closes-Bug: #1785841
This commit is contained in:
Pierre Riteau 2018-08-08 12:46:28 +02:00 committed by Jason Anderson
parent 368403ea80
commit c92edb8a17
No known key found for this signature in database
GPG Key ID: 9207452BF63947DD
2 changed files with 164 additions and 33 deletions

View File

@ -13,7 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
import datetime
from operator import itemgetter
import eventlet
from oslo_config import cfg
@ -77,7 +79,7 @@ class ManagerService(service_utils.RPCServer):
# NOTE(jakecoll): stop_on_exception=False was added because database
# exceptions would prevent threads from being scheduled again.
# TODO(jakecoll): Find a way to test this.
self.tg.add_timer_args(EVENT_INTERVAL, self._event,
self.tg.add_timer_args(EVENT_INTERVAL, self._process_events,
stop_on_exception=False)
for m in self.monitors:
m.start_monitoring()
@ -139,25 +141,12 @@ class ManagerService(service_utils.RPCServer):
return actions
@service_utils.with_empty_context
def _event(self):
"""Tries to commit event.
If there is an event in Blazar DB to be done, do it and change its
status to 'DONE'.
"""
LOG.debug('Trying to get event from DB.')
events = db_api.event_get_all_sorted_by_filters(
sort_key='time',
sort_dir='asc',
filters={'status': status.event.UNDONE,
'time': {'op': 'le',
'border': datetime.datetime.utcnow()}}
)
def _process_events_concurrently(self, events):
if not events:
return
LOG.info("Trying to execute events: %s", events)
event_threads = {}
for event in events:
if not status.LeaseStatus.is_stable(event['lease_id']):
LOG.info("Skip event %s because the status of the lease %s "
@ -166,15 +155,89 @@ class ManagerService(service_utils.RPCServer):
db_api.event_update(event['id'],
{'status': status.event.IN_PROGRESS})
try:
eventlet.spawn_n(
event_thread = eventlet.spawn(
service_utils.with_empty_context(self._exec_event),
event)
event_threads[event['id']] = event_thread
except Exception:
db_api.event_update(event['id'],
{'status': status.event.ERROR})
LOG.exception('Error occurred while event %s handling.',
LOG.exception('Error occurred while spawning event %s.',
event['id'])
for event_id, event_thread in event_threads.items():
try:
event_thread.wait()
except Exception:
db_api.event_update(event['id'],
{'status': status.event.ERROR})
LOG.exception('Error occurred while handling event %s.',
event_id)
def _select_for_execution(self, events):
"""Orders the events such that they can be safely executed concurrently.
Events are selected to be executed concurrently if they are of the same
type, while keeping strict time ordering and the following priority of
event types: before_end_lease, end_lease, and start_lease (except for
before_end_lease events where there is a start_lease event for the same
lease at the same time).
We ensure that:
- the before_end_lease event of a lease is executed after the
start_lease event and before the end_lease event of the same lease,
- for two reservations using the same hosts back to back, the end_lease
event is executed before the start_lease event.
"""
if not events:
return []
events_by_lease = defaultdict(list)
events_by_type = defaultdict(list)
for e in sorted(events, key=itemgetter('time')):
events_by_lease[e['lease_id']].append(e)
events_by_type[e['event_type']].append(e)
# If there is a start_lease event for the same lease, we run it first.
deferred_before_end_events = []
deferred_end_events = []
for start_event in events_by_type['start_lease']:
for e in events_by_lease[start_event['lease_id']]:
if e['event_type'] == 'before_end_lease':
events_by_type['before_end_lease'].remove(e)
deferred_before_end_events.append(e)
elif e['event_type'] == 'end_lease':
events_by_type['end_lease'].remove(e)
deferred_end_events.append(e)
return [
events_by_type['before_end_lease'],
events_by_type['end_lease'],
events_by_type['start_lease'],
deferred_before_end_events,
deferred_end_events
]
def _process_events(self):
"""Tries to execute events.
If there is any event in Blazar DB to be executed, do it and change its
status to 'DONE'. Events are executed concurrently if possible.
"""
LOG.debug('Trying to get events from DB.')
events = db_api.event_get_all_sorted_by_filters(
sort_key='time',
sort_dir='asc',
filters={'status': status.event.UNDONE,
'time': {'op': 'le',
'border': datetime.datetime.utcnow()}}
)
for batch in self._select_for_execution(events):
self._process_events_concurrently(batch)
def _exec_event(self, event):
"""Execute an event function"""
event_fn = getattr(self, event['event_type'], None)

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
from unittest import mock
@ -235,36 +236,102 @@ class ServiceTestCase(tests.TestCase):
event_update = self.patch(self.db_api, 'event_update')
events.return_value = None
self.manager._event()
self.manager._process_events()
self.assertFalse(event_update.called)
def test_event_success(self):
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
event_update = self.patch(self.db_api, 'event_update')
events.return_value = [{'id': '111-222-333',
'lease_id': 'lease_id1',
'time': self.good_date},
{'id': '444-555-666',
'lease_id': 'lease_id2',
'time': self.good_date}]
self.patch(eventlet, 'spawn_n')
events.return_value = [{'id': '111-222-333', 'time': self.good_date,
'lease_id': 'aaa-bbb-ccc',
'event_type': 'start_lease'},
{'id': '444-555-666', 'time': self.good_date,
'lease_id': 'bbb-ccc-ddd',
'event_type': 'start_lease'}]
self.patch(eventlet, 'spawn')
self.manager._event()
self.manager._process_events()
event_update.assert_has_calls([
mock.call('111-222-333', {'status': status.event.IN_PROGRESS}),
mock.call('444-555-666', {'status': status.event.IN_PROGRESS})])
def test_concurrent_events(self):
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
self.patch(self.db_api, 'event_update')
events.return_value = [{'id': '111-222-333', 'time': self.good_date,
'lease_id': 'aaa-bbb-ccc',
'event_type': 'start_lease'},
{'id': '222-333-444', 'time': self.good_date,
'lease_id': 'bbb-ccc-ddd',
'event_type': 'end_lease'},
{'id': '333-444-555', 'time': self.good_date,
'lease_id': 'bbb-ccc-ddd',
'event_type': 'before_end_lease'},
{'id': '444-555-666', 'time': self.good_date,
# Same lease as start_lease event above
'lease_id': 'aaa-bbb-ccc',
'event_type': 'before_end_lease'},
{'id': '444-555-666', 'time': self.good_date,
# Same lease as start_lease event above
'lease_id': 'aaa-bbb-ccc',
'event_type': 'end_lease'},
{'id': '555-666-777', 'time': self.good_date,
'lease_id': 'ccc-ddd-eee',
'event_type': 'end_lease'},
{'id': '666-777-888',
'time': self.good_date + datetime.timedelta(
minutes=1),
'lease_id': 'ddd-eee-fff',
'event_type': 'end_lease'}]
events_values = copy.copy(events.return_value)
_process_events_concurrently = self.patch(
self.manager, '_process_events_concurrently')
self.manager._process_events()
_process_events_concurrently.assert_has_calls([
# First execute the before_end_lease event which doesn't have a
# corresponding start_lease
mock.call([events_values[2]]),
# Then end_lease events
mock.call([events_values[1], events_values[5], events_values[6]]),
# Then the start_lease event
mock.call([events_values[0]]),
# Then the before_end_lease which is for the same lease as the
# previous start_lease event
mock.call([events_values[3]]),
# Then the end_lease which is for the same lease as the previous
# start_lease event
mock.call([events_values[4]])])
def test_process_events_concurrently(self):
events = [{'id': '111-222-333', 'time': self.good_date,
'lease_id': 'aaa-bbb-ccc',
'event_type': 'start_lease'},
{'id': '222-333-444', 'time': self.good_date,
'lease_id': 'bbb-ccc-ddd',
'event_type': 'start_lease'},
{'id': '333-444-555', 'time': self.good_date,
'lease_id': 'ccc-ddd-eee',
'event_type': 'start_lease'}]
spawn = self.patch(eventlet, 'spawn')
self.manager._process_events_concurrently(events)
spawn.assert_has_calls([
mock.call(mock.ANY, events[0]),
mock.call(mock.ANY, events[1]),
mock.call(mock.ANY, events[2])])
def test_event_spawn_fail(self):
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
event_update = self.patch(self.db_api, 'event_update')
self.patch(eventlet, 'spawn_n').side_effect = Exception
events.return_value = [{'id': '111-222-333',
'lease_id': self.lease_id,
'time': self.good_date}]
self.patch(eventlet, 'spawn').side_effect = Exception
events.return_value = [{'id': '111-222-333', 'time': self.good_date,
'lease_id': 'aaa-bbb-ccc',
'event_type': 'start_lease'}]
self.manager._event()
self.manager._process_events()
event_update.assert_has_calls([
mock.call('111-222-333', {'status': status.event.IN_PROGRESS}),
@ -274,6 +341,7 @@ class ServiceTestCase(tests.TestCase):
events = self.patch(self.db_api, 'event_get_all_sorted_by_filters')
events.return_value = [{'id': '111-222-333',
'lease_id': self.lease_id,
'event_type': 'start_lease',
'time': self.good_date}]
self.lease_get = self.patch(self.db_api, 'lease_get')
@ -283,7 +351,7 @@ class ServiceTestCase(tests.TestCase):
event_update = self.patch(self.db_api, 'event_update')
self.manager._event()
self.manager._process_events()
event_update.assert_not_called()