diff --git a/winchester/models.py b/winchester/models.py index d29c482..fa1b9cb 100644 --- a/winchester/models.py +++ b/winchester/models.py @@ -362,7 +362,7 @@ class Stream(ProxiedDictMixin, Base): self.fire_timestamp = fire_timestamp if state is None: state = StreamState.active - self.state = state + self.state = int(state) if state_serial_no is None: state_serial_no = 0 self.state_serial_no = state_serial_no diff --git a/winchester/pipeline_handler.py b/winchester/pipeline_handler.py index e934a59..507967a 100644 --- a/winchester/pipeline_handler.py +++ b/winchester/pipeline_handler.py @@ -80,7 +80,7 @@ class LoggingHandler(PipelineHandlerBase): def handle_events(self, events, env): emsg = ', '.join("%s: %s" % (event['event_type'], event['message_id']) for event in events) - logger.info("Received %s events: \n%s" % (len(events)), emsg) + logger.info("Received %s events: \n%s" % (len(events), emsg)) return events def commit(self): diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py index 24370ae..72f5391 100644 --- a/winchester/pipeline_manager.py +++ b/winchester/pipeline_manager.py @@ -217,6 +217,7 @@ class PipelineManager(object): except LockError: logger.debug("Stream %s locked. Moving on..." % stream.id) return False + logger.debug("Firing Stream %s." % stream.id) trigger_def = self.trigger_map.get(stream.name) if trigger_def is None: logger.error("Stream %s has unknown trigger definition %s" % ( @@ -246,6 +247,7 @@ class PipelineManager(object): except LockError: logger.debug("Stream %s locked. Moving on..." % stream.id) return False + logger.debug("Expiring Stream %s." % stream.id) trigger_def = self.trigger_map.get(stream.name) if trigger_def is None: logger.error("Stream %s has unknown trigger definition %s" % ( diff --git a/winchester/trigger_manager.py b/winchester/trigger_manager.py index 6ec4978..8181e48 100644 --- a/winchester/trigger_manager.py +++ b/winchester/trigger_manager.py @@ -24,6 +24,8 @@ class EventCondenser(condenser.CondenserBase): self.timestamp = None def add_trait(self, name, trait_type, value): + if isinstance(value, datetime.datetime): + value = self._fix_time(value) self.traits[name] = value def add_envelope_info(self, event_type, message_id, when): @@ -34,10 +36,20 @@ class EventCondenser(condenser.CondenserBase): def get_event(self): event = self.traits.copy() event['message_id'] = self.message_id - event['timestamp'] = self.timestamp + event['timestamp'] = self._fix_time(self.timestamp) event['event_type'] = self.event_type return event + def _fix_time(self, dt): + """Stackdistiller converts all times to utc. + We store timestamps as utc datetime. However, the explicit + UTC timezone on incoming datetimes causes comparison issues + deep in sqlalchemy. We fix this by converting all datetimes + to naive utc timestamps""" + if dt.tzinfo is not None: + dt = dt.replace(tzinfo=None) + return dt + def validate(self): if self.event_type is None: return False