Remove old dse reference in code

Change-Id: I49fed8868678c50030584567f984d97f43a7ede5
This commit is contained in:
Anusha Ramineni 2016-09-09 15:51:30 +05:30
parent 5aa0d1d800
commit fbcd42e0ea
11 changed files with 4 additions and 1714 deletions

View File

@ -1,128 +0,0 @@
Data Services Engine
====================
The DSE is a lightweight variation of a Data Stream Management System. The
purpose of the DSE is to retrieve, or receive, data from external sources, then
format and present the data on a message bus.
Overview
--------
The DSE consists of a Python "cage" (see d6cage.py) which contains one or more
module instances. These are instances of an eventlet subclass called "deepsix"
(see deepsix.py). Each eventlet has an "inbox" queue. All eventlets share an
"outbox" queue called the "datapath".
A lightweight AMQP router in the cage (see amqprouter.py) routes messages from
the datapath to the appropriate eventlet inbox. In this way, the deepsix
instances are able to communicate with each other.
A deepsix instance may listen to multiple AMQP addresses. However, every
deepsix instance must have at least one unique non-wildcard AMQP address.
Subsequent addresses do not have to be unique. AMQP wildcards are supported
for these additional addresses.
Deepsix
-------
Publisher
~~~~~~~~~
A publishing deepsix instance will either pull data from an external source, or
have data pushed to it. The nature of how this is achieved is dependent on the
external data source and the libraries used to access it. For example, a
deepsix module might use the pyVmomi to periodically poll a VMware vSphere
instance to retrieve meta-data for all VM instances on a particular host.
A developer using deepsix will write code to periodically poll vSphere, extract
the data from the pyVmomi response object, and format it into a JSON data
structure. Next, the "self.publish" method provided by deepsix will be used to
publish the data on the DSE message bus.
Invoking "self.publish" results in calls to the "prepush_processor" and "push"
methods. For example, if a list of VMs on a host is retrieved from a vSphere
instance, this list is formatted in JSON and the results stored locally in the
instance. Before sending out any updates, the prepush_processor method is
called. Here data is groomed before sending out. Using the prepush_processor
method, a delta of the data can be sent out to known subscribers, instead of
all the data every time it is retrieved from vSphere. Finally, the "push"
method is called, and a list of known subscribers is iterated over, sending the
update to each.
Incoming subscription requests are processed by the "self.insub" method within
deepsix.py.
Published data is stored in a dictionary called "pubData".
Subscriber
~~~~~~~~~~
A subscribing deepsix instance will use the "self.subscribe" method to announce
it's interest in data being published on the DSE bus. This announcement is
transmitted periodically, at an interval specified by the developer. When
"self.subscribe" is called, a callback is provided by the developer as an
argument. When new data is received by the subscriber, the callback is invoked
with the published data message passed as an argument.
A subscriber may need data from multiple sources. There are two ways this can
happen: (1) Multiple invocations of "self.subscribe" to publishers of
different types of data, or (2) A single invocation of "self.subscribe" which
is received by multiple publishers listening to the same AMQP address.
In the former case a unique UUID, used as a subscription ID, is generated for
each call to "self.subscribe". This UUID is used internally by deepsix to
differentiate between subscriptions. A unique callback can be provided for
each subscription.
If a UUID is not provided, one is automatically generated. This UUID is sent
to the publisher within the periodic "subscribe" message. When the publisher
sends an update, the subscription UUID is included with the update.
Let's consider the case of multiple publishers listening to the same AMQP
address for subscriptions. For example, you may have two vSphere deepsix
instances: "vSphere.Boston" and "vSphere.Chicago". Those are the unique names
for those instances, however, both of those instances may also be listening to
the address "vSphere.node.list".
A subscribing instance might send a subscription announcement to
"vSphere.node.list". In this case, both "vSphere.Boston" and "vSphere.Chicago"
will receive this subscription request and start publishing data back to the
subscriber. The subscriber maintains a nested dictionary "subData" which is a
dictionary, indexed by subscription ID. Each subscription ID, in turn, is a
dictionary indexed by the unique AMQP addresses of the publishers providing
that data.
Incoming published data is processed by the "self.inpubrep" method within
deepsix.py. It is from this method that the developer provided callback is
invoked.
Request/Reply
~~~~~~~~~~~~~
Another way to retrieve data is with "self.request". This is a one-off
asynchronous request for data.
d6cage
------
The d6cage is itself a deepsix instance. It listens to the AMQP addresses
"local.d6cage" and "local.router". When a deepsix instance within d6cage is
created, it registers it's AMQP addresses by invoking "self.subscribe" and
sending the subscription to "local.router". The d6cage will then add the AMQP
address to it's AMQP route table with the instance inbox thread as a
destination.
Miscellaneous/TO-DO
-------------------
Need to modify d6cage.py/deepsix.py to support dynamic
loading/reloading/stopping of modules.
Need to write a module to proxy external mq bus. For instance, there may be
multiple OpenStack instances. If a developer wants to receive updates from
Nova on "compute.instance.update", then they will need to disambiguate between
instances of Nova. A proxy module would be loaded for each OpenStack instance.
Subscriptions would be sent to "openstack1.compute.instance.update" and/or
"openstack2.compute.instance.update"

View File

