diff --git a/tooz/coordination.py b/tooz/coordination.py index e500dc09..1664daf2 100644 --- a/tooz/coordination.py +++ b/tooz/coordination.py @@ -292,6 +292,19 @@ class CoordinationDriver(object): """ raise tooz.NotImplemented + @staticmethod + def get_member_info(group_id, member_id): + """Return the statistics and capabilities of a member asynchronously. + + :param group_id: the id of the group of the member + :type group_id: str + :param member_id: the id of the member + :type member_id: str + :returns: capabilities and statistics of a member + :rtype: CoordAsyncResult + """ + raise tooz.NotImplemented + @staticmethod def update_capabilities(group_id, capabilities): """Update capabilities of the caller in the specified group diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index 87de2c11..b753ca33 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -278,6 +278,39 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): timeout_exception=self._timeout_exception, group_id=group_id, member_id=self._member_id) + @classmethod + def _get_member_info_handler(cls, async_result, timeout, + timeout_exception, group_id, + member_id): + try: + capabilities, znode_stats = async_result.get(block=True, + timeout=timeout) + except timeout_exception as e: + coordination.raise_with_cause(coordination.OperationTimedOut, + utils.exception_message(e), + cause=e) + except exceptions.NoNodeError: + raise coordination.MemberNotJoined(group_id, member_id) + except exceptions.ZookeeperError as e: + coordination.raise_with_cause(coordination.ToozError, + utils.exception_message(e), + cause=e) + else: + member_info = { + 'capabilities': cls._loads(capabilities), + 'created_at': utils.millis_to_datetime(znode_stats.ctime), + 'updated_at': utils.millis_to_datetime(znode_stats.mtime) + } + return member_info + + def get_member_info(self, group_id, member_id): + member_path = self._path_member(group_id, member_id) + async_result = self._coord.get_async(member_path) + return ZooAsyncResult(async_result, + self._get_member_info_handler, + timeout_exception=self._timeout_exception, + group_id=group_id, member_id=self._member_id) + @staticmethod def _get_groups_handler(async_result, timeout, timeout_exception): try: diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index eba28247..a7362a60 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -226,6 +226,7 @@ class TestAPI(testscenarios.TestWithScenarios, capa = self._coord.get_member_capabilities(self.group_id, self.member_id).get() self.assertEqual(capa, caps) + self.assertEqual(capa['type'], caps['type']) def test_get_member_capabilities_nonexistent_group(self): capa = self._coord.get_member_capabilities(self.group_id, @@ -242,6 +243,44 @@ class TestAPI(testscenarios.TestWithScenarios, self.assertRaises(tooz.coordination.MemberNotJoined, capa.get) + def test_get_member_info(self): + self._coord.create_group(self.group_id).get() + self._coord.join_group(self.group_id, b"test_capabilities") + + member_info = self._coord.get_member_info(self.group_id, + self.member_id).get() + self.assertEqual(member_info['capabilities'], b"test_capabilities") + + def test_get_member_info_complex(self): + self._coord.create_group(self.group_id).get() + caps = { + 'type': 'warrior', + 'abilities': ['fight', 'flight', 'double-hit-damage'], + } + member_info = {'capabilities': 'caps', + 'created_at': '0', + 'updated_at': '0'} + self._coord.join_group(self.group_id, caps) + member_info = self._coord.get_member_info(self.group_id, + self.member_id).get() + self.assertEqual(member_info['capabilities'], caps) + + def test_get_member_info_nonexistent_group(self): + member_info = self._coord.get_member_info(self.group_id, + self.member_id) + # Drivers raise one of those depending on their capability + self.assertRaisesAny([tooz.coordination.MemberNotJoined, + tooz.coordination.GroupNotCreated], + member_info.get) + + def test_get_member_info_nonjoined_member(self): + self._coord.create_group(self.group_id).get() + member_id = self._get_random_uuid() + member_info = self._coord.get_member_info(self.group_id, + member_id) + self.assertRaises(tooz.coordination.MemberNotJoined, + member_info.get) + def test_update_capabilities(self): self._coord.create_group(self.group_id).get() self._coord.join_group(self.group_id, b"test_capabilities1").get() diff --git a/tooz/utils.py b/tooz/utils.py index be0c4697..ac723ea2 100644 --- a/tooz/utils.py +++ b/tooz/utils.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import errno import os @@ -175,3 +176,8 @@ def loads(blob, excp_cls=coordination.SerializationError): except (msgpack.UnpackException, ValueError) as e: coordination.raise_with_cause(excp_cls, exception_message(e), cause=e) + + +def millis_to_datetime(milliseconds): + """Converts number of milliseconds (from epoch) into a datetime object.""" + return datetime.datetime.fromtimestamp(float(milliseconds) / 1000)