Initial commit.

Threads, no logging, XML config.
This commit is contained in:
François Rossigneux 2012-11-07 14:38:58 +01:00
commit e970a0e102
11 changed files with 414 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.pyc

3
README Normal file
View File

@ -0,0 +1,3 @@
Energy Efficiency Architecture for XLcloud project.
License: Apache.

40
api.py Executable file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import Flask, redirect, url_for, jsonify, abort
import collector
app = Flask(__name__)
@app.route('/')
def index():
return redirect(url_for('v1'))
@app.route('/v1/')
def v1():
return 'Welcome to Kwapi!'
@app.route('/v1/probes/')
def v1_probe_list():
return jsonify(probes=database.keys())
@app.route('/v1/probes/<probe>/')
def v1_probe_info(probe):
if not probe in database:
abort(404)
result = {probe: database[probe]}
return jsonify(result)
@app.route('/v1/probes/<probe>/<meter>/')
def v1_probe_value(probe, meter):
if not probe in database or not meter in database[probe]._asdict():
abort(404)
result = {meter: database[probe]._asdict()[meter]}
return jsonify(result)
if __name__ == '__main__':
collector = collector.Collector()
collector.clean(3600*24, True)
collector.start_listen()
database = collector.database
app.run(debug=True)

77
collector.py Executable file
View File

@ -0,0 +1,77 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import socket
import os, os.path
import threading
from recordtype import recordtype
class Collector:
def __init__(self):
self.Record = recordtype('Record', 'timestamp, kwh, w')
self.database = {}
def add(self, probe, watts):
if probe in self.database:
currentTime = time.time()
record = self.database[probe]
record.kwh += (currentTime - record.timestamp) / 3600.0 * (watts / 1000.0)
record.w = watts
record.timestamp = currentTime
else:
record = self.Record(timestamp=time.time(), kwh=0.0, w=watts)
self.database[probe] = record
def remove(self, probe):
if probe in self.database:
del self.database[probe]
return True
else:
return False
def clean(self, timeout, periodic):
# Cleaning
for probe in self.database.keys():
if time.time() - self.database[probe].timestamp > timeout:
self.remove(probe)
# Cancel next execution of this function
try:
self.timer.cancel()
except AttributeError:
pass
# Schedule periodic execution of this function
if periodic:
self.timer = threading.Timer(timeout, self.clean, [timeout])
self.timer.start()
def listen(self, socket_name='/tmp/kwapi-collector'):
if os.path.exists(socket_name):
os.remove(socket_name)
server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
server.bind(socket_name)
while True:
datagram = server.recv(1024)
if not datagram:
print 'Error: not datagram'
break
else:
data = datagram.split(':')
if len(data) == 2:
try:
self.add(data[0], float(data[1]))
except:
print 'Format error!'
else:
print 'Malformed datagram!'
server.close()
os.remove(socket_name)
def start_listen(self):
thread = threading.Thread(target=self.listen)
thread.start()

36
config.xml Normal file
View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="config.xsd">
<collector socket="/tmp/kwapi-collector"/>
<driver class="Wattsup">
<probe id="A">
<arg name="device" value="/dev/ttyUSB0"/>
</probe>
<!-- <probe id="B">-->
<!-- <arg name="device" value="/dev/ttyUSB1"/>-->
<!-- </probe>-->
</driver>
<!-- <driver class="Omega">-->
<!-- <probe id="C">-->
<!-- <arg name="IP" value="192.168.0.10"/>-->
<!-- <arg name="port" value="2020"/>-->
<!-- <arg name="output" value="1"/>-->
<!-- </probe>-->
<!-- <probe id="D">-->
<!-- <arg name="IP" value="192.168.0.10"/>-->
<!-- <arg name="port" value="2020"/>-->
<!-- <arg name="output" value="1"/>-->
<!-- </probe>-->
<!-- </driver>-->
<driver class="Dummy">
<probe id="X"/>
<probe id="Y">
<arg name="min_value" value="10"/>
<arg name="max_value" value="20"/>
</probe>
</driver>
</config>

45
config.xsd Normal file
View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" elementFormDefault="qualified" attributeFormDefault="unqualified">
<xs:element name="config" type="config">
<xs:unique name="driverClass">
<xs:selector xpath="./driver"/>
<xs:field xpath="@class"/>
</xs:unique>
<xs:unique name="probeID">
<xs:selector xpath="./driver/probe"/>
<xs:field xpath="@id"/>
</xs:unique>
</xs:element>
<xs:complexType name="config">
<xs:sequence>
<xs:element name="collector" type="collector"/>
<xs:element name="driver" type="driver" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="collector">
<xs:attribute name="socket" type="xs:string"/>
</xs:complexType>
<xs:complexType name="driver">
<xs:sequence>
<xs:element name="probe" type="probe" maxOccurs="unbounded"/>
</xs:sequence>
<xs:attribute name="class" type="xs:NCName"/>
</xs:complexType>
<xs:complexType name="probe">
<xs:sequence>
<xs:element name="arg" type="arg" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
<xs:attribute name="id" type="xs:NCName"/>
</xs:complexType>
<xs:complexType name="arg">
<xs:attribute name="name" type="xs:string"/>
<xs:attribute name="value" type="xs:string"/>
</xs:complexType>
</xs:schema>

