Port away from some eventlet infrastructure

Add a simple object pool implementation for our connection pool, in
place of eventlet.pools.Pool.

Also use threading.Lock in place of eventlet.Semaphore.

There are still some eventlet modules imported by the code, but we can
avoid using them at runtime and clean things up later. We can't remove
them now or it'll cause pep8 failures.

Change-Id: I380408d1321802de813de541cd0a2d4305c3627c
This commit is contained in:
Mark McLoughlin 2013-07-23 17:28:42 +01:00
parent 37bd6923dc
commit e3c5b99959
2 changed files with 101 additions and 12 deletions

View File

@ -28,15 +28,15 @@ AMQP, but is deprecated and predates this code.
import collections
import inspect
import sys
import threading
import uuid
from eventlet import greenpool
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
from oslo.config import cfg
from oslo.messaging._drivers import common as rpc_common
from oslo.messaging._drivers import pool
from oslo.messaging.openstack.common import excutils
from oslo.messaging.openstack.common.gettextutils import _ # noqa
from oslo.messaging.openstack.common import local
@ -58,14 +58,12 @@ UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
class Pool(pools.Pool):
class ConnectionPool(pool.Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, conf, connection_cls, *args, **kwargs):
def __init__(self, conf, connection_cls):
self.connection_cls = connection_cls
self.conf = conf
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
super(ConnectionPool, self).__init__(self.conf.rpc_conn_pool_size)
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
@ -74,8 +72,8 @@ class Pool(pools.Pool):
return self.connection_cls(self.conf)
def empty(self):
while self.free_items:
self.get().close()
for item in self.iter_free:
item.close()
# Force a new connection pool to be created.
# Note that this was added due to failing unit test cases. The issue
# is the above "while loop" gets all the cached connections from the
@ -88,14 +86,14 @@ class Pool(pools.Pool):
self.connection_cls.pool = None
_pool_create_sem = semaphore.Semaphore()
_pool_create_sem = threading.Lock()
def get_connection_pool(conf, connection_cls):
with _pool_create_sem:
# Make sure only one thread tries to create the connection pool.
if not connection_cls.pool:
connection_cls.pool = Pool(conf, connection_cls)
connection_cls.pool = ConnectionPool(conf, connection_cls)
return connection_cls.pool
@ -517,7 +515,7 @@ def create_connection(conf, new, connection_pool):
return ConnectionContext(conf, connection_pool, pooled=not new)
_reply_proxy_create_sem = semaphore.Semaphore()
_reply_proxy_create_sem = threading.Lock()
def multicall(conf, context, topic, msg, timeout, connection_pool):

View File

@ -0,0 +1,91 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
import threading
class Pool(object):
"""A thread-safe object pool.
Modelled after the eventlet.pools.Pool interface, but designed to be safe
when using native threads without the GIL.
Resizing is not supported.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, max_size=4):
super(Pool, self).__init__()
self._max_size = max_size
self._current_size = 0
self._cond = threading.Condition()
self._items = collections.deque()
def put(self, item):
"""Return an item to the pool."""
with self._cond:
self._items.appendleft(item)
self._cond.notify()
def get(self, only_free=False):
"""Return an item from the pool, when one is available.
This may cause the calling thread to block.
:param only_free: if True, return None if no free item available
:type only_free: bool
"""
with self._cond:
while True:
try:
return self._items.popleft()
except IndexError:
if only_free:
return None
if self._current_size < self._max_size:
self._current_size += 1
break
# FIXME(markmc): timeout needed to allow keyboard interrupt
# http://bugs.python.org/issue8844
self._cond.wait(timeout=1)
# We've grabbed a slot and dropped the lock, now do the creation
try:
return self.create()
except Exception:
with self._cond:
self._current_size -= 1
raise
def iter_free(self):
"""Iterate over free items."""
with self._cond:
while True:
try:
yield self._items.popleft()
except IndexError:
break
@abc.abstractmethod
def create(self):
"""Construct a new item."""