@ -1,152 +0,0 @@
# Copyright 2014 Plexxi, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import six
class Node(object):
def __init__(self, rPath=[], results=set()):
self.destinations = set()
self.results = results
self.children = {}
self.rPath = rPath
def _remove(self, patternList, destination):
word = patternList[0]
if word in self.children:
if len(patternList) == 1:
if destination in self.children[word].destinations:
self.children[word].destinations.remove(destination)
if (len(self.children[word].destinations) == 0 and
len(self.children[word].children) == 0):
del self.children[word]
else:
self.children[word]._remove(patternList[1:], destination)
if (len(self.children[word].destinations) == 0 and
len(self.children[word].children) == 0):
del self.children[word]
def _add(self, patternList, destination):
word = patternList[0]
if word not in self.children:
if word == "#":
self.children['#'] = hashNode(
rPath=self.rPath + ['#'],
results=self.results)
else:
self.children[word] = Node(
rPath=self.rPath + [word],
results=self.results)
if len(patternList) == 1:
self.children[word].destinations.add(destination)
else:
self.children[word]._add(patternList[1:], destination)
def update_results(self):
if '#' in self.children:
self.children['#'].update_results()
self.results.update(self.destinations)
def _lookup(self, keyList):
word = keyList[0]
if len(keyList) == 1:
if word in self.children:
self.children[word].update_results()
if '*' in self.children:
if word:
self.children['*'].update_results()
else:
if word in self.children:
self.children[word]._lookup(keyList[1:])
if '*' in self.children:
if word:
self.children['*']._lookup(keyList[1:])
if '#' in self.children:
self.children['#']._lookup(keyList[:])
self.children['#'].update_results()
class hashNode(Node):
def _lookup(self, keyList):
for i in range(len(keyList)):
if keyList[i] in self.children:
self.children[keyList[i]]._lookup(keyList[i:])
if '*' in self.children:
if keyList[i]:
self.children['*']._lookup(keyList[i:])
if '#' in self.children:
self.children['#']._lookup(keyList[i:])
if keyList[-1] in self.children:
self.children[keyList[-1]].update_results()
if '*' in self.children:
if keyList[-1]:
self.children['*'].update_results()
class routeTable(Node):
def add(self, pattern, destination):
if type(pattern) == list:
for p in pattern:
wordList = p.split('.')
self._add(wordList, destination)
elif isinstance(pattern, six.string_types):
wordList = pattern.split('.')
self._add(wordList, destination)
def remove(self, pattern, destination):
if type(pattern) == list:
for p in pattern:
wordList = p.split('.')
self._remove(wordList, destination)
elif isinstance(pattern, six.string_types):
wordList = pattern.split('.')
self._remove(wordList, destination)
def lookup(self, key):
self.results.clear()
wordList = key.split('.')
self._lookup(wordList)
return self.results

View File

