Merge pull request #446 from python-zk/feat/issue-445

feat: pep8 all the things
This commit is contained in:
Ben Bangert 2017-06-01 23:00:28 -07:00 committed by GitHub
commit bd1ee41fd1
19 changed files with 160 additions and 87 deletions

3
.hound.yml Normal file
View File

@ -0,0 +1,3 @@
fail_on_violations: true
python:
enabled: true

View File

@ -1 +1 @@
from .version import __version__
from .version import __version__ # noqa

View File

@ -1384,7 +1384,8 @@ class KazooClient(object):
joining=joining, leaving=None, new_members=None)
# wait and then remove it (just by using its id) (incremental)
data, _ = zk.reconfig(joining=None, leaving='100', new_members=None)
data, _ = zk.reconfig(joining=None, leaving='100',
new_members=None)
# now do a full change of the cluster (non-incremental)
new = [
@ -1418,7 +1419,8 @@ class KazooClient(object):
returns a non-zero error code.
"""
result = self.reconfig_async(joining, leaving, new_members, from_config)
result = self.reconfig_async(joining, leaving, new_members,
from_config)
return result.get()
def reconfig_async(self, joining, leaving, new_members, from_config):

View File

@ -149,8 +149,11 @@ class SequentialThreadingHandler(object):
end = (time.time() + timeout) if timeout else None
while end is None or time.time() < end:
if end is not None:
args = list(args) # make a list, since tuples aren't mutable
args[3] = end - time.time() # set the timeout to the remaining time
# make a list, since tuples aren't mutable
args = list(args)
# set the timeout to the remaining time
args[3] = end - time.time()
try:
return select.select(*args, **kwargs)
except select.error as ex:

View File

@ -491,7 +491,8 @@ class ConnectionHandler(object):
host_ports = []
for host, port in self.client.hosts:
try:
for rhost in socket.getaddrinfo(host.strip(), port, 0, 0, socket.IPPROTO_TCP):
for rhost in socket.getaddrinfo(host.strip(), port, 0, 0,
socket.IPPROTO_TCP):
host_ports.append((rhost[4][0], rhost[4][1]))
except socket.gaierror as e:
# Skip hosts that don't resolve
@ -618,7 +619,8 @@ class ConnectionHandler(object):
client._session_id or 0, client._session_passwd,
client.read_only)
# save the client's last_zxid before it gets overwritten by the server's.
# save the client's last_zxid before it gets overwritten by the
# server's.
# we'll need this to reset watches via SetWatches further below.
last_zxid = client.last_zxid
@ -667,7 +669,8 @@ class ConnectionHandler(object):
client._data_watchers.keys(),
client._data_watchers.keys(),
client._child_watchers.keys())
zxid = self._invoke(connect_timeout / 1000.0, sw, xid=SET_WATCHES_XID)
zxid = self._invoke(connect_timeout / 1000.0, sw,
xid=SET_WATCHES_XID)
if zxid:
client.last_zxid = zxid

View File

@ -359,7 +359,8 @@ class Transaction(namedtuple('Transaction', 'operations')):
return resp
class Reconfig(namedtuple('Reconfig', 'joining leaving new_members config_id')):
class Reconfig(namedtuple('Reconfig',
'joining leaving new_members config_id')):
type = 16
def serialize(self):

View File

@ -105,7 +105,7 @@ class EventType(object):
NONE = 'NONE'
EVENT_TYPE_MAP = {
-1:EventType.NONE,
-1: EventType.NONE,
1: EventType.CREATED,
2: EventType.DELETED,
3: EventType.CHANGED,

View File

@ -15,24 +15,28 @@ from kazoo.exceptions import CancelledError
class NonBlockingLease(object):
"""Exclusive lease that does not block.
An exclusive lease ensures that only one client at a time owns the lease. The client may
renew the lease without losing it by obtaining a new lease with the same path and same
identity. The lease object evaluates to True if the lease was obtained.
An exclusive lease ensures that only one client at a time owns the lease.
The client may renew the lease without losing it by obtaining a new lease
with the same path and same identity. The lease object evaluates to True
if the lease was obtained.
A common use case is a situation where a task should only run on a single host. In this
case, the clients that did not obtain the lease should exit without performing the protected
task.
A common use case is a situation where a task should only run on a single
host. In this case, the clients that did not obtain the lease should exit
without performing the protected task.
The lease stores time stamps using client clocks, and will therefore only work if client clocks
are roughly synchronised. It uses UTC, and works across time zones and daylight savings.
The lease stores time stamps using client clocks, and will therefore only
work if client clocks are roughly synchronised. It uses UTC, and works
across time zones and daylight savings.
Example usage: with a :class:`~kazoo.client.KazooClient` instance::
zk = KazooClient()
zk.start()
# Hold lease over an hour in order to keep job on same machine, with failover if it dies.
lease = zk.NonBlockingLease("/db_leases/hourly_cleanup", datetime.timedelta(minutes = 70),
identifier = "DB hourly cleanup on " + socket.gethostname())
# Hold lease over an hour in order to keep job on same machine,
# with failover if it dies.
lease = zk.NonBlockingLease(
"/db_leases/hourly_cleanup", datetime.timedelta(minutes = 70),
identifier = "DB hourly cleanup on " + socket.gethostname())
if lease:
do_hourly_database_cleanup()
"""
@ -42,15 +46,20 @@ class NonBlockingLease(object):
_date_format = "%Y-%m-%dT%H:%M:%S"
_byte_encoding = 'utf-8'
def __init__(self, client, path, duration, identifier=None, utcnow=datetime.datetime.utcnow):
def __init__(self, client, path, duration, identifier=None,
utcnow=datetime.datetime.utcnow):
"""Create a non-blocking lease.
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The lease path to use.
:param duration: Duration during which the lease is reserved. A :class:`~datetime.timedelta` instance.
:param identifier: Unique name to use for this lease holder. Reuse in order to renew the lease.
Defaults do :meth:`socket.gethostname()`.
:param utcnow: Clock function, by default returning :meth:`datetime.datetime.utcnow()`. Used for testing.
:param duration: Duration during which the lease is reserved. A
:class:`~datetime.timedelta` instance.
:param identifier: Unique name to use for this lease holder. Reuse in
order to renew the lease. Defaults to
:meth:`socket.gethostname()`.
:param utcnow: Clock function, by default returning
:meth:`datetime.datetime.utcnow()`. Used for testing.
"""
ident = identifier or socket.gethostname()
self.obtained = False
@ -69,13 +78,15 @@ class NonBlockingLease(object):
if data["version"] != self._version:
# We need an upgrade, let someone else take the lease
return
current_end = datetime.datetime.strptime(data['end'], self._date_format)
current_end = datetime.datetime.strptime(data['end'],
self._date_format)
if data['holder'] != ident and now < current_end:
# Another client is still holding the lease
return
client.delete(holder_path)
end_lease = (now + duration).strftime(self._date_format)
new_data = {'version': self._version, 'holder': ident, 'end': end_lease}
new_data = {'version': self._version, 'holder': ident,
'end': end_lease}
client.create(holder_path, self._encode(new_data))
self.obtained = True
@ -100,16 +111,21 @@ class NonBlockingLease(object):
class MultiNonBlockingLease(object):
"""Exclusive lease for multiple clients.
This type of lease is useful when a limited set of hosts should run a particular task.
It will attempt to obtain leases trying a sequence of ZooKeeper lease paths.
This type of lease is useful when a limited set of hosts should run a
particular task. It will attempt to obtain leases trying a sequence of
ZooKeeper lease paths.
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param count: Number of host leases allowed.
:param path: ZooKeeper path under which lease files are stored.
:param duration: Duration during which the lease is reserved. A :class:`~datetime.timedelta` instance.
:param identifier: Unique name to use for this lease holder. Reuse in order to renew the lease.
:param duration: Duration during which the lease is reserved. A
:class:`~datetime.timedelta` instance.
:param identifier: Unique name to use for this lease holder. Reuse in order
to renew the lease.
Defaults do :meth:`socket.gethostname()`.
:param utcnow: Clock function, by default returning :meth:`datetime.datetime.utcnow()`. Used for testing.
:param utcnow: Clock function, by default returning
:meth:`datetime.datetime.utcnow()`. Used for testing.
"""
def __init__(self, client, count, path, duration, identifier=None,

View File

@ -68,8 +68,8 @@ class Lock(object):
Note: This lock is not *re-entrant*. Repeated calls after already
acquired will block.
This is an exclusive lock. For a read/write lock, see :class:`WriteLock` and
:class:`ReadLock`.
This is an exclusive lock. For a read/write lock, see :class:`WriteLock`
and :class:`ReadLock`.
"""
@ -267,8 +267,8 @@ class Lock(object):
# Node names are prefixed by a type: strip the prefix first, which may
# be one of multiple values in case of a read-write lock, and return
# only the sequence number (as a string since it is padded and will sort
# correctly anyway).
# only the sequence number (as a string since it is padded and will
# sort correctly anyway).
#
# In some cases, the lock path may contain nodes with other prefixes
# (eg. in case of a lease), just sort them last ('~' sorts after all

View File

@ -100,7 +100,7 @@ class Queue(BaseQueue):
# get another one
self._children = []
raise ForceRetryError()
self._children.pop(0)
return data
@ -269,8 +269,9 @@ class LockingQueue(BaseQueue):
:returns: True if the lock was removed successfully, False otherwise.
:rtype: bool
"""
if not self.processing_element is None and self.holds_lock:
if self.processing_element is not None and self.holds_lock:
id_, value = self.processing_element
with self.client.transaction() as transaction:
transaction.delete("{path}/{id}".format(
@ -281,7 +282,6 @@ class LockingQueue(BaseQueue):
else:
return False
def _inner_get(self, timeout):
flag = self.client.handler.event_object()
lock = self.client.handler.lock_object()

View File

@ -268,7 +268,8 @@ class ZookeeperCluster(object):
peer_type = 'observer'
else:
peer_type = 'participant'
info = ServerInfo(server_id, port, port + 1, port + 2, port + 3, peer_type)
info = ServerInfo(server_id, port, port + 1, port + 2, port + 3,
peer_type)
peers.append(info)
port += 10

View File

@ -8,7 +8,7 @@ import unittest
from kazoo import python2atexit as atexit
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException, NotEmptyError
from kazoo.exceptions import KazooException
from kazoo.protocol.states import (
KazooState
)
@ -27,7 +27,8 @@ def get_global_cluster():
ZK_CLASSPATH = os.environ.get("ZOOKEEPER_CLASSPATH")
ZK_PORT_OFFSET = int(os.environ.get("ZOOKEEPER_PORT_OFFSET", 20000))
ZK_CLUSTER_SIZE = int(os.environ.get("ZOOKEEPER_CLUSTER_SIZE", 3))
ZK_OBSERVER_START_ID = int(os.environ.get("ZOOKEEPER_OBSERVER_START_ID", -1))
ZK_OBSERVER_START_ID = int(
os.environ.get("ZOOKEEPER_OBSERVER_START_ID", -1))
assert ZK_HOME or ZK_CLASSPATH, (
"Either ZOOKEEPER_PATH or ZOOKEEPER_CLASSPATH environment "
@ -96,11 +97,13 @@ class KazooTestHarness(unittest.TestCase):
def lose_connection(self, event_factory):
"""Force client to lose connection with server"""
self.__break_connection(_CONNECTION_DROP, KazooState.SUSPENDED, event_factory)
self.__break_connection(_CONNECTION_DROP, KazooState.SUSPENDED,
event_factory)
def expire_session(self, event_factory):
"""Force ZK to expire a client session"""
self.__break_connection(_SESSION_EXPIRED, KazooState.LOST, event_factory)
self.__break_connection(_SESSION_EXPIRED, KazooState.LOST,
event_factory)
def setup_zookeeper(self, **client_options):
"""Create a ZK cluster and chrooted :class:`KazooClient`

View File

@ -426,7 +426,6 @@ class TestConnection(KazooTestCase):
client.get("/test/", watch=test_watch)
self.expire_session(self.make_event)
cv.wait(3)
assert cv.is_set()
@ -980,6 +979,7 @@ class TestClient(KazooTestCase):
# force a reconnect
states = []
rc = client.handler.event_object()
@client.add_listener
def listener(state):
if state == KazooState.CONNECTED:

View File

@ -39,9 +39,11 @@ class KazooLeaseTests(KazooTestCase):
class NonBlockingLeaseTests(KazooLeaseTests):
def test_renew(self):
# Use client convenience method here to test it at least once. Use class directly in
# Use client convenience method here to test it at least once. Use
# class directly in
# other tests in order to get better IDE support.
lease = self.client.NonBlockingLease(self.path, datetime.timedelta(seconds=3),
lease = self.client.NonBlockingLease(self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
self.assertTrue(lease.obtained)
@ -52,7 +54,8 @@ class NonBlockingLeaseTests(KazooLeaseTests):
self.assertTrue(renewed_lease)
def test_busy(self):
lease = NonBlockingLease(self.client, self.path, datetime.timedelta(seconds=3),
lease = NonBlockingLease(self.client, self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
@ -64,7 +67,8 @@ class NonBlockingLeaseTests(KazooLeaseTests):
self.assertFalse(foreigner_lease.obtained)
def test_overtake(self):
lease = NonBlockingLease(self.client, self.path, datetime.timedelta(seconds=3),
lease = NonBlockingLease(self.client, self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
@ -75,7 +79,8 @@ class NonBlockingLeaseTests(KazooLeaseTests):
self.assertTrue(foreigner_lease)
def test_renew_no_overtake(self):
lease = self.client.NonBlockingLease(self.path, datetime.timedelta(seconds=3),
lease = self.client.NonBlockingLease(self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
self.assertTrue(lease.obtained)
@ -92,7 +97,8 @@ class NonBlockingLeaseTests(KazooLeaseTests):
self.assertFalse(foreigner_lease)
def test_overtaker_renews(self):
lease = NonBlockingLease(self.client, self.path, datetime.timedelta(seconds=3),
lease = NonBlockingLease(self.client, self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
@ -109,7 +115,8 @@ class NonBlockingLeaseTests(KazooLeaseTests):
self.assertTrue(foreigner_renew)
def test_overtake_refuse_first(self):
lease = NonBlockingLease(self.client, self.path, datetime.timedelta(seconds=3),
lease = NonBlockingLease(self.client, self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
@ -121,13 +128,15 @@ class NonBlockingLeaseTests(KazooLeaseTests):
self.clock.forward(2)
first_again_lease = NonBlockingLease(
self.client, self.path, datetime.timedelta(seconds=3), utcnow=self.clock)
self.client, self.path, datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertFalse(first_again_lease)
def test_old_version(self):
# Skip to a future version
NonBlockingLease._version += 1
lease = NonBlockingLease(self.client, self.path, datetime.timedelta(seconds=3),
lease = NonBlockingLease(self.client, self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
@ -143,56 +152,82 @@ class NonBlockingLeaseTests(KazooLeaseTests):
class MultiNonBlockingLeaseTest(KazooLeaseTests):
def test_1_renew(self):
ls = self.client.MultiNonBlockingLease(1, self.path, datetime.timedelta(seconds=4), utcnow=self.clock)
ls = self.client.MultiNonBlockingLease(1, self.path,
datetime.timedelta(seconds=4),
utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client, 1, self.path, datetime.timedelta(seconds=4), utcnow=self.clock)
ls2 = MultiNonBlockingLease(self.client, 1, self.path,
datetime.timedelta(seconds=4),
utcnow=self.clock)
self.assertTrue(ls2)
def test_1_reject(self):
ls = MultiNonBlockingLease(self.client, 1, self.path, datetime.timedelta(seconds=4), utcnow=self.clock)
ls = MultiNonBlockingLease(self.client, 1, self.path,
datetime.timedelta(seconds=4),
utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 1, self.path, datetime.timedelta(seconds=4),
identifier="some.other.host", utcnow=self.clock)
ls2 = MultiNonBlockingLease(self.client2, 1, self.path,
datetime.timedelta(seconds=4),
identifier="some.other.host",
utcnow=self.clock)
self.assertFalse(ls2)
def test_2_renew(self):
ls = MultiNonBlockingLease(self.client, 2, self.path, datetime.timedelta(seconds=7), utcnow=self.clock)
ls = MultiNonBlockingLease(self.client, 2, self.path,
datetime.timedelta(seconds=7),
utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path, datetime.timedelta(seconds=7), identifier="host2", utcnow=self.clock)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path,
datetime.timedelta(seconds=7),
identifier="host2", utcnow=self.clock)
self.assertTrue(ls2)
self.clock.forward(2)
ls3 = MultiNonBlockingLease(self.client, 2, self.path, datetime.timedelta(seconds=7), utcnow=self.clock)
ls3 = MultiNonBlockingLease(self.client, 2, self.path,
datetime.timedelta(seconds=7),
utcnow=self.clock)
self.assertTrue(ls3)
self.clock.forward(2)
ls4 = MultiNonBlockingLease(self.client2, 2, self.path, datetime.timedelta(seconds=7), identifier="host2", utcnow=self.clock)
ls4 = MultiNonBlockingLease(self.client2, 2, self.path,
datetime.timedelta(seconds=7),
identifier="host2", utcnow=self.clock)
self.assertTrue(ls4)
def test_2_reject(self):
ls = MultiNonBlockingLease(self.client, 2, self.path, datetime.timedelta(seconds=7), utcnow=self.clock)
ls = MultiNonBlockingLease(self.client, 2, self.path,
datetime.timedelta(seconds=7),
utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path, datetime.timedelta(seconds=7),
identifier="host2", utcnow=self.clock)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path,
datetime.timedelta(seconds=7),
identifier="host2", utcnow=self.clock)
self.assertTrue(ls2)
self.clock.forward(2)
ls3 = MultiNonBlockingLease(self.client3, 2, self.path, datetime.timedelta(seconds=7),
identifier="host3", utcnow=self.clock)
ls3 = MultiNonBlockingLease(self.client3, 2, self.path,
datetime.timedelta(seconds=7),
identifier="host3", utcnow=self.clock)
self.assertFalse(ls3)
def test_2_handover(self):
ls = MultiNonBlockingLease(self.client, 2, self.path, datetime.timedelta(seconds=4), utcnow=self.clock)
ls = MultiNonBlockingLease(self.client, 2, self.path,
datetime.timedelta(seconds=4),
utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path, datetime.timedelta(seconds=4),
identifier="host2", utcnow=self.clock)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path,
datetime.timedelta(seconds=4),
identifier="host2", utcnow=self.clock)
self.assertTrue(ls2)
self.clock.forward(3)
ls3 = MultiNonBlockingLease(self.client3, 2, self.path, datetime.timedelta(seconds=4),
identifier="host3", utcnow=self.clock)
ls3 = MultiNonBlockingLease(self.client3, 2, self.path,
datetime.timedelta(seconds=4),
identifier="host3", utcnow=self.clock)
self.assertTrue(ls3)
self.clock.forward(2)
ls4 = MultiNonBlockingLease(self.client, 2, self.path, datetime.timedelta(seconds=4), utcnow=self.clock)
ls4 = MultiNonBlockingLease(self.client, 2, self.path,
datetime.timedelta(seconds=4),
utcnow=self.clock)
self.assertTrue(ls4)

View File

@ -1,6 +1,5 @@
import collections
import threading
import time
import uuid
from nose.tools import eq_, ok_
@ -319,7 +318,6 @@ class KazooLockTests(KazooTestCase):
sleep_func = self.client.handler.sleep_func
lock = self.client.Lock(self.lockpath, "one")
acquires = collections.deque()
chain = collections.deque()
differences = collections.deque()
barrier = SleepBarrier(self.thread_count, sleep_func)

View File

@ -147,12 +147,14 @@ class KazooPartitionerTests(KazooTestCase):
def test_race_condition_new_partitioner_during_the_lock(self):
locks = {}
def get_lock(path):
lock = locks.setdefault(path, self.client.handler.lock_object())
return SlowLockMock(self.client, lock)
with mock.patch.object(self.client, "Lock", side_effect=get_lock):
# Create first partitioner. It will start to acquire the set members.
# Create first partitioner. It will start to acquire the set
# members.
self.__create_partitioner(identifier="0", size=2)
# Wait until the first partitioner has acquired first lock and
@ -189,7 +191,8 @@ class KazooPartitionerTests(KazooTestCase):
return SlowLockMock(self.client, lock, delay_time=delay_time)
with mock.patch.object(self.client, "Lock", side_effect=get_lock):
# Create first partitioner. It will start to acquire the set members.
# Create first partitioner. It will start to acquire the set
# members.
self.__create_partitioner(identifier="0", size=2)
# Wait until the first partitioner has acquired first lock and
@ -212,7 +215,8 @@ class KazooPartitionerTests(KazooTestCase):
def __create_partitioner(self, size, identifier=None):
partitioner = self.client.SetPartitioner(
self.path, set=range(size), time_boundary=0.2, identifier=identifier)
self.path, set=range(size), time_boundary=0.2,
identifier=identifier)
self.__partitioners.append(partitioner)
return partitioner
@ -229,7 +233,8 @@ class KazooPartitionerTests(KazooTestCase):
def __assert_partitions(self, *partitions):
eq_(len(partitions), len(self.__partitioners))
for partitioner, own_partitions in zip(self.__partitioners, partitions):
for partitioner, own_partitions in zip(self.__partitioners,
partitions):
eq_(list(partitioner), own_partitions)
def __wait(self):
@ -244,4 +249,4 @@ class KazooPartitionerTests(KazooTestCase):
def __finish(self):
for partitioner in self.__partitioners:
partitioner.finish()
partitioner.finish()

View File

@ -48,7 +48,9 @@ class TestRetrySleeper(unittest.TestCase):
eq_(type(retry._cur_delay), float)
def test_copy(self):
_sleep = lambda t: None
def _sleep(t):
return None
retry = self._makeOne(sleep_func=_sleep)
rcopy = retry.copy()
self.assertTrue(rcopy.sleep_func is _sleep)

View File

@ -9,8 +9,8 @@ def test(arg):
def main(args):
if not args:
print("Run as bin/python run_failure.py <test>, for example: \n"
"bin/python run_failure.py "
"kazoo.tests.test_watchers:KazooChildrenWatcherTests")
"bin/python run_failure.py "
"kazoo.tests.test_watchers:KazooChildrenWatcherTests")
return
arg = args[0]
i = 0

View File

@ -1,7 +1,8 @@
[tox]
minversion = 1.6
skipsdist = True
envlist = pep8,
envlist =
pep8,
py27,
py27-gevent,
py27-eventlet,
@ -34,4 +35,4 @@ deps = {[testenv]deps}
[flake8]
builtins = _
exclude = .venv,.tox,dist,doc,*egg,.git,build,tools
exclude = .venv,.tox,dist,doc,*egg,.git,build,tools,sw,local,docs,zookeeper