Add a simple processing framework
This commit is contained in:
parent
34bd35285f
commit
4a02032618
|
@ -20,6 +20,7 @@ from kazoo import exceptions
|
||||||
from kazoo.protocol import paths
|
from kazoo.protocol import paths
|
||||||
|
|
||||||
from delimiter import engine
|
from delimiter import engine
|
||||||
|
from delimiter import processors
|
||||||
|
|
||||||
|
|
||||||
class ZookeeperQuotaEngine(engine.QuotaEngine):
|
class ZookeeperQuotaEngine(engine.QuotaEngine):
|
||||||
|
@ -30,12 +31,22 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
|
||||||
and limits on some set of resources.
|
and limits on some set of resources.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
processors = {
|
||||||
|
'upper_bound': processors.UpperBoundProcessor(),
|
||||||
|
}
|
||||||
|
|
||||||
def __init__(self, uri):
|
def __init__(self, uri):
|
||||||
super(ZookeeperQuotaEngine, self).__init__(uri)
|
super(ZookeeperQuotaEngine, self).__init__(uri)
|
||||||
if not self.uri.path or self.uri.path == "/":
|
if not self.uri.path or self.uri.path == "/":
|
||||||
raise ValueError("A non-empty url path is required")
|
raise ValueError("A non-empty url path is required")
|
||||||
self.client = None
|
self.client = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def started(self):
|
||||||
|
if self.client is None:
|
||||||
|
return False
|
||||||
|
return self.client.connected
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self.client = client.KazooClient(hosts=self.uri.netloc)
|
self.client = client.KazooClient(hosts=self.uri.netloc)
|
||||||
self.client.start()
|
self.client.start()
|
||||||
|
@ -56,25 +67,55 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
|
||||||
except exceptions.NoNodeError:
|
except exceptions.NoNodeError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
limits.append((resource, json.loads(blob)))
|
stored = json.loads(blob)
|
||||||
|
kind = stored['kind']
|
||||||
|
processor = self.processors.get(kind)
|
||||||
|
if not processor:
|
||||||
|
raise ValueError("Read unsupported kind '%s'" % kind)
|
||||||
|
limits.append((resource,
|
||||||
|
processor.decode(stored['details'])))
|
||||||
return limits
|
return limits
|
||||||
|
|
||||||
def create_or_update_limit(self, for_who, resource, limit):
|
def create_or_update_limit(self, for_who, resource,
|
||||||
|
limit, kind='upper_bound'):
|
||||||
|
processor = self.processors.get(kind)
|
||||||
|
if not processor:
|
||||||
|
raise ValueError("Unsupported kind '%s'" % kind)
|
||||||
who_path = paths.join(self.uri.path, for_who)
|
who_path = paths.join(self.uri.path, for_who)
|
||||||
self.client.ensure_path(who_path)
|
self.client.ensure_path(who_path)
|
||||||
resource_path = paths.join(who_path, resource)
|
resource_path = paths.join(who_path, resource)
|
||||||
try:
|
try:
|
||||||
self.client.create(resource_path, json.dumps(limit))
|
self.client.create(resource_path, json.dumps({
|
||||||
|
'kind': kind,
|
||||||
|
'details': processor.create(limit),
|
||||||
|
}))
|
||||||
except exceptions.NodeExistsError:
|
except exceptions.NodeExistsError:
|
||||||
blob, znode = self.client.get(resource_path)
|
blob, znode = self.client.get(resource_path)
|
||||||
cur_limit = json.loads(blob)
|
stored = json.loads(blob)
|
||||||
cur_limit.update(limit)
|
if stored['kind'] != kind:
|
||||||
# Ensure we pass in the version that we read this on so
|
raise ValueError("Can only update limits of the same"
|
||||||
# that if it was changed by some other actor that we can
|
" kind, %s != %s" % (kind, stored['kind']))
|
||||||
# avoid overwriting that value (and retry, or handle in some
|
else:
|
||||||
# other manner).
|
stored['details'] = processor.update(stored['details'], limit)
|
||||||
self.client.set(resource_path, json.dumps(cur_limit),
|
# Ensure we pass in the version that we read this on so
|
||||||
version=znode.version)
|
# that if it was changed by some other actor that we can
|
||||||
|
# avoid overwriting that value (and retry, or handle in some
|
||||||
|
# other manner).
|
||||||
|
self.client.set(resource_path, json.dumps(stored),
|
||||||
|
version=znode.version)
|
||||||
|
|
||||||
|
def _try_consume(self, for_who, resource, stored, amount):
|
||||||
|
kind = stored['kind']
|
||||||
|
processor = self.processors.get(kind)
|
||||||
|
if not processor:
|
||||||
|
raise ValueError("Unsupported kind '%s' encountered"
|
||||||
|
" for resource '%s' owned by '%s'"
|
||||||
|
% (kind, resource, for_who))
|
||||||
|
if not processor.process(stored['details'], amount):
|
||||||
|
raise ValueError("Limit reached, '%s' can not"
|
||||||
|
" consume '%s' of '%s'" % (for_who,
|
||||||
|
resource, amount))
|
||||||
|
return stored
|
||||||
|
|
||||||
def consume_many(self, for_who, resources, amounts):
|
def consume_many(self, for_who, resources, amounts):
|
||||||
who_path = paths.join(self.uri.path, for_who)
|
who_path = paths.join(self.uri.path, for_who)
|
||||||
|
@ -82,20 +123,11 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
|
||||||
for resource, amount in zip(resources, amounts):
|
for resource, amount in zip(resources, amounts):
|
||||||
resource_path = paths.join(who_path, resource)
|
resource_path = paths.join(who_path, resource)
|
||||||
blob, znode = self.client.get(resource_path)
|
blob, znode = self.client.get(resource_path)
|
||||||
cur_limit = json.loads(blob)
|
new_stored = self._try_consume(for_who, resource,
|
||||||
try:
|
json.loads(blob), amount)
|
||||||
cur_consumed = cur_limit['consumed']
|
values_to_save.append((resource_path,
|
||||||
except KeyError:
|
json.dumps(new_stored),
|
||||||
cur_consumed = 0
|
znode.version))
|
||||||
max_resource = cur_limit['max']
|
|
||||||
if cur_consumed + amount > max_resource:
|
|
||||||
raise ValueError("Limit reached, can not"
|
|
||||||
" consume %s of %s" % (resource, amount))
|
|
||||||
else:
|
|
||||||
cur_limit['consumed'] = cur_consumed + amount
|
|
||||||
values_to_save.append((resource_path,
|
|
||||||
json.dumps(cur_limit),
|
|
||||||
znode.version))
|
|
||||||
# Commit all changes at once, so that we can ensure that all the
|
# Commit all changes at once, so that we can ensure that all the
|
||||||
# changes will happen, or none will...
|
# changes will happen, or none will...
|
||||||
if values_to_save:
|
if values_to_save:
|
||||||
|
@ -107,23 +139,14 @@ class ZookeeperQuotaEngine(engine.QuotaEngine):
|
||||||
who_path = paths.join(self.uri.path, for_who)
|
who_path = paths.join(self.uri.path, for_who)
|
||||||
resource_path = paths.join(who_path, resource)
|
resource_path = paths.join(who_path, resource)
|
||||||
blob, znode = self.client.get(resource_path)
|
blob, znode = self.client.get(resource_path)
|
||||||
cur_limit = json.loads(blob)
|
new_stored = self._try_consume(for_who, resource,
|
||||||
try:
|
json.loads(blob), amount)
|
||||||
cur_consumed = cur_limit['consumed']
|
# Ensure we pass in the version that we read this on so
|
||||||
except KeyError:
|
# that if it was changed by some other actor that we can
|
||||||
cur_consumed = 0
|
# avoid overwriting that value (and retry, or handle in some
|
||||||
max_resource = cur_limit['max']
|
# other manner).
|
||||||
if cur_consumed + amount > max_resource:
|
self.client.set(resource_path, json.dumps(new_stored),
|
||||||
raise ValueError("Limit reached, can not"
|
version=znode.version)
|
||||||
" consume %s of %s" % (resource, amount))
|
|
||||||
else:
|
|
||||||
cur_limit['consumed'] = cur_consumed + amount
|
|
||||||
# Ensure we pass in the version that we read this on so
|
|
||||||
# that if it was changed by some other actor that we can
|
|
||||||
# avoid overwriting that value (and retry, or handle in some
|
|
||||||
# other manner).
|
|
||||||
self.client.set(resource_path, json.dumps(cur_limit),
|
|
||||||
version=znode.version)
|
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if self.client is not None:
|
if self.client is not None:
|
||||||
|
|
|
@ -25,6 +25,10 @@ class QuotaEngine(object):
|
||||||
def __init__(self, uri):
|
def __init__(self, uri):
|
||||||
self.uri = uri
|
self.uri = uri
|
||||||
|
|
||||||
|
@abc.abstractproperty
|
||||||
|
def started(self):
|
||||||
|
"""Whether the current engine has started (or not)."""
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""Performs any engine startup (connection setup, validation...)"""
|
"""Performs any engine startup (connection setup, validation...)"""
|
||||||
|
|
||||||
|
@ -36,7 +40,8 @@ class QuotaEngine(object):
|
||||||
"""Reads the limits of some entity."""
|
"""Reads the limits of some entity."""
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def create_or_update_limit(self, for_who, resource, limit):
|
def create_or_update_limit(self, for_who, resource,
|
||||||
|
limit, kind='upper_bound'):
|
||||||
"""Updates or creates a resource limit for some entity.
|
"""Updates or creates a resource limit for some entity.
|
||||||
|
|
||||||
Must operate transactionally; either created/updated or not.
|
Must operate transactionally; either created/updated or not.
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
|
class DelimiterException(Exception):
|
||||||
|
"""Base class for *most* exceptions emitted from this library."""
|
|
@ -0,0 +1,41 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
|
class UpperBoundProcessor(object):
|
||||||
|
"""Processes a limit given some upper bound."""
|
||||||
|
|
||||||
|
def create(self, bound):
|
||||||
|
return {
|
||||||
|
'consumed': 0,
|
||||||
|
'bound': bound,
|
||||||
|
}
|
||||||
|
|
||||||
|
def decode(self, details):
|
||||||
|
return details
|
||||||
|
|
||||||
|
def update(self, details, bound):
|
||||||
|
details = details.copy()
|
||||||
|
details['bound'] = bound
|
||||||
|
return details
|
||||||
|
|
||||||
|
def process(self, details, amount):
|
||||||
|
consumed = details['consumed']
|
||||||
|
if consumed + amount > details['bound']:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
details = details.copy()
|
||||||
|
details['consumed'] = consumed + amount
|
||||||
|
return True
|
Loading…
Reference in New Issue