initial import of server-core's ldappool

This commit is contained in:
Tarek Ziadé 2011-10-28 17:10:59 +02:00
parent 0ea5374a5c
commit 1e5494ac3f
6 changed files with 780 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
*.egg-info
bin
lib
include
*.pyc

341
ldappool/__init__.py Normal file
View File

@ -0,0 +1,341 @@
# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is Sync Server
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2010
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Tarek Ziade (tarek@mozilla.com)
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
""" LDAP Connection Pool.
"""
import time
from contextlib import contextmanager
from threading import RLock
from ldap.ldapobject import ReconnectLDAPObject
import ldap
class MaxConnectionReachedError(Exception):
pass
class BackendError(Exception):
def __init__(self, msg, backend):
self.bacend = backend
Exception.__init__(self, msg)
class StateConnector(ReconnectLDAPObject):
"""Just remembers who is connected, and if connected"""
def __init__(self, *args, **kw):
ReconnectLDAPObject.__init__(self, *args, **kw)
self.connected = False
self.who = ''
self.cred = ''
self._connection_time = None
def get_lifetime(self):
"""Returns the lifetime of the connection on the server in seconds."""
if self._connection_time is None:
return 0
return time.time() - self._connection_time
def simple_bind_s(self, who='', cred='', serverctrls=None,
clientctrls=None):
res = ReconnectLDAPObject.simple_bind_s(self, who, cred, serverctrls,
clientctrls)
self.connected = True
self.who = who
self.cred = cred
if self._connection_time is None:
self._connection_time = time.time()
return res
def unbind_ext_s(self, serverctrls=None, clientctrls=None):
try:
return ReconnectLDAPObject.unbind_ext_s(self, serverctrls,
clientctrls)
finally:
self.connected = False
self.who = None
self.cred = None
def add_s(self, *args, **kwargs):
return self._apply_method_s(ReconnectLDAPObject.add_s, *args,
**kwargs)
def modify_s(self, *args, **kwargs):
return self._apply_method_s(ReconnectLDAPObject.modify_s, *args,
**kwargs)
def __str__(self):
res = 'LDAP Connector'
if self.connected:
res += ' (connected)'
else:
res += ' (disconnected)'
if self.who != '':
res += ' - who: %r' % self.who
if self._uri != '':
res += ' - uri: %r' % self._uri
return res
class ConnectionManager(object):
"""LDAP Connection Manager.
Provides a context manager for LDAP connectors.
"""
def __init__(self, uri, bind=None, passwd=None, size=10, retry_max=3,
retry_delay=.1, use_tls=False, single_box=False, timeout=-1,
connector_cls=StateConnector, use_pool=False,
max_lifetime=600):
self._pool = []
self.size = size
self.retry_max = retry_max
self.retry_delay = retry_delay
self.uri = uri
self.bind = bind
self.passwd = passwd
self._pool_lock = RLock()
self.use_tls = False
self.timeout = timeout
self.connector_cls = connector_cls
self.use_pool = use_pool
self.max_lifetime = max_lifetime
def __len__(self):
return len(self._pool)
def _match(self, bind, passwd):
passwd = passwd.encode('utf8')
with self._pool_lock:
inactives = []
for conn in reversed(self._pool):
# already in usage
if conn.active:
continue
# let's check the lifetime
if conn.get_lifetime() > self.max_lifetime:
# this connector has lived for too long,
# we want to unbind it and remove it from the pool
try:
conn.unbind_s()
except Exception:
pass # XXX we will see later
self._pool.remove(conn)
continue
# we found a connector for this bind
if conn.who == bind and conn.cred == passwd:
conn.active = True
return conn
inactives.append(conn)
# no connector was available, let's rebind the latest inactive one
if len(inactives) > 0:
for conn in inactives:
try:
self._bind(conn, bind, passwd)
return conn
except:
self._pool.remove(conn)
return None
# There are no connector that match
return None
def _bind(self, conn, bind, passwd):
# let's bind
if self.use_tls:
conn.start_tls_s()
if bind is not None:
conn.simple_bind_s(bind, passwd)
conn.active = True
def _create_connector(self, bind, passwd):
"""Creates a connector, binds it, and returns it
Args:
- bind: login
- passwd: password
"""
tries = 0
connected = False
passwd = passwd.encode('utf8')
exc = None
# trying retry_max times in a row with a fresh connector
while tries < self.retry_max and not connected:
try:
conn = self.connector_cls(self.uri, retry_max=self.retry_max,
retry_delay=self.retry_delay)
conn.timeout = self.timeout
self._bind(conn, bind, passwd)
connected = True
except ldap.LDAPError, exc:
time.sleep(self.retry_delay)
tries += 1
if not connected:
if isinstance(exc, (ldap.NO_SUCH_OBJECT,
ldap.INVALID_CREDENTIALS)):
raise exc
# that's something else
raise BackendError(str(exc), backend=conn)
return conn
def _get_connection(self, bind=None, passwd=None):
if bind is None:
bind = self.bind
if passwd is None:
passwd = self.passwd
if self.use_pool:
# let's try to recycle an existing one
conn = self._match(bind, passwd)
if conn is not None:
return conn
# the pool is full
if len(self._pool) >= self.size:
raise MaxConnectionReachedError(self.uri)
# we need to create a new connector
conn = self._create_connector(bind, passwd)
# adding it to the pool
if self.use_pool:
with self._pool_lock:
self._pool.append(conn)
else:
# with no pool, the connector is always active
conn.active = True
return conn
def _release_connection(self, connection):
if self.use_pool:
with self._pool_lock:
if not connection.connected:
# unconnected connector, let's drop it
self._pool.remove(connection)
else:
# can be reused - let's mark is as not active
connection.active = False
# done.
return
else:
connection.active = False
# let's try to unbind it
try:
connection.unbind_ext_s()
except ldap.LDAPError:
# avoid error on invalid state
pass
@contextmanager
def connection(self, bind=None, passwd=None):
"""Creates a context'ed connector, binds it, and returns it
Args:
- bind: login
- passwd: password
"""
tries = 0
conn = None
while tries < self.retry_max:
try:
conn = self._get_connection(bind, passwd)
except MaxConnectionReachedError:
tries += 1
time.sleep(0.1)
# removing the first inactive connector going backward
with self._pool_lock:
reversed_list = reversed(list(enumerate(self._pool)))
for index, conn_ in reversed_list:
if not conn_.active:
self._pool.pop(index)
break
else:
break
if conn is None:
raise MaxConnectionReachedError(self.uri)
try:
yield conn
finally:
self._release_connection(conn)
def purge(self, bind, passwd=None):
"""Purge a connector
Args:
- bind: login
- passwd: password
"""
if self.use_pool:
return
if passwd is not None:
passwd = passwd.encode('utf8')
with self._pool_lock:
for conn in list(self._pool):
if conn.who != bind:
continue
if passwd is not None and conn.cred == passwd:
continue
# let's drop it
try:
conn.unbind_ext_s()
except ldap.LDAPError:
# invalid state
pass
self._pool.remove(conn)

