Add a simple processing framework

This commit is contained in:
Joshua Harlow 2016-05-13 16:14:31 -07:00
parent 34bd35285f
commit 4a02032618
4 changed files with 130 additions and 43 deletions

View File

@ -20,6 +20,7 @@ from kazoo import exceptions
from kazoo.protocol import paths
from delimiter import engine
from delimiter import processors
class ZookeeperQuotaEngine(engine.QuotaEngine):
@ -30,12 +31,22 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
and limits on some set of resources.
"""
processors = {
'upper_bound': processors.UpperBoundProcessor(),
}
def __init__(self, uri):
super(ZookeeperQuotaEngine, self).__init__(uri)
if not self.uri.path or self.uri.path == "/":
raise ValueError("A non-empty url path is required")
self.client = None
@property
def started(self):
if self.client is None:
return False
return self.client.connected
def start(self):
self.client = client.KazooClient(hosts=self.uri.netloc)
self.client.start()
@ -56,25 +67,55 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
except exceptions.NoNodeError:
pass
else:
limits.append((resource, json.loads(blob)))
stored = json.loads(blob)
kind = stored['kind']
processor = self.processors.get(kind)
if not processor:
raise ValueError("Read unsupported kind '%s'" % kind)
limits.append((resource,
processor.decode(stored['details'])))
return limits
def create_or_update_limit(self, for_who, resource, limit):
def create_or_update_limit(self, for_who, resource,
limit, kind='upper_bound'):
processor = self.processors.get(kind)
if not processor:
raise ValueError("Unsupported kind '%s'" % kind)
who_path = paths.join(self.uri.path, for_who)
self.client.ensure_path(who_path)
resource_path = paths.join(who_path, resource)
try:
self.client.create(resource_path, json.dumps(limit))
self.client.create(resource_path, json.dumps({
'kind': kind,
'details': processor.create(limit),
}))
except exceptions.NodeExistsError:
blob, znode = self.client.get(resource_path)
cur_limit = json.loads(blob)
cur_limit.update(limit)
# Ensure we pass in the version that we read this on so
# that if it was changed by some other actor that we can
# avoid overwriting that value (and retry, or handle in some
# other manner).
self.client.set(resource_path, json.dumps(cur_limit),
version=znode.version)
stored = json.loads(blob)
if stored['kind'] != kind:
raise ValueError("Can only update limits of the same"
" kind, %s != %s" % (kind, stored['kind']))
else:
stored['details'] = processor.update(stored['details'], limit)
# Ensure we pass in the version that we read this on so
# that if it was changed by some other actor that we can
# avoid overwriting that value (and retry, or handle in some
# other manner).
self.client.set(resource_path, json.dumps(stored),
version=znode.version)
def _try_consume(self, for_who, resource, stored, amount):
kind = stored['kind']
processor = self.processors.get(kind)
if not processor:
raise ValueError("Unsupported kind '%s' encountered"
" for resource '%s' owned by '%s'"
% (kind, resource, for_who))
if not processor.process(stored['details'], amount):
raise ValueError("Limit reached, '%s' can not"
" consume '%s' of '%s'" % (for_who,
resource, amount))
return stored
def consume_many(self, for_who, resources, amounts):
who_path = paths.join(self.uri.path, for_who)
@ -82,20 +123,11 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
for resource, amount in zip(resources, amounts):
resource_path = paths.join(who_path, resource)
blob, znode = self.client.get(resource_path)
cur_limit = json.loads(blob)
try:
cur_consumed = cur_limit['consumed']
except KeyError:
cur_consumed = 0
max_resource = cur_limit['max']
if cur_consumed + amount > max_resource:
raise ValueError("Limit reached, can not"
" consume %s of %s" % (resource, amount))
else:
cur_limit['consumed'] = cur_consumed + amount
values_to_save.append((resource_path,
json.dumps(cur_limit),
znode.version))
new_stored = self._try_consume(for_who, resource,
json.loads(blob), amount)
values_to_save.append((resource_path,
json.dumps(new_stored),
znode.version))
# Commit all changes at once, so that we can ensure that all the
# changes will happen, or none will...
if values_to_save:
@ -107,23 +139,14 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
who_path = paths.join(self.uri.path, for_who)
resource_path = paths.join(who_path, resource)
blob, znode = self.client.get(resource_path)
cur_limit = json.loads(blob)
try:
cur_consumed = cur_limit['consumed']
except KeyError:
cur_consumed = 0
max_resource = cur_limit['max']
if cur_consumed + amount > max_resource:
raise ValueError("Limit reached, can not"
" consume %s of %s" % (resource, amount))
else:
cur_limit['consumed'] = cur_consumed + amount
# Ensure we pass in the version that we read this on so
# that if it was changed by some other actor that we can
# avoid overwriting that value (and retry, or handle in some
# other manner).
self.client.set(resource_path, json.dumps(cur_limit),
version=znode.version)
new_stored = self._try_consume(for_who, resource,
json.loads(blob), amount)
# Ensure we pass in the version that we read this on so
# that if it was changed by some other actor that we can
# avoid overwriting that value (and retry, or handle in some
# other manner).
self.client.set(resource_path, json.dumps(new_stored),
version=znode.version)
def close(self):
if self.client is not None:

View File

@ -25,6 +25,10 @@ class QuotaEngine(object):
def __init__(self, uri):
self.uri = uri
@abc.abstractproperty
def started(self):
"""Whether the current engine has started (or not)."""
def start(self):
"""Performs any engine startup (connection setup, validation...)"""
@ -36,7 +40,8 @@ class QuotaEngine(object):
"""Reads the limits of some entity."""
@abc.abstractmethod
def create_or_update_limit(self, for_who, resource, limit):
def create_or_update_limit(self, for_who, resource,
limit, kind='upper_bound'):
"""Updates or creates a resource limit for some entity.
Must operate transactionally; either created/updated or not.

18
delimiter/exceptions.py Normal file
View File

@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class DelimiterException(Exception):
"""Base class for *most* exceptions emitted from this library."""

41
delimiter/processors.py Normal file
View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class UpperBoundProcessor(object):
"""Processes a limit given some upper bound."""
def create(self, bound):
return {
'consumed': 0,
'bound': bound,
}
def decode(self, details):
return details
def update(self, details, bound):
details = details.copy()
details['bound'] = bound
return details
def process(self, details, amount):
consumed = details['consumed']
if consumed + amount > details['bound']:
return False
else:
details = details.copy()
details['consumed'] = consumed + amount
return True