Catch exceptions and reconnect if MySQL goes away.

Catch exception from the db layer and reconnect if
MySQL connection goes away.

Change-Id: Ib931100f4ee6ed78f0839d7ad0ed82bd277c23c0
This commit is contained in:
Monsyne Dragon 2015-06-25 17:26:11 +00:00
parent b6dc37aad3
commit a15b95dbf3
3 changed files with 33 additions and 12 deletions

View File

@ -1,3 +1,3 @@
from winchester.db.interface import DuplicateError, LockError
from winchester.db.interface import DuplicateError, LockError, DatabaseConnectionError
from winchester.db.interface import NoSuchEventError, NoSuchStreamError
from winchester.db.interface import DBInterface

View File

@ -19,7 +19,9 @@ import logging
import sqlalchemy
from sqlalchemy import and_
from sqlalchemy.exc import DisconnectionError
from sqlalchemy.exc import IntegrityError
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import sessionmaker
from winchester.config import ConfigItem
@ -33,6 +35,10 @@ ENGINES = dict()
SESSIONMAKERS = dict()
class DatabaseConnectionError(models.DBException):
pass
class DuplicateError(models.DBException):
pass
@ -110,6 +116,11 @@ class DBInterface(object):
except IntegrityError:
session.rollback()
raise DuplicateError("Duplicate unique value detected!")
except (OperationalError, DisconnectionError):
session.rollback()
self.close()
logger.warn("Database Connection Lost!")
raise DatabaseConnectionError()
except Exception:
session.rollback()
raise

View File

@ -22,6 +22,7 @@ import time
from winchester.config import ConfigItem
from winchester.config import ConfigManager
from winchester.db import DatabaseConnectionError
from winchester.db import DBInterface
from winchester.db import LockError
from winchester.definition import TriggerDefinition
@ -396,16 +397,25 @@ class PipelineManager(object):
def run(self):
while True:
fire_ct = self.process_ready_streams(
self.pipeline_worker_batch_size)
expire_ct = self.process_ready_streams(
self.pipeline_worker_batch_size,
expire=True)
try:
fire_ct = self.process_ready_streams(
self.pipeline_worker_batch_size)
expire_ct = self.process_ready_streams(
self.pipeline_worker_batch_size,
expire=True)
if ((self.current_time() - self.last_status).seconds
> self.statistics_period):
self._log_statistics()
if ((self.current_time() - self.last_status).seconds
> self.statistics_period):
self._log_statistics()
if not fire_ct and not expire_ct:
logger.debug("No streams to fire or expire. Sleeping...")
time.sleep(self.pipeline_worker_delay)
if not fire_ct and not expire_ct:
logger.debug("No streams to fire or expire. Sleeping...")
time.sleep(self.pipeline_worker_delay)
except DatabaseConnectionError:
logger.warn("Database Connection went away. Reconnecting...")
time.sleep(5)
# DB layer will reconnect automatically. We just need to
# retry the operation. (mdragon)
except Exception:
logger.exception("Unknown Error in pipeline worker!")
raise