View File

View File

@ -0,0 +1,139 @@
# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is Sync Server
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2011
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Tarek Ziade (tarek@mozilla.com)
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
import unittest
import ldap
from ldappool import (ConnectionManager, StateConnector, BackendError,
MaxConnectionReachedError)
def _bind(self, who='', cred='', **kw):
self.connected = True
self.who = who
self.cred = cred
return 1
def _bind_fails(self, who='', cred='', **kw):
raise ldap.LDAPError('LDAP connection invalid')
def _bind_fails2(self, who='', cred='', **kw):
raise ldap.SERVER_DOWN('LDAP connection invalid')
class TestLDAPConnection(unittest.TestCase):
def setUp(self):
self.old = StateConnector.simple_bind_s
StateConnector.simple_bind_s = _bind
def tearDown(self):
StateConnector.simple_bind_s = self.old
def test_connection(self):
uri = ''
dn = 'uid=adminuser,ou=logins,dc=mozilla'
passwd = 'adminuser'
cm = ConnectionManager(uri, dn, passwd, use_pool=True, size=2)
self.assertEqual(len(cm), 0)
with cm.connection('dn', 'pass'):
self.assertEqual(len(cm), 1)
# if we ask a new one the pool will grow
with cm.connection('dn', 'pass'):
self.assertEqual(len(cm), 2)
# every connector is marked active
self.assertTrue(cm._pool[0].active)
self.assertTrue(cm._pool[1].active)
# if we ask a new one the pool is full
try:
with cm.connection('dn', 'pass'):
pass
except MaxConnectionReachedError:
pass
else:
raise AssertionError()
# down to one active
self.assertFalse(cm._pool[1].active)
self.assertTrue(cm._pool[0].active)
# if we ask a new one the pool is full
# but we get the inactive one
with cm.connection('dn', 'pass'):
self.assertEqual(len(cm), 2)
self.assertFalse(cm._pool[1].active)
self.assertTrue(cm._pool[0].active)
# if we ask a new one the pool is full
# but we get the inactive one, and rebind it
with cm.connection('dn2', 'pass'):
self.assertEqual(len(cm), 2)
# the pool is still 2
self.assertEqual(len(cm), 2)
# every connector is marked inactive
self.assertFalse(cm._pool[0].active)
self.assertFalse(cm._pool[1].active)
def test_simple_bind_fails(self):
unbinds = []
def _unbind(self):
unbinds.append(1)
# the binding fails with an LDAPError
StateConnector.simple_bind_s = _bind_fails2
StateConnector.unbind_s = _unbind
uri = ''
dn = 'uid=adminuser,ou=logins,dc=mozilla'
passwd = 'adminuser'
cm = ConnectionManager(uri, dn, passwd, use_pool=True, size=2)
self.assertEqual(len(cm), 0)
try:
with cm.connection('dn', 'pass'):
pass
except BackendError:
pass
else:
raise AssertionError()

