Add a pool of memcached clients
This patchset adds a pool of memcache clients. This pool allows for reuse of a client object, prevents too many client object from being instantiated, and maintains proper tracking of dead servers so as to limit delays when a server (or all servers) become unavailable. The new memcache pool backend is available either by being set as the memcache backend or by using keystone.token.persistence.backends.memcache_pool.Token for the Token memcache persistence driver. [memcache] servers = 127.0.0.1:11211 dead_retry = 300 socket_timeout = 3 pool_maxsize = 10 pool_unused_timeout = 60 Where: - servers - comma-separated list of host:port pairs (was already there); - dead_retry - number of seconds memcached server is considered dead before it is tried again; - socket_timeout - timeout in seconds for every call to a server; - pool_maxsize - max total number of open connections in the pool; - pool_unused_timeout - number of seconds a connection is held unused in the pool before it is closed; The new memcache pool backend can be used as the driver for the Keystone caching layer. To use it as caching driver, set 'keystone.cache.memcache_pool' as the value of the [cache]\backend option, the other options are the same as above, but with 'memcache_' prefix: [cache] backend = keystone.cache.memcache_pool memcache_servers = 127.0.0.1:11211 memcache_dead_retry = 300 memcache_socket_timeout = 3 memcache_pool_maxsize = 10 memcache_pool_unused_timeout = 60 Co-Authored-By: Morgan Fainberg <morgan.fainberg@gmail.com> Closes-bug: #1332058 Closes-bug: #1360446 Change-Id: I3544894482b30a47fcd4fac8948d03136fd83f14
This commit is contained in:
parent
ee4ee3b7f5
commit
0010803288
|
@ -296,6 +296,40 @@ disables external authentication. For more details, refer to :doc:`External
|
||||||
Authentication <external-auth>`.
|
Authentication <external-auth>`.
|
||||||
|
|
||||||
|
|
||||||
|
Token Persistence Driver
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
Keystone supports customizable token persistence drivers. These can be specified
|
||||||
|
in the ``[token]`` section of the configuration file. Keystone provides three
|
||||||
|
non-test persistence backends. These can be set with the ``[token]\driver``
|
||||||
|
configuration option.
|
||||||
|
|
||||||
|
The drivers Keystone provides are:
|
||||||
|
|
||||||
|
* ``keystone.token.persistence.backends.sql.Token`` - The SQL-based (default)
|
||||||
|
token persistence engine. This backend stores all token data in the same SQL
|
||||||
|
store that is used for Identity/Assignment/etc.
|
||||||
|
|
||||||
|
* ``keystone.token.persistence.backends.memcache.Token`` - The memcached based
|
||||||
|
token persistence backend. This backend relies on ``dogpile.cache`` and stores
|
||||||
|
the token data in a set of memcached servers. The servers urls are specified
|
||||||
|
in the ``[memcache]\servers`` configuration option in the Keystone config.
|
||||||
|
|
||||||
|
* ``keystone.token.persistence.backends.memcache_pool.Token`` - The pooled memcached
|
||||||
|
token persistence engine. This backend supports the concept of pooled memcache
|
||||||
|
client object (allowing for the re-use of the client objects). This backend has
|
||||||
|
a number of extra tunable options in the ``[memcache]`` section of the config.
|
||||||
|
|
||||||
|
|
||||||
|
.. WARNING::
|
||||||
|
It is recommended you use the ``keystone.token.persistence.backend.memcache_pool.Token``
|
||||||
|
backend instead of ``keystone.token.persistence.backend.memcache.Token`` as the token
|
||||||
|
persistence driver if you are deploying Keystone under eventlet instead of
|
||||||
|
Apache + mod_wsgi. This recommendation are due to known issues with the use of
|
||||||
|
``thread.local`` under eventlet that can allow the leaking of memcache client objects
|
||||||
|
and consumption of extra sockets.
|
||||||
|
|
||||||
|
|
||||||
Token Provider
|
Token Provider
|
||||||
--------------
|
--------------
|
||||||
|
|
||||||
|
@ -372,6 +406,8 @@ behavior is that subsystem caching is enabled, but the global toggle is set to d
|
||||||
* ``dogpile.cache.dbm`` - local DBM file backend
|
* ``dogpile.cache.dbm`` - local DBM file backend
|
||||||
* ``dogpile.cache.memory`` - in-memory cache
|
* ``dogpile.cache.memory`` - in-memory cache
|
||||||
* ``keystone.cache.mongo`` - MongoDB as caching backend
|
* ``keystone.cache.mongo`` - MongoDB as caching backend
|
||||||
|
* ``keystone.cache.memcache_pool`` - An eventlet safe implementation of ``dogpile.cache.memcached``.
|
||||||
|
This implementation also provides client connection re-use.
|
||||||
|
|
||||||
.. WARNING::
|
.. WARNING::
|
||||||
``dogpile.cache.memory`` is not suitable for use outside of unit testing
|
``dogpile.cache.memory`` is not suitable for use outside of unit testing
|
||||||
|
@ -383,6 +419,12 @@ behavior is that subsystem caching is enabled, but the global toggle is set to d
|
||||||
when using ``Keystone`` and the ``dogpile.cache.memory`` backend under
|
when using ``Keystone`` and the ``dogpile.cache.memory`` backend under
|
||||||
any real workload.
|
any real workload.
|
||||||
|
|
||||||
|
.. WARNING::
|
||||||
|
Do not use ``dogpile.cache.memcached`` backend if you are deploying
|
||||||
|
Keystone under eventlet. There are known issues with the use of ``thread.local``
|
||||||
|
under eventlet that can allow the leaking of memcache client objects and
|
||||||
|
consumption of extra sockets.
|
||||||
|
|
||||||
* ``expiration_time`` - int, the default length of time to cache a specific value. A value of ``0``
|
* ``expiration_time`` - int, the default length of time to cache a specific value. A value of ``0``
|
||||||
indicates to not cache anything. It is recommended that the ``enabled`` option be used to disable
|
indicates to not cache anything. It is recommended that the ``enabled`` option be used to disable
|
||||||
cache instead of setting this to ``0``.
|
cache instead of setting this to ``0``.
|
||||||
|
|
|
@ -507,10 +507,10 @@
|
||||||
#expiration_time=600
|
#expiration_time=600
|
||||||
|
|
||||||
# Dogpile.cache backend module. It is recommended that
|
# Dogpile.cache backend module. It is recommended that
|
||||||
# Memcache (dogpile.cache.memcached) or Redis
|
# Memcache with pooling (keystone.cache.memcache_pool) or
|
||||||
# (dogpile.cache.redis) be used in production deployments.
|
# Redis (dogpile.cache.redis) be used in production
|
||||||
# Small workloads (single process) like devstack can use the
|
# deployments. Small workloads (single process) like devstack
|
||||||
# dogpile.cache.memory backend. (string value)
|
# can use the dogpile.cache.memory backend. (string value)
|
||||||
#backend=keystone.common.cache.noop
|
#backend=keystone.common.cache.noop
|
||||||
|
|
||||||
# Arguments supplied to the backend module. Specify this
|
# Arguments supplied to the backend module. Specify this
|
||||||
|
@ -534,6 +534,35 @@
|
||||||
# false. (boolean value)
|
# false. (boolean value)
|
||||||
#debug_cache_backend=false
|
#debug_cache_backend=false
|
||||||
|
|
||||||
|
# Memcache servers in the format of "host:port".
|
||||||
|
# (dogpile.cache.memcache and keystone.cache.memcache_pool
|
||||||
|
# backends only) (list value)
|
||||||
|
#memcache_servers=localhost:11211
|
||||||
|
|
||||||
|
# Number of seconds memcached server is considered dead before
|
||||||
|
# it is tried again. (dogpile.cache.memcache and
|
||||||
|
# keystone.cache.memcache_pool backends only) (integer value)
|
||||||
|
#memcache_dead_retry=300
|
||||||
|
|
||||||
|
# Timeout in seconds for every call to a server.
|
||||||
|
# (dogpile.cache.memcache and keystone.cache.memcache_pool
|
||||||
|
# backends only) (integer value)
|
||||||
|
#memcache_socket_timeout=3
|
||||||
|
|
||||||
|
# Max total number of open connections to every memcached
|
||||||
|
# server. (keystone.cache.memcache_pool backend only) (integer
|
||||||
|
# value)
|
||||||
|
#memcache_pool_maxsize=10
|
||||||
|
|
||||||
|
# Number of seconds a connection to memcached is held unused
|
||||||
|
# in the pool before it is closed.
|
||||||
|
# (keystone.cache.memcache_pool backend only) (integer value)
|
||||||
|
#memcache_pool_unused_timeout=60
|
||||||
|
|
||||||
|
# Number of seconds that an operation will wait to get a
|
||||||
|
# memcache client connection. (integer value)
|
||||||
|
#memcache_pool_connection_get_timeout=10
|
||||||
|
|
||||||
|
|
||||||
[catalog]
|
[catalog]
|
||||||
|
|
||||||
|
@ -1212,10 +1241,33 @@
|
||||||
# Memcache servers in the format of "host:port". (list value)
|
# Memcache servers in the format of "host:port". (list value)
|
||||||
#servers=localhost:11211
|
#servers=localhost:11211
|
||||||
|
|
||||||
# Number of compare-and-set attempts to make when using
|
# Number of seconds memcached server is considered dead before
|
||||||
# compare-and-set in the token memcache back end. (integer
|
# it is tried again. This is used by the key value store
|
||||||
# value)
|
# system (e.g. token pooled memcached persistence backend).
|
||||||
#max_compare_and_set_retry=16
|
# (integer value)
|
||||||
|
#dead_retry=300
|
||||||
|
|
||||||
|
# Timeout in seconds for every call to a server. This is used
|
||||||
|
# by the key value store system (e.g. token pooled memcached
|
||||||
|
# persistence backend). (integer value)
|
||||||
|
#socket_timeout=3
|
||||||
|
|
||||||
|
# Max total number of open connections to every memcached
|
||||||
|
# server. This is used by the key value store system (e.g.
|
||||||
|
# token pooled memcached persistence backend). (integer value)
|
||||||
|
#pool_maxsize=10
|
||||||
|
|
||||||
|
# Number of seconds a connection to memcached is held unused
|
||||||
|
# in the pool before it is closed. This is used by the key
|
||||||
|
# value store system (e.g. token pooled memcached persistence
|
||||||
|
# backend). (integer value)
|
||||||
|
#pool_unused_timeout=60
|
||||||
|
|
||||||
|
# Number of seconds that an operation will wait to get a
|
||||||
|
# memcache client connection. This is used by the key value
|
||||||
|
# store system (e.g. token pooled memcached persistence
|
||||||
|
# backend). (integer value)
|
||||||
|
#pool_connection_get_timeout=10
|
||||||
|
|
||||||
|
|
||||||
[oauth1]
|
[oauth1]
|
||||||
|
|
|
@ -0,0 +1,196 @@
|
||||||
|
# Copyright 2014 Mirantis Inc
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""Thread-safe connection pool for python-memcached."""
|
||||||
|
|
||||||
|
# NOTE(yorik-sar): this file is copied between keystone and keystonemiddleware
|
||||||
|
# and should be kept in sync until we can use external library for this.
|
||||||
|
|
||||||
|
import collections
|
||||||
|
import contextlib
|
||||||
|
import itertools
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import memcache
|
||||||
|
from six.moves import queue
|
||||||
|
|
||||||
|
from keystone import exception
|
||||||
|
from keystone.i18n import _
|
||||||
|
from keystone.openstack.common import log
|
||||||
|
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
# NOTE(morganfainberg): This is used as the maximum number of seconds a get
|
||||||
|
# of a new connection will wait for before raising an exception indicating
|
||||||
|
# a serious / most likely non-recoverable delay has occurred.
|
||||||
|
CONNECTION_GET_TIMEOUT = 120
|
||||||
|
|
||||||
|
# This 'class' is taken from http://stackoverflow.com/a/22520633/238308
|
||||||
|
# Don't inherit client from threading.local so that we can reuse clients in
|
||||||
|
# different threads
|
||||||
|
_MemcacheClient = type('_MemcacheClient', (object,),
|
||||||
|
dict(memcache.Client.__dict__))
|
||||||
|
|
||||||
|
_PoolItem = collections.namedtuple('_PoolItem', ['ttl', 'connection'])
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectionPool(queue.Queue):
|
||||||
|
"""Base connection pool class
|
||||||
|
|
||||||
|
This class implements the basic connection pool logic as an abstract base
|
||||||
|
class.
|
||||||
|
"""
|
||||||
|
def __init__(self, maxsize, unused_timeout, conn_get_timeout=None):
|
||||||
|
"""Initialize the connection pool.
|
||||||
|
|
||||||
|
:param maxsize: maximum number of client connections for the pool
|
||||||
|
:type maxsize: int
|
||||||
|
:param unused_timeout: idle time to live for unused clients (in
|
||||||
|
seconds). If a client connection object has been
|
||||||
|
in the pool and idle for longer than the
|
||||||
|
unused_timeout, it will be reaped. This is to
|
||||||
|
ensure resources are released as utilization
|
||||||
|
goes down.
|
||||||
|
:type unused_timeout: int
|
||||||
|
:param conn_get_timeout: maximum time in seconds to wait for a
|
||||||
|
connection. If set to `None` timeout is
|
||||||
|
indefinite.
|
||||||
|
:type conn_get_timeout: int
|
||||||
|
"""
|
||||||
|
queue.Queue.__init__(self, maxsize)
|
||||||
|
self._unused_timeout = unused_timeout
|
||||||
|
self._connection_get_timeout = conn_get_timeout
|
||||||
|
self._acquired = 0
|
||||||
|
|
||||||
|
def _create_connection(self):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def _destroy_connection(self, conn):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def _debug_logger(self, msg, *args, **kwargs):
|
||||||
|
if LOG.isEnabledFor(logging.DEBUG):
|
||||||
|
thread_id = threading.current_thread().ident
|
||||||
|
args = (id(self), thread_id) + args
|
||||||
|
prefix = 'Memcached pool %s, thread %s: '
|
||||||
|
LOG.debug(prefix + msg, *args, **kwargs)
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def acquire(self):
|
||||||
|
self._debug_logger('Acquiring connection')
|
||||||
|
try:
|
||||||
|
conn = self.get(timeout=self._connection_get_timeout)
|
||||||
|
except queue.Empty:
|
||||||
|
raise exception.UnexpectedError(
|
||||||
|
_('Unable to get a connection from pool id %(id)s after '
|
||||||
|
'%(seconds)s seconds.') %
|
||||||
|
{'id': id(self), 'seconds': self._connection_get_timeout})
|
||||||
|
self._debug_logger('Acquired connection %s', id(conn))
|
||||||
|
try:
|
||||||
|
yield conn
|
||||||
|
finally:
|
||||||
|
self._debug_logger('Releasing connection %s', id(conn))
|
||||||
|
self.put(conn)
|
||||||
|
|
||||||
|
def _qsize(self):
|
||||||
|
return self.maxsize - self._acquired
|
||||||
|
|
||||||
|
if not hasattr(queue.Queue, '_qsize'):
|
||||||
|
qsize = _qsize
|
||||||
|
|
||||||
|
def _get(self):
|
||||||
|
if self.queue:
|
||||||
|
conn = self.queue.pop().connection
|
||||||
|
else:
|
||||||
|
conn = self._create_connection()
|
||||||
|
self._acquired += 1
|
||||||
|
return conn
|
||||||
|
|
||||||
|
def _put(self, conn):
|
||||||
|
self.queue.append(_PoolItem(
|
||||||
|
ttl=time.time() + self._unused_timeout,
|
||||||
|
connection=conn,
|
||||||
|
))
|
||||||
|
self._acquired -= 1
|
||||||
|
# Drop all expired connections from the right end of the queue
|
||||||
|
now = time.time()
|
||||||
|
while self.queue and self.queue[0].ttl < now:
|
||||||
|
conn = self.queue.popleft().connection
|
||||||
|
self._debug_logger('Reaping connection %s', id(conn))
|
||||||
|
self._destroy_connection(conn)
|
||||||
|
|
||||||
|
|
||||||
|
class MemcacheClientPool(ConnectionPool):
|
||||||
|
def __init__(self, urls, arguments, **kwargs):
|
||||||
|
ConnectionPool.__init__(self, **kwargs)
|
||||||
|
self.urls = urls
|
||||||
|
self._arguments = arguments
|
||||||
|
# NOTE(morganfainberg): The host objects expect an int for the
|
||||||
|
# deaduntil value. Initialize this at 0 for each host with 0 indicating
|
||||||
|
# the host is not dead.
|
||||||
|
self._hosts_deaduntil = [0] * len(urls)
|
||||||
|
|
||||||
|
def _create_connection(self):
|
||||||
|
return _MemcacheClient(self.urls, **self._arguments)
|
||||||
|
|
||||||
|
def _destroy_connection(self, conn):
|
||||||
|
conn.disconnect_all()
|
||||||
|
|
||||||
|
def _get(self):
|
||||||
|
conn = ConnectionPool._get(self)
|
||||||
|
try:
|
||||||
|
# Propagate host state known to us to this client's list
|
||||||
|
now = time.time()
|
||||||
|
for deaduntil, host in zip(self._hosts_deaduntil, conn.servers):
|
||||||
|
if deaduntil > now and host.deaduntil <= now:
|
||||||
|
host.mark_dead('propagating death mark from the pool')
|
||||||
|
host.deaduntil = deaduntil
|
||||||
|
except Exception:
|
||||||
|
# We need to be sure that connection doesn't leak from the pool.
|
||||||
|
# This code runs before we enter context manager's try-finally
|
||||||
|
# block, so we need to explicitly release it here
|
||||||
|
ConnectionPool._put(self, conn)
|
||||||
|
raise
|
||||||
|
return conn
|
||||||
|
|
||||||
|
def _put(self, conn):
|
||||||
|
try:
|
||||||
|
# If this client found that one of the hosts is dead, mark it as
|
||||||
|
# such in our internal list
|
||||||
|
now = time.time()
|
||||||
|
for i, deaduntil, host in zip(itertools.count(),
|
||||||
|
self._hosts_deaduntil,
|
||||||
|
conn.servers):
|
||||||
|
# Do nothing if we already know this host is dead
|
||||||
|
if deaduntil <= now:
|
||||||
|
if host.deaduntil > now:
|
||||||
|
self._hosts_deaduntil[i] = host.deaduntil
|
||||||
|
self._debug_logger(
|
||||||
|
'Marked host %s dead until %s',
|
||||||
|
self.urls[i], host.deaduntil)
|
||||||
|
else:
|
||||||
|
self._hosts_deaduntil[i] = 0
|
||||||
|
# If all hosts are dead we should forget that they're dead. This
|
||||||
|
# way we won't get completely shut off until dead_retry seconds
|
||||||
|
# pass, but will be checking servers as frequent as we can (over
|
||||||
|
# way smaller socket_timeout)
|
||||||
|
if all(deaduntil > now for deaduntil in self._hosts_deaduntil):
|
||||||
|
self._debug_logger('All hosts are dead. Marking them as live.')
|
||||||
|
self._hosts_deaduntil[:] = [0] * len(self._hosts_deaduntil)
|
||||||
|
finally:
|
||||||
|
ConnectionPool._put(self, conn)
|
|
@ -0,0 +1,61 @@
|
||||||
|
# Copyright 2014 Mirantis Inc
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""dogpile.cache backend that uses Memcached connection pool"""
|
||||||
|
|
||||||
|
import functools
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from dogpile.cache.backends import memcached as memcached_backend
|
||||||
|
|
||||||
|
from keystone.common.cache import _memcache_pool
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# Helper to ease backend refactoring
|
||||||
|
class ClientProxy(object):
|
||||||
|
def __init__(self, client_pool):
|
||||||
|
self.client_pool = client_pool
|
||||||
|
|
||||||
|
def _run_method(self, __name, *args, **kwargs):
|
||||||
|
with self.client_pool.acquire() as client:
|
||||||
|
return getattr(client, __name)(*args, **kwargs)
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
return functools.partial(self._run_method, name)
|
||||||
|
|
||||||
|
|
||||||
|
class PooledMemcachedBackend(memcached_backend.MemcachedBackend):
|
||||||
|
# Composed from GenericMemcachedBackend's and MemcacheArgs's __init__
|
||||||
|
def __init__(self, arguments):
|
||||||
|
super(PooledMemcachedBackend, self).__init__(arguments)
|
||||||
|
self.client_pool = _memcache_pool.MemcacheClientPool(
|
||||||
|
self.url,
|
||||||
|
arguments={
|
||||||
|
'dead_retry': arguments.get('dead_retry', 5 * 60),
|
||||||
|
'socket_timeout': arguments.get('socket_timeout', 3),
|
||||||
|
},
|
||||||
|
maxsize=arguments.get('pool_maxsize', 10),
|
||||||
|
unused_timeout=arguments.get('pool_unused_timeout', 60),
|
||||||
|
conn_get_timeout=arguments.get('pool_connection_get_timeout', 10),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Since all methods in backend just call one of methods of client, this
|
||||||
|
# lets us avoid need to hack it too much
|
||||||
|
@property
|
||||||
|
def client(self):
|
||||||
|
return ClientProxy(self.client_pool)
|
|
@ -40,6 +40,11 @@ dogpile.cache.register_backend(
|
||||||
'keystone.common.cache.backends.mongo',
|
'keystone.common.cache.backends.mongo',
|
||||||
'MongoCacheBackend')
|
'MongoCacheBackend')
|
||||||
|
|
||||||
|
dogpile.cache.register_backend(
|
||||||
|
'keystone.cache.memcache_pool',
|
||||||
|
'keystone.common.cache.backends.memcache_pool',
|
||||||
|
'PooledMemcachedBackend')
|
||||||
|
|
||||||
|
|
||||||
class DebugProxy(proxy.ProxyBackend):
|
class DebugProxy(proxy.ProxyBackend):
|
||||||
"""Extra Logging ProxyBackend."""
|
"""Extra Logging ProxyBackend."""
|
||||||
|
@ -102,6 +107,15 @@ def build_cache_config():
|
||||||
conf_dict[arg_key] = argvalue
|
conf_dict[arg_key] = argvalue
|
||||||
|
|
||||||
LOG.debug('Keystone Cache Config: %s', conf_dict)
|
LOG.debug('Keystone Cache Config: %s', conf_dict)
|
||||||
|
# NOTE(yorik-sar): these arguments will be used for memcache-related
|
||||||
|
# backends. Use setdefault for url to support old-style setting through
|
||||||
|
# backend_argument=url:127.0.0.1:11211
|
||||||
|
conf_dict.setdefault('%s.arguments.url' % prefix,
|
||||||
|
CONF.cache.memcache_servers)
|
||||||
|
for arg in ('dead_retry', 'socket_timeout', 'pool_maxsize',
|
||||||
|
'pool_unused_timeout', 'pool_connection_get_timeout'):
|
||||||
|
value = getattr(CONF.cache, 'memcache_' + arg)
|
||||||
|
conf_dict['%s.arguments.%s' % (prefix, arg)] = value
|
||||||
|
|
||||||
return conf_dict
|
return conf_dict
|
||||||
|
|
||||||
|
|
|
@ -307,7 +307,8 @@ FILE_OPTIONS = {
|
||||||
# backend.
|
# backend.
|
||||||
cfg.StrOpt('backend', default='keystone.common.cache.noop',
|
cfg.StrOpt('backend', default='keystone.common.cache.noop',
|
||||||
help='Dogpile.cache backend module. It is recommended '
|
help='Dogpile.cache backend module. It is recommended '
|
||||||
'that Memcache (dogpile.cache.memcached) or Redis '
|
'that Memcache with pooling '
|
||||||
|
'(keystone.cache.memcache_pool) or Redis '
|
||||||
'(dogpile.cache.redis) be used in production '
|
'(dogpile.cache.redis) be used in production '
|
||||||
'deployments. Small workloads (single process) '
|
'deployments. Small workloads (single process) '
|
||||||
'like devstack can use the dogpile.cache.memory '
|
'like devstack can use the dogpile.cache.memory '
|
||||||
|
@ -332,6 +333,34 @@ FILE_OPTIONS = {
|
||||||
'cache-backend get/set/delete calls with the '
|
'cache-backend get/set/delete calls with the '
|
||||||
'keys/values. Typically this should be left set '
|
'keys/values. Typically this should be left set '
|
||||||
'to false.'),
|
'to false.'),
|
||||||
|
cfg.ListOpt('memcache_servers', default=['localhost:11211'],
|
||||||
|
help='Memcache servers in the format of "host:port".'
|
||||||
|
' (dogpile.cache.memcache and keystone.cache.memcache_pool'
|
||||||
|
' backends only)'),
|
||||||
|
cfg.IntOpt('memcache_dead_retry',
|
||||||
|
default=5 * 60,
|
||||||
|
help='Number of seconds memcached server is considered dead'
|
||||||
|
' before it is tried again. (dogpile.cache.memcache and'
|
||||||
|
' keystone.cache.memcache_pool backends only)'),
|
||||||
|
cfg.IntOpt('memcache_socket_timeout',
|
||||||
|
default=3,
|
||||||
|
help='Timeout in seconds for every call to a server.'
|
||||||
|
' (dogpile.cache.memcache and keystone.cache.memcache_pool'
|
||||||
|
' backends only)'),
|
||||||
|
cfg.IntOpt('memcache_pool_maxsize',
|
||||||
|
default=10,
|
||||||
|
help='Max total number of open connections to every'
|
||||||
|
' memcached server. (keystone.cache.memcache_pool backend'
|
||||||
|
' only)'),
|
||||||
|
cfg.IntOpt('memcache_pool_unused_timeout',
|
||||||
|
default=60,
|
||||||
|
help='Number of seconds a connection to memcached is held'
|
||||||
|
' unused in the pool before it is closed.'
|
||||||
|
' (keystone.cache.memcache_pool backend only)'),
|
||||||
|
cfg.IntOpt('memcache_pool_connection_get_timeout',
|
||||||
|
default=10,
|
||||||
|
help='Number of seconds that an operation will wait to get '
|
||||||
|
'a memcache client connection.'),
|
||||||
],
|
],
|
||||||
'ssl': [
|
'ssl': [
|
||||||
cfg.BoolOpt('enable', default=False,
|
cfg.BoolOpt('enable', default=False,
|
||||||
|
@ -771,10 +800,35 @@ FILE_OPTIONS = {
|
||||||
'memcache': [
|
'memcache': [
|
||||||
cfg.ListOpt('servers', default=['localhost:11211'],
|
cfg.ListOpt('servers', default=['localhost:11211'],
|
||||||
help='Memcache servers in the format of "host:port".'),
|
help='Memcache servers in the format of "host:port".'),
|
||||||
cfg.IntOpt('max_compare_and_set_retry', default=16,
|
cfg.IntOpt('dead_retry',
|
||||||
help='Number of compare-and-set attempts to make when '
|
default=5 * 60,
|
||||||
'using compare-and-set in the token memcache back '
|
help='Number of seconds memcached server is considered dead'
|
||||||
'end.'),
|
' before it is tried again. This is used by the key '
|
||||||
|
'value store system (e.g. token '
|
||||||
|
'pooled memcached persistence backend).'),
|
||||||
|
cfg.IntOpt('socket_timeout',
|
||||||
|
default=3,
|
||||||
|
help='Timeout in seconds for every call to a server. This '
|
||||||
|
'is used by the key value store system (e.g. token '
|
||||||
|
'pooled memcached persistence backend).'),
|
||||||
|
cfg.IntOpt('pool_maxsize',
|
||||||
|
default=10,
|
||||||
|
help='Max total number of open connections to every'
|
||||||
|
' memcached server. This is used by the key value '
|
||||||
|
'store system (e.g. token pooled memcached '
|
||||||
|
'persistence backend).'),
|
||||||
|
cfg.IntOpt('pool_unused_timeout',
|
||||||
|
default=60,
|
||||||
|
help='Number of seconds a connection to memcached is held'
|
||||||
|
' unused in the pool before it is closed. This is used'
|
||||||
|
' by the key value store system (e.g. token pooled '
|
||||||
|
'memcached persistence backend).'),
|
||||||
|
cfg.IntOpt('pool_connection_get_timeout',
|
||||||
|
default=10,
|
||||||
|
help='Number of seconds that an operation will wait to get '
|
||||||
|
'a memcache client connection. This is used by the '
|
||||||
|
'key value store system (e.g. token pooled memcached '
|
||||||
|
'persistence backend).'),
|
||||||
],
|
],
|
||||||
'catalog': [
|
'catalog': [
|
||||||
cfg.StrOpt('template_file',
|
cfg.StrOpt('template_file',
|
||||||
|
|
|
@ -22,6 +22,7 @@ import time
|
||||||
from dogpile.cache import api
|
from dogpile.cache import api
|
||||||
from dogpile.cache.backends import memcached
|
from dogpile.cache.backends import memcached
|
||||||
|
|
||||||
|
from keystone.common.cache.backends import memcache_pool
|
||||||
from keystone.common import manager
|
from keystone.common import manager
|
||||||
from keystone import config
|
from keystone import config
|
||||||
from keystone import exception
|
from keystone import exception
|
||||||
|
@ -34,9 +35,11 @@ LOG = log.getLogger(__name__)
|
||||||
NO_VALUE = api.NO_VALUE
|
NO_VALUE = api.NO_VALUE
|
||||||
|
|
||||||
|
|
||||||
VALID_DOGPILE_BACKENDS = dict(pylibmc=memcached.PylibmcBackend,
|
VALID_DOGPILE_BACKENDS = dict(
|
||||||
bmemcached=memcached.BMemcachedBackend,
|
pylibmc=memcached.PylibmcBackend,
|
||||||
memcached=memcached.MemcachedBackend)
|
bmemcached=memcached.BMemcachedBackend,
|
||||||
|
memcached=memcached.MemcachedBackend,
|
||||||
|
pooled_memcached=memcache_pool.PooledMemcachedBackend)
|
||||||
|
|
||||||
|
|
||||||
class MemcachedLock(object):
|
class MemcachedLock(object):
|
||||||
|
|
|
@ -395,6 +395,7 @@ class TestCase(BaseTestCase):
|
||||||
'routes.middleware=INFO',
|
'routes.middleware=INFO',
|
||||||
'stevedore.extension=INFO',
|
'stevedore.extension=INFO',
|
||||||
'keystone.notifications=INFO',
|
'keystone.notifications=INFO',
|
||||||
|
'keystone.common._memcache_pool=INFO',
|
||||||
])
|
])
|
||||||
self.auth_plugin_config_override()
|
self.auth_plugin_config_override()
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,119 @@
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from six.moves import queue
|
||||||
|
import testtools
|
||||||
|
from testtools import matchers
|
||||||
|
|
||||||
|
from keystone.common.cache import _memcache_pool
|
||||||
|
from keystone import exception
|
||||||
|
from keystone.tests import core
|
||||||
|
|
||||||
|
|
||||||
|
class _TestConnectionPool(_memcache_pool.ConnectionPool):
|
||||||
|
destroyed_value = 'destroyed'
|
||||||
|
|
||||||
|
def _create_connection(self):
|
||||||
|
return mock.MagicMock()
|
||||||
|
|
||||||
|
def _destroy_connection(self, conn):
|
||||||
|
conn(self.destroyed_value)
|
||||||
|
|
||||||
|
|
||||||
|
class TestConnectionPool(core.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestConnectionPool, self).setUp()
|
||||||
|
self.unused_timeout = 10
|
||||||
|
self.maxsize = 2
|
||||||
|
self.connection_pool = _TestConnectionPool(
|
||||||
|
maxsize=self.maxsize,
|
||||||
|
unused_timeout=self.unused_timeout)
|
||||||
|
self.addCleanup(self.cleanup_instance('connection_pool'))
|
||||||
|
|
||||||
|
def test_get_context_manager(self):
|
||||||
|
self.assertThat(self.connection_pool.queue, matchers.HasLength(0))
|
||||||
|
with self.connection_pool.acquire() as conn:
|
||||||
|
self.assertEqual(1, self.connection_pool._acquired)
|
||||||
|
self.assertEqual(0, self.connection_pool._acquired)
|
||||||
|
self.assertThat(self.connection_pool.queue, matchers.HasLength(1))
|
||||||
|
self.assertEqual(conn, self.connection_pool.queue[0].connection)
|
||||||
|
|
||||||
|
def test_cleanup_pool(self):
|
||||||
|
self.test_get_context_manager()
|
||||||
|
newtime = time.time() + self.unused_timeout * 2
|
||||||
|
non_expired_connection = _memcache_pool._PoolItem(
|
||||||
|
ttl=(newtime * 2),
|
||||||
|
connection=mock.MagicMock())
|
||||||
|
self.connection_pool.queue.append(non_expired_connection)
|
||||||
|
self.assertThat(self.connection_pool.queue, matchers.HasLength(2))
|
||||||
|
with mock.patch.object(time, 'time', return_value=newtime):
|
||||||
|
conn = self.connection_pool.queue[0].connection
|
||||||
|
with self.connection_pool.acquire():
|
||||||
|
pass
|
||||||
|
conn.assert_has_calls(
|
||||||
|
[mock.call(self.connection_pool.destroyed_value)])
|
||||||
|
self.assertThat(self.connection_pool.queue, matchers.HasLength(1))
|
||||||
|
self.assertEqual(0, non_expired_connection.connection.call_count)
|
||||||
|
|
||||||
|
def test_acquire_conn_exception_returns_acquired_count(self):
|
||||||
|
class TestException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
with mock.patch.object(_TestConnectionPool, '_create_connection',
|
||||||
|
side_effect=TestException):
|
||||||
|
with testtools.ExpectedException(TestException):
|
||||||
|
with self.connection_pool.acquire():
|
||||||
|
pass
|
||||||
|
self.assertThat(self.connection_pool.queue,
|
||||||
|
matchers.HasLength(0))
|
||||||
|
self.assertEqual(0, self.connection_pool._acquired)
|
||||||
|
|
||||||
|
def test_connection_pool_limits_maximum_connections(self):
|
||||||
|
# NOTE(morganfainberg): To ensure we don't lockup tests until the
|
||||||
|
# job limit, explicitly call .get_nowait() and .put_nowait() in this
|
||||||
|
# case.
|
||||||
|
conn1 = self.connection_pool.get_nowait()
|
||||||
|
conn2 = self.connection_pool.get_nowait()
|
||||||
|
|
||||||
|
# Use a nowait version to raise an Empty exception indicating we would
|
||||||
|
# not get another connection until one is placed back into the queue.
|
||||||
|
self.assertRaises(queue.Empty, self.connection_pool.get_nowait)
|
||||||
|
|
||||||
|
# Place the connections back into the pool.
|
||||||
|
self.connection_pool.put_nowait(conn1)
|
||||||
|
self.connection_pool.put_nowait(conn2)
|
||||||
|
|
||||||
|
# Make sure we can get a connection out of the pool again.
|
||||||
|
self.connection_pool.get_nowait()
|
||||||
|
|
||||||
|
def test_connection_pool_maximum_connection_get_timeout(self):
|
||||||
|
connection_pool = _TestConnectionPool(
|
||||||
|
maxsize=1,
|
||||||
|
unused_timeout=self.unused_timeout,
|
||||||
|
conn_get_timeout=0)
|
||||||
|
|
||||||
|
def _acquire_connection():
|
||||||
|
with connection_pool.acquire():
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Make sure we've consumed the only available connection from the pool
|
||||||
|
conn = connection_pool.get_nowait()
|
||||||
|
|
||||||
|
self.assertRaises(exception.UnexpectedError, _acquire_connection)
|
||||||
|
|
||||||
|
# Put the connection back and ensure we can acquire the connection
|
||||||
|
# after it is available.
|
||||||
|
connection_pool.put_nowait(conn)
|
||||||
|
_acquire_connection()
|
|
@ -22,8 +22,10 @@ CONF = config.CONF
|
||||||
|
|
||||||
class Token(kvs.Token):
|
class Token(kvs.Token):
|
||||||
kvs_backend = 'openstack.kvs.Memcached'
|
kvs_backend = 'openstack.kvs.Memcached'
|
||||||
|
memcached_backend = 'memcached'
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
kwargs['memcached_backend'] = self.memcached_backend
|
||||||
kwargs['no_expiry_keys'] = [self.revocation_key]
|
kwargs['no_expiry_keys'] = [self.revocation_key]
|
||||||
kwargs['memcached_expire_time'] = CONF.token.expiration
|
kwargs['memcached_expire_time'] = CONF.token.expiration
|
||||||
kwargs['url'] = CONF.memcache.servers
|
kwargs['url'] = CONF.memcache.servers
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from keystone.common import config
|
||||||
|
from keystone.token.persistence.backends import memcache
|
||||||
|
|
||||||
|
|
||||||
|
CONF = config.CONF
|
||||||
|
|
||||||
|
|
||||||
|
class Token(memcache.Token):
|
||||||
|
memcached_backend = 'pooled_memcached'
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
for arg in ('dead_retry', 'socket_timeout', 'pool_maxsize',
|
||||||
|
'pool_unused_timeout', 'pool_connection_get_timeout'):
|
||||||
|
kwargs[arg] = getattr(CONF.memcache, arg)
|
||||||
|
super(Token, self).__init__(*args, **kwargs)
|
Loading…
Reference in New Issue