commit e970a0e102462962532c25f798c431be216c133b Author: François Rossigneux Date: Wed Nov 7 14:38:58 2012 +0100 Initial commit. Threads, no logging, XML config. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d20b64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/README b/README new file mode 100644 index 0000000..8eecb4e --- /dev/null +++ b/README @@ -0,0 +1,3 @@ +Energy Efficiency Architecture for XLcloud project. + +License: Apache. diff --git a/api.py b/api.py new file mode 100755 index 0000000..4c78929 --- /dev/null +++ b/api.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from flask import Flask, redirect, url_for, jsonify, abort +import collector + +app = Flask(__name__) + +@app.route('/') +def index(): + return redirect(url_for('v1')) + +@app.route('/v1/') +def v1(): + return 'Welcome to Kwapi!' + +@app.route('/v1/probes/') +def v1_probe_list(): + return jsonify(probes=database.keys()) + +@app.route('/v1/probes//') +def v1_probe_info(probe): + if not probe in database: + abort(404) + result = {probe: database[probe]} + return jsonify(result) + +@app.route('/v1/probes///') +def v1_probe_value(probe, meter): + if not probe in database or not meter in database[probe]._asdict(): + abort(404) + result = {meter: database[probe]._asdict()[meter]} + return jsonify(result) + +if __name__ == '__main__': + collector = collector.Collector() + collector.clean(3600*24, True) + collector.start_listen() + database = collector.database + app.run(debug=True) diff --git a/collector.py b/collector.py new file mode 100755 index 0000000..32b75c1 --- /dev/null +++ b/collector.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import time +import socket +import os, os.path +import threading +from recordtype import recordtype + +class Collector: + + def __init__(self): + self.Record = recordtype('Record', 'timestamp, kwh, w') + self.database = {} + + def add(self, probe, watts): + if probe in self.database: + currentTime = time.time() + record = self.database[probe] + record.kwh += (currentTime - record.timestamp) / 3600.0 * (watts / 1000.0) + record.w = watts + record.timestamp = currentTime + else: + record = self.Record(timestamp=time.time(), kwh=0.0, w=watts) + self.database[probe] = record + + def remove(self, probe): + if probe in self.database: + del self.database[probe] + return True + else: + return False + + def clean(self, timeout, periodic): + # Cleaning + for probe in self.database.keys(): + if time.time() - self.database[probe].timestamp > timeout: + self.remove(probe) + + # Cancel next execution of this function + try: + self.timer.cancel() + except AttributeError: + pass + + # Schedule periodic execution of this function + if periodic: + self.timer = threading.Timer(timeout, self.clean, [timeout]) + self.timer.start() + + def listen(self, socket_name='/tmp/kwapi-collector'): + if os.path.exists(socket_name): + os.remove(socket_name) + + server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + server.bind(socket_name) + + while True: + datagram = server.recv(1024) + if not datagram: + print 'Error: not datagram' + break + else: + data = datagram.split(':') + if len(data) == 2: + try: + self.add(data[0], float(data[1])) + except: + print 'Format error!' + else: + print 'Malformed datagram!' + server.close() + os.remove(socket_name) + + def start_listen(self): + thread = threading.Thread(target=self.listen) + thread.start() diff --git a/config.xml b/config.xml new file mode 100644 index 0000000..f2114ce --- /dev/null +++ b/config.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/config.xsd b/config.xsd new file mode 100644 index 0000000..67e2753 --- /dev/null +++ b/config.xsd @@ -0,0 +1,45 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/drivers/__init__.py b/drivers/__init__.py new file mode 100644 index 0000000..00f3b7a --- /dev/null +++ b/drivers/__init__.py @@ -0,0 +1,3 @@ +from driver import Driver +from wattsup import Wattsup +from dummy import Dummy diff --git a/drivers/driver.py b/drivers/driver.py new file mode 100644 index 0000000..3d6a043 --- /dev/null +++ b/drivers/driver.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from threading import Thread + +class Driver(Thread): + + def __init__(self, probe_id): + Thread.__init__(self) + self.name = probe_id + self.probe_observers = [] + self.terminate = False + + def run(self): + raise NotImplementedError + + def update_value(self, value): + for notify_new_value in self.probe_observers : + notify_new_value(self.name, value) + + def subscribe(self, observer): + self.probe_observers.append(observer) + + def stop(self): + self.terminate = True diff --git a/drivers/dummy.py b/drivers/dummy.py new file mode 100755 index 0000000..2dcba69 --- /dev/null +++ b/drivers/dummy.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- + +from driver import Driver +from random import randrange +import time + +class Dummy(Driver): + + def __init__(self, probe_id, **kwargs): + Driver.__init__(self, probe_id) + self.min_value = int(kwargs.get('min_value', 75)) + self.max_value = int(kwargs.get('max_value', 100)) + + def run(self): + while not self.terminate: + value = randrange(self.min_value, self.max_value) + self.update_value(value) + time.sleep(1) diff --git a/drivers/wattsup.py b/drivers/wattsup.py new file mode 100755 index 0000000..decb5b7 --- /dev/null +++ b/drivers/wattsup.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from driver import Driver +import serial + +class Wattsup(Driver): + + def __init__(self, probe_id, **kwargs): + Driver.__init__(self, probe_id) + + # Configure serial port + self.serial = serial.Serial( + port=kwargs.get('device', '/dev/ttyUSB0'), + baudrate=115200, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + bytesize=serial.EIGHTBITS, + timeout=2, + ) + + # Clear memory + self.serial.write('#R,W,0;') + self.serial.read(256) + + # Start external logging with interval = 1 + self.serial.write('#L,W,3,E,1,1;') + self.serial.read(256) + + def run(self): + while not self.terminate: + packet = self.get_packet() + value = self.extract_watts(packet) + self.update_value(value) + + def get_packet(self): + packet = '' + while True: + char = self.serial.read(1) + if len(char) == 0: + raise ValueError('Invalid packet') + packet += char + if char == ';': + return packet + + def extract_watts(self, packet): + value = float(packet.split(',')[3])/10.0 + return value diff --git a/probe_manager.py b/probe_manager.py new file mode 100755 index 0000000..6c995f1 --- /dev/null +++ b/probe_manager.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from lxml import etree +from subprocess import call +import drivers +import sys +import os +import socket +import thread +import threading +import signal + +threads = {} +socket_name = '' + +def get_root(schema, xml): + # Validating XML schema + xsd = etree.parse(schema) + schema = etree.XMLSchema(xsd) + parser = etree.XMLParser(schema = schema) + try: + tree = etree.parse(xml, parser) + except etree.XMLSyntaxError: + return None + return tree.getroot() + +def send_value(probe_id, value): + if os.path.exists(socket_name): + client = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + client.connect(socket_name) + client.send(probe_id + ':' + str(value)) + client.close() + +def signal_handler(signum, frame): + if signum is signal.SIGTERM: + terminate() + +def check_probes_alive(interval=60): + # TODO : default value because main exit before this thread... + print 'Check probes every', interval, 'seconds' + for thread in threads.keys(): + if not threads[thread].is_alive(): + print threads[thread], ' crashed!' + load_probe_from_xml(thread) + timer = threading.Timer(interval, check_probes_alive, [interval]) + timer.daemon = True + timer.start() + +def terminate(): + check_probes_alive() + for thread in threads.values(): + thread.stop() + for thread in threads.values(): + thread.join() + +def load_probe(class_name, probe_id, kwargs): + try: + probeClass = getattr(sys.modules['drivers'], class_name) + except AttributeError: + raise NameError("%s doesn't exist." % class_name) + try: + probeObject = probeClass(probe_id, **kwargs) + except Exception as exception: + print 'Probe "' + probe_id + '" error: %s' % exception + return + probeObject.subscribe(send_value) + probeObject.start() + threads[probe_id] = probeObject + +def load_probe_from_xml(probe_id): + print 'load_probe ', probe_id + + root = get_root('config.xsd', 'config.xml') + if root is None: + print "Configuration file isn't valid!" + sys.exit(1) + + probe = root.find("./driver/probe[@id='%s']" % probe_id) + class_name = probe.getparent().attrib['class'] + kwargs = {} + for argument in probe: + kwargs[argument.attrib['name']] = argument.attrib['value'] + load_probe(class_name, probe_id, kwargs) + +if __name__ == "__main__": + # Load and validate XML + root = get_root('config.xsd', 'config.xml') + if root is None: + print "Configuration file isn't valid!" + sys.exit(1) + + # Get socket path + socket_name = root.find('collector').attrib['socket'] + + # Load probes + probe_ids = root.findall("./driver/probe") + for probe_id in probe_ids: + load_probe_from_xml(probe_id.attrib['id']) + + # Load probes +# for driver in root.findall('driver'): +# for probe in driver.findall('probe'): +# class_name = driver.attrib['class'] +# probe_id = probe.attrib['id'] +# kwargs = {} +# for argument in probe.findall('arg'): +# kwargs[argument.attrib['name']] = argument.attrib['value'] +# thread.start_new_thread(load_probe, (class_name, probe_id, kwargs)) + + # Check probe crashes + check_probes_alive(60) + + signal.signal(signal.SIGTERM, signal_handler) + try: + signal.pause() + except KeyboardInterrupt: + terminate()