From a15b95dbf3cebe629ab5e83aac6910dc568c8773 Mon Sep 17 00:00:00 2001 From: Monsyne Dragon Date: Thu, 25 Jun 2015 17:26:11 +0000 Subject: [PATCH] Catch exceptions and reconnect if MySQL goes away. Catch exception from the db layer and reconnect if MySQL connection goes away. Change-Id: Ib931100f4ee6ed78f0839d7ad0ed82bd277c23c0 --- winchester/db/__init__.py | 2 +- winchester/db/interface.py | 11 +++++++++++ winchester/pipeline_manager.py | 32 +++++++++++++++++++++----------- 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/winchester/db/__init__.py b/winchester/db/__init__.py index 6fd6b33..ae83e70 100644 --- a/winchester/db/__init__.py +++ b/winchester/db/__init__.py @@ -1,3 +1,3 @@ -from winchester.db.interface import DuplicateError, LockError +from winchester.db.interface import DuplicateError, LockError, DatabaseConnectionError from winchester.db.interface import NoSuchEventError, NoSuchStreamError from winchester.db.interface import DBInterface diff --git a/winchester/db/interface.py b/winchester/db/interface.py index ba20ea5..8393f2a 100644 --- a/winchester/db/interface.py +++ b/winchester/db/interface.py @@ -19,7 +19,9 @@ import logging import sqlalchemy from sqlalchemy import and_ +from sqlalchemy.exc import DisconnectionError from sqlalchemy.exc import IntegrityError +from sqlalchemy.exc import OperationalError from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm import sessionmaker from winchester.config import ConfigItem @@ -33,6 +35,10 @@ ENGINES = dict() SESSIONMAKERS = dict() +class DatabaseConnectionError(models.DBException): + pass + + class DuplicateError(models.DBException): pass @@ -110,6 +116,11 @@ class DBInterface(object): except IntegrityError: session.rollback() raise DuplicateError("Duplicate unique value detected!") + except (OperationalError, DisconnectionError): + session.rollback() + self.close() + logger.warn("Database Connection Lost!") + raise DatabaseConnectionError() except Exception: session.rollback() raise diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py index d038824..cba0a07 100644 --- a/winchester/pipeline_manager.py +++ b/winchester/pipeline_manager.py @@ -22,6 +22,7 @@ import time from winchester.config import ConfigItem from winchester.config import ConfigManager +from winchester.db import DatabaseConnectionError from winchester.db import DBInterface from winchester.db import LockError from winchester.definition import TriggerDefinition @@ -396,16 +397,25 @@ class PipelineManager(object): def run(self): while True: - fire_ct = self.process_ready_streams( - self.pipeline_worker_batch_size) - expire_ct = self.process_ready_streams( - self.pipeline_worker_batch_size, - expire=True) + try: + fire_ct = self.process_ready_streams( + self.pipeline_worker_batch_size) + expire_ct = self.process_ready_streams( + self.pipeline_worker_batch_size, + expire=True) - if ((self.current_time() - self.last_status).seconds - > self.statistics_period): - self._log_statistics() + if ((self.current_time() - self.last_status).seconds + > self.statistics_period): + self._log_statistics() - if not fire_ct and not expire_ct: - logger.debug("No streams to fire or expire. Sleeping...") - time.sleep(self.pipeline_worker_delay) + if not fire_ct and not expire_ct: + logger.debug("No streams to fire or expire. Sleeping...") + time.sleep(self.pipeline_worker_delay) + except DatabaseConnectionError: + logger.warn("Database Connection went away. Reconnecting...") + time.sleep(5) + # DB layer will reconnect automatically. We just need to + # retry the operation. (mdragon) + except Exception: + logger.exception("Unknown Error in pipeline worker!") + raise