diff --git a/delimiter/drivers/zookeeper.py b/delimiter/drivers/zookeeper.py index 567d1ae..0e35af0 100644 --- a/delimiter/drivers/zookeeper.py +++ b/delimiter/drivers/zookeeper.py @@ -20,6 +20,7 @@ from kazoo import exceptions from kazoo.protocol import paths from delimiter import engine +from delimiter import processors class ZookeeperQuotaEngine(engine.QuotaEngine): @@ -30,12 +31,22 @@ class ZookeeperQuotaEngine(engine.QuotaEngine): and limits on some set of resources. """ + processors = { + 'upper_bound': processors.UpperBoundProcessor(), + } + def __init__(self, uri): super(ZookeeperQuotaEngine, self).__init__(uri) if not self.uri.path or self.uri.path == "/": raise ValueError("A non-empty url path is required") self.client = None + @property + def started(self): + if self.client is None: + return False + return self.client.connected + def start(self): self.client = client.KazooClient(hosts=self.uri.netloc) self.client.start() @@ -56,25 +67,55 @@ class ZookeeperQuotaEngine(engine.QuotaEngine): except exceptions.NoNodeError: pass else: - limits.append((resource, json.loads(blob))) + stored = json.loads(blob) + kind = stored['kind'] + processor = self.processors.get(kind) + if not processor: + raise ValueError("Read unsupported kind '%s'" % kind) + limits.append((resource, + processor.decode(stored['details']))) return limits - def create_or_update_limit(self, for_who, resource, limit): + def create_or_update_limit(self, for_who, resource, + limit, kind='upper_bound'): + processor = self.processors.get(kind) + if not processor: + raise ValueError("Unsupported kind '%s'" % kind) who_path = paths.join(self.uri.path, for_who) self.client.ensure_path(who_path) resource_path = paths.join(who_path, resource) try: - self.client.create(resource_path, json.dumps(limit)) + self.client.create(resource_path, json.dumps({ + 'kind': kind, + 'details': processor.create(limit), + })) except exceptions.NodeExistsError: blob, znode = self.client.get(resource_path) - cur_limit = json.loads(blob) - cur_limit.update(limit) - # Ensure we pass in the version that we read this on so - # that if it was changed by some other actor that we can - # avoid overwriting that value (and retry, or handle in some - # other manner). - self.client.set(resource_path, json.dumps(cur_limit), - version=znode.version) + stored = json.loads(blob) + if stored['kind'] != kind: + raise ValueError("Can only update limits of the same" + " kind, %s != %s" % (kind, stored['kind'])) + else: + stored['details'] = processor.update(stored['details'], limit) + # Ensure we pass in the version that we read this on so + # that if it was changed by some other actor that we can + # avoid overwriting that value (and retry, or handle in some + # other manner). + self.client.set(resource_path, json.dumps(stored), + version=znode.version) + + def _try_consume(self, for_who, resource, stored, amount): + kind = stored['kind'] + processor = self.processors.get(kind) + if not processor: + raise ValueError("Unsupported kind '%s' encountered" + " for resource '%s' owned by '%s'" + % (kind, resource, for_who)) + if not processor.process(stored['details'], amount): + raise ValueError("Limit reached, '%s' can not" + " consume '%s' of '%s'" % (for_who, + resource, amount)) + return stored def consume_many(self, for_who, resources, amounts): who_path = paths.join(self.uri.path, for_who) @@ -82,20 +123,11 @@ class ZookeeperQuotaEngine(engine.QuotaEngine): for resource, amount in zip(resources, amounts): resource_path = paths.join(who_path, resource) blob, znode = self.client.get(resource_path) - cur_limit = json.loads(blob) - try: - cur_consumed = cur_limit['consumed'] - except KeyError: - cur_consumed = 0 - max_resource = cur_limit['max'] - if cur_consumed + amount > max_resource: - raise ValueError("Limit reached, can not" - " consume %s of %s" % (resource, amount)) - else: - cur_limit['consumed'] = cur_consumed + amount - values_to_save.append((resource_path, - json.dumps(cur_limit), - znode.version)) + new_stored = self._try_consume(for_who, resource, + json.loads(blob), amount) + values_to_save.append((resource_path, + json.dumps(new_stored), + znode.version)) # Commit all changes at once, so that we can ensure that all the # changes will happen, or none will... if values_to_save: @@ -107,23 +139,14 @@ class ZookeeperQuotaEngine(engine.QuotaEngine): who_path = paths.join(self.uri.path, for_who) resource_path = paths.join(who_path, resource) blob, znode = self.client.get(resource_path) - cur_limit = json.loads(blob) - try: - cur_consumed = cur_limit['consumed'] - except KeyError: - cur_consumed = 0 - max_resource = cur_limit['max'] - if cur_consumed + amount > max_resource: - raise ValueError("Limit reached, can not" - " consume %s of %s" % (resource, amount)) - else: - cur_limit['consumed'] = cur_consumed + amount - # Ensure we pass in the version that we read this on so - # that if it was changed by some other actor that we can - # avoid overwriting that value (and retry, or handle in some - # other manner). - self.client.set(resource_path, json.dumps(cur_limit), - version=znode.version) + new_stored = self._try_consume(for_who, resource, + json.loads(blob), amount) + # Ensure we pass in the version that we read this on so + # that if it was changed by some other actor that we can + # avoid overwriting that value (and retry, or handle in some + # other manner). + self.client.set(resource_path, json.dumps(new_stored), + version=znode.version) def close(self): if self.client is not None: diff --git a/delimiter/engine.py b/delimiter/engine.py index caf61e6..7552792 100644 --- a/delimiter/engine.py +++ b/delimiter/engine.py @@ -25,6 +25,10 @@ class QuotaEngine(object): def __init__(self, uri): self.uri = uri + @abc.abstractproperty + def started(self): + """Whether the current engine has started (or not).""" + def start(self): """Performs any engine startup (connection setup, validation...)""" @@ -36,7 +40,8 @@ class QuotaEngine(object): """Reads the limits of some entity.""" @abc.abstractmethod - def create_or_update_limit(self, for_who, resource, limit): + def create_or_update_limit(self, for_who, resource, + limit, kind='upper_bound'): """Updates or creates a resource limit for some entity. Must operate transactionally; either created/updated or not. diff --git a/delimiter/exceptions.py b/delimiter/exceptions.py new file mode 100644 index 0000000..f1ae3bd --- /dev/null +++ b/delimiter/exceptions.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- + +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class DelimiterException(Exception): + """Base class for *most* exceptions emitted from this library.""" diff --git a/delimiter/processors.py b/delimiter/processors.py new file mode 100644 index 0000000..bf913e6 --- /dev/null +++ b/delimiter/processors.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- + +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class UpperBoundProcessor(object): + """Processes a limit given some upper bound.""" + + def create(self, bound): + return { + 'consumed': 0, + 'bound': bound, + } + + def decode(self, details): + return details + + def update(self, details, bound): + details = details.copy() + details['bound'] = bound + return details + + def process(self, details, amount): + consumed = details['consumed'] + if consumed + amount > details['bound']: + return False + else: + details = details.copy() + details['consumed'] = consumed + amount + return True