From 0010803288748fcd3ce7dba212a54bffe7a61a0c Mon Sep 17 00:00:00 2001 From: Yuriy Taraday Date: Thu, 28 Aug 2014 14:27:58 +0400 Subject: [PATCH] Add a pool of memcached clients This patchset adds a pool of memcache clients. This pool allows for reuse of a client object, prevents too many client object from being instantiated, and maintains proper tracking of dead servers so as to limit delays when a server (or all servers) become unavailable. The new memcache pool backend is available either by being set as the memcache backend or by using keystone.token.persistence.backends.memcache_pool.Token for the Token memcache persistence driver. [memcache] servers = 127.0.0.1:11211 dead_retry = 300 socket_timeout = 3 pool_maxsize = 10 pool_unused_timeout = 60 Where: - servers - comma-separated list of host:port pairs (was already there); - dead_retry - number of seconds memcached server is considered dead before it is tried again; - socket_timeout - timeout in seconds for every call to a server; - pool_maxsize - max total number of open connections in the pool; - pool_unused_timeout - number of seconds a connection is held unused in the pool before it is closed; The new memcache pool backend can be used as the driver for the Keystone caching layer. To use it as caching driver, set 'keystone.cache.memcache_pool' as the value of the [cache]\backend option, the other options are the same as above, but with 'memcache_' prefix: [cache] backend = keystone.cache.memcache_pool memcache_servers = 127.0.0.1:11211 memcache_dead_retry = 300 memcache_socket_timeout = 3 memcache_pool_maxsize = 10 memcache_pool_unused_timeout = 60 Co-Authored-By: Morgan Fainberg Closes-bug: #1332058 Closes-bug: #1360446 Change-Id: I3544894482b30a47fcd4fac8948d03136fd83f14 --- doc/source/configuration.rst | 42 ++++ etc/keystone.conf.sample | 68 +++++- keystone/common/cache/_memcache_pool.py | 196 ++++++++++++++++++ .../common/cache/backends/memcache_pool.py | 61 ++++++ keystone/common/cache/core.py | 14 ++ keystone/common/config.py | 64 +++++- keystone/common/kvs/backends/memcached.py | 9 +- keystone/tests/core.py | 1 + .../tests/unit/common/test_connection_pool.py | 119 +++++++++++ .../token/persistence/backends/memcache.py | 2 + .../persistence/backends/memcache_pool.py | 27 +++ 11 files changed, 587 insertions(+), 16 deletions(-) create mode 100644 keystone/common/cache/_memcache_pool.py create mode 100644 keystone/common/cache/backends/memcache_pool.py create mode 100644 keystone/tests/unit/common/test_connection_pool.py create mode 100644 keystone/token/persistence/backends/memcache_pool.py diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 5cefbc14cc..62e819ee9b 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -296,6 +296,40 @@ disables external authentication. For more details, refer to :doc:`External Authentication `. +Token Persistence Driver +------------------------ + +Keystone supports customizable token persistence drivers. These can be specified +in the ``[token]`` section of the configuration file. Keystone provides three +non-test persistence backends. These can be set with the ``[token]\driver`` +configuration option. + +The drivers Keystone provides are: + +* ``keystone.token.persistence.backends.sql.Token`` - The SQL-based (default) + token persistence engine. This backend stores all token data in the same SQL + store that is used for Identity/Assignment/etc. + +* ``keystone.token.persistence.backends.memcache.Token`` - The memcached based + token persistence backend. This backend relies on ``dogpile.cache`` and stores + the token data in a set of memcached servers. The servers urls are specified + in the ``[memcache]\servers`` configuration option in the Keystone config. + +* ``keystone.token.persistence.backends.memcache_pool.Token`` - The pooled memcached + token persistence engine. This backend supports the concept of pooled memcache + client object (allowing for the re-use of the client objects). This backend has + a number of extra tunable options in the ``[memcache]`` section of the config. + + +.. WARNING:: + It is recommended you use the ``keystone.token.persistence.backend.memcache_pool.Token`` + backend instead of ``keystone.token.persistence.backend.memcache.Token`` as the token + persistence driver if you are deploying Keystone under eventlet instead of + Apache + mod_wsgi. This recommendation are due to known issues with the use of + ``thread.local`` under eventlet that can allow the leaking of memcache client objects + and consumption of extra sockets. + + Token Provider -------------- @@ -372,6 +406,8 @@ behavior is that subsystem caching is enabled, but the global toggle is set to d * ``dogpile.cache.dbm`` - local DBM file backend * ``dogpile.cache.memory`` - in-memory cache * ``keystone.cache.mongo`` - MongoDB as caching backend + * ``keystone.cache.memcache_pool`` - An eventlet safe implementation of ``dogpile.cache.memcached``. + This implementation also provides client connection re-use. .. WARNING:: ``dogpile.cache.memory`` is not suitable for use outside of unit testing @@ -383,6 +419,12 @@ behavior is that subsystem caching is enabled, but the global toggle is set to d when using ``Keystone`` and the ``dogpile.cache.memory`` backend under any real workload. + .. WARNING:: + Do not use ``dogpile.cache.memcached`` backend if you are deploying + Keystone under eventlet. There are known issues with the use of ``thread.local`` + under eventlet that can allow the leaking of memcache client objects and + consumption of extra sockets. + * ``expiration_time`` - int, the default length of time to cache a specific value. A value of ``0`` indicates to not cache anything. It is recommended that the ``enabled`` option be used to disable cache instead of setting this to ``0``. diff --git a/etc/keystone.conf.sample b/etc/keystone.conf.sample index cfd970ac62..c058a030c7 100644 --- a/etc/keystone.conf.sample +++ b/etc/keystone.conf.sample @@ -507,10 +507,10 @@ #expiration_time=600 # Dogpile.cache backend module. It is recommended that -# Memcache (dogpile.cache.memcached) or Redis -# (dogpile.cache.redis) be used in production deployments. -# Small workloads (single process) like devstack can use the -# dogpile.cache.memory backend. (string value) +# Memcache with pooling (keystone.cache.memcache_pool) or +# Redis (dogpile.cache.redis) be used in production +# deployments. Small workloads (single process) like devstack +# can use the dogpile.cache.memory backend. (string value) #backend=keystone.common.cache.noop # Arguments supplied to the backend module. Specify this @@ -534,6 +534,35 @@ # false. (boolean value) #debug_cache_backend=false +# Memcache servers in the format of "host:port". +# (dogpile.cache.memcache and keystone.cache.memcache_pool +# backends only) (list value) +#memcache_servers=localhost:11211 + +# Number of seconds memcached server is considered dead before +# it is tried again. (dogpile.cache.memcache and +# keystone.cache.memcache_pool backends only) (integer value) +#memcache_dead_retry=300 + +# Timeout in seconds for every call to a server. +# (dogpile.cache.memcache and keystone.cache.memcache_pool +# backends only) (integer value) +#memcache_socket_timeout=3 + +# Max total number of open connections to every memcached +# server. (keystone.cache.memcache_pool backend only) (integer +# value) +#memcache_pool_maxsize=10 + +# Number of seconds a connection to memcached is held unused +# in the pool before it is closed. +# (keystone.cache.memcache_pool backend only) (integer value) +#memcache_pool_unused_timeout=60 + +# Number of seconds that an operation will wait to get a +# memcache client connection. (integer value) +#memcache_pool_connection_get_timeout=10 + [catalog] @@ -1212,10 +1241,33 @@ # Memcache servers in the format of "host:port". (list value) #servers=localhost:11211 -# Number of compare-and-set attempts to make when using -# compare-and-set in the token memcache back end. (integer -# value) -#max_compare_and_set_retry=16 +# Number of seconds memcached server is considered dead before +# it is tried again. This is used by the key value store +# system (e.g. token pooled memcached persistence backend). +# (integer value) +#dead_retry=300 + +# Timeout in seconds for every call to a server. This is used +# by the key value store system (e.g. token pooled memcached +# persistence backend). (integer value) +#socket_timeout=3 + +# Max total number of open connections to every memcached +# server. This is used by the key value store system (e.g. +# token pooled memcached persistence backend). (integer value) +#pool_maxsize=10 + +# Number of seconds a connection to memcached is held unused +# in the pool before it is closed. This is used by the key +# value store system (e.g. token pooled memcached persistence +# backend). (integer value) +#pool_unused_timeout=60 + +# Number of seconds that an operation will wait to get a +# memcache client connection. This is used by the key value +# store system (e.g. token pooled memcached persistence +# backend). (integer value) +#pool_connection_get_timeout=10 [oauth1] diff --git a/keystone/common/cache/_memcache_pool.py b/keystone/common/cache/_memcache_pool.py new file mode 100644 index 0000000000..70b86b684f --- /dev/null +++ b/keystone/common/cache/_memcache_pool.py @@ -0,0 +1,196 @@ +# Copyright 2014 Mirantis Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Thread-safe connection pool for python-memcached.""" + +# NOTE(yorik-sar): this file is copied between keystone and keystonemiddleware +# and should be kept in sync until we can use external library for this. + +import collections +import contextlib +import itertools +import logging +import threading +import time + +import memcache +from six.moves import queue + +from keystone import exception +from keystone.i18n import _ +from keystone.openstack.common import log + + +LOG = log.getLogger(__name__) + +# NOTE(morganfainberg): This is used as the maximum number of seconds a get +# of a new connection will wait for before raising an exception indicating +# a serious / most likely non-recoverable delay has occurred. +CONNECTION_GET_TIMEOUT = 120 + +# This 'class' is taken from http://stackoverflow.com/a/22520633/238308 +# Don't inherit client from threading.local so that we can reuse clients in +# different threads +_MemcacheClient = type('_MemcacheClient', (object,), + dict(memcache.Client.__dict__)) + +_PoolItem = collections.namedtuple('_PoolItem', ['ttl', 'connection']) + + +class ConnectionPool(queue.Queue): + """Base connection pool class + + This class implements the basic connection pool logic as an abstract base + class. + """ + def __init__(self, maxsize, unused_timeout, conn_get_timeout=None): + """Initialize the connection pool. + + :param maxsize: maximum number of client connections for the pool + :type maxsize: int + :param unused_timeout: idle time to live for unused clients (in + seconds). If a client connection object has been + in the pool and idle for longer than the + unused_timeout, it will be reaped. This is to + ensure resources are released as utilization + goes down. + :type unused_timeout: int + :param conn_get_timeout: maximum time in seconds to wait for a + connection. If set to `None` timeout is + indefinite. + :type conn_get_timeout: int + """ + queue.Queue.__init__(self, maxsize) + self._unused_timeout = unused_timeout + self._connection_get_timeout = conn_get_timeout + self._acquired = 0 + + def _create_connection(self): + raise NotImplementedError + + def _destroy_connection(self, conn): + raise NotImplementedError + + def _debug_logger(self, msg, *args, **kwargs): + if LOG.isEnabledFor(logging.DEBUG): + thread_id = threading.current_thread().ident + args = (id(self), thread_id) + args + prefix = 'Memcached pool %s, thread %s: ' + LOG.debug(prefix + msg, *args, **kwargs) + + @contextlib.contextmanager + def acquire(self): + self._debug_logger('Acquiring connection') + try: + conn = self.get(timeout=self._connection_get_timeout) + except queue.Empty: + raise exception.UnexpectedError( + _('Unable to get a connection from pool id %(id)s after ' + '%(seconds)s seconds.') % + {'id': id(self), 'seconds': self._connection_get_timeout}) + self._debug_logger('Acquired connection %s', id(conn)) + try: + yield conn + finally: + self._debug_logger('Releasing connection %s', id(conn)) + self.put(conn) + + def _qsize(self): + return self.maxsize - self._acquired + + if not hasattr(queue.Queue, '_qsize'): + qsize = _qsize + + def _get(self): + if self.queue: + conn = self.queue.pop().connection + else: + conn = self._create_connection() + self._acquired += 1 + return conn + + def _put(self, conn): + self.queue.append(_PoolItem( + ttl=time.time() + self._unused_timeout, + connection=conn, + )) + self._acquired -= 1 + # Drop all expired connections from the right end of the queue + now = time.time() + while self.queue and self.queue[0].ttl < now: + conn = self.queue.popleft().connection + self._debug_logger('Reaping connection %s', id(conn)) + self._destroy_connection(conn) + + +class MemcacheClientPool(ConnectionPool): + def __init__(self, urls, arguments, **kwargs): + ConnectionPool.__init__(self, **kwargs) + self.urls = urls + self._arguments = arguments + # NOTE(morganfainberg): The host objects expect an int for the + # deaduntil value. Initialize this at 0 for each host with 0 indicating + # the host is not dead. + self._hosts_deaduntil = [0] * len(urls) + + def _create_connection(self): + return _MemcacheClient(self.urls, **self._arguments) + + def _destroy_connection(self, conn): + conn.disconnect_all() + + def _get(self): + conn = ConnectionPool._get(self) + try: + # Propagate host state known to us to this client's list + now = time.time() + for deaduntil, host in zip(self._hosts_deaduntil, conn.servers): + if deaduntil > now and host.deaduntil <= now: + host.mark_dead('propagating death mark from the pool') + host.deaduntil = deaduntil + except Exception: + # We need to be sure that connection doesn't leak from the pool. + # This code runs before we enter context manager's try-finally + # block, so we need to explicitly release it here + ConnectionPool._put(self, conn) + raise + return conn + + def _put(self, conn): + try: + # If this client found that one of the hosts is dead, mark it as + # such in our internal list + now = time.time() + for i, deaduntil, host in zip(itertools.count(), + self._hosts_deaduntil, + conn.servers): + # Do nothing if we already know this host is dead + if deaduntil <= now: + if host.deaduntil > now: + self._hosts_deaduntil[i] = host.deaduntil + self._debug_logger( + 'Marked host %s dead until %s', + self.urls[i], host.deaduntil) + else: + self._hosts_deaduntil[i] = 0 + # If all hosts are dead we should forget that they're dead. This + # way we won't get completely shut off until dead_retry seconds + # pass, but will be checking servers as frequent as we can (over + # way smaller socket_timeout) + if all(deaduntil > now for deaduntil in self._hosts_deaduntil): + self._debug_logger('All hosts are dead. Marking them as live.') + self._hosts_deaduntil[:] = [0] * len(self._hosts_deaduntil) + finally: + ConnectionPool._put(self, conn) diff --git a/keystone/common/cache/backends/memcache_pool.py b/keystone/common/cache/backends/memcache_pool.py new file mode 100644 index 0000000000..f3990b1264 --- /dev/null +++ b/keystone/common/cache/backends/memcache_pool.py @@ -0,0 +1,61 @@ +# Copyright 2014 Mirantis Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""dogpile.cache backend that uses Memcached connection pool""" + +import functools +import logging + +from dogpile.cache.backends import memcached as memcached_backend + +from keystone.common.cache import _memcache_pool + + +LOG = logging.getLogger(__name__) + + +# Helper to ease backend refactoring +class ClientProxy(object): + def __init__(self, client_pool): + self.client_pool = client_pool + + def _run_method(self, __name, *args, **kwargs): + with self.client_pool.acquire() as client: + return getattr(client, __name)(*args, **kwargs) + + def __getattr__(self, name): + return functools.partial(self._run_method, name) + + +class PooledMemcachedBackend(memcached_backend.MemcachedBackend): + # Composed from GenericMemcachedBackend's and MemcacheArgs's __init__ + def __init__(self, arguments): + super(PooledMemcachedBackend, self).__init__(arguments) + self.client_pool = _memcache_pool.MemcacheClientPool( + self.url, + arguments={ + 'dead_retry': arguments.get('dead_retry', 5 * 60), + 'socket_timeout': arguments.get('socket_timeout', 3), + }, + maxsize=arguments.get('pool_maxsize', 10), + unused_timeout=arguments.get('pool_unused_timeout', 60), + conn_get_timeout=arguments.get('pool_connection_get_timeout', 10), + ) + + # Since all methods in backend just call one of methods of client, this + # lets us avoid need to hack it too much + @property + def client(self): + return ClientProxy(self.client_pool) diff --git a/keystone/common/cache/core.py b/keystone/common/cache/core.py index 0fc1353bf8..3ba528747b 100644 --- a/keystone/common/cache/core.py +++ b/keystone/common/cache/core.py @@ -40,6 +40,11 @@ dogpile.cache.register_backend( 'keystone.common.cache.backends.mongo', 'MongoCacheBackend') +dogpile.cache.register_backend( + 'keystone.cache.memcache_pool', + 'keystone.common.cache.backends.memcache_pool', + 'PooledMemcachedBackend') + class DebugProxy(proxy.ProxyBackend): """Extra Logging ProxyBackend.""" @@ -102,6 +107,15 @@ def build_cache_config(): conf_dict[arg_key] = argvalue LOG.debug('Keystone Cache Config: %s', conf_dict) + # NOTE(yorik-sar): these arguments will be used for memcache-related + # backends. Use setdefault for url to support old-style setting through + # backend_argument=url:127.0.0.1:11211 + conf_dict.setdefault('%s.arguments.url' % prefix, + CONF.cache.memcache_servers) + for arg in ('dead_retry', 'socket_timeout', 'pool_maxsize', + 'pool_unused_timeout', 'pool_connection_get_timeout'): + value = getattr(CONF.cache, 'memcache_' + arg) + conf_dict['%s.arguments.%s' % (prefix, arg)] = value return conf_dict diff --git a/keystone/common/config.py b/keystone/common/config.py index fee1fe525d..d7f9dd8118 100644 --- a/keystone/common/config.py +++ b/keystone/common/config.py @@ -307,7 +307,8 @@ FILE_OPTIONS = { # backend. cfg.StrOpt('backend', default='keystone.common.cache.noop', help='Dogpile.cache backend module. It is recommended ' - 'that Memcache (dogpile.cache.memcached) or Redis ' + 'that Memcache with pooling ' + '(keystone.cache.memcache_pool) or Redis ' '(dogpile.cache.redis) be used in production ' 'deployments. Small workloads (single process) ' 'like devstack can use the dogpile.cache.memory ' @@ -332,6 +333,34 @@ FILE_OPTIONS = { 'cache-backend get/set/delete calls with the ' 'keys/values. Typically this should be left set ' 'to false.'), + cfg.ListOpt('memcache_servers', default=['localhost:11211'], + help='Memcache servers in the format of "host:port".' + ' (dogpile.cache.memcache and keystone.cache.memcache_pool' + ' backends only)'), + cfg.IntOpt('memcache_dead_retry', + default=5 * 60, + help='Number of seconds memcached server is considered dead' + ' before it is tried again. (dogpile.cache.memcache and' + ' keystone.cache.memcache_pool backends only)'), + cfg.IntOpt('memcache_socket_timeout', + default=3, + help='Timeout in seconds for every call to a server.' + ' (dogpile.cache.memcache and keystone.cache.memcache_pool' + ' backends only)'), + cfg.IntOpt('memcache_pool_maxsize', + default=10, + help='Max total number of open connections to every' + ' memcached server. (keystone.cache.memcache_pool backend' + ' only)'), + cfg.IntOpt('memcache_pool_unused_timeout', + default=60, + help='Number of seconds a connection to memcached is held' + ' unused in the pool before it is closed.' + ' (keystone.cache.memcache_pool backend only)'), + cfg.IntOpt('memcache_pool_connection_get_timeout', + default=10, + help='Number of seconds that an operation will wait to get ' + 'a memcache client connection.'), ], 'ssl': [ cfg.BoolOpt('enable', default=False, @@ -771,10 +800,35 @@ FILE_OPTIONS = { 'memcache': [ cfg.ListOpt('servers', default=['localhost:11211'], help='Memcache servers in the format of "host:port".'), - cfg.IntOpt('max_compare_and_set_retry', default=16, - help='Number of compare-and-set attempts to make when ' - 'using compare-and-set in the token memcache back ' - 'end.'), + cfg.IntOpt('dead_retry', + default=5 * 60, + help='Number of seconds memcached server is considered dead' + ' before it is tried again. This is used by the key ' + 'value store system (e.g. token ' + 'pooled memcached persistence backend).'), + cfg.IntOpt('socket_timeout', + default=3, + help='Timeout in seconds for every call to a server. This ' + 'is used by the key value store system (e.g. token ' + 'pooled memcached persistence backend).'), + cfg.IntOpt('pool_maxsize', + default=10, + help='Max total number of open connections to every' + ' memcached server. This is used by the key value ' + 'store system (e.g. token pooled memcached ' + 'persistence backend).'), + cfg.IntOpt('pool_unused_timeout', + default=60, + help='Number of seconds a connection to memcached is held' + ' unused in the pool before it is closed. This is used' + ' by the key value store system (e.g. token pooled ' + 'memcached persistence backend).'), + cfg.IntOpt('pool_connection_get_timeout', + default=10, + help='Number of seconds that an operation will wait to get ' + 'a memcache client connection. This is used by the ' + 'key value store system (e.g. token pooled memcached ' + 'persistence backend).'), ], 'catalog': [ cfg.StrOpt('template_file', diff --git a/keystone/common/kvs/backends/memcached.py b/keystone/common/kvs/backends/memcached.py index 795082789d..0d8eeb8b58 100644 --- a/keystone/common/kvs/backends/memcached.py +++ b/keystone/common/kvs/backends/memcached.py @@ -22,6 +22,7 @@ import time from dogpile.cache import api from dogpile.cache.backends import memcached +from keystone.common.cache.backends import memcache_pool from keystone.common import manager from keystone import config from keystone import exception @@ -34,9 +35,11 @@ LOG = log.getLogger(__name__) NO_VALUE = api.NO_VALUE -VALID_DOGPILE_BACKENDS = dict(pylibmc=memcached.PylibmcBackend, - bmemcached=memcached.BMemcachedBackend, - memcached=memcached.MemcachedBackend) +VALID_DOGPILE_BACKENDS = dict( + pylibmc=memcached.PylibmcBackend, + bmemcached=memcached.BMemcachedBackend, + memcached=memcached.MemcachedBackend, + pooled_memcached=memcache_pool.PooledMemcachedBackend) class MemcachedLock(object): diff --git a/keystone/tests/core.py b/keystone/tests/core.py index 787847fd7b..a22afdff3b 100644 --- a/keystone/tests/core.py +++ b/keystone/tests/core.py @@ -395,6 +395,7 @@ class TestCase(BaseTestCase): 'routes.middleware=INFO', 'stevedore.extension=INFO', 'keystone.notifications=INFO', + 'keystone.common._memcache_pool=INFO', ]) self.auth_plugin_config_override() diff --git a/keystone/tests/unit/common/test_connection_pool.py b/keystone/tests/unit/common/test_connection_pool.py new file mode 100644 index 0000000000..f7d6664316 --- /dev/null +++ b/keystone/tests/unit/common/test_connection_pool.py @@ -0,0 +1,119 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +import mock +from six.moves import queue +import testtools +from testtools import matchers + +from keystone.common.cache import _memcache_pool +from keystone import exception +from keystone.tests import core + + +class _TestConnectionPool(_memcache_pool.ConnectionPool): + destroyed_value = 'destroyed' + + def _create_connection(self): + return mock.MagicMock() + + def _destroy_connection(self, conn): + conn(self.destroyed_value) + + +class TestConnectionPool(core.TestCase): + def setUp(self): + super(TestConnectionPool, self).setUp() + self.unused_timeout = 10 + self.maxsize = 2 + self.connection_pool = _TestConnectionPool( + maxsize=self.maxsize, + unused_timeout=self.unused_timeout) + self.addCleanup(self.cleanup_instance('connection_pool')) + + def test_get_context_manager(self): + self.assertThat(self.connection_pool.queue, matchers.HasLength(0)) + with self.connection_pool.acquire() as conn: + self.assertEqual(1, self.connection_pool._acquired) + self.assertEqual(0, self.connection_pool._acquired) + self.assertThat(self.connection_pool.queue, matchers.HasLength(1)) + self.assertEqual(conn, self.connection_pool.queue[0].connection) + + def test_cleanup_pool(self): + self.test_get_context_manager() + newtime = time.time() + self.unused_timeout * 2 + non_expired_connection = _memcache_pool._PoolItem( + ttl=(newtime * 2), + connection=mock.MagicMock()) + self.connection_pool.queue.append(non_expired_connection) + self.assertThat(self.connection_pool.queue, matchers.HasLength(2)) + with mock.patch.object(time, 'time', return_value=newtime): + conn = self.connection_pool.queue[0].connection + with self.connection_pool.acquire(): + pass + conn.assert_has_calls( + [mock.call(self.connection_pool.destroyed_value)]) + self.assertThat(self.connection_pool.queue, matchers.HasLength(1)) + self.assertEqual(0, non_expired_connection.connection.call_count) + + def test_acquire_conn_exception_returns_acquired_count(self): + class TestException(Exception): + pass + + with mock.patch.object(_TestConnectionPool, '_create_connection', + side_effect=TestException): + with testtools.ExpectedException(TestException): + with self.connection_pool.acquire(): + pass + self.assertThat(self.connection_pool.queue, + matchers.HasLength(0)) + self.assertEqual(0, self.connection_pool._acquired) + + def test_connection_pool_limits_maximum_connections(self): + # NOTE(morganfainberg): To ensure we don't lockup tests until the + # job limit, explicitly call .get_nowait() and .put_nowait() in this + # case. + conn1 = self.connection_pool.get_nowait() + conn2 = self.connection_pool.get_nowait() + + # Use a nowait version to raise an Empty exception indicating we would + # not get another connection until one is placed back into the queue. + self.assertRaises(queue.Empty, self.connection_pool.get_nowait) + + # Place the connections back into the pool. + self.connection_pool.put_nowait(conn1) + self.connection_pool.put_nowait(conn2) + + # Make sure we can get a connection out of the pool again. + self.connection_pool.get_nowait() + + def test_connection_pool_maximum_connection_get_timeout(self): + connection_pool = _TestConnectionPool( + maxsize=1, + unused_timeout=self.unused_timeout, + conn_get_timeout=0) + + def _acquire_connection(): + with connection_pool.acquire(): + pass + + # Make sure we've consumed the only available connection from the pool + conn = connection_pool.get_nowait() + + self.assertRaises(exception.UnexpectedError, _acquire_connection) + + # Put the connection back and ensure we can acquire the connection + # after it is available. + connection_pool.put_nowait(conn) + _acquire_connection() diff --git a/keystone/token/persistence/backends/memcache.py b/keystone/token/persistence/backends/memcache.py index 01d9f0674a..53293c01f9 100644 --- a/keystone/token/persistence/backends/memcache.py +++ b/keystone/token/persistence/backends/memcache.py @@ -22,8 +22,10 @@ CONF = config.CONF class Token(kvs.Token): kvs_backend = 'openstack.kvs.Memcached' + memcached_backend = 'memcached' def __init__(self, *args, **kwargs): + kwargs['memcached_backend'] = self.memcached_backend kwargs['no_expiry_keys'] = [self.revocation_key] kwargs['memcached_expire_time'] = CONF.token.expiration kwargs['url'] = CONF.memcache.servers diff --git a/keystone/token/persistence/backends/memcache_pool.py b/keystone/token/persistence/backends/memcache_pool.py new file mode 100644 index 0000000000..3b1bdb4249 --- /dev/null +++ b/keystone/token/persistence/backends/memcache_pool.py @@ -0,0 +1,27 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from keystone.common import config +from keystone.token.persistence.backends import memcache + + +CONF = config.CONF + + +class Token(memcache.Token): + memcached_backend = 'pooled_memcached' + + def __init__(self, *args, **kwargs): + for arg in ('dead_retry', 'socket_timeout', 'pool_maxsize', + 'pool_unused_timeout', 'pool_connection_get_timeout'): + kwargs[arg] = getattr(CONF.memcache, arg) + super(Token, self).__init__(*args, **kwargs)