@ -1,419 +0,0 @@
# Copyright 2014 Plexxi, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Main entrypoint for the DSE
#
# Configuration in d6cage.ini
#
# Prerequisites:
# - Plexxi API libraries (there is an RPM)
# - Python dependencies (see readme elsewhere, or capture RPM)
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import imp
import sys
import traceback
import eventlet
eventlet.monkey_patch()
from oslo_log import log as logging
from oslo_utils import importutils
from oslo_utils import strutils
from congress.dse import amqprouter
from congress.dse import d6message
from congress.dse import deepsix
LOG = logging.getLogger(__name__)
class DataServiceError (Exception):
pass
# This holds the cage instance singleton
instances = {}
def singleton(class_):
global instances
def getinstance(*args, **kwargs):
if class_ not in instances:
instances[class_] = class_(*args, **kwargs)
return instances[class_]
return getinstance
def delete_cage():
global instances
for instance in instances.values():
del instance
instances = {}
@singleton
class d6Cage(deepsix.deepSix):
def __init__(self):
self.config = {}
self.config['modules'] = {}
self.config['services'] = {}
# Dictionary mapping service name to a dict of arguments.
# Those arguments are only passed to d6service by createservice if they
# are not alreay present in the ARGS argument given to createservice.
self.default_service_args = {}
cageKeys = ['python.d6cage']
cageDesc = 'deepsix python cage'
name = "d6cage"
deepsix.deepSix.__init__(self, name, cageKeys)
self.inbox = eventlet.Queue()
self.dataPath = eventlet.Queue()
self.table = amqprouter.routeTable()
self.table.add("local.router", self.inbox)
self.table.add(self.name, self.inbox)
self.table.add("router", self.inbox)
localname = "local." + self.name
self.table.add(localname, self.inbox)
self.modules = {}
self.services = {}
self.greenThreads = []
self.unloadingServices = {}
self.reloadingServices = set()
self.services[self.name] = {}
self.services[self.name]['service'] = self
self.services[self.name]['name'] = self.name
self.services[self.name]['description'] = cageDesc
self.services[self.name]['inbox'] = self.inbox
self.services[self.name]['keys'] = self.keys
self.services[self.name]['type'] = None
self.services[self.name]['id'] = None
self.subscribe(
"local.d6cage",
"routeKeys",
callback=self.updateRoutes,
interval=5)
self.router_greenthread = eventlet.spawn(self.router_loop)
def __del__(self):
# This function gets called when the interpreter deletes the object
# by the automatic garbage cleanup
for gt in self.greenThreads:
eventlet.kill(gt)
eventlet.kill(self.router_greenthread)
eventlet.kill(self)
def newConfig(self, msg):
newConfig = msg.body.data
if type(newConfig) == dict and newConfig:
if "modules" in newConfig:
for module in newConfig["modules"]:
if module not in sys.modules:
self.loadModule(
module,
newConfig['modules'][module]['filename'])
if "services" in newConfig:
for service in newConfig['services']:
if service not in self.services:
self.createservice(
service,
**newConfig['services'][service])
self.config = newConfig
def reloadStoppedService(self, service):
moduleName = self.config['services'][service]['moduleName']
try:
reload(sys.modules[moduleName])
except Exception as errmsg:
self.log_error(
"Unable to reload module '%s': %s", moduleName, errmsg)
return
self.createservice(service, **self.config['services'][service])
def waitForServiceToStop(
self,
service,
attemptsLeft=20,
callback=None,
cbkwargs={}):
if attemptsLeft > 0:
if self.services[service]['object'].isActive():
self.timerThreads.append(
eventlet.spawn_after(10,
self.waitForServiceToStop,
service,
attemptsLeft - 1))
else:
del self.services[service]
if callback:
callback(**cbkwargs)
else:
self.log_error("Unable to stop service %s", service)
def loadModule(self, name, filename):
if name in sys.modules:
# self.log_error(
# "error loading module '%s': module already exists", name)
return
try:
self.log_info("loading module: %s", name)
imp.load_source(name, filename)
except Exception:
raise DataServiceError(
"error loading module '%s' from '%s': %s" %
(name, filename, traceback.format_exc()))
def load_modules_from_config(self):
for section in self.config['modules'].keys():
filename = self.config['modules'][section]["filename"]
self.loadModule(section, filename)
def deleteservice(self, name):
self.log_info("deleting service: %s", name)
obj = self.services[name]['object']
if hasattr(obj, "cleanup"):
obj.cleanup()
eventlet.greenthread.kill(obj)
self.greenThreads.remove(obj)
self.table.remove(name, self.services[name]['inbox'])
self.table.remove("local." + name, self.services[name]['inbox'])
self.unsubscribe(name, 'routeKeys')
del self.services[name]
self.log_info("finished deleting service: %s", name)
def createservice(
self,
name="",
keys="",
description="",
moduleName="",
args={},
module_driver=False,
type_=None,
id_=None):
self.log_info("creating service %s with module %s and args %s",
name, moduleName, strutils.mask_password(args, "****"))
# FIXME(arosen) This will be refactored out in the next patchset
# this is only done because existing imports from d6service
# instead of the module.
if module_driver:
congress_expected_module_path = ""
for entry in range(len(moduleName.split(".")) - 1):
congress_expected_module_path += (
moduleName.split(".")[entry] + ".")
congress_expected_module_path = congress_expected_module_path[:-1]
module = importutils.import_module(congress_expected_module_path)
if not module_driver and moduleName not in sys.modules:
self.log_error(
"error loading service %s: module %s does not exist",
name,
moduleName)
raise DataServiceError(
"error loading service %s: module %s does not exist" %
(name, moduleName))
if not module_driver and name in self.services:
self.log_error("error loading service '%s': name already in use",
name)
raise DataServiceError(
"error loading service '%s': name already in use"
% name)
inbox = eventlet.Queue()
if not module_driver:
module = sys.modules[moduleName]
# set args to default values, as necessary
if name in self.default_service_args:
global_args = self.default_service_args[name]
for key, value in global_args.items():
if key not in args:
args[key] = value
try:
svcObject = module.d6service(name, keys, inbox, self.dataPath,
args)
self.greenThreads.append(svcObject)
except Exception:
self.log_error("Error loading service '%s' of module '%s':: \n%s",
name, module, traceback.format_exc())
raise DataServiceError(
"Error loading service '%s' of module '%s':: \n%s"
% (name, module, traceback.format_exc()))
self.log_info("created service: %s", name)
self.services[name] = {}
self.services[name]['name'] = name
self.services[name]['description'] = description
self.services[name]['moduleName'] = moduleName
self.services[name]['keys'] = keys
self.services[name]['args'] = args
self.services[name]['object'] = svcObject
self.services[name]['inbox'] = inbox
self.services[name]['type'] = type_
self.services[name]['id'] = id_
try:
self.table.add(name, inbox)
localname = "local." + name
self.table.add(localname, inbox)
self.subscribe(
name,
'routeKeys',
callback=self.updateRoutes,
interval=5)
self.publish('services', self.services)
except Exception as errmsg:
del self.services[name]
self.log_error("error starting service '%s': %s", name, errmsg)
raise DataServiceError(
"error starting service '%s': %s" % (name, errmsg))
def getservices(self):
return self.services
def getservice(self, id_=None, type_=None, name=None):
# Returns the first service that matches all non-None parameters.
for name_, service in self.services.items():
if (id_ and (not service.get('id', None) or id_ != service['id'])):
continue
if type_ and type_ != service['type']:
continue
if name and name_ != name:
continue
return service
return None
def service_object(self, name):
if name in self.services:
return self.services[name]['object']
else:
return None
def updateRoutes(self, msg):
keyData = self.getSubData(msg.correlationId, sender=msg.replyTo)
currentKeys = set(keyData.data)
# self.log_debug("updateRoutes msgbody: %s", msg.body.data)
pubKeys = set(msg.body.data['keys'])
if currentKeys != pubKeys:
newKeys = pubKeys - currentKeys
if newKeys:
self.table.add(
list(newKeys), self.services[msg.replyTo]['inbox'])
oldKeys = currentKeys - pubKeys
if oldKeys:
self.table.remove(
list(oldKeys), self.services[msg.replyTo]['inbox'])
return msg.body
def load_services_from_config(self):
for section in self.config['services'].keys():
self.createservice(section, **self.config['services'][section])
def routemsg(self, msg):
# LOG.debug(
# "Message lookup %s from %s", msg.key, msg.replyTo)
destinations = self.table.lookup(msg.key)
# self.log_debug("Destinations %s for key %s for msg %s",
# destinations, msg.key, msg)
if destinations:
for destination in destinations:
destination.put_nowait(msg)
# self.log_debug("Message sent to %s from %s: %s",
# msg.key, msg.replyTo, msg)
def d6reload(self, msg):
inargs = msg.body.data
service = inargs['service']
newmsg = d6message.d6msg(key=service, replyTo=self.name, type="shut")
self.send(newmsg)
cbkwargs = {}
cbkwargs['service'] = service
self.waitForServiceToStop(
service,
callback=self.reloadStoppedService,
cbkwargs=cbkwargs)
def cmdhandler(self, msg):
command = msg.header['dataindex']
if command == "reload":
self.d6reload(msg)
def router_loop(self):
while self._running:
msg = self.dataPath.get()
self.routemsg(msg)
self.dataPath.task_done()
if __name__ == '__main__':
main = d6Cage
try:
main.wait()
main.d6stop()
except KeyboardInterrupt:
main.d6stop()
sys.exit(0)

