diff --git a/docs/api.rst b/docs/api.rst index ff921c0..744b012 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -16,7 +16,6 @@ organized alphabetically by module name. api/interfaces api/protocol/states api/recipe/barrier - api/recipe/cache api/recipe/counter api/recipe/election api/recipe/lease diff --git a/docs/api/recipe/cache.rst b/docs/api/recipe/cache.rst deleted file mode 100644 index fb7f372..0000000 --- a/docs/api/recipe/cache.rst +++ /dev/null @@ -1,26 +0,0 @@ -.. _cache_module: - -:mod:`kazoo.recipe.cache` ----------------------------- - -.. automodule:: kazoo.recipe.cache - -Public API -++++++++++ - - .. autoclass:: TreeCache - - .. automethod:: start - .. automethod:: close - .. automethod:: listen - .. automethod:: listen_fault - .. automethod:: get_data - .. automethod:: get_children - - .. autoclass:: TreeEvent - :members: - :show-inheritance: - - .. autoclass:: NodeData - :members: - :show-inheritance: diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py deleted file mode 100644 index 1e59f28..0000000 --- a/kazoo/recipe/cache.py +++ /dev/null @@ -1,389 +0,0 @@ -"""TreeCache - -:Maintainer: Jiangge Zhang -:Maintainer: Haochuan Guo -:Maintainer: Tianwen Zhang -:Status: Alpha - -A port of the Apache Curator's TreeCache recipe. It builds an in-memory cache -of a subtree in ZooKeeper and keeps it up-to-date. - -See also: http://curator.apache.org/curator-recipes/tree-cache.html -""" - -from __future__ import absolute_import - -import os -import logging -import contextlib -import functools -import operator - -from kazoo.exceptions import NoNodeError, KazooException -from kazoo.protocol.states import KazooState, EventType - - -logger = logging.getLogger(__name__) - - -class TreeCache(object): - """The cache of a ZooKeeper subtree. - - :param client: A :class:`~kazoo.client.KazooClient` instance. - :param path: The root path of subtree. - """ - - STATE_LATENT = 0 - STATE_STARTED = 1 - STATE_CLOSED = 2 - - def __init__(self, client, path): - self._client = client - self._root = TreeNode.make_root(self, path) - self._state = self.STATE_LATENT - self._outstanding_ops = 0 - self._is_initialized = False - self._error_listeners = [] - self._event_listeners = [] - - def start(self): - """Starts the cache. - - The cache is not started automatically. You must call this method. - - After a cache started, all changes of subtree will be synchronized - from the ZooKeeper server. Events will be fired for those activity. - - See also :meth:`~TreeCache.listen`. - - .. note:: - - This method is not thread safe. - """ - if self._state == self.STATE_LATENT: - self._state = self.STATE_STARTED - elif self._state == self.STATE_CLOSED: - raise KazooException('already closed') - else: - raise KazooException('already started') - - self._client.add_listener(self._session_watcher) - self._client.ensure_path(self._root._path) - - if self._client.connected: - self._root.on_created() - - def close(self): - """Closes the cache. - - A closed cache was detached from ZooKeeper's changes. And all nodes - will be invalidated. - - Once a tree cache was closed, it could not be started again. You should - only close a tree cache while you want to recycle it. - - .. note:: - - This method is not thread safe. - """ - if self._state == self.STATE_STARTED: - self._state = self.STATE_CLOSED - self._client.remove_listener(self._session_watcher) - with handle_exception(self._error_listeners): - self._root.on_deleted() - - def listen(self, listener): - """Registers a function to listen the cache events. - - The cache events are changes of local data. They are delivered from - watching notifications in ZooKeeper session. - - This method can be use as a decorator. - - :param listener: A callable object which accepting a - :class:`~kazoo.recipe.cache.TreeEvent` instance as - its argument. - """ - self._event_listeners.append(listener) - return listener - - def listen_fault(self, listener): - """Registers a function to listen the exceptions. - - It is possible to meet some exceptions during the cache running. You - could specific handlers for them. - - This method can be use as a decorator. - - :param listener: A callable object which accepting an exception as its - argument. - """ - self._error_listeners.append(listener) - return listener - - def get_data(self, path, default=None): - """Gets data of a node from cache. - - :param path: The absolute path string. - :param default: The default value which will be returned if the node - does not exist. - :raises ValueError: If the path is outside of this subtree. - :returns: A :class:`~kazoo.recipe.cache.NodeData` instance. - """ - node = self._find_node(path) - return default if node is None else node._data - - def get_children(self, path, default=None): - """Gets node children list from in-memory snapshot. - - :param path: The absolute path string. - :param default: The default value which will be returned if the node - does not exist. - :raises ValueError: If the path is outside of this subtree. - :returns: The :class:`frozenset` which including children names. - """ - node = self._find_node(path) - return default if node is None else frozenset(node._children) - - def _find_node(self, path): - if not path.startswith(self._root._path): - raise ValueError('outside of tree') - striped_path = path[len(self._root._path):].strip('/') - splited_path = [p for p in striped_path.split('/') if p] - current_node = self._root - for node_name in splited_path: - if node_name not in current_node._children: - return - current_node = current_node._children[node_name] - return current_node - - def _publish_event(self, event_type, event_data=None): - event = TreeEvent.make(event_type, event_data) - if self._state != self.STATE_CLOSED: - logger.debug('public event: %r', event) - self._in_background(self._do_publish_event, event) - - def _do_publish_event(self, event): - for listener in self._event_listeners: - with handle_exception(self._error_listeners): - listener(event) - - def _in_background(self, func, *args, **kwargs): - self._client.handler.callback_queue.put(lambda: func(*args, **kwargs)) - - def _session_watcher(self, state): - if state == KazooState.SUSPENDED: - self._publish_event(TreeEvent.CONNECTION_SUSPENDED) - elif state == KazooState.CONNECTED: - with handle_exception(self._error_listeners): - self._root.on_reconnected() - self._publish_event(TreeEvent.CONNECTION_RECONNECTED) - elif state == KazooState.LOST: - self._is_initialized = False - self._publish_event(TreeEvent.CONNECTION_LOST) - - -class TreeNode(object): - """The tree node record. - - :param tree: A :class:`~kazoo.recipe.cache.TreeCache` instance. - :param path: The path of current node. - :param parent: The parent node reference. ``None`` for root node. - """ - - __slots__ = ('_tree', '_path', '_parent', '_depth', '_children', '_state', - '_data') - - STATE_PENDING = 0 - STATE_LIVE = 1 - STATE_DEAD = 2 - - def __init__(self, tree, path, parent): - self._tree = tree - self._path = path - self._parent = parent - self._depth = parent._depth + 1 if parent else 0 - self._children = {} - self._state = self.STATE_PENDING - self._data = None - - @classmethod - def make_root(cls, tree, path): - return cls(tree, path, None) - - def on_reconnected(self): - self._refresh() - for child in self._children.values(): - child.on_reconnected() - - def on_created(self): - self._refresh() - - def on_deleted(self): - old_children, self._children = self._children, {} - old_data, self._data = self._data, None - - for old_child in old_children.values(): - old_child.on_deleted() - - if self._tree._state == self._tree.STATE_CLOSED: - return - - old_state, self._state = self._state, self.STATE_DEAD - if old_state == self.STATE_LIVE: - self._publish_event(TreeEvent.NODE_REMOVED, old_data) - - if self._parent is None: - self._call_client('exists', self._path) # root node - else: - child = self._path[len(self._parent._path) + 1:] - if self._parent._children.get(child) is self: - del self._parent._children[child] - - def _publish_event(self, *args, **kwargs): - return self._tree._publish_event(*args, **kwargs) - - def _refresh(self): - self._refresh_data() - self._refresh_children() - - def _refresh_data(self): - self._call_client('get', self._path) - - def _refresh_children(self): - # TODO max-depth checking support - self._call_client('get_children', self._path) - - def _call_client(self, method_name, path, *args): - self._tree._outstanding_ops += 1 - callback = functools.partial( - self._tree._in_background, self._process_result, - method_name, path) - kwargs = {'watch': self._process_watch} - method = getattr(self._tree._client, method_name + '_async') - method(path, *args, **kwargs).rawlink(callback) - - def _process_watch(self, watched_event): - logger.debug('process_watch: %r', watched_event) - with handle_exception(self._tree._error_listeners): - if watched_event.type == EventType.CREATED: - assert self._parent is None, 'unexpected CREATED on non-root' - self.on_created() - elif watched_event.type == EventType.DELETED: - self.on_deleted() - elif watched_event.type == EventType.CHANGED: - self._refresh_data() - elif watched_event.type == EventType.CHILD: - self._refresh_children() - - def _process_result(self, method_name, path, result): - logger.debug('process_result: %s %s', method_name, path) - if method_name == 'exists': - assert self._parent is None, 'unexpected EXISTS on non-root' - # the value of result will be set with `None` if node not exists. - if result.get() is not None: - if self._state == self.STATE_DEAD: - self._state = self.STATE_PENDING - self.on_created() - elif method_name == 'get_children': - try: - children = result.get() - except NoNodeError: - self.on_deleted() - else: - for child in sorted(children): - full_path = os.path.join(path, child) - if child not in self._children: - node = TreeNode(self._tree, full_path, self) - self._children[child] = node - node.on_created() - elif method_name == 'get': - try: - data, stat = result.get() - except NoNodeError: - self.on_deleted() - else: - old_data, self._data = ( - self._data, NodeData.make(path, data, stat)) - - old_state, self._state = self._state, self.STATE_LIVE - if old_state == self.STATE_LIVE: - if old_data is None or old_data.stat.mzxid != stat.mzxid: - self._publish_event(TreeEvent.NODE_UPDATED, self._data) - else: - self._publish_event(TreeEvent.NODE_ADDED, self._data) - else: # pragma: no cover - logger.warning('unknown operation %s', method_name) - self._tree._outstanding_ops -= 1 - return - - self._tree._outstanding_ops -= 1 - if self._tree._outstanding_ops == 0 and not self._tree._is_initialized: - self._tree._is_initialized = True - self._publish_event(TreeEvent.INITIALIZED) - - -class TreeEvent(tuple): - """The immutable event tuple of cache.""" - - NODE_ADDED = 0 - NODE_UPDATED = 1 - NODE_REMOVED = 2 - CONNECTION_SUSPENDED = 3 - CONNECTION_RECONNECTED = 4 - CONNECTION_LOST = 5 - INITIALIZED = 6 - - #: An enumerate integer to indicate event type. - event_type = property(operator.itemgetter(0)) - - #: A :class:`~kazoo.recipe.cache.NodeData` instance. - event_data = property(operator.itemgetter(1)) - - @classmethod - def make(cls, event_type, event_data): - """Creates a new TreeEvent tuple. - - :returns: A :class:`~kazoo.recipe.cache.TreeEvent` instance. - """ - assert event_type in ( - cls.NODE_ADDED, cls.NODE_UPDATED, cls.NODE_REMOVED, - cls.CONNECTION_SUSPENDED, cls.CONNECTION_RECONNECTED, - cls.CONNECTION_LOST, cls.INITIALIZED) - return cls((event_type, event_data)) - - -class NodeData(tuple): - """The immutable node data tuple of cache.""" - - #: The absolute path string of current node. - path = property(operator.itemgetter(0)) - - #: The bytes data of current node. - data = property(operator.itemgetter(1)) - - #: The stat information of current node. - stat = property(operator.itemgetter(2)) - - @classmethod - def make(cls, path, data, stat): - """Creates a new NodeData tuple. - - :returns: A :class:`~kazoo.recipe.cache.NodeData` instance. - """ - return cls((path, data, stat)) - - -@contextlib.contextmanager -def handle_exception(listeners): - try: - yield - except Exception as e: - logger.debug('processing error: %r', e) - for listener in listeners: - try: - listener(e) - except: # pragma: no cover - logger.exception('Exception handling exception') # oops - else: - logger.exception('No listener to process %r', e) diff --git a/kazoo/tests/test_cache.py b/kazoo/tests/test_cache.py deleted file mode 100644 index 1aa03fb..0000000 --- a/kazoo/tests/test_cache.py +++ /dev/null @@ -1,278 +0,0 @@ -import uuid - -from mock import patch, call, Mock -from nose.tools import eq_, ok_, assert_not_equal, raises - -from kazoo.testing import KazooTestCase -from kazoo.exceptions import KazooException -from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent - - -class KazooTreeCacheTests(KazooTestCase): - - def setUp(self): - super(KazooTreeCacheTests, self).setUp() - self._event_queue = self.client.handler.queue_impl() - self._error_queue = self.client.handler.queue_impl() - self.path = None - self.cache = None - - def tearDown(self): - super(KazooTreeCacheTests, self).tearDown() - if not self._error_queue.empty(): - try: - raise self._error_queue.get() - except FakeException: - pass - - def make_cache(self): - if self.cache is None: - self.path = '/' + uuid.uuid4().hex - self.cache = TreeCache(self.client, self.path) - self.cache.listen(lambda event: self._event_queue.put(event)) - self.cache.listen_fault(lambda error: self._error_queue.put(error)) - self.cache.start() - return self.cache - - def wait_cache(self, expect=None, since=None, timeout=10): - started = since is None - while True: - event = self._event_queue.get(timeout=timeout) - if started: - if expect is not None: - eq_(event.event_type, expect) - return event - if event.event_type == since: - started = True - if expect is None: - return - - def spy_client(self, method_name): - method = getattr(self.client, method_name) - return patch.object(self.client, method_name, wraps=method) - - def test_start(self): - self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - - stat = self.client.exists(self.path) - eq_(stat.version, 0) - - eq_(self.cache._state, TreeCache.STATE_STARTED) - eq_(self.cache._root._state, TreeNode.STATE_LIVE) - - @raises(KazooException) - def test_start_started(self): - self.make_cache() - self.cache.start() - - @raises(KazooException) - def test_start_closed(self): - self.make_cache() - self.cache.start() - self.cache.close() - self.cache.start() - - def test_close(self): - self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - self.client.create(self.path + '/foo/bar/baz', makepath=True) - for _ in range(3): - self.wait_cache(TreeEvent.NODE_ADDED) - - self.cache.close() - - # nothing should be published since tree closed - ok_(self._event_queue.empty()) - - # tree should be empty - eq_(self.cache._root._children, {}) - eq_(self.cache._root._data, None) - eq_(self.cache._state, TreeCache.STATE_CLOSED) - - # node state should not be changed - assert_not_equal(self.cache._root._state, TreeNode.STATE_DEAD) - - def test_children_operation(self): - self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - - self.client.create(self.path + '/test_children', b'test_children_1') - event = self.wait_cache(TreeEvent.NODE_ADDED) - eq_(event.event_type, TreeEvent.NODE_ADDED) - eq_(event.event_data.path, self.path + '/test_children') - eq_(event.event_data.data, b'test_children_1') - eq_(event.event_data.stat.version, 0) - - self.client.set(self.path + '/test_children', b'test_children_2') - event = self.wait_cache(TreeEvent.NODE_UPDATED) - eq_(event.event_type, TreeEvent.NODE_UPDATED) - eq_(event.event_data.path, self.path + '/test_children') - eq_(event.event_data.data, b'test_children_2') - eq_(event.event_data.stat.version, 1) - - self.client.delete(self.path + '/test_children') - event = self.wait_cache(TreeEvent.NODE_REMOVED) - eq_(event.event_type, TreeEvent.NODE_REMOVED) - eq_(event.event_data.path, self.path + '/test_children') - eq_(event.event_data.data, b'test_children_2') - eq_(event.event_data.stat.version, 1) - - def test_subtree_operation(self): - self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - - self.client.create(self.path + '/foo/bar/baz', makepath=True) - for relative_path in ('/foo', '/foo/bar', '/foo/bar/baz'): - event = self.wait_cache(TreeEvent.NODE_ADDED) - eq_(event.event_type, TreeEvent.NODE_ADDED) - eq_(event.event_data.path, self.path + relative_path) - eq_(event.event_data.data, b'') - eq_(event.event_data.stat.version, 0) - - self.client.delete(self.path + '/foo', recursive=True) - for relative_path in ('/foo/bar/baz', '/foo/bar', '/foo'): - event = self.wait_cache(TreeEvent.NODE_REMOVED) - eq_(event.event_type, TreeEvent.NODE_REMOVED) - eq_(event.event_data.path, self.path + relative_path) - - def test_get_data(self): - cache = self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True) - self.wait_cache(TreeEvent.NODE_ADDED) - self.wait_cache(TreeEvent.NODE_ADDED) - self.wait_cache(TreeEvent.NODE_ADDED) - - with patch.object(cache, '_client'): # disable any remote operation - eq_(cache.get_data(self.path).data, b'') - eq_(cache.get_data(self.path).stat.version, 0) - - eq_(cache.get_data(self.path + '/foo').data, b'') - eq_(cache.get_data(self.path + '/foo').stat.version, 0) - - eq_(cache.get_data(self.path + '/foo/bar').data, b'') - eq_(cache.get_data(self.path + '/foo/bar').stat.version, 0) - - eq_(cache.get_data(self.path + '/foo/bar/baz').data, b'@') - eq_(cache.get_data(self.path + '/foo/bar/baz').stat.version, 0) - - def test_get_children(self): - cache = self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True) - self.wait_cache(TreeEvent.NODE_ADDED) - self.wait_cache(TreeEvent.NODE_ADDED) - self.wait_cache(TreeEvent.NODE_ADDED) - - with patch.object(cache, '_client'): # disable any remote operation - eq_(cache.get_children(self.path + '/foo/bar/baz'), frozenset()) - eq_(cache.get_children(self.path + '/foo/bar'), frozenset(['baz'])) - eq_(cache.get_children(self.path + '/foo'), frozenset(['bar'])) - eq_(cache.get_children(self.path), frozenset(['foo'])) - - @raises(ValueError) - def test_get_data_out_of_tree(self): - self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - self.cache.get_data('/out_of_tree') - - @raises(ValueError) - def test_get_children_out_of_tree(self): - self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - self.cache.get_children('/out_of_tree') - - def test_get_data_no_node(self): - cache = self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - - with patch.object(cache, '_client'): # disable any remote operation - eq_(cache.get_data(self.path + '/non_exists'), None) - - def test_get_children_no_node(self): - cache = self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - - with patch.object(cache, '_client'): # disable any remote operation - eq_(cache.get_children(self.path + '/non_exists'), None) - - def test_session_reconnected(self): - self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - - self.client.create(self.path + '/foo') - event = self.wait_cache(TreeEvent.NODE_ADDED) - eq_(event.event_data.path, self.path + '/foo') - - with self.spy_client('get_async') as get_data: - with self.spy_client('get_children_async') as get_children: - # session suspended - self.lose_connection(self.client.handler.event_object) - self.wait_cache(TreeEvent.CONNECTION_SUSPENDED) - - # There are a serial refreshing operation here. But NODE_ADDED - # events will not be raised because the zxid of nodes are the - # same during reconnecting. - - # connection restore - self.wait_cache(TreeEvent.CONNECTION_RECONNECTED) - - # wait for outstanding operations - while self.cache._outstanding_ops > 0: - self.client.handler.sleep_func(0.1) - - # inspect in-memory nodes - _node_root = self.cache._root - _node_foo = self.cache._root._children['foo'] - - # make sure that all nodes are refreshed - get_data.assert_has_calls([ - call(self.path, watch=_node_root._process_watch), - call(self.path + '/foo', watch=_node_foo._process_watch), - ], any_order=True) - get_children.assert_has_calls([ - call(self.path, watch=_node_root._process_watch), - call(self.path + '/foo', watch=_node_foo._process_watch), - ], any_order=True) - - def test_root_recreated(self): - self.make_cache() - self.wait_cache(since=TreeEvent.INITIALIZED) - - # remove root node - self.client.delete(self.path) - event = self.wait_cache(TreeEvent.NODE_REMOVED) - eq_(event.event_type, TreeEvent.NODE_REMOVED) - eq_(event.event_data.data, b'') - eq_(event.event_data.path, self.path) - eq_(event.event_data.stat.version, 0) - - # re-create root node - self.client.ensure_path(self.path) - event = self.wait_cache(TreeEvent.NODE_ADDED) - eq_(event.event_type, TreeEvent.NODE_ADDED) - eq_(event.event_data.data, b'') - eq_(event.event_data.path, self.path) - eq_(event.event_data.stat.version, 0) - - self.assertTrue( - self.cache._outstanding_ops >= 0, - 'unexpected outstanding ops %r' % self.cache._outstanding_ops) - - def test_exception_handler(self): - error_value = FakeException() - error_handler = Mock() - - with patch.object(TreeNode, 'on_deleted') as on_deleted: - on_deleted.side_effect = [error_value] - - self.make_cache() - self.cache.listen_fault(error_handler) - - self.cache.close() - error_handler.assert_called_once_with(error_value) - - -class FakeException(Exception): - pass