From 2d47fa6f6e0de0a54975ff92fc87785a052d4371 Mon Sep 17 00:00:00 2001 From: Monsyne Dragon Date: Mon, 8 Sep 2014 23:02:52 +0000 Subject: [PATCH] Add reset stream method. --- ChangeLog | 10 +++++++++ tests/test_db.py | 44 ++++++++++++++++++++++++++++++++++++-- winchester/db/interface.py | 14 +++++++++++- winchester/models.py | 2 ++ 4 files changed, 67 insertions(+), 3 deletions(-) diff --git a/ChangeLog b/ChangeLog index ff69abc..11c9e52 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,13 @@ +commit ca0d09f7bc017ef9e372ae29a0c19bf20b68aca5 +Author: Monsyne Dragon +Date: Mon Sep 8 19:57:24 2014 +0000 + + Save newly generated events from pipeline + + Save newly created events from pipeline run if pipeline commits. + Refactor trigger manager api wart, move save_event call into add_event + to make add_event and add_notification symmetric. + commit 0c619c133d3c248d62a2c5f6441d4fae0bf7042a Author: Monsyne Dragon Date: Sun Sep 7 04:07:20 2014 +0000 diff --git a/tests/test_db.py b/tests/test_db.py index 9b1e72a..860a6d2 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -71,6 +71,34 @@ TEST_DATA = [ expire_timestamp=datetime.datetime(2014,8,16,0,0,0,42), state=int(models.StreamState.active), state_serial_no=0), + dict(id=5, first_event=datetime.datetime(2014,8,1,15,25,45,453201), + last_event=datetime.datetime(2014,8,1,15,25,45,453201), + name='reset_test_trigger', + fire_timestamp=datetime.datetime(2014,8,11,6,0,0,42), + expire_timestamp=datetime.datetime(2014,8,16,0,0,0,42), + state=int(models.StreamState.error), + state_serial_no=0), + dict(id=6, first_event=datetime.datetime(2014,8,1,15,25,45,453201), + last_event=datetime.datetime(2014,8,1,15,25,45,453201), + name='reset_test_trigger', + fire_timestamp=datetime.datetime(2014,8,11,6,0,0,42), + expire_timestamp=datetime.datetime(2014,8,16,0,0,0,42), + state=int(models.StreamState.expire_error), + state_serial_no=0), + dict(id=7, first_event=datetime.datetime(2014,8,1,15,25,45,453201), + last_event=datetime.datetime(2014,8,1,15,25,45,453201), + name='reset_test_trigger', + fire_timestamp=datetime.datetime(2014,8,11,6,0,0,42), + expire_timestamp=datetime.datetime(2014,8,16,0,0,0,42), + state=int(models.StreamState.retry_fire), + state_serial_no=0), + dict(id=8, first_event=datetime.datetime(2014,8,1,15,25,45,453201), + last_event=datetime.datetime(2014,8,1,15,25,45,453201), + name='reset_test_trigger', + fire_timestamp=datetime.datetime(2014,8,11,6,0,0,42), + expire_timestamp=datetime.datetime(2014,8,16,0,0,0,42), + state=int(models.StreamState.retry_expire), + state_serial_no=0), ]}, {'streamevent': [ dict(stream_id=1, event_id=3), @@ -351,10 +379,11 @@ class TestDB(unittest.TestCase): def test_get_ready_streams_fire(self): current_time = datetime.datetime(2014,8,12,0,0,0,42) streams = self.db.get_ready_streams(10, current_time) - self.assertEqual(len(streams), 2) + self.assertEqual(len(streams), 3) stream_ids = [stream.id for stream in streams] self.assertIn(3, stream_ids) self.assertIn(4, stream_ids) + self.assertIn(7, stream_ids) current_time = datetime.datetime(2014,8,10,12,0,0,42) streams = self.db.get_ready_streams(10, current_time) @@ -369,12 +398,13 @@ class TestDB(unittest.TestCase): def test_get_ready_streams_expire(self): current_time = datetime.datetime(2014,8,17,0,0,0,42) streams = self.db.get_ready_streams(10, current_time, expire=True) - self.assertEqual(len(streams), 4) + self.assertEqual(len(streams), 5) stream_ids = [stream.id for stream in streams] self.assertIn(1, stream_ids) self.assertIn(2, stream_ids) self.assertIn(3, stream_ids) self.assertIn(4, stream_ids) + self.assertIn(8, stream_ids) current_time = datetime.datetime(2014,8,10,12,0,0,42) streams = self.db.get_ready_streams(10, current_time, expire=True) @@ -399,3 +429,13 @@ class TestDB(unittest.TestCase): self.db.set_stream_state(stream, models.StreamState.firing) with self.assertRaises(db.LockError): self.db.set_stream_state(stream, models.StreamState.firing) + + def test_reset_stream_fire(self): + stream = self.db.get_stream_by_id(5) + stream = self.db.reset_stream(stream) + self.assertEqual(stream.state, models.StreamState.retry_fire) + + def test_reset_stream_expire(self): + stream = self.db.get_stream_by_id(6) + stream = self.db.reset_stream(stream) + self.assertEqual(stream.state, models.StreamState.retry_expire) diff --git a/winchester/db/interface.py b/winchester/db/interface.py index 0fd75bc..7b63183 100644 --- a/winchester/db/interface.py +++ b/winchester/db/interface.py @@ -185,7 +185,12 @@ class DBInterface(object): @sessioned def get_ready_streams(self, batch_size, current_time, expire=False, session=None): q = session.query(models.Stream) - q = q.filter(models.Stream.state == int(models.StreamState.active)) + if expire: + states = (int(models.StreamState.active), int(models.StreamState.retry_expire)) + else: + states = (int(models.StreamState.active), int(models.StreamState.retry_fire)) + + q = q.filter(models.Stream.state.in_(states)) if expire: q = q.filter(models.Stream.expire_timestamp < current_time) else: @@ -205,3 +210,10 @@ class DBInterface(object): if ct != 1: raise LockError("Optimistic Lock failed!") return self.get_stream_by_id(stream_id) + + def reset_stream(self, stream): + if stream.state == models.StreamState.error: + return self.set_stream_state(stream, models.StreamState.retry_fire) + if stream.state == models.StreamState.expire_error: + return self.set_stream_state(stream, models.StreamState.retry_expire) + return stream diff --git a/winchester/models.py b/winchester/models.py index fa1b9cb..21d6309 100644 --- a/winchester/models.py +++ b/winchester/models.py @@ -39,6 +39,8 @@ class StreamState(IntEnum): error = 4 expire_error = 5 completed = 6 + retry_fire = 7 + retry_expire = 8 class DBException(Exception):