View File

@ -1,56 +0,0 @@
# Copyright 2014 Plexxi, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import uuid
class d6msg(object):
def __init__(self,
key="",
replyTo="",
correlationId="",
type="",
dataindex="",
body={},
srcmsg={}):
self.header = {}
self.body = body
self.replyTo = replyTo
self.type = type
if srcmsg:
self.key = srcmsg.replyTo
self.correlationId = srcmsg.correlationId
self.header['dataindex'] = srcmsg.header['dataindex']
else:
self.key = key
self.header['dataindex'] = dataindex
if correlationId:
self.correlationId = correlationId
else:
newuuid = uuid.uuid4()
self.correlationId = str(newuuid)
def __str__(self):
return ("<to:{}, from:{}, corrId:{}, type:{}, dataindex:{}, "
"body:{}>").format(
self.key, self.replyTo, self.correlationId, self.type,
self.header['dataindex'], str(self.body))

View File

@ -1,127 +0,0 @@
# Copyright 2014 Plexxi, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class dataObject(object):
def __init__(self, data=None, version=0):
if data is None:
self.data = {}
else:
self.data = data
if version:
self.version = version
else:
self.version = int(bool(data))
def __str__(self):
return str(self.data)
class subData(object):
"""A piece of data that a data service is subscribed to.
Each data service in the cage can have its own instance of
this data; keep track of who published which instance.
"""
def __init__(self, key, dataindex, corrId, callback):
self.key = key
self.dataindex = dataindex
self.corrId = corrId
self.callback = callback
self.dataObjects = {}
# LOG.info(
# "*****New subdata: %s, %s, %s",
# key, dataindex, id(self.dataObjects))
def getSources(self):
return self.dataObjects.keys()
def update(self, sender, newdata):
self.dataObjects[sender] = newdata
def version(self, sender):
version = 0
if sender in self.dataObjects:
version = self.dataObjects[sender].version
return version
def getData(self, sender):
result = dataObject()
if sender in self.dataObjects:
LOG.info("subdata object: %s", self.dataObjects[sender])
result = self.dataObjects[sender]
return result
def getAllData(self):
result = {}
for sender in self.dataObjects:
result[sender] = self.dataObjects[sender]
return result
class pubData(object):
"""A piece of data that a data service is publishing.
Keep track of those data services that are subscribed.
"""
def __init__(self, dataindex, args={}):
self.dataindex = dataindex
self.dataObject = dataObject()
self.subscribers = {}
self.requesters = {}
self.args = args
def update(self, newdata):
version = self.dataObject.version + 1
self.dataObject = dataObject(newdata, version)
def get(self):
return self.dataObject
def version(self):
return self.dataObject.version
def addsubscriber(self, sender, type, corrId):
if sender not in self.subscribers:
self.subscribers[sender] = {}
self.subscribers[sender]['type'] = type
self.subscribers[sender]['correlationId'] = corrId
def removesubscriber(self, sender):
if sender in self.subscribers:
del self.subscribers[sender]
def getsubscribers(self, sender=""):
if sender:
if sender in self.subscribers:
return self.subscribers[sender]
else:
return []
else:
return self.subscribers

View File