3
drivers/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from driver import Driver
from wattsup import Wattsup
from dummy import Dummy

25
drivers/driver.py Normal file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from threading import Thread
class Driver(Thread):
def __init__(self, probe_id):
Thread.__init__(self)
self.name = probe_id
self.probe_observers = []
self.terminate = False
def run(self):
raise NotImplementedError
def update_value(self, value):
for notify_new_value in self.probe_observers :
notify_new_value(self.name, value)
def subscribe(self, observer):
self.probe_observers.append(observer)
def stop(self):
self.terminate = True

18
drivers/dummy.py Executable file
View File

@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
from driver import Driver
from random import randrange
import time
class Dummy(Driver):
def __init__(self, probe_id, **kwargs):
Driver.__init__(self, probe_id)
self.min_value = int(kwargs.get('min_value', 75))
self.max_value = int(kwargs.get('max_value', 100))
def run(self):
while not self.terminate:
value = randrange(self.min_value, self.max_value)
self.update_value(value)
time.sleep(1)

48
drivers/wattsup.py Executable file
View File

@ -0,0 +1,48 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from driver import Driver
import serial
class Wattsup(Driver):
def __init__(self, probe_id, **kwargs):
Driver.__init__(self, probe_id)
# Configure serial port
self.serial = serial.Serial(
port=kwargs.get('device', '/dev/ttyUSB0'),
baudrate=115200,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=2,
)
# Clear memory
self.serial.write('#R,W,0;')
self.serial.read(256)
# Start external logging with interval = 1
self.serial.write('#L,W,3,E,1,1;')
self.serial.read(256)
def run(self):
while not self.terminate:
packet = self.get_packet()
value = self.extract_watts(packet)
self.update_value(value)
def get_packet(self):
packet = ''
while True:
char = self.serial.read(1)
if len(char) == 0:
raise ValueError('Invalid packet')
packet += char
if char == ';':
return packet
def extract_watts(self, packet):
value = float(packet.split(',')[3])/10.0
return value

118
probe_manager.py Executable file
View File

@ -0,0 +1,118 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from lxml import etree
from subprocess import call
import drivers
import sys
import os
import socket
import thread
import threading
import signal
threads = {}
socket_name = ''
def get_root(schema, xml):
# Validating XML schema
xsd = etree.parse(schema)
schema = etree.XMLSchema(xsd)
parser = etree.XMLParser(schema = schema)
try:
tree = etree.parse(xml, parser)
except etree.XMLSyntaxError:
return None
return tree.getroot()
def send_value(probe_id, value):
if os.path.exists(socket_name):
client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
client.connect(socket_name)
client.send(probe_id + ':' + str(value))
client.close()
def signal_handler(signum, frame):
if signum is signal.SIGTERM:
terminate()
def check_probes_alive(interval=60):
# TODO : default value because main exit before this thread...
print 'Check probes every', interval, 'seconds'
for thread in threads.keys():
if not threads[thread].is_alive():
print threads[thread], ' crashed!'
load_probe_from_xml(thread)
timer = threading.Timer(interval, check_probes_alive, [interval])
timer.daemon = True
timer.start()
def terminate():
check_probes_alive()
for thread in threads.values():
thread.stop()
for thread in threads.values():
thread.join()
def load_probe(class_name, probe_id, kwargs):
try:
probeClass = getattr(sys.modules['drivers'], class_name)
except AttributeError:
raise NameError("%s doesn't exist." % class_name)
try:
probeObject = probeClass(probe_id, **kwargs)
except Exception as exception:
print 'Probe "' + probe_id + '" error: %s' % exception
return
probeObject.subscribe(send_value)
probeObject.start()
threads[probe_id] = probeObject
def load_probe_from_xml(probe_id):
print 'load_probe ', probe_id
root = get_root('config.xsd', 'config.xml')
if root is None:
print "Configuration file isn't valid!"
sys.exit(1)
probe = root.find("./driver/probe[@id='%s']" % probe_id)
class_name = probe.getparent().attrib['class']
kwargs = {}
for argument in probe:
kwargs[argument.attrib['name']] = argument.attrib['value']
load_probe(class_name, probe_id, kwargs)
if __name__ == "__main__":
# Load and validate XML
root = get_root('config.xsd', 'config.xml')
if root is None:
print "Configuration file isn't valid!"
sys.exit(1)
# Get socket path
socket_name = root.find('collector').attrib['socket']
# Load probes
probe_ids = root.findall("./driver/probe")
for probe_id in probe_ids:
load_probe_from_xml(probe_id.attrib['id'])
# Load probes
# for driver in root.findall('driver'):
# for probe in driver.findall('probe'):
# class_name = driver.attrib['class']
# probe_id = probe.attrib['id']
# kwargs = {}
# for argument in probe.findall('arg'):
# kwargs[argument.attrib['name']] = argument.attrib['value']
# thread.start_new_thread(load_probe, (class_name, probe_id, kwargs))
# Check probe crashes
check_probes_alive(60)
signal.signal(signal.SIGTERM, signal_handler)
try:
signal.pause()
except KeyboardInterrupt:
terminate()