From 8add0260f2af1b82c9cc77ec69895652b4d5c467 Mon Sep 17 00:00:00 2001 From: Jedrzej Nowak Date: Mon, 7 Mar 2016 16:14:42 +0100 Subject: [PATCH] Implement better and more solar specific pool for PG Supported parameters - pool_size how big is the pool - pool_overflow how many overflow connections should be allowed Change-Id: Iba92eb94754ef7314bc3d4bf0e413e7d61e027f8 --- bootstrap/playbooks/postgres.yaml | 2 +- solar/dblayer/__init__.py | 1 + solar/dblayer/sql_client.py | 11 +++ solar/dblayer/sql_pool.py | 120 ++++++++++++++++++++++++++++++ 4 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 solar/dblayer/sql_pool.py diff --git a/bootstrap/playbooks/postgres.yaml b/bootstrap/playbooks/postgres.yaml index 696d5a96..8d133508 100644 --- a/bootstrap/playbooks/postgres.yaml +++ b/bootstrap/playbooks/postgres.yaml @@ -18,6 +18,6 @@ - shell: docker exec vagrant_pg_1 psql -h localhost -U solar postgres -c "CREATE DATABASE solar WITH owner=postgres LC_COLLATE='C' TEMPLATE template0;" - lineinfile: dest: /.solar_config_override - line: "solar_db: postgresql://solar:solar@127.0.0.1:5432/solar" + line: "solar_db: postgresql://solar:solar@127.0.0.1:5432/solar?solar_pool=True" state: present create: yes diff --git a/solar/dblayer/__init__.py b/solar/dblayer/__init__.py index 5077e8d9..6ba7aeb6 100644 --- a/solar/dblayer/__init__.py +++ b/solar/dblayer/__init__.py @@ -81,6 +81,7 @@ elif _connection.mode == 'postgresql': opts["password"] = _connection.password # TODO: allow set Postgresql classes from playhouse opts.setdefault('db_class', 'PostgresqlDatabase') + opts.setdefault('solar_pool', True) client = SqlClient(_connection.database, **opts) else: diff --git a/solar/dblayer/sql_client.py b/solar/dblayer/sql_client.py index d4539cf2..3eafdba9 100644 --- a/solar/dblayer/sql_client.py +++ b/solar/dblayer/sql_client.py @@ -18,6 +18,7 @@ import json import sys import uuid +import yaml from peewee import BlobField from peewee import CharField @@ -416,6 +417,16 @@ class SqlClient(object): fromlist = db_class_str __import__(mod, fromlist=[fromlist]) db_class = getattr(sys.modules[mod], fromlist) + args = map(yaml.safe_load, args) + kwargs = {k: yaml.safe_load(v) if isinstance(v, basestring) else v + for k, v in kwargs.items()} + # use_pool is defaulted to True with PG backend + use_pool = kwargs.pop('solar_pool', False) + if use_pool: + from solar.dblayer.sql_pool import get_pooled_db + db_class = get_pooled_db(db_class, + kwargs.pop('solar_pool_size', 7), + kwargs.pop('solar_pool_overflow', 20)) session = db_class(*args, **kwargs) self._sql_session = session self.buckets = {} diff --git a/solar/dblayer/sql_pool.py b/solar/dblayer/sql_pool.py new file mode 100644 index 00000000..95fd9295 --- /dev/null +++ b/solar/dblayer/sql_pool.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import gevent + +from collections import deque +from gevent.lock import RLock +from gevent.lock import Semaphore + + +class FakeLock(object): + """A lock that does not lock, used to replace peewee connection lock""" + + def acquire(self, *args, **kwargs): + return True + + def release(self, *args, **kwargs): + return True + + def __enter__(self): + return True + + def __exit__(self, *args, **kwargs): + return True + + def __repr__(self): + return "<%s at 0x%x>" % (self.__class__.__name__, id(self)) + + +def get_pooled_db(orig_db, pool_size, pool_overflow): + """Wrapper that makes enables pool in DB + +This is wrapper for peewee database that has separate own logic of +connection pooling, adjusted for our use case +""" + db_lock = RLock() + pool = deque() + pool_size_overflow = pool_size + pool_overflow + + _tmp_lock = RLock() + _conn_count = [0, 0, 0] + s = Semaphore(pool_size_overflow) + + def close_conn(g): + g._db_obj._close(g._db_conn) + + def pooled__connect(obj, *args, **kwargs): + ret = None + n = 10 + max_tries = n - 1 + for i in xrange(n): + if not s.acquire(blocking=True, timeout=0.1): + continue + try: + try: + ret = pool.pop() + c = ret.cursor() + try: + c.execute('SELECT 1;') + finally: + c.close() + except IndexError: + with _tmp_lock: + _conn_count[0] += 1 + _conn_count[1] += 1 + ret = obj._orig__connect(*args, **kwargs) + except Exception: + s.release() + if i == max_tries: + raise + else: + if ret is not None: + break + if ret is None: + raise Exception('To many connections') + + cg = gevent.getcurrent() + if hasattr(cg, 'link'): + cg._db_conn = ret + cg._db_obj = obj + cg.link(close_conn) + return ret + + def pooled__close(obj, conn): + with _tmp_lock: + _conn_count[2] += 1 + + with db_lock: + if len(pool) < pool_size: + conn.rollback() # if anything left, rollback + pool.append(conn) + s.release() + return + else: + s.release() + with _tmp_lock: + _conn_count[0] -= 1 + + return obj._orig__close(conn) + + l = locals() + for x in ('_connect', '_close'): + orig = getattr(orig_db, x) + setattr(orig_db, "_orig_%s" % x, orig) + setattr(orig_db, x, l['pooled_%s' % x]) + + orig_db._conn_lock = FakeLock() + return orig_db