Add a simple processing framework
This commit is contained in:
parent
34bd35285f
commit
4a02032618
|
@ -20,6 +20,7 @@ from kazoo import exceptions
|
|||
from kazoo.protocol import paths
|
||||
|
||||
from delimiter import engine
|
||||
from delimiter import processors
|
||||
|
||||
|
||||
class ZookeeperQuotaEngine(engine.QuotaEngine):
|
||||
|
@ -30,12 +31,22 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
|
|||
and limits on some set of resources.
|
||||
"""
|
||||
|
||||
processors = {
|
||||
'upper_bound': processors.UpperBoundProcessor(),
|
||||
}
|
||||
|
||||
def __init__(self, uri):
|
||||
super(ZookeeperQuotaEngine, self).__init__(uri)
|
||||
if not self.uri.path or self.uri.path == "/":
|
||||
raise ValueError("A non-empty url path is required")
|
||||
self.client = None
|
||||
|
||||
@property
|
||||
def started(self):
|
||||
if self.client is None:
|
||||
return False
|
||||
return self.client.connected
|
||||
|
||||
def start(self):
|
||||
self.client = client.KazooClient(hosts=self.uri.netloc)
|
||||
self.client.start()
|
||||
|
@ -56,25 +67,55 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
|
|||
except exceptions.NoNodeError:
|
||||
pass
|
||||
else:
|
||||
limits.append((resource, json.loads(blob)))
|
||||
stored = json.loads(blob)
|
||||
kind = stored['kind']
|
||||
processor = self.processors.get(kind)
|
||||
if not processor:
|
||||
raise ValueError("Read unsupported kind '%s'" % kind)
|
||||
limits.append((resource,
|
||||
processor.decode(stored['details'])))
|
||||
return limits
|
||||
|
||||
def create_or_update_limit(self, for_who, resource, limit):
|
||||
def create_or_update_limit(self, for_who, resource,
|
||||
limit, kind='upper_bound'):
|
||||
processor = self.processors.get(kind)
|
||||
if not processor:
|
||||
raise ValueError("Unsupported kind '%s'" % kind)
|
||||
who_path = paths.join(self.uri.path, for_who)
|
||||
self.client.ensure_path(who_path)
|
||||
resource_path = paths.join(who_path, resource)
|
||||
try:
|
||||
self.client.create(resource_path, json.dumps(limit))
|
||||
self.client.create(resource_path, json.dumps({
|
||||
'kind': kind,
|
||||
'details': processor.create(limit),
|
||||
}))
|
||||
except exceptions.NodeExistsError:
|
||||
blob, znode = self.client.get(resource_path)
|
||||
cur_limit = json.loads(blob)
|
||||
cur_limit.update(limit)
|
||||
# Ensure we pass in the version that we read this on so
|
||||
# that if it was changed by some other actor that we can
|
||||
# avoid overwriting that value (and retry, or handle in some
|
||||
# other manner).
|
||||
self.client.set(resource_path, json.dumps(cur_limit),
|
||||
version=znode.version)
|
||||
stored = json.loads(blob)
|
||||
if stored['kind'] != kind:
|
||||
raise ValueError("Can only update limits of the same"
|
||||
" kind, %s != %s" % (kind, stored['kind']))
|
||||
else:
|
||||
stored['details'] = processor.update(stored['details'], limit)
|
||||
# Ensure we pass in the version that we read this on so
|
||||
# that if it was changed by some other actor that we can
|
||||
# avoid overwriting that value (and retry, or handle in some
|
||||
# other manner).
|
||||
self.client.set(resource_path, json.dumps(stored),
|
||||
version=znode.version)
|
||||
|
||||
def _try_consume(self, for_who, resource, stored, amount):
|
||||
kind = stored['kind']
|
||||
processor = self.processors.get(kind)
|
||||
if not processor:
|
||||
raise ValueError("Unsupported kind '%s' encountered"
|
||||
" for resource '%s' owned by '%s'"
|
||||
% (kind, resource, for_who))
|
||||
if not processor.process(stored['details'], amount):
|
||||
raise ValueError("Limit reached, '%s' can not"
|
||||
" consume '%s' of '%s'" % (for_who,
|
||||
resource, amount))
|
||||
return stored
|
||||
|
||||
def consume_many(self, for_who, resources, amounts):
|
||||
who_path = paths.join(self.uri.path, for_who)
|
||||
|
@ -82,20 +123,11 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
|
|||
for resource, amount in zip(resources, amounts):
|
||||
resource_path = paths.join(who_path, resource)
|
||||
blob, znode = self.client.get(resource_path)
|
||||
cur_limit = json.loads(blob)
|
||||
try:
|
||||
cur_consumed = cur_limit['consumed']
|
||||
except KeyError:
|
||||
cur_consumed = 0
|
||||
max_resource = cur_limit['max']
|
||||
if cur_consumed + amount > max_resource:
|
||||
raise ValueError("Limit reached, can not"
|
||||
" consume %s of %s" % (resource, amount))
|
||||
else:
|
||||
cur_limit['consumed'] = cur_consumed + amount
|
||||
values_to_save.append((resource_path,
|
||||
json.dumps(cur_limit),
|
||||
znode.version))
|
||||
new_stored = self._try_consume(for_who, resource,
|
||||
json.loads(blob), amount)
|
||||
values_to_save.append((resource_path,
|
||||
json.dumps(new_stored),
|
||||
znode.version))
|
||||
# Commit all changes at once, so that we can ensure that all the
|
||||
# changes will happen, or none will...
|
||||
if values_to_save:
|
||||
|
@ -107,23 +139,14 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
|
|||
who_path = paths.join(self.uri.path, for_who)
|
||||
resource_path = paths.join(who_path, resource)
|
||||
blob, znode = self.client.get(resource_path)
|
||||
cur_limit = json.loads(blob)
|
||||
try:
|
||||
cur_consumed = cur_limit['consumed']
|
||||
except KeyError:
|
||||
cur_consumed = 0
|
||||
max_resource = cur_limit['max']
|
||||
if cur_consumed + amount > max_resource:
|
||||
raise ValueError("Limit reached, can not"
|
||||
" consume %s of %s" % (resource, amount))
|
||||
else:
|
||||
cur_limit['consumed'] = cur_consumed + amount
|
||||
# Ensure we pass in the version that we read this on so
|
||||
# that if it was changed by some other actor that we can
|
||||
# avoid overwriting that value (and retry, or handle in some
|
||||
# other manner).
|
||||
self.client.set(resource_path, json.dumps(cur_limit),
|
||||
version=znode.version)
|
||||
new_stored = self._try_consume(for_who, resource,
|
||||
json.loads(blob), amount)
|
||||
# Ensure we pass in the version that we read this on so
|
||||
# that if it was changed by some other actor that we can
|
||||
# avoid overwriting that value (and retry, or handle in some
|
||||
# other manner).
|
||||
self.client.set(resource_path, json.dumps(new_stored),
|
||||
version=znode.version)
|
||||
|
||||
def close(self):
|
||||
if self.client is not None:
|
||||
|
|
|
@ -25,6 +25,10 @@ class QuotaEngine(object):
|
|||
def __init__(self, uri):
|
||||
self.uri = uri
|
||||
|
||||
@abc.abstractproperty
|
||||
def started(self):
|
||||
"""Whether the current engine has started (or not)."""
|
||||
|
||||
def start(self):
|
||||
"""Performs any engine startup (connection setup, validation...)"""
|
||||
|
||||
|
@ -36,7 +40,8 @@ class QuotaEngine(object):
|
|||
"""Reads the limits of some entity."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_or_update_limit(self, for_who, resource, limit):
|
||||
def create_or_update_limit(self, for_who, resource,
|
||||
limit, kind='upper_bound'):
|
||||
"""Updates or creates a resource limit for some entity.
|
||||
|
||||
Must operate transactionally; either created/updated or not.
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class DelimiterException(Exception):
|
||||
"""Base class for *most* exceptions emitted from this library."""
|
|
@ -0,0 +1,41 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class UpperBoundProcessor(object):
|
||||
"""Processes a limit given some upper bound."""
|
||||
|
||||
def create(self, bound):
|
||||
return {
|
||||
'consumed': 0,
|
||||
'bound': bound,
|
||||
}
|
||||
|
||||
def decode(self, details):
|
||||
return details
|
||||
|
||||
def update(self, details, bound):
|
||||
details = details.copy()
|
||||
details['bound'] = bound
|
||||
return details
|
||||
|
||||
def process(self, details, amount):
|
||||
consumed = details['consumed']
|
||||
if consumed + amount > details['bound']:
|
||||
return False
|
||||
else:
|
||||
details = details.copy()
|
||||
details['consumed'] = consumed + amount
|
||||
return True
|
Loading…
Reference in New Issue