Add an optional advanced pool of memcached clients
This patchset adds an advanced eventlet safe pool of memcache clients. This allows the deployer to configure auth_token middleware to utilize the new pool by simply setting 'memcache_use_advanced_pool' to true. Optional tunables for the memcache pool have also been added. Co-Authored-By: Morgan Fainberg <morgan.fainberg@gmail.com> Closes-bug: #1332058 Closes-bug: #1360446 Change-Id: I08082b46ce692cf4df449d48dac94718f1e98a6c
This commit is contained in:
parent
f56e5fef3a
commit
045cddcea2
|
@ -0,0 +1,182 @@
|
|||
# Copyright 2014 Mirantis Inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Thread-safe connection pool for python-memcached."""
|
||||
|
||||
# NOTE(yorik-sar): this file is copied between keystone and keystonemiddleware
|
||||
# and should be kept in sync until we can use external library for this.
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
import itertools
|
||||
import logging
|
||||
import time
|
||||
|
||||
from six.moves import queue
|
||||
|
||||
|
||||
_PoolItem = collections.namedtuple('_PoolItem', ['ttl', 'connection'])
|
||||
|
||||
|
||||
class ConnectionGetTimeoutException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ConnectionPool(queue.Queue):
|
||||
"""Base connection pool class
|
||||
|
||||
This class implements the basic connection pool logic as an abstract base
|
||||
class.
|
||||
"""
|
||||
def __init__(self, maxsize, unused_timeout, conn_get_timeout=None):
|
||||
"""Initialize the connection pool.
|
||||
|
||||
:param maxsize: maximum number of client connections for the pool
|
||||
:type maxsize: int
|
||||
:param unused_timeout: idle time to live for unused clients (in
|
||||
seconds). If a client connection object has been
|
||||
in the pool and idle for longer than the
|
||||
unused_timeout, it will be reaped. This is to
|
||||
ensure resources are released as utilization
|
||||
goes down.
|
||||
:type unused_timeout: int
|
||||
:param conn_get_timeout: maximum time in seconds to wait for a
|
||||
connection. If set to `None` timeout is
|
||||
indefinite.
|
||||
:type conn_get_timeout: int
|
||||
"""
|
||||
queue.Queue.__init__(self, maxsize)
|
||||
self._unused_timeout = unused_timeout
|
||||
self._connection_get_timeout = conn_get_timeout
|
||||
self._acquired = 0
|
||||
self._LOG = logging.getLogger(__name__)
|
||||
|
||||
def _create_connection(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def _destroy_connection(self, conn):
|
||||
raise NotImplementedError
|
||||
|
||||
@contextlib.contextmanager
|
||||
def acquire(self):
|
||||
try:
|
||||
conn = self.get(timeout=self._connection_get_timeout)
|
||||
except queue.Empty:
|
||||
self._LOG.critical('Unable to get a connection from pool id '
|
||||
'%(id)s after %(seconds)s seconds.',
|
||||
{'id': id(self),
|
||||
'seconds': self._connection_get_timeout})
|
||||
raise ConnectionGetTimeoutException()
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
self.put(conn)
|
||||
|
||||
def _qsize(self):
|
||||
return self.maxsize - self._acquired
|
||||
|
||||
if not hasattr(queue.Queue, '_qsize'):
|
||||
qsize = _qsize
|
||||
|
||||
def _get(self):
|
||||
if self.queue:
|
||||
conn = self.queue.pop().connection
|
||||
else:
|
||||
conn = self._create_connection()
|
||||
self._acquired += 1
|
||||
return conn
|
||||
|
||||
def _put(self, conn):
|
||||
self.queue.append(_PoolItem(
|
||||
ttl=time.time() + self._unused_timeout,
|
||||
connection=conn,
|
||||
))
|
||||
self._acquired -= 1
|
||||
# Drop all expired connections from the right end of the queue
|
||||
now = time.time()
|
||||
while self.queue and self.queue[0].ttl < now:
|
||||
conn = self.queue.popleft().connection
|
||||
self._destroy_connection(conn)
|
||||
|
||||
|
||||
class MemcacheClientPool(ConnectionPool):
|
||||
def __init__(self, urls, arguments, **kwargs):
|
||||
ConnectionPool.__init__(self, **kwargs)
|
||||
self._urls = urls
|
||||
self._arguments = arguments
|
||||
# NOTE(morganfainberg): The host objects expect an int for the
|
||||
# deaduntil value. Initialize this at 0 for each host with 0 indicating
|
||||
# the host is not dead.
|
||||
self._hosts_deaduntil = [0] * len(urls)
|
||||
|
||||
# NOTE(morganfainberg): Lazy import to allow middleware to work with
|
||||
# python 3k even if memcache will not due to python 3k
|
||||
# incompatibilities within the python-memcache library.
|
||||
global memcache
|
||||
import memcache
|
||||
|
||||
# This 'class' is taken from http://stackoverflow.com/a/22520633/238308
|
||||
# Don't inherit client from threading.local so that we can reuse
|
||||
# clients in different threads
|
||||
MemcacheClient = type('_MemcacheClient', (object,),
|
||||
dict(memcache.Client.__dict__))
|
||||
|
||||
self._memcache_client_class = MemcacheClient
|
||||
|
||||
def _create_connection(self):
|
||||
return self._memcache_client_class(self._urls, **self._arguments)
|
||||
|
||||
def _destroy_connection(self, conn):
|
||||
conn.disconnect_all()
|
||||
|
||||
def _get(self):
|
||||
conn = ConnectionPool._get(self)
|
||||
try:
|
||||
# Propagate host state known to us to this client's list
|
||||
now = time.time()
|
||||
for deaduntil, host in zip(self._hosts_deaduntil, conn.servers):
|
||||
if deaduntil > now and host.deaduntil <= now:
|
||||
host.mark_dead('propagating death mark from the pool')
|
||||
host.deaduntil = deaduntil
|
||||
except Exception:
|
||||
# We need to be sure that connection doesn't leak from the pool.
|
||||
# This code runs before we enter context manager's try-finally
|
||||
# block, so we need to explicitly release it here
|
||||
ConnectionPool._put(self, conn)
|
||||
raise
|
||||
return conn
|
||||
|
||||
def _put(self, conn):
|
||||
try:
|
||||
# If this client found that one of the hosts is dead, mark it as
|
||||
# such in our internal list
|
||||
now = time.time()
|
||||
for i, deaduntil, host in zip(itertools.count(),
|
||||
self._hosts_deaduntil,
|
||||
conn.servers):
|
||||
# Do nothing if we already know this host is dead
|
||||
if deaduntil <= now:
|
||||
if host.deaduntil > now:
|
||||
self._hosts_deaduntil[i] = host.deaduntil
|
||||
else:
|
||||
self._hosts_deaduntil[i] = 0
|
||||
# If all hosts are dead we should forget that they're dead. This
|
||||
# way we won't get completely shut off until dead_retry seconds
|
||||
# pass, but will be checking servers as frequent as we can (over
|
||||
# way smaller socket_timeout)
|
||||
if all(deaduntil > now for deaduntil in self._hosts_deaduntil):
|
||||
self._hosts_deaduntil[:] = [0] * len(self._hosts_deaduntil)
|
||||
finally:
|
||||
ConnectionPool._put(self, conn)
|
|
@ -322,6 +322,31 @@ _OPTS = [
|
|||
secret=True,
|
||||
help='(optional, mandatory if memcache_security_strategy is'
|
||||
' defined) this string is used for key derivation.'),
|
||||
cfg.IntOpt('memcache_pool_dead_retry',
|
||||
default=5 * 60,
|
||||
help='(optional) number of seconds memcached server is'
|
||||
' considered dead before it is tried again.'),
|
||||
cfg.IntOpt('memcache_pool_maxsize',
|
||||
default=10,
|
||||
help='(optional) max total number of open connections to'
|
||||
' every memcached server.'),
|
||||
cfg.IntOpt('memcache_pool_socket_timeout',
|
||||
default=3,
|
||||
help='(optional) socket timeout in seconds for communicating '
|
||||
'with a memcache server.'),
|
||||
cfg.IntOpt('memcache_pool_unused_timeout',
|
||||
default=60,
|
||||
help='(optional) number of seconds a connection to memcached'
|
||||
' is held unused in the pool before it is closed.'),
|
||||
cfg.IntOpt('memcache_pool_conn_get_timeout',
|
||||
default=10,
|
||||
help='(optional) number of seconds that an operation will wait '
|
||||
'to get a memcache client connection from the pool.'),
|
||||
cfg.BoolOpt('memcache_use_advanced_pool',
|
||||
default=False,
|
||||
help='(optional) use the advanced (eventlet safe) memcache '
|
||||
'client pool. The advanced pool will only work under '
|
||||
'python 2.x.'),
|
||||
cfg.BoolOpt('include_service_catalog',
|
||||
default=True,
|
||||
help='(optional) indicate whether to set the X-Service-Catalog'
|
||||
|
@ -1245,7 +1270,17 @@ class AuthProtocol(object):
|
|||
env_cache_name=self._conf_get('cache'),
|
||||
memcached_servers=self._conf_get('memcached_servers'),
|
||||
memcache_security_strategy=self._memcache_security_strategy,
|
||||
memcache_secret_key=self._conf_get('memcache_secret_key'))
|
||||
memcache_secret_key=self._conf_get('memcache_secret_key'),
|
||||
use_advanced_pool=self._conf_get('memcache_use_advanced_pool'),
|
||||
memcache_pool_dead_retry=self._conf_get(
|
||||
'memcache_pool_dead_retry'),
|
||||
memcache_pool_maxsize=self._conf_get('memcache_pool_maxsize'),
|
||||
memcache_pool_unused_timeout=self._conf_get(
|
||||
'memcache_pool_unused_timeout'),
|
||||
memcache_pool_conn_get_timeout=self._conf_get(
|
||||
'memcache_pool_conn_get_timeout'),
|
||||
memcache_pool_socket_timeout=self._conf_get(
|
||||
'memcache_pool_socket_timeout'))
|
||||
return token_cache
|
||||
|
||||
|
||||
|
@ -1276,6 +1311,34 @@ class _CachePool(list):
|
|||
self.append(c)
|
||||
|
||||
|
||||
class _MemcacheClientPool(object):
|
||||
"""An advanced memcached client pool that is eventlet safe."""
|
||||
def __init__(self, memcache_servers, memcache_dead_retry=None,
|
||||
memcache_pool_maxsize=None, memcache_pool_unused_timeout=None,
|
||||
memcache_pool_conn_get_timeout=None,
|
||||
memcache_pool_socket_timeout=None):
|
||||
# NOTE(morganfainberg): import here to avoid hard dependency on
|
||||
# python-memcache library.
|
||||
global _memcache_pool
|
||||
from keystonemiddleware import _memcache_pool
|
||||
|
||||
self._pool = _memcache_pool.MemcacheClientPool(
|
||||
memcache_servers,
|
||||
arguments={
|
||||
'dead_retry': memcache_dead_retry,
|
||||
'socket_timeout': memcache_pool_socket_timeout,
|
||||
},
|
||||
maxsize=memcache_pool_maxsize,
|
||||
unused_timeout=memcache_pool_unused_timeout,
|
||||
conn_get_timeout=memcache_pool_conn_get_timeout,
|
||||
)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def reserve(self):
|
||||
with self._pool.get() as client:
|
||||
yield client
|
||||
|
||||
|
||||
class _IdentityServer(object):
|
||||
"""Operations on the Identity API server.
|
||||
|
||||
|
@ -1542,12 +1605,22 @@ class _TokenCache(object):
|
|||
|
||||
def __init__(self, log, cache_time=None, hash_algorithms=None,
|
||||
env_cache_name=None, memcached_servers=None,
|
||||
memcache_security_strategy=None, memcache_secret_key=None):
|
||||
memcache_security_strategy=None, memcache_secret_key=None,
|
||||
use_advanced_pool=False, memcache_pool_dead_retry=None,
|
||||
memcache_pool_maxsize=None, memcache_pool_unused_timeout=None,
|
||||
memcache_pool_conn_get_timeout=None,
|
||||
memcache_pool_socket_timeout=None):
|
||||
self._LOG = log
|
||||
self._cache_time = cache_time
|
||||
self._hash_algorithms = hash_algorithms
|
||||
self._env_cache_name = env_cache_name
|
||||
self._memcached_servers = memcached_servers
|
||||
self._use_advanced_pool = use_advanced_pool
|
||||
self._memcache_pool_dead_retry = memcache_pool_dead_retry,
|
||||
self._memcache_pool_maxsize = memcache_pool_maxsize,
|
||||
self._memcache_pool_unused_timeout = memcache_pool_unused_timeout
|
||||
self._memcache_pool_conn_get_timeout = memcache_pool_conn_get_timeout
|
||||
self._memcache_pool_socket_timeout = memcache_pool_socket_timeout
|
||||
|
||||
# memcache value treatment, ENCRYPT or MAC
|
||||
self._memcache_security_strategy = memcache_security_strategy
|
||||
|
@ -1561,12 +1634,36 @@ class _TokenCache(object):
|
|||
|
||||
self._assert_valid_memcache_protection_config()
|
||||
|
||||
def _get_cache_pool(self, cache, memcache_servers, use_advanced_pool=False,
|
||||
memcache_dead_retry=None, memcache_pool_maxsize=None,
|
||||
memcache_pool_unused_timeout=None,
|
||||
memcache_pool_conn_get_timeout=None,
|
||||
memcache_pool_socket_timeout=None):
|
||||
if use_advanced_pool is True and memcache_servers and cache is None:
|
||||
return _MemcacheClientPool(
|
||||
memcache_servers,
|
||||
memcache_dead_retry=memcache_dead_retry,
|
||||
memcache_pool_maxsize=memcache_pool_maxsize,
|
||||
memcache_pool_unused_timeout=memcache_pool_unused_timeout,
|
||||
memcache_pool_conn_get_timeout=memcache_pool_conn_get_timeout,
|
||||
memcache_pool_socket_timeout=memcache_pool_socket_timeout)
|
||||
else:
|
||||
return _CachePool(cache, memcache_servers)
|
||||
|
||||
def initialize(self, env):
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
self._cache_pool = _CachePool(env.get(self._env_cache_name),
|
||||
self._memcached_servers)
|
||||
self._cache_pool = self._get_cache_pool(
|
||||
env.get(self._env_cache_name),
|
||||
self._memcached_servers,
|
||||
use_advanced_pool=self._use_advanced_pool,
|
||||
memcache_dead_retry=self._memcache_pool_dead_retry,
|
||||
memcache_pool_maxsize=self._memcache_pool_maxsize,
|
||||
memcache_pool_unused_timeout=self._memcache_pool_unused_timeout,
|
||||
memcache_pool_conn_get_timeout=self._memcache_pool_conn_get_timeout
|
||||
)
|
||||
|
||||
self._initialized = True
|
||||
|
||||
def get(self, user_token):
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
import mock
|
||||
from six.moves import queue
|
||||
import testtools
|
||||
from testtools import matchers
|
||||
|
||||
from keystonemiddleware import _memcache_pool
|
||||
from keystonemiddleware.tests import utils
|
||||
|
||||
|
||||
class _TestConnectionPool(_memcache_pool.ConnectionPool):
|
||||
destroyed_value = 'destroyed'
|
||||
|
||||
def _create_connection(self):
|
||||
return mock.MagicMock()
|
||||
|
||||
def _destroy_connection(self, conn):
|
||||
conn(self.destroyed_value)
|
||||
|
||||
|
||||
class TestConnectionPool(utils.TestCase):
|
||||
def setUp(self):
|
||||
super(TestConnectionPool, self).setUp()
|
||||
self.unused_timeout = 10
|
||||
self.maxsize = 2
|
||||
self.connection_pool = _TestConnectionPool(
|
||||
maxsize=self.maxsize,
|
||||
unused_timeout=self.unused_timeout)
|
||||
|
||||
def test_get_context_manager(self):
|
||||
self.assertThat(self.connection_pool.queue, matchers.HasLength(0))
|
||||
with self.connection_pool.acquire() as conn:
|
||||
self.assertEqual(1, self.connection_pool._acquired)
|
||||
self.assertEqual(0, self.connection_pool._acquired)
|
||||
self.assertThat(self.connection_pool.queue, matchers.HasLength(1))
|
||||
self.assertEqual(conn, self.connection_pool.queue[0].connection)
|
||||
|
||||
def test_cleanup_pool(self):
|
||||
self.test_get_context_manager()
|
||||
newtime = time.time() + self.unused_timeout * 2
|
||||
non_expired_connection = _memcache_pool._PoolItem(
|
||||
ttl=(newtime * 2),
|
||||
connection=mock.MagicMock())
|
||||
self.connection_pool.queue.append(non_expired_connection)
|
||||
self.assertThat(self.connection_pool.queue, matchers.HasLength(2))
|
||||
with mock.patch.object(time, 'time', return_value=newtime):
|
||||
conn = self.connection_pool.queue[0].connection
|
||||
with self.connection_pool.acquire():
|
||||
pass
|
||||
conn.assert_has_calls(
|
||||
[mock.call(self.connection_pool.destroyed_value)])
|
||||
self.assertThat(self.connection_pool.queue, matchers.HasLength(1))
|
||||
self.assertEqual(0, non_expired_connection.connection.call_count)
|
||||
|
||||
def test_acquire_conn_exception_returns_acquired_count(self):
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
with mock.patch.object(_TestConnectionPool, '_create_connection',
|
||||
side_effect=TestException):
|
||||
with testtools.ExpectedException(TestException):
|
||||
with self.connection_pool.acquire():
|
||||
pass
|
||||
self.assertThat(self.connection_pool.queue,
|
||||
matchers.HasLength(0))
|
||||
self.assertEqual(0, self.connection_pool._acquired)
|
||||
|
||||
def test_connection_pool_limits_maximum_connections(self):
|
||||
# NOTE(morganfainberg): To ensure we don't lockup tests until the
|
||||
# job limit, explicitly call .get_nowait() and .put_nowait() in this
|
||||
# case.
|
||||
conn1 = self.connection_pool.get_nowait()
|
||||
conn2 = self.connection_pool.get_nowait()
|
||||
|
||||
# Use a nowait version to raise an Empty exception indicating we would
|
||||
# not get another connection until one is placed back into the queue.
|
||||
self.assertRaises(queue.Empty, self.connection_pool.get_nowait)
|
||||
|
||||
# Place the connections back into the pool.
|
||||
self.connection_pool.put_nowait(conn1)
|
||||
self.connection_pool.put_nowait(conn2)
|
||||
|
||||
# Make sure we can get a connection out of the pool again.
|
||||
self.connection_pool.get_nowait()
|
||||
|
||||
def test_connection_pool_maximum_connection_get_timeout(self):
|
||||
connection_pool = _TestConnectionPool(
|
||||
maxsize=1,
|
||||
unused_timeout=self.unused_timeout,
|
||||
conn_get_timeout=0)
|
||||
|
||||
def _acquire_connection():
|
||||
with connection_pool.acquire():
|
||||
pass
|
||||
|
||||
# Make sure we've consumed the only available connection from the pool
|
||||
conn = connection_pool.get_nowait()
|
||||
|
||||
self.assertRaises(_memcache_pool.ConnectionGetTimeoutException,
|
||||
_acquire_connection)
|
||||
|
||||
# Put the connection back and ensure we can acquire the connection
|
||||
# after it is available.
|
||||
connection_pool.put_nowait(conn)
|
||||
_acquire_connection()
|
|
@ -53,6 +53,12 @@ class OptsTestCase(utils.TestCase):
|
|||
'revocation_cache_time',
|
||||
'memcache_security_strategy',
|
||||
'memcache_secret_key',
|
||||
'memcache_use_advanced_pool',
|
||||
'memcache_pool_dead_retry',
|
||||
'memcache_pool_maxsize',
|
||||
'memcache_pool_unused_timeout',
|
||||
'memcache_pool_conn_get_timeout',
|
||||
'memcache_pool_socket_timeout',
|
||||
'include_service_catalog',
|
||||
'enforce_token_bind',
|
||||
'check_revocations_for_cached',
|
||||
|
|
Loading…
Reference in New Issue