@ -1,573 +0,0 @@
# Copyright 2014 Plexxi, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import eventlet
from eventlet import greenthread
from eventlet import hubs
eventlet.monkey_patch()
from oslo_log import log as logging
from oslo_utils import strutils
from congress.dse import d6message
from congress.dse import dataobj
LOG = logging.getLogger(__name__)
class deepSix(greenthread.GreenThread):
def __init__(self, name, keys, inbox=None, dataPath=None):
hub = hubs.get_hub()
greenthread.GreenThread.__init__(self, hub.greenlet)
g = self
self.name = name
keyList = []
for k in keys:
keyList.append(k)
localk = "local." + k
keyList.append(localk)
keyList.append("allservices")
keyList.append("local.allservices")
self.keys = keyList
self._running = True
self.pubdata = {}
self.subdata = {}
self.subscriberCorrelationUuids = set()
self.scheduuids = set()
self.timerThreads = []
# Necessary for deepSix objects that don't get initialized with an
# inbox
self.inbox = None
if inbox:
self.inbox = inbox
self.dataPath = dataPath
hub.schedule_call_global(0, g.switch, g._loop, [], {})
keyargs = {}
keyargs['keys'] = self.keys
self.publish("routeKeys", keyargs)
def send(self, msg):
# TODO(thinrichs): reduce how often sub messages
# get sent so we can re-enable this
# if msg.type == 'sub':
# self.log_info("sending SUB msg %s", msg)
# else:
# self.log_debug("sending msg %s", msg)
self.dataPath.put_nowait(msg)
def schedule(self, msg, scheduuid, interval):
if scheduuid in self.scheduuids:
if msg.type == 'pub':
msg.updatebody(self.pubdata[msg.dataindex].get())
self.send(msg)
ev = eventlet.spawn_after(interval,
self.schedule,
msg,
scheduuid,
interval)
self.timerThreads.append(ev)
else:
self.log_debug("stop scheduling a message: %s", msg)
def getSubData(self, corrId, sender=""):
if corrId in self.subdata:
if sender:
return self.subdata[corrId].getData(sender)
else:
return self.subdata[corrId].getAllData()
def reqtimeout(self, corrId):
if corrId in self.subdata:
del self.subdata[corrId]
def inreq(self, msg):
corruuid = msg.correlationId
dataindex = msg.header['dataindex']
if dataindex == "pubdata":
newmsg = d6message.d6msg(key=msg.replyTo,
replyTo=self.name,
correlationId=msg.correlationId,
type="rep",
dataindex=dataindex,
body=dataobj.dataObject(self.pubdata))
self.send(newmsg)
elif dataindex == "subdata":
newmsg = d6message.d6msg(key=msg.replyTo,
replyTo=self.name,
correlationId=msg.correlationId,
type="rep",
dataindex=dataindex,
body=dataobj.dataObject(self.subdata))
self.send(newmsg)
elif hasattr(self, 'reqhandler'):
self.pubdata[dataindex] = dataobj.pubData(dataindex, msg.body)
self.pubdata[dataindex].requesters[msg.replyTo] = corruuid
self.reqhandler(msg)
else:
self.log_exception("Received a request but have no handler: %s",
msg)
def inpull(self, msg):
# self.log_debug("received PULL msg: %s", msg)
dataindex = msg.header['dataindex']
if dataindex in self.pubdata:
reply = d6message.d6msg(replyTo=self.name,
type="rep",
body=self.pubdata[dataindex].get(),
srcmsg=msg)
self.send(reply)
else:
self.pubdata[dataindex] = dataobj.pubData(dataindex, msg.body)
self.subhandler(msg)
self.pubdata[dataindex].addsubscriber(
msg.replyTo, "pull", msg.correlationId)
def incmd(self, msg):
# self.log_debug("received CMD msg: %s", msg)
corruuid = msg.correlationId
dataindex = msg.header['dataindex']
if corruuid not in self.pubdata:
self.pubdata[corruuid] = dataobj.pubData(dataindex, msg.body)
self.pubdata[corruuid].requesters[msg.replyTo] = corruuid
self.cmdhandler(msg)
def insub(self, msg):
# self.log_info("received SUB msg: %s", msg)
corruuid = msg.correlationId
dataindex = msg.header['dataindex']
sender = msg.replyTo
if corruuid not in self.subscriberCorrelationUuids:
if dataindex not in self.pubdata:
self.pubdata[dataindex] = dataobj.pubData(dataindex, msg.body)
# always call subhandler so subclass has a chance to know more
# about the subscription
if hasattr(self, "subhandler"):
self.subhandler(msg)
self.pubdata[dataindex].addsubscriber(sender, "push", corruuid)
self.subscriberCorrelationUuids.add(corruuid)
self.push(dataindex, sender, type='sub')
def inunsub(self, msg):
# self.log_info("received UNSUB msg: %s", msg)
dataindex = msg.header['dataindex']
if hasattr(self, 'unsubhandler'):
if self.unsubhandler(msg):
if dataindex in self.pubdata:
self.pubdata[dataindex].removesubscriber(msg.replyTo)
else:
if dataindex in self.pubdata:
self.pubdata[dataindex].removesubscriber(msg.replyTo)
# release resource if no more subscribers for this dataindex
if self.pubdata[dataindex].getsubscribers() == []:
self.pubdata.discard(dataindex)
def inshut(self, msg):
"""Shut down this data service."""
# self.log_warning("received SHUT msg: %s", msg)
for corruuid in self.subdata:
self.unsubscribe(corrId=corruuid)
for ev in self.timerThreads:
try:
ev.kill()
except Exception as errmsg:
self.log("error stopping timer thread: %s", errmsg)
self._running = False
self.keys = {}
keydata = {}
keydata['keys'] = {}
self.publish("routeKeys", keydata)
def inpubrep(self, msg):
# self.log_debug("received PUBREP msg: %s", msg)
corruuid = msg.correlationId
sender = msg.replyTo
if corruuid in self.scheduuids:
self.scheduuids.remove(corruuid)
if corruuid in self.subdata:
callback = self.subdata[corruuid].callback
if msg.type in ['pub', 'rep']:
if callback:
scrubbed = callback(msg)
if scrubbed:
self.subdata[corruuid].update(
sender, dataobj.dataObject(scrubbed))
else:
self.unsubscribe(corrId=corruuid)
def request(
self,
key,
dataindex,
corrId="",
callback=None,
interval=0,
timer=30,
args={}):
msg = d6message.d6msg(key=key,
replyTo=self.name,
correlationId=corrId,
type="req",
dataindex=dataindex,
body=args)
corruuid = msg.correlationId
self.subdata[corruuid] = dataobj.subData(key, dataindex,
corruuid, callback)
if interval:
self.scheduuids.add(corruuid)
self.schedule(msg, corruuid, interval)
else:
self.send(msg)
if timer:
self.timerThreads.append(
eventlet.spawn_after(timer,
self.reqtimeout,
corruuid))
def reply(self, dataindex, newdata="", delete=True):
for requester in self.pubdata[dataindex].requesters:
msg = d6message.d6msg(key=requester,
replyTo=self.name,
correlationId=self.pubdata[dataindex]
.requesters[requester],
type="rep",
dataindex=self.pubdata[dataindex].dataindex)
if newdata:
msg.body = dataobj.dataObject(newdata)
else:
msg.body = self.pubdata[dataindex].get()
# self.log_debug("REPLY body: %s", msg.body)
self.send(msg)
if delete:
del self.pubdata[dataindex]
def prepush_processor(self, data, dataindex, type=None):
"""Pre-processing the data before publish.
Given the DATA to be published, returns the data actually put
on the wire. Can be overloaded.
"""
return data
def reserved_dataindex(self, dataindex):
"""Returns True if DATAINDEX is one of those reserved by deepsix."""
return dataindex in ('routeKeys', 'pubdata', 'subdata')
def push(self, dataindex, key="", type=None):
"""Send data for DATAINDEX and KEY to subscribers/requesters."""
self.log_debug("pushing dataindex %s to subscribers %s "
"and requesters %s ", dataindex,
self.pubdata[dataindex].subscribers,
self.pubdata[dataindex].requesters)
# bail out if there are no requesters/subscribers
if (len(self.pubdata[dataindex].requesters) == 0 and
len(self.pubdata[dataindex].subscribers) == 0):
self.log_debug("no requesters/subscribers; not sending")
return
# give prepush hook chance to morph data
if self.reserved_dataindex(dataindex):
data = self.pubdata[dataindex].get()
# bail out if no data to send
if data is None:
return
else:
# .get() returns dataObject
data = self.prepush_processor(self.pubdata[dataindex].get().data,
dataindex,
type=type)
# bail out if prepush hook said there's no data
if data is None:
return
data = dataobj.dataObject(data)
# send to subscribers/requestors
if self.pubdata[dataindex].subscribers:
if key:
msg = d6message.d6msg(key=key,
replyTo=self.name,
correlationId=self.pubdata[dataindex]
.subscribers[key]['correlationId'],
type="pub",
dataindex=dataindex,
body=data)
self.send(msg)
else:
subscribers = self.pubdata[dataindex].getsubscribers()
for subscriber in subscribers:
if subscribers[subscriber]['type'] == "push":
corId = subscribers[subscriber]['correlationId']
msg = d6message.d6msg(key=subscriber,
replyTo=self.name,
correlationId=corId,
type="pub",
dataindex=dataindex,
body=data)
self.send(msg)
if self.pubdata[dataindex].requesters:
if key:
msg = d6message.d6msg(key=key,
replyTo=self.name,
correlationId=self.pubdata[dataindex].
requesters[key],
type="rep",
dataindex=dataindex,
body=self.pubdata[dataindex].get())
self.send(msg)
del self.pubdata[dataindex].requesters[key]
else:
for requester in self.pubdata[dataindex].requesters.keys():
corId = self.pubdata[dataindex].requesters[requester]
msg = d6message.d6msg(key=requester,
replyTo=self.name,
correlationId=corId,
type="rep",
dataindex=dataindex,
body=self.pubdata[dataindex].get())
self.send(msg)
del self.pubdata[dataindex].requesters[requester]
def subscribe(
self,
key,
dataindex,
corrId="",
callback=None,
pull=False,
interval=30,
args={}):
"""Subscribe to a DATAINDEX for a given KEY."""
self.log_debug("subscribed to %s with dataindex %s", key, dataindex)
msg = d6message.d6msg(key=key,
replyTo=self.name,
correlationId=corrId,
dataindex=dataindex,
body=args)
if pull:
msg.type = 'pull'
else:
msg.type = 'sub'
corruuid = msg.correlationId
self.subdata[corruuid] = dataobj.subData(key, dataindex,
corruuid, callback)
self.scheduuids.add(corruuid)
self.schedule(msg, corruuid, interval)
return corruuid
def unsubscribe(self, key="", dataindex="", corrId=""):
"""Unsubscribe self from DATAINDEX for KEY."""
self.log_debug("unsubscribed to %s with dataindex %s", key, dataindex)
if corrId:
if corrId in self.scheduuids:
self.scheduuids.remove(corrId)
if corrId in self.subdata:
key = self.subdata[corrId].key
dataindex = self.subdata[corrId].dataindex
del self.subdata[corrId]
msg = d6message.d6msg(key=key,
replyTo=self.name,
correlationId=corrId,
type='unsub',
dataindex=dataindex)
self.send(msg)
elif key and dataindex:
for corruuid in self.subdata.copy().keys():
# copy to avoid undefined behavior w changing dict during iter
if (key == self.subdata[corruuid].key and
dataindex == self.subdata[corruuid].dataindex):
if corruuid in self.scheduuids:
self.scheduuids.remove(corruuid)
del self.subdata[corruuid]
msg = d6message.d6msg(key=key,
replyTo=self.name,
correlationId=corruuid,
type='unsub',
dataindex=dataindex)
self.send(msg)
return
def command(
self,
key,
command,
corrId="",
callback=None,
timer=30,
args={}):
msg = d6message.d6msg(key=key,
replyTo=self.name,
type="cmd",
correlationId=corrId,
dataindex=command,
body=args)
corruuid = msg.correlationId
self.subdata[corruuid] = dataobj.subData(key, command,
corruuid, callback)
self.send(msg)
if timer:
self.timerThreads.append(
eventlet.spawn_after(timer,
self.reqtimeout,
corruuid))
def publish(self, dataindex, newdata, key='', use_snapshot=False):
# Note(ekcs): use_snapshot param is ignored.
# Accepted here on temporary basis for dse1+2 compatibility.
self.log_debug("publishing to dataindex %s with data %s",
dataindex, strutils.mask_password(newdata, "****"))
if dataindex not in self.pubdata:
self.pubdata[dataindex] = dataobj.pubData(dataindex)
self.pubdata[dataindex].update(newdata)
self.push(dataindex, type='pub')
def receive(self, msg):
if msg.type == 'sub':
self.insub(msg)
elif msg.type == 'unsub':
self.inunsub(msg)
elif msg.type == 'pub':
self.inpubrep(msg)
elif msg.type == 'req':
self.inreq(msg)
elif msg.type == 'rep':
self.inpubrep(msg)
elif msg.type == 'pull':
self.inpull(msg)
elif msg.type == 'shut':
self.inshut(msg)
elif msg.type == 'cmd':
if hasattr(self, 'cmdhandler'):
self.incmd(msg)
else:
assert False, "{} received message of unknown type {}: {}".format(
self.name, msg.type, str(msg))
def _loop(self):
# self.running will be set to False when processing a shutdown a
# message
while self._running:
if self.inbox:
msg = self.inbox.get()
self.receive(msg)
self.inbox.task_done()
else:
# in test cases some deepSix instances are initialized
# without an inbox, this prevents a busy wait state
eventlet.sleep(1)
def subscription_list(self):
"""Return a list version of subscriptions."""
return [(x.key, x.dataindex) for x in self.subdata.values()]
def subscriber_list(self):
"""Return a list version of subscribers."""
result = []
for pubdata in self.pubdata.values():
for subscriber in pubdata.subscribers:
result.append((subscriber, pubdata.dataindex))
return result
def log(self, msg, *args):
self.log_debug(msg, *args)
def log_debug(self, msg, *args):
msg = "%s:: %s" % (self.name, msg)
LOG.debug(msg, *args)
def log_info(self, msg, *args):
msg = "%s:: %s" % (self.name, msg)
LOG.info(msg, *args)
def log_warning(self, msg, *args):
msg = "%s:: %s" % (self.name, msg)
LOG.warning(msg, *args)
def log_error(self, msg, *args):
msg = "%s:: %s" % (self.name, msg)
LOG.error(msg, *args)
def log_exception(self, msg, *args):
msg = "%s:: %s" % (self.name, msg)
LOG.exception(msg, *args)

View File

@ -33,7 +33,6 @@ from congress.db import api as db_api
# Import all data models
from congress.db.migration.models import head # noqa
from congress.db import model_base
from congress.dse import d6cage
from congress.tests import helper
from congress.tests import policy_fixture
@ -85,9 +84,6 @@ class TestCase(testtools.TestCase):
self.log_fixture = self.useFixture(fixtures.FakeLogger())
self.policy = self.useFixture(policy_fixture.PolicyFixture())
# cage is a singleton so we delete it here and
# recreate it after each test
self.addCleanup(d6cage.delete_cage)
def setup_config(self):
"""Tests that need a non-default config can override this method."""

View File

@ -1,70 +0,0 @@
# Copyright (c) 2013 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from oslo_log import log as logging
from congress.datasources import datasource_driver
LOG = logging.getLogger(__name__)
def d6service(name, keys, inbox, datapath, args):
"""Create dataservice instance.
This method is called by d6cage to create a dataservice
instance. There are a couple of parameters we found useful
to add to that call, so we included them here instead of
modifying d6cage (and all the d6cage.createservice calls).
"""
return TestDriver(name, keys, inbox, datapath, args)
class TestDriver(datasource_driver.PollingDataSourceDriver):
def __init__(self, name='', keys='', inbox=None, datapath=None, args=None):
if args is None:
args = self._empty_openstack_credentials()
super(TestDriver, self).__init__(name, keys, inbox, datapath, args)
self.msg = None
self.state = {}
self._init_end_start_poll()
def receive_msg(self, msg):
LOG.info("TestDriver: received msg %s", msg)
self.msg = msg
def get_msg_data(self):
msgstr = ""
if self.msg is None:
return msgstr
# only support list and set now
if isinstance(self.msg.body.data, (list, set)):
for di in self.msg.body.data:
msgstr += str(di)
else:
msgstr = str(self.msg.body.data)
LOG.info("TestDriver: current received msg: %s", msgstr)
return msgstr
def update_from_datasource(self):
pass
def prepush_processor(self, data, dataindex, type=None):
# don't change data before transfer
return data

