(fix) Guard against DB connection leaks

- Use connection contexts to ensure database connections
  are released back to the pool.
- Make database connection pool size configurable, defaults to 15

Change-Id: Id8011fbf45a1b3c87835ff5f47ebfa9334488319
This commit is contained in:
Scott Hussey 2018-06-26 14:38:40 -05:00
parent fec226fc66
commit 7e7739ce2c
2 changed files with 104 additions and 116 deletions

View File

@ -91,6 +91,10 @@ class DrydockConfig(object):
cfg.StrOpt(
'database_connect_string',
help='The URI database connect string.'),
cfg.IntOpt(
'pool_size',
default=15,
help='The SQLalchemy database connection pool size.'),
]
# Options for the boot action framework

View File

@ -42,7 +42,8 @@ class DrydockState(object):
def connect_db(self):
"""Connect the state manager to the persistent DB."""
self.db_engine = create_engine(
config.config_mgr.conf.database.database_connect_string)
config.config_mgr.conf.database.database_connect_string,
pool_size=config.config_mgr.conf.database.pool_size)
self.db_metadata = MetaData(bind=self.db_engine)
self.tasks_tbl = tables.Tasks(self.db_metadata)
@ -67,12 +68,11 @@ class DrydockState(object):
'build_data',
]
conn = self.db_engine.connect()
for t in table_names:
query_text = sql.text(
"TRUNCATE TABLE %s" % t).execution_options(autocommit=True)
conn.execute(query_text)
conn.close()
with self.db_engine.connect() as conn:
for t in table_names:
query_text = sql.text(
"TRUNCATE TABLE %s" % t).execution_options(autocommit=True)
conn.execute(query_text)
def get_design_documents(self, design_ref):
return ReferenceResolver.resolve_reference(design_ref)
@ -80,19 +80,17 @@ class DrydockState(object):
def get_tasks(self):
"""Get all tasks in the database."""
try:
conn = self.db_engine.connect()
query = sql.select([self.tasks_tbl])
rs = conn.execute(query)
with self.db_engine.connect() as conn:
query = sql.select([self.tasks_tbl])
rs = conn.execute(query)
task_list = [objects.Task.from_db(dict(r)) for r in rs]
task_list = [objects.Task.from_db(dict(r)) for r in rs]
self._assemble_tasks(task_list=task_list)
self._assemble_tasks(task_list=task_list)
# add reference to this state manager to each task
for t in task_list:
t.statemgr = self
conn.close()
# add reference to this state manager to each task
for t in task_list:
t.statemgr = self
return task_list
except Exception as ex:
@ -164,25 +162,24 @@ class DrydockState(object):
:param allowed_actions: list of string action names
"""
try:
conn = self.db_engine.connect()
if allowed_actions is None:
query = self.tasks_tbl.select().where(
self.tasks_tbl.c.status ==
hd_fields.TaskStatus.Queued).order_by(
self.tasks_tbl.c.created.asc())
rs = conn.execute(query)
else:
query = sql.text("SELECT * FROM tasks WHERE "
"status = :queued_status AND "
"action = ANY(:actions) "
"ORDER BY created ASC")
rs = conn.execute(
query,
queued_status=hd_fields.TaskStatus.Queued,
actions=allowed_actions)
with self.db_engine.connect() as conn:
if allowed_actions is None:
query = self.tasks_tbl.select().where(
self.tasks_tbl.c.status ==
hd_fields.TaskStatus.Queued).order_by(
self.tasks_tbl.c.created.asc())
rs = conn.execute(query)
else:
query = sql.text("SELECT * FROM tasks WHERE "
"status = :queued_status AND "
"action = ANY(:actions) "
"ORDER BY created ASC")
rs = conn.execute(
query,
queued_status=hd_fields.TaskStatus.Queued,
actions=allowed_actions)
r = rs.first()
conn.close()
r = rs.first()
if r is not None:
task = objects.Task.from_db(dict(r))
@ -203,12 +200,11 @@ class DrydockState(object):
:param task_id: uuid.UUID of a task_id to query against
"""
try:
conn = self.db_engine.connect()
query = self.tasks_tbl.select().where(
self.tasks_tbl.c.task_id == task_id.bytes)
rs = conn.execute(query)
r = rs.fetchone()
with self.db_engine.connect() as conn:
query = self.tasks_tbl.select().where(
self.tasks_tbl.c.task_id == task_id.bytes)
rs = conn.execute(query)
r = rs.fetchone()
task = objects.Task.from_db(dict(r))
@ -217,8 +213,6 @@ class DrydockState(object):
self._assemble_tasks(task_list=[task])
task.statemgr = self
conn.close()
return task
except Exception as ex:
@ -234,11 +228,10 @@ class DrydockState(object):
:param msg: instance of objects.TaskStatusMessage
"""
try:
conn = self.db_engine.connect()
query = self.result_message_tbl.insert().values(
task_id=task_id.bytes, **(msg.to_db()))
conn.execute(query)
conn.close()
with self.db_engine.connect() as conn:
query = self.result_message_tbl.insert().values(
task_id=task_id.bytes, **(msg.to_db()))
conn.execute(query)
return True
except Exception as ex:
self.logger.error("Error inserting result message for task %s: %s"
@ -253,24 +246,22 @@ class DrydockState(object):
if task_list is None:
return None
conn = self.db_engine.connect()
query = sql.select([
self.result_message_tbl
]).where(self.result_message_tbl.c.task_id == sql.bindparam(
'task_id')).order_by(self.result_message_tbl.c.sequence.asc())
query.compile(self.db_engine)
with self.db_engine.connect() as conn:
query = sql.select([
self.result_message_tbl
]).where(self.result_message_tbl.c.task_id == sql.bindparam(
'task_id')).order_by(self.result_message_tbl.c.sequence.asc())
query.compile(self.db_engine)
for t in task_list:
rs = conn.execute(query, task_id=t.task_id.bytes)
error_count = 0
for r in rs:
msg = objects.TaskStatusMessage.from_db(dict(r))
if msg.error:
error_count = error_count + 1
t.result.message_list.append(msg)
t.result.error_count = error_count
conn.close()
for t in task_list:
rs = conn.execute(query, task_id=t.task_id.bytes)
error_count = 0
for r in rs:
msg = objects.TaskStatusMessage.from_db(dict(r))
if msg.error:
error_count = error_count + 1
t.result.message_list.append(msg)
t.result.error_count = error_count
def post_task(self, task):
"""Insert a task into the database.
@ -280,11 +271,10 @@ class DrydockState(object):
:param task: instance of objects.Task to insert into the database.
"""
try:
conn = self.db_engine.connect()
query = self.tasks_tbl.insert().values(**(
task.to_db(include_id=True)))
conn.execute(query)
conn.close()
with self.db_engine.connect() as conn:
query = self.tasks_tbl.insert().values(**(
task.to_db(include_id=True)))
conn.execute(query)
return True
except Exception as ex:
self.logger.error(
@ -297,17 +287,15 @@ class DrydockState(object):
:param task: objects.Task instance to reference for update values
"""
try:
conn = self.db_engine.connect()
query = self.tasks_tbl.update().where(
self.tasks_tbl.c.task_id == task.task_id.bytes).values(**(
task.to_db(include_id=False)))
rs = conn.execute(query)
if rs.rowcount == 1:
conn.close()
return True
else:
conn.close()
return False
with self.db_engine.connect() as conn:
query = self.tasks_tbl.update().where(
self.tasks_tbl.c.task_id == task.task_id.bytes).values(**(
task.to_db(include_id=False)))
rs = conn.execute(query)
if rs.rowcount == 1:
return True
else:
return False
except Exception as ex:
self.logger.error(
"Error updating task %s: %s" % (str(task.task_id), str(ex)))
@ -325,17 +313,16 @@ class DrydockState(object):
"WHERE task_id = :task_id").execution_options(autocommit=True)
try:
conn = self.db_engine.connect()
rs = conn.execute(
query_string,
new_subtask=subtask_id.bytes,
task_id=task_id.bytes)
rc = rs.rowcount
conn.close()
if rc == 1:
return True
else:
return False
with self.db_engine.connect() as conn:
rs = conn.execute(
query_string,
new_subtask=subtask_id.bytes,
task_id=task_id.bytes)
rc = rs.rowcount
if rc == 1:
return True
else:
return False
except Exception as ex:
self.logger.error("Error appending subtask %s to task %s: %s" %
(str(subtask_id), str(task_id), str(ex)))
@ -347,18 +334,17 @@ class DrydockState(object):
:param leader_id: uuid.UUID ID of the leader
"""
try:
conn = self.db_engine.connect()
query = self.active_instance_tbl.update().where(
self.active_instance_tbl.c.identity == leader_id.bytes).values(
last_ping=datetime.utcnow())
rs = conn.execute(query)
rc = rs.rowcount
conn.close()
with self.db_engine.connect() as conn:
query = self.active_instance_tbl.update().where(
self.active_instance_tbl.c.identity == leader_id.bytes).values(
last_ping=datetime.utcnow())
rs = conn.execute(query)
rc = rs.rowcount
if rc == 1:
return True
else:
return False
if rc == 1:
return True
else:
return False
except Exception as ex:
self.logger.error("Error maintaining leadership: %s" % str(ex))
@ -385,13 +371,12 @@ class DrydockState(object):
autocommit=True)
try:
conn = self.db_engine.connect()
conn.execute(query_string, instance_id=leader_id.bytes)
check_query = self.active_instance_tbl.select().where(
self.active_instance_tbl.c.identity == leader_id.bytes)
rs = conn.execute(check_query)
r = rs.fetchone()
conn.close()
with self.db_engine.connect() as conn:
conn.execute(query_string, instance_id=leader_id.bytes)
check_query = self.active_instance_tbl.select().where(
self.active_instance_tbl.c.identity == leader_id.bytes)
rs = conn.execute(check_query)
r = rs.fetchone()
if r is not None:
return True
else:
@ -406,12 +391,11 @@ class DrydockState(object):
:param leader_id: a uuid.UUID instance identifying the instance giving up leadership
"""
try:
conn = self.db_engine.connect()
query = self.active_instance_tbl.delete().where(
self.active_instance_tbl.c.identity == leader_id.bytes)
rs = conn.execute(query)
rc = rs.rowcount
conn.close()
with self.db_engine.connect() as conn:
query = self.active_instance_tbl.delete().where(
self.active_instance_tbl.c.identity == leader_id.bytes)
rs = conn.execute(query)
rc = rs.rowcount
if rc == 1:
return True