Introduce a API service to consolidate all database activity

This commit is contained in:
Christian Berendt 2015-03-05 00:46:23 +01:00
parent 29b519fb8c
commit d99c696e28
19 changed files with 271 additions and 117 deletions

View File

@ -1,4 +1,4 @@
Openstack tutorial application
OpenStack tutorial application
==============================
Workflow
@ -6,6 +6,8 @@ Workflow
.. image:: images/diagram.png
FIXME(berendt): Add new API service and webinterface to the workflow description.
* The producer generates a random number of tasks with random parameters and a UUID as identifier.
* The producer pushes the generated tasks into the exchange :code:`tasks`.
* The producer inserts a new record for each task into the database (including all parameters and the UUID).
@ -21,9 +23,11 @@ Workflow
Frameworks
----------
* http://flask.pocoo.org/
* http://python-requests.org
* http://www.sqlalchemy.org/
* https://github.com/celery/kombu
* https://pillow.readthedocs.org/
* http://www.sqlalchemy.org/
Example image
-------------
@ -57,6 +61,7 @@ The RabbitMQ server and the MySQL server are running on the machine :code:`servi
There is a machine for each service of the tutorial application:
* :code:`api` - :code:`vagrant ssh api` - :code:`sh run_api.sh`
* :code:`producer` - :code:`vagrant ssh producer` - :code:`sh run_producer.sh`
* :code:`tracker` - :code:`vagrant ssh tracker` - :code:`sh run_tracker.sh`
* :code:`worker` - :code:`vagrant ssh worker` - :code:`sh run_worker.sh`
@ -86,11 +91,12 @@ the application itself.
$ pip install -r requirements.txt
$ python setup.py install
Now open a new screen or tmux session. Aftwards run the worker, producer, and
Now open a new screen or tmux session. Aftwards run the api, worker, producer, and
tracker services in the foreground, each service in a separate window.
.. code::
$ source .venv/bin/activate; oat-api
$ source .venv/bin/activate; oat-worker
$ source .venv/bin/activate; oat-tracker
$ source .venv/bin/activate; oat-producer
@ -98,6 +104,11 @@ tracker services in the foreground, each service in a separate window.
Example outputs
---------------
API Service
~~~~~~~~~~~
FIXME(berendt): add output of the API service
Producer service
~~~~~~~~~~~~~~~~

9
Vagrantfile vendored
View File

@ -60,4 +60,13 @@ Vagrant.configure(2) do |config|
ansible.tags = "producer"
end
end
config.vm.define "api" do |api|
api.vm.hostname= "api"
api.vm.network :private_network, ip: '10.15.15.50'
api.vm.provision "ansible" do |ansible|
ansible.playbook = "ansible/playbook.yaml"
ansible.tags = "api"
end
api.vm.network "forwarded_port", guest: 5000, host: 5000
end
end

4
ansible/files/run_api.sh Normal file
View File

@ -0,0 +1,4 @@
#!/bin/sh
oat-api \
--database-url mysql://tutorial:secretsecret@service:3306/tutorial

View File

@ -2,4 +2,4 @@
oat-producer \
--amqp-url amqp://tutorial:secretsecret@service:5672/ \
--database-url mysql://tutorial:secretsecret@service:3306/tutorial
--api-url http://api:5000

View File

@ -2,4 +2,4 @@
oat-tracker \
--amqp-url amqp://tutorial:secretsecret@service:5672/ \
--database-url mysql://tutorial:secretsecret@service:3306/tutorial
--api-url http://api:5000

View File

@ -5,6 +5,7 @@
- apt: update-cache=yes upgrade=yes
- include: tasks/database.yaml tags=database
- include: tasks/messaging.yaml tags=messaging
- include: tasks/api.yaml tags=api
- apt: name={{ item }} state=latest
with_items:
@ -13,14 +14,15 @@
- python-daemon
- python-dev
- python-kombu
- python-pillow
- python-sqlalchemy
- python-virtualenv
- python-pbr
- python-pillow
- python-requests
- python-virtualenv
tags:
- worker
- producer
- tracker
- api
- command: python setup.py install
args:
@ -29,6 +31,7 @@
- worker
- producer
- tracker
- api
- copy: src=files/run_worker.sh dest=/home/vagrant/run_worker.sh mode=0755
tags: worker
@ -36,3 +39,5 @@
tags: tracker
- copy: src=files/run_producer.sh dest=/home/vagrant/run_producer.sh mode=0755
tags: producer
- copy: src=files/run_api.sh dest=/home/vagrant/run_api.sh mode=0755
tags: api

5
ansible/tasks/api.yaml Normal file
View File

@ -0,0 +1,5 @@
---
- apt: name={{ item }} state=latest
with_items:
- python-flask
- python-flask-sqlalchemy

View File

@ -20,3 +20,4 @@
- producer
- tracker
- worker
- api

View File

@ -1,11 +1,15 @@
digraph {
node [shape=box]; Producer; Database;
Producer -> Database [color=orange];
digraph {
API -> Database [color=green];
API -> Database [color=orange];
Database -> API [color=red];
API -> Webinterface [color=red];
Producer -> API [color=orange];
Producer -> "Queue Service" [color=orange];
Tracker -> Database [color=green];
Tracker -> API [color=green];
"Queue Service" -> Tracker [color=green];
"Queue Service" -> Worker [color=orange];
Worker -> "Image File" [color=blue];
Worker -> "Queue Service" [color=green];
"Image File" -> "Storage Backend" [color=blue];
"Storage Backend" -> Webinterface [color=red];
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 18 KiB

After

Width:  |  Height:  |  Size: 39 KiB

3
images/dot2png.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/sh
dot -T png -o diagram.png diagram.dot

View File

@ -0,0 +1,158 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import json
import logging
import flask
from flask.ext.sqlalchemy import SQLAlchemy
def initialize_logging(filename):
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO,
filename=filename)
def parse_command_line_arguments():
"""Parse the command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument(
"--database-url", type=str, help="database connection URL",
default="sqlite:////tmp/oat.db")
parser.add_argument(
"--log-file", type=str, help="write logs to this file", default=None)
parser.add_argument(
"--daemonize", action="store_true", help="run in background")
return parser.parse_args()
app = flask.Flask(__name__)
args = parse_command_line_arguments()
initialize_logging(args.log_file)
app.config['SQLALCHEMY_DATABASE_URI'] = args.database_url
db = SQLAlchemy(app)
class Fractal(db.Model):
__tablename__ = 'fractals'
fractal_id = db.Column(db.String(36), primary_key=True)
data = db.Column(db.Text())
def __repr__(self):
return "<Fractal(uuid='%s')>" % self.uuid
@app.route("/")
def index():
return flask.jsonify({})
def get_fractal_from_database(fractal_id):
try:
return Fractal.query.get(fractal_id)
except Exception:
return None
def write_fractal_to_database(fractal_id, data):
fractal = Fractal(
fractal_id=fractal_id,
data=json.dumps(data)
)
db.session.add(fractal)
db.session.commit()
@app.route('/v1/fractals/<string:fractal_id>/result', methods=['POST'])
def publish_result(fractal_id):
if (not flask.request.json or
not flask.request.json.viewkeys() & {
'duration', 'checksum'}):
logging.error("wrong request: %s" % flask.request.json)
flask.abort(400)
fractal = get_fractal_from_database(fractal_id)
if not fractal:
flask.abort(400)
data = json.loads(fractal.data)
data['checksum'] = str(flask.request.json['checksum'])
data['duration'] = float(flask.request.json['duration'])
fractal.data = json.dumps(data)
db.session.commit()
data['uuid'] = fractal_id
return flask.jsonify(data), 201
@app.route('/v1/fractals', methods=['POST'])
def create_fractal():
if (not flask.request.json or
not flask.request.json.viewkeys() >= {
'uuid', 'parameter', 'dimension'} or
not flask.request.json['parameter'].viewkeys() >= {
'xa', 'xb', 'ya', 'yb', 'iterations'} or
not flask.request.json['dimension'].viewkeys() >= {
'width', 'height'}):
logging.error("wrong request: %s" % flask.request.json)
flask.abort(400)
try:
fractal_id = str(flask.request.json['uuid'])
xa = float(flask.request.json['parameter']['xa'])
xb = float(flask.request.json['parameter']['xb'])
ya = float(flask.request.json['parameter']['ya'])
yb = float(flask.request.json['parameter']['yb'])
iterations = int(flask.request.json['parameter']['iterations'])
width = int(flask.request.json['dimension']['width'])
height = int(flask.request.json['dimension']['height'])
fractal = {
'checksum': '',
'duration': 0.0,
'parameter': {
'xa': xa,
'xb': xb,
'ya': ya,
'yb': yb,
'iterations': iterations
},
'dimension': {
'width': width,
'height': height
}
}
except Exception:
flask.abort(400)
write_fractal_to_database(fractal_id, fractal)
fractal['uuid'] = fractal_id
return flask.jsonify(fractal), 201
@app.errorhandler(404)
def not_found(error):
return flask.make_response(flask.jsonify({'error': 'Not found'}), 404)
def main():
db.create_all()
app.run(host="0.0.0.0", debug=True)
if __name__ == "__main__":
main()

View File

@ -1,34 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float
Base = declarative_base()
class Fractal(Base):
__tablename__ = 'fractals'
uuid = Column(String(36), primary_key=True)
checksum = Column(String(256))
duration = Column(Float)
width = Column(Integer)
height = Column(Integer)
iterations = Column(Integer)
xa = Column(Float)
xb = Column(Float)
ya = Column(Float)
yb = Column(Float)
def __repr__(self):
return "<Fractal(uuid='%s')>" % self.uuid

View File

@ -13,6 +13,7 @@
# under the License.
import argparse
import json
import logging
import random
import sys
@ -22,10 +23,8 @@ import uuid
import daemon
import kombu
from kombu.pools import producers
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import requests
from openstack_application_tutorial import models
from openstack_application_tutorial import queues
@ -41,8 +40,8 @@ def parse_command_line_arguments():
"--amqp-url", type=str, help="AMQP connection URL",
default="amqp://tutorial:secretsecret@localhost:5672/")
parser.add_argument(
"--database-url", type=str, help="database connection URL",
default="mysql://tutorial:secretsecret@localhost:3306/tutorial")
"--api-url", type=str, help="API connection URL",
default="http://localhost:5000")
parser.add_argument(
"--log-file", type=str, help="write logs to this file", default=None)
parser.add_argument(
@ -99,38 +98,38 @@ def generate_task(args):
ya = random.uniform(args.min_ya, args.max_ya)
yb = random.uniform(args.min_yb, args.max_yb)
return {
'uuid': uuid.uuid4(),
'width': width,
'height': height,
'iterations': iterations,
'xa': xa,
'xb': xb,
'ya': ya,
'yb': yb
task = {
'uuid': str(uuid.uuid4()),
'dimension': {
'width': width,
'height': height,
},
'parameter': {
'iterations': iterations,
'xa': xa,
'xb': xb,
'ya': ya,
'yb': yb
}
}
return task
def run(args, connection, session):
def run(args, messaging, api_url):
while True:
random.seed()
number = random.randint(args.min_tasks, args.max_tasks)
logging.info("generating %d task(s)" % number)
for i in xrange(0, number):
task = generate_task(args)
fractal = models.Fractal(
uuid=task['uuid'],
width=task['width'],
height=task['height'],
xa=task['xa'],
xb=task['xb'],
ya=task['ya'],
yb=task['yb'],
iterations=task['iterations'])
session.add(fractal)
session.commit()
# NOTE(berendt): only necessary when using requests < 2.4.2
headers = {'Content-type': 'application/json',
'Accept': 'text/plain'}
requests.post("%s/v1/fractals" % api_url, json.dumps(task),
headers=headers)
logging.info("generated task: %s" % task)
with producers[connection].acquire(block=True) as producer:
with producers[messaging].acquire(block=True) as producer:
producer.publish(
task,
serializer='pickle',
@ -149,22 +148,14 @@ def run(args, connection, session):
def main():
args = parse_command_line_arguments()
initialize_logging(args.log_file)
connection = kombu.Connection(args.amqp_url)
engine = create_engine(args.database_url)
models.Base.metadata.bind = engine
models.Base.metadata.create_all(engine)
maker = sessionmaker(bind=engine)
session = maker()
messaging = kombu.Connection(args.amqp_url)
if args.daemonize:
with daemon.DaemonContext():
run(args, connection, session)
run(args, messaging, args.api_url)
else:
try:
run(args, connection, session)
run(args, messaging, args.api_url)
except KeyboardInterrupt:
return 0

View File

@ -15,28 +15,23 @@
# based on http://code.activestate.com/recipes/577120-julia-fractals/
import argparse
import json
import logging
import sys
import daemon
import kombu
from kombu.mixins import ConsumerMixin
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import requests
from openstack_application_tutorial import models
from openstack_application_tutorial import queues
class Tracker(ConsumerMixin):
def __init__(self, amqp_url, database_url):
def __init__(self, amqp_url, api_url):
self.connection = kombu.Connection(amqp_url)
engine = create_engine(database_url)
models.Base.metadata.bind = engine
models.Base.metadata.create_all(engine)
maker = sessionmaker(bind=engine)
self.session = maker()
self.api_url = api_url
def get_consumers(self, Consumer, channel):
return [Consumer(queues=queues.result_queues,
@ -47,14 +42,16 @@ class Tracker(ConsumerMixin):
logging.info("processing result %s" % body['uuid'])
logging.info("elapsed time %f seconds" % body['duration'])
logging.info("checksum %s" % body['checksum'])
try:
fractal = self.session.query(models.Fractal).filter(
models.Fractal.uuid == str(body['uuid'])).one()
fractal.duration = body['duration']
fractal.checksum = body['checksum']
self.session.commit()
except Exception:
pass
result = {
'duration': float(body['duration']),
'checksum': str(body['checksum'])
}
# NOTE(berendt): only necessary when using requests < 2.4.2
headers = {'Content-type': 'application/json',
'Accept': 'text/plain'}
requests.post("%s/v1/fractals/%s/result" %
(self.api_url, str(body['uuid'])),
json.dumps(result), headers=headers)
message.ack()
@ -70,8 +67,8 @@ def parse_command_line_arguments():
"--amqp-url", type=str, help="AMQP connection URL",
default="amqp://tutorial:secretsecret@localhost:5672/")
parser.add_argument(
"--database-url", type=str, help="database connection URL",
default="mysql://tutorial:secretsecret@localhost:3306/tutorial")
"--api-url", type=str, help="API connection URL",
default="http://localhost:5000")
parser.add_argument(
"--log-file", type=str, help="write logs to this file", default=None)
parser.add_argument(
@ -82,7 +79,7 @@ def parse_command_line_arguments():
def main():
args = parse_command_line_arguments()
initialize_logging(args.log_file)
tracker = Tracker(args.amqp_url, args.database_url)
tracker = Tracker(args.amqp_url, args.api_url)
if args.daemonize:
with daemon.DaemonContext():

View File

@ -92,10 +92,15 @@ class Worker(ConsumerMixin):
def process_task(self, body, message):
logging.info("processing task %s" % body['uuid'])
logging.debug(body)
start_time = time.time()
juliaset = JuliaSet(body['width'], body['height'], body['xa'],
body['xb'], body['ya'], body['yb'],
body['iterations'])
juliaset = JuliaSet(body['dimension']['width'],
body['dimension']['height'],
body['parameter']['xa'],
body['parameter']['xb'],
body['parameter']['ya'],
body['parameter']['yb'],
body['parameter']['iterations'])
filename = os.path.join(self.target, "%s.png" % body['uuid'])
elapsed_time = time.time() - start_time
logging.info("task %s processed in %f seconds" %

View File

@ -6,4 +6,6 @@ kombu
mysql
pillow
python-daemon
sqlalchemy
requests
flask
flask-sqlalchemy

View File

@ -29,14 +29,7 @@ console_scripts =
oat-producer = openstack_application_tutorial.producer:main
oat-tracker = openstack_application_tutorial.tracker:main
oat-worker = openstack_application_tutorial.worker:main
#[build_sphinx]
#source-dir = doc/source
#build-dir = doc/build
#all_files = 1
#[upload_sphinx]
#upload-dir = doc/build/html
oat-api = openstack_application_tutorial.api:main
[wheel]
universal = 1

View File

@ -18,4 +18,4 @@ commands = flake8
[flake8]
show-source = True
exclude=.venv,.git,.tox,*egg*
exclude=.venv,.git,.tox,*egg*,build