View File

@ -1,180 +0,0 @@
# Copyright (c) 2013 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from congress.datalog import compile
import congress.dse.d6cage
from congress.tests import base
import congress.tests.helper as helper
class TestDSE(base.TestCase):
def test_cage(self):
"""Test basic DSE functionality."""
cage = congress.dse.d6cage.d6Cage()
cage.loadModule("TestDriver",
helper.data_module_path(
"../tests/datasources/test_driver.py"))
args = helper.datasource_openstack_args()
args['poll_time'] = 0
cage.createservice(name="test1", moduleName="TestDriver", args=args)
cage.createservice(name="test2", moduleName="TestDriver", args=args)
test1 = cage.service_object('test1')
test2 = cage.service_object('test2')
test1.subscribe('test2', 'p', callback=test1.receive_msg)
test2.publish('p', 42)
helper.retry_check_for_message_to_arrive(test1)
self.assertEqual(42, test1.msg.body.data)
def test_policy(self):
"""Test basic DSE functionality with policy engine."""
cage = congress.dse.d6cage.d6Cage()
cage.loadModule("TestDriver",
helper.data_module_path(
"../tests/datasources/test_driver.py"))
cage.loadModule("TestPolicy", helper.policy_module_path())
cage.createservice(name="data", moduleName="TestDriver",
args=helper.datasource_openstack_args())
cage.createservice(name="policy", moduleName="TestPolicy",
args={'d6cage': cage, 'rootdir': '',
'log_actions_only': True})
data = cage.services['data']['object']
policy = cage.services['policy']['object']
policy.subscribe('data', 'p', callback=policy.receive_msg)
data.publish('p', 42)
helper.retry_check_for_message_to_arrive(policy)
self.assertEqual(42, policy.msg.body.data)
def test_policy_data(self):
"""Test policy properly inserts data and processes it normally."""
cage = congress.dse.d6cage.d6Cage()
cage.loadModule("TestDriver",
helper.data_module_path(
"../tests/datasources/test_driver.py"))
cage.loadModule("TestPolicy", helper.policy_module_path())
cage.createservice(name="data", moduleName="TestDriver",
args=helper.datasource_openstack_args())
cage.createservice(name="policy", moduleName="TestPolicy",
args={'d6cage': cage, 'rootdir': '',
'log_actions_only': True})
data = cage.services['data']['object']
policy = cage.services['policy']['object']
# turn off module-schema syntax checking
policy.create_policy('data')
policy.set_schema('data', compile.Schema({'p': ((1, None),)}))
policy.subscribe('data', 'p', callback=policy.receive_data)
formula = policy.parse1('p(1)')
# sending a single Insert. (Default for Event is Insert.)
data.publish('p', [compile.Event(formula)])
helper.retry_check_db_equal(policy, 'data:p(x)', 'data:p(1)')
def test_policy_tables(self):
"""Test basic DSE functionality with policy engine and the API."""
cage = congress.dse.d6cage.d6Cage()
cage.loadModule("TestDriver",
helper.data_module_path(
"../tests/datasources/test_driver.py"))
cage.loadModule("TestPolicy", helper.policy_module_path())
cage.createservice(name="data", moduleName="TestDriver",
args=helper.datasource_openstack_args())
# using regular testdriver as API for now
cage.createservice(name="api", moduleName="TestDriver",
args=helper.datasource_openstack_args())
cage.createservice(name="policy", moduleName="TestPolicy",
args={'d6cage': cage, 'rootdir': '',
'log_actions_only': True})
data = cage.services['data']['object']
api = cage.services['api']['object']
policy = cage.services['policy']['object']
policy.create_policy('data')
policy.set_schema('data', compile.Schema({'q': (1,)}))
policy.subscribe('api', 'policy-update',
callback=policy.receive_policy_update)
# simulate API call for insertion of policy statements
formula = policy.parse1('p(x) :- data:q(x)')
api.publish('policy-update', [compile.Event(formula)])
helper.retry_check_nonempty_last_policy_change(policy)
# simulate data source publishing to q
formula = policy.parse1('q(1)')
data.publish('q', [compile.Event(formula)])
helper.retry_check_db_equal(policy, 'data:q(x)', 'data:q(1)')
# check that policy did the right thing with data
e = helper.db_equal(policy.select('p(x)'), 'p(1)')
self.assertTrue(e, 'Policy insert')
# check that publishing into 'p' does not work
formula = policy.parse1('p(3)')
data.publish('p', [compile.Event(formula)])
# can't actually check that the update for p does not arrive
# so instead wait a bit and check
helper.pause()
e = helper.db_equal(policy.select('p(x)'), 'p(1)')
self.assertTrue(e, 'Policy non-insert')
def test_policy_table_publish(self):
"""Policy table result publish
Test basic DSE functionality with policy engine and table result
publish.
"""
cage = congress.dse.d6cage.d6Cage()
cage.loadModule("TestDriver",
helper.data_module_path(
"../tests/datasources/test_driver.py"))
cage.loadModule("TestPolicy", helper.policy_module_path())
cage.createservice(name="data", moduleName="TestDriver",
args=helper.datasource_openstack_args())
cage.createservice(name="policy", moduleName="TestPolicy",
args={'d6cage': cage, 'rootdir': '',
'log_actions_only': True})
data = cage.services['data']['object']
policy = cage.services['policy']['object']
policy.create_policy('data')
policy.create_policy('classification')
policy.set_schema('data', compile.Schema({'q': (1,)}))
policy.insert('p(x):-data:q(x),gt(x,2)', target='classification')
data.subscribe('policy', 'classification:p', callback=data.receive_msg)
helper.retry_check_subscribers(policy, [('data', 'classification:p')])
self.assertEqual(list(policy.policySubData.keys()),
[('p', 'classification', None)])
policy.insert('q(1)', target='data')
# no entry here
self.assertEqual(data.get_msg_data(), '{}')
policy.insert('q(2)', target='data')
policy.insert('q(3)', target='data')
# get an update
helper.retry_check_for_message_data(data, 'insert[p(3)]')
self.assertEqual(data.get_msg_data(), 'insert[p(3)]')
# subscribe again to get a full table
data.subscribe('policy', 'classification:p', callback=data.receive_msg)
helper.retry_check_for_message_data(data, 'p(3)')
self.assertEqual(data.get_msg_data(), 'p(3)')
# get another update
policy.insert('q(4)', target='data')
helper.retry_check_for_message_data(data, 'insert[p(4)]')
self.assertEqual(data.get_msg_data(), 'insert[p(4)]')
# get another update
policy.delete('q(4)', target='data')
helper.retry_check_for_message_data(data, 'delete[p(4)]')
self.assertEqual(data.get_msg_data(), 'delete[p(4)]')
data.unsubscribe('policy', 'classification:p')
# trigger removed
helper.retry_check_no_subscribers(policy,
[('data', 'classification:p')])
self.assertEqual(list(policy.policySubData.keys()), [])

View File

@ -23,7 +23,6 @@ from mox3 import mox
from six.moves import range
from congress.datalog import compile
from congress.dse import dataobj
from congress import harness
from congress.policy_engines import agnostic
from congress.tests import base
@ -60,10 +59,10 @@ class BenchmarkDatasource(base.Benchmark):
self.assertEqual(datasource.state, {})
# add a subscriber to ensure the updates end up in datasource.dataPath
pubdata = datasource.pubdata.setdefault(table_name,
dataobj.pubData(table_name))
pubdata.addsubscriber(self.__class__.__name__, "push", "")
self.assertTrue(datasource.pubdata[table_name])
# pubdata = datasource.pubdata.setdefault(table_name,
# dataobj.pubData(table_name))
# pubdata.addsubscriber(self.__class__.__name__, "push", "")
# self.assertTrue(datasource.pubdata[table_name])
self.cage = cage
self.engine = engine