View File

@ -0,0 +1,250 @@
# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is Sync Server
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2010
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Tarek Ziade (tarek@mozilla.com)
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
import unittest
import threading
import time
import ldap
from ldappool import (ConnectionManager, StateConnector,
MaxConnectionReachedError)
# patching StateConnector
StateConnector.users = {'uid=tarek,ou=users,dc=mozilla':
{'uidNumber': ['1'],
'account-enabled': ['Yes'],
'mail': ['tarek@mozilla.com'],
'cn': ['tarek']},
'cn=admin,dc=mozilla': {'cn': ['admin'],
'mail': ['admin'],
'uidNumber': ['100']}}
def _simple_bind(self, who='', cred='', *args):
self.connected = True
self.who = who
self.cred = cred
StateConnector.simple_bind_s = _simple_bind
def _search(self, dn, *args, **kw):
if dn in self.users:
return [(dn, self.users[dn])]
elif dn == 'ou=users,dc=mozilla':
uid = kw['filterstr'].split('=')[-1][:-1]
for dn_, value in self.users.items():
if value['uidNumber'][0] != uid:
continue
return [(dn_, value)]
raise ldap.NO_SUCH_OBJECT
StateConnector.search_s = _search
def _add(self, dn, user):
self.users[dn] = {}
for key, value in user:
if not isinstance(value, list):
value = [value]
self.users[dn][key] = value
return ldap.RES_ADD, ''
StateConnector.add_s = _add
def _modify(self, dn, user):
if dn in self.users:
for type_, key, value in user:
if not isinstance(value, list):
value = [value]
self.users[dn][key] = value
return ldap.RES_MODIFY, ''
StateConnector.modify_s = _modify
def _delete(self, dn):
if dn in self.users:
del self.users[dn]
return ldap.RES_DELETE, ''
StateConnector.delete_s = _delete
class LDAPWorker(threading.Thread):
def __init__(self, pool):
threading.Thread.__init__(self)
self.pool = pool
self.results = []
def run(self):
dn = 'cn=admin,dc=mozilla'
for i in range(10):
with self.pool.connection() as conn:
res = conn.search_s(dn, ldap.SCOPE_BASE,
attrlist=['cn'])
self.results.append(res)
class TestLDAPSQLAuth(unittest.TestCase):
def test_pool(self):
dn = 'uid=adminuser,ou=logins,dc=mozilla'
passwd = 'adminuser'
pool = ConnectionManager('ldap://localhost', dn, passwd)
workers = [LDAPWorker(pool) for i in range(10)]
for worker in workers:
worker.start()
for worker in workers:
worker.join()
self.assertEquals(len(worker.results), 10)
cn = worker.results[0][0][1]['cn']
self.assertEquals(cn, ['admin'])
def test_pool_full(self):
dn = 'uid=adminuser,ou=logins,dc=mozilla'
passwd = 'adminuser'
pool = ConnectionManager('ldap://localhost', dn, passwd, size=1,
retry_delay=1., retry_max=5,
use_pool=True)
class Worker(threading.Thread):
def __init__(self, pool, duration):
threading.Thread.__init__(self)
self.pool = pool
self.duration = duration
def run(self):
with self.pool.connection() as conn: # NOQA
time.sleep(self.duration)
def tryit():
with pool.connection() as conn: # NOQA
pass
# an attempt on a full pool should eventually work
# because the connector is reused
for i in range(10):
tryit()
# we have 1 non-active connector now
self.assertEqual(len(pool), 1)
# an attempt with a full pool should succeed if a
# slot gets freed in less than one second.
worker1 = Worker(pool, .4)
worker1.start()
try:
tryit()
finally:
worker1.join()
# an attempt with a full pool should fail
# if no slot gets freed in less than one second.
worker1 = Worker(pool, 1.1)
worker1.start()
try:
self.assertRaises(MaxConnectionReachedError, tryit)
finally:
worker1.join()
# we still have one active connector
self.assertEqual(len(pool), 1)
def test_pool_cleanup(self):
dn = 'uid=adminuser,ou=logins,dc=mozilla'
passwd = 'adminuser'
pool = ConnectionManager('ldap://localhost', dn, passwd, size=1,
use_pool=True)
with pool.connection('bind1') as conn: # NOQA
pass
with pool.connection('bind2') as conn: # NOQA
pass
# the second call should have removed the first conn
self.assertEqual(len(pool), 1)
def test_pool_reuse(self):
dn = 'uid=adminuser,ou=logins,dc=mozilla'
passwd = 'adminuser'
pool = ConnectionManager('ldap://localhost', dn, passwd,
use_pool=True)
with pool.connection() as conn:
self.assertTrue(conn.active)
self.assertFalse(conn.active)
self.assertTrue(conn.connected)
with pool.connection() as conn2:
pass
self.assertTrue(conn is conn2)
with pool.connection() as conn:
conn.connected = False
with pool.connection() as conn2:
pass
self.assertTrue(conn is not conn2)
# same bind and password: reuse
with pool.connection('bind', 'passwd') as conn:
self.assertTrue(conn.active)
self.assertFalse(conn.active)
self.assertTrue(conn.connected)
with pool.connection('bind', 'passwd') as conn2:
pass
self.assertTrue(conn is conn2)
# same bind different password: rebind !
with pool.connection('bind', 'passwd') as conn:
self.assertTrue(conn.active)
self.assertFalse(conn.active)
self.assertTrue(conn.connected)
with pool.connection('bind', 'passwd2') as conn2:
pass
self.assertTrue(conn is conn2)

45
setup.py Normal file
View File

@ -0,0 +1,45 @@
# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is Sync Server
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2010
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Tarek Ziade (tarek@mozilla.com)
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
from setuptools import setup, find_packages
install_requires = ['python-ldap']
setup(name='ldappool', version='0.9', packages=find_packages(),
author='Mozilla Services', author_email='services-dev@mozilla.org',
url='https://github.com/mozilla-services/ldappool',
install_requires=install_requires)