Introduction of oslo.messaging

* replaces kombu with oslo.messaging

* deprecates and removes the faafo-tracker service (merged into the
  faafo-producer service)

* moves all CLI options into configuration files

* updates log example outputs in the documentation

* updates the workflow diagram and removes the workflow text description

Change-Id: I4b79ed28a8f5b1fe1b33dfdbfe9cc8c958f80b1b
This commit is contained in:
Christian Berendt 2015-03-25 10:12:29 +01:00
parent 32ca4b408d
commit ec790a3bed
22 changed files with 110 additions and 279 deletions

4
ansible/files/api.conf Normal file
View File

@ -0,0 +1,4 @@
[DEFAULT]
database_url = mysql://faafo:secretsecret@127.0.0.1:3306/faafo
verbose = True

View File

@ -0,0 +1,5 @@
[DEFAULT]
endpoint_url = http://127.0.0.1:5000
transport_url = rabbit://faafo:secretsecret@localhost:5672/
verbose = True

View File

@ -1,5 +1,3 @@
#!/bin/sh
faafo-api \
--database-url mysql://faafo:secretsecret@127.0.0.1:3306/faafo \
--debug --verbose
faafo-api --config-file api.conf

View File

@ -1,6 +1,3 @@
#!/bin/sh
faafo-producer \
--amqp-url amqp://faafo:secretsecret@127.0.0.1:5672/ \
--api-url http://127.0.0.1:5000 \
--debug --verbose
faafo-producer --config-file producer.conf

View File

@ -1,6 +0,0 @@
#!/bin/sh
faafo-tracker \
--amqp-url amqp://faafo:secretsecret@127.0.0.1:5672/ \
--api-url http://127.0.0.1:5000 \
--debug --verbose

View File

@ -1,6 +1,3 @@
#!/bin/sh
faafo-worker \
--amqp-url amqp://faafo:secretsecret@127.0.0.1:5672/ \
--target /home/vagrant \
--debug --verbose
faafo-worker --config-file worker.conf

View File

@ -0,0 +1,5 @@
[DEFAULT]
filesystem_store_datadir = /home/vagrant
transport_url = rabbit://faafo:secretsecret@localhost:5672/
verbose = True

View File

@ -20,7 +20,11 @@
args:
chdir: /vagrant
- copy: src=files/run_api.sh dest=/home/vagrant/run_api.sh mode=0755 owner=vagrant group=vagrant
- copy: src=files/run_producer.sh dest=/home/vagrant/run_producer.sh mode=0755 owner=vagrant group=vagrant
- copy: src=files/run_tracker.sh dest=/home/vagrant/run_tracker.sh mode=0755 owner=vagrant group=vagrant
- copy: src=files/run_worker.sh dest=/home/vagrant/run_worker.sh mode=0755 owner=vagrant group=vagrant
- copy: src=files/{{ item }} dest=/home/vagrant/{{ item }} mode=0755 owner=vagrant group=vagrant
with_items:
- api.conf
- producer.conf
- run_api.sh
- run_producer.sh
- run_worker.sh
- worker.conf

View File

@ -27,12 +27,11 @@ Now it is possible to login with SSH.
$ vagrant ssh
Open a new screen or tmux session. Aftwards run the api, worker, producer, and
tracker services in the foreground, each service in a separate window.
Open a new screen or tmux session. Aftwards run the api, worker, and producer
services in the foreground, each service in a separate window.
* :code:`sh run_api.sh`
* :code:`sh run_producer.sh`
* :code:`sh run_tracker.sh`
* :code:`sh run_worker.sh`
RabbitMQ server
@ -60,12 +59,11 @@ the application itself.
$ pip install -r requirements.txt
$ python setup.py install
Open a new screen or tmux session. Aftwards run the api, worker, producer, and
tracker services in the foreground, each service in a separate window.
Open a new screen or tmux session. Aftwards run the api, worker, and producer
services in the foreground, each service in a separate window.
.. code::
$ source .venv/bin/activate; faafo-api
$ source .venv/bin/activate; faafo-worker
$ source .venv/bin/activate; faafo-tracker
$ source .venv/bin/activate; faafo-producer

View File

@ -4,12 +4,12 @@ digraph {
Database -> API [color=red];
API -> Webinterface [color=red];
Producer -> API [color=orange];
Producer -> API [color=green];
Producer -> "Queue Service" [color=orange];
Tracker -> API [color=green];
"Queue Service" -> Tracker [color=green];
"Queue Service" -> Worker [color=orange];
Worker -> "Image File" [color=blue];
Worker -> "Queue Service" [color=green];
"Queue Service" -> Producer [color=green];
"Image File" -> "Storage Backend" [color=blue];
"Storage Backend" -> Webinterface [color=red];
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 39 KiB

After

Width:  |  Height:  |  Size: 35 KiB

View File

@ -10,42 +10,28 @@ Example image
Example outputs
---------------
API Service
~~~~~~~~~~~
FIXME(berendt): add output of the API service
Producer service
~~~~~~~~~~~~~~~~
FIXME(berendt): update output (introduction of oslo.logging)
.. code::
2015-02-12 22:21:42,870 generating 2 task(s)
2015-02-12 22:21:42,876 generated task: {'width': 728, 'yb': 2.6351683415972076, 'uuid': UUID('66d5f67e-d26d-42fb-9d88-3c3830b4187a'), 'iterations': 395, 'xb': 1.6486035545865234, 'xa': -1.2576814065507933, 'ya': -2.8587178863035616, 'height': 876}
2015-02-12 22:21:42,897 generated task: {'width': 944, 'yb': 2.981696583462036, 'uuid': UUID('6f873111-8bc2-4d73-9a36-ed49915699c8'), 'iterations': 201, 'xb': 3.530775320058914, 'xa': -3.3511031734533794, 'ya': -0.921920674639712, 'height': 962}
2015-02-12 22:21:42,927 sleeping for 2.680171 seconds
Tracker service
~~~~~~~~~~~~~~~
FIXME(berendt): update output (introduction of oslo.logging)
.. code::
2015-02-12 22:20:26,630 processing result be42a131-e4aa-4db5-80d1-1956784f4b81
2015-02-12 22:20:26,630 elapsed time 5.749099 seconds
2015-02-12 22:20:26,631 checksum 7ba5bf955a94f1aa02e5f442869b8db88a5915b7c2fb91ffba74708b8d799c2a
2015-03-25 23:01:29.308 22526 INFO faafo.producer [-] generating 1 task(s)
2015-03-25 23:01:29.344 22526 INFO faafo.producer [-] generated task: {'width': 510, 'yb': 2.478654026560605, 'uuid': '212e8c23-e67f-4bd3-86e1-5a5e811ee2f4', 'iterations': 281, 'xb': 1.1428457603077387, 'xa': -3.3528957195683087, 'ya': -2.1341119130263717, 'height': 278}
2015-03-25 23:01:30.295 22526 INFO faafo.producer [-] task 212e8c23-e67f-4bd3-86e1-5a5e811ee2f4 processed: {u'duration': 0.8725259304046631, u'checksum': u'b22d975c4f9dc77df5db96ce6264a4990d865dd8f800aba2ac03a065c2f09b1e', u'uuid': u'212e8c23-e67f-4bd3-86e1-5a5e811ee2f4'}
Worker service
~~~~~~~~~~~~~~
FIXME(berendt): update output (introduction of oslo.logging)
.. code::
2015-02-12 22:20:59,258 processing task 20a00e9e-baec-4045-bc57-2cb9d8d1aa61
2015-02-12 22:21:01,506 task 20a00e9e-baec-4045-bc57-2cb9d8d1aa61 processed in 2.246601 seconds
2015-02-12 22:21:01,553 saved result of task 20a00e9e-baec-4045-bc57-2cb9d8d1aa61 to file /home/vagrant/20a00e9e-baec-4045-bc57-2cb9d8d1aa61.png
2015-02-12 22:21:01,554 pushed result: {'duration': 2.246600866317749, 'checksum': 'faa0f00a72fac53e02c3eb392c5da8365139e509899e269227e5c27047af6c1f', 'uuid': UUID('20a00e9e-baec-4045-bc57-2cb9d8d1aa61')}
2015-03-25 23:01:29.378 22518 INFO faafo.worker [-] processing task 212e8c23-e67f-4bd3-86e1-5a5e811ee2f4
2015-03-25 23:01:30.251 22518 INFO faafo.worker [-] task 212e8c23-e67f-4bd3-86e1-5a5e811ee2f4 processed in 0.872526 seconds
2015-03-25 23:01:30.268 22518 INFO faafo.worker [-] saved result of task 212e8c23-e67f-4bd3-86e1-5a5e811ee2f4 to file /home/vagrant/212e8c23-e67f-4bd3-86e1-5a5e811ee2f4.png
API Service
~~~~~~~~~~~
.. code::
2015-03-25 23:01:29.342 22511 INFO werkzeug [-] 127.0.0.1 - - [25/Mar/2015 23:01:29] "POST /api/fractal HTTP/1.1" 201 -
2015-03-25 23:01:30.317 22511 INFO werkzeug [-] 127.0.0.1 - - [25/Mar/2015 23:01:30] "PUT /api/fractal/212e8c23-e67f-4bd3-86e1-5a5e811ee2f4 HTTP/1.1" 200 -

View File

@ -2,17 +2,3 @@ Workflow
--------
.. image:: images/diagram.png
FIXME(berendt): Add new API service and webinterface to the workflow description.
* The producer generates a random number of tasks with random parameters and a UUID as identifier.
* The producer pushes the generated tasks into the exchange :code:`tasks`.
* The producer inserts a new record for each task into the database (including all parameters and the UUID).
* The producer sleeps for a random number of seconds and will generate more tasks after awakening.
* All messages in the :code:`tasks` exchange will be routed into the :code:`tasks` queue.
* The worker waits for new messages in the :code:`tasks` queue.
* After receiving a message the worker generates an image based on the received parameters and writes the result into a local file (identified by the UUID).
* After writing an image the worker pushes the result (the checksum of the generated image and the duration identified by the UUID) into the exchange :code:`results`.
* All messages in the :code:`results` exchange will be routed into the :code:`results` queue.
* The tracker waits for new messages in the :code:`results` queue.
* After receiving a message the tracker updates the duration and checksum value of the corresponding database record (identified by the UUID).

1
etc/faafo/producer.conf Normal file
View File

@ -0,0 +1 @@
TODO(berendt): generate example configuration file with http://docs.openstack.org/developer/oslo.config/generator.html

1
etc/faafo/worker.conf Normal file
View File

@ -0,0 +1 @@
TODO(berendt): generate example configuration file with http://docs.openstack.org/developer/oslo.config/generator.html

View File

@ -18,9 +18,9 @@ from oslo_log import log
from faafo import version
LOG = log.getLogger(__name__)
LOG = log.getLogger('faafo.api')
cli_opts = [
api_opts = [
cfg.StrOpt('listen-address',
default='0.0.0.0',
help='Listen address.'),
@ -32,7 +32,7 @@ cli_opts = [
help='Database connection URL.')
]
cfg.CONF.register_cli_opts(cli_opts)
cfg.CONF.register_opts(api_opts)
log.register_options(cfg.CONF)
log.set_defaults()
@ -43,7 +43,7 @@ cfg.CONF(project='api', prog='faafo-api',
log.setup(cfg.CONF, 'api',
version=version.version_info.version_string())
app = flask.Flask(__name__)
app = flask.Flask('faafo.api')
app.config['DEBUG'] = cfg.CONF.debug
app.config['SQLALCHEMY_DATABASE_URI'] = cfg.CONF.database_url
db = flask.ext.sqlalchemy.SQLAlchemy(app)

View File

@ -16,27 +16,19 @@ import json
import random
import uuid
import kombu
from kombu.pools import producers
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as messaging
import requests
from faafo.openstack.common import periodic_task
from faafo.openstack.common import service
from faafo import queues
from faafo import version
LOG = log.getLogger('faafo.producer')
cli_opts = [
cfg.StrOpt('amqp-url',
default='amqp://faafo:secretsecret@localhost:5672/',
help='AMQP connection URL'),
cfg.StrOpt('api-url',
default='http://localhost:5000',
help='API connection URL')
]
producer_opts = [
@ -74,22 +66,27 @@ producer_opts = [
help="The minimum number of generated tasks."),
cfg.IntOpt("max-tasks", default=10,
help="The maximum number of generated tasks."),
cfg.IntOpt("interval", default=10, help="Interval in seconds.")
cfg.IntOpt("interval", default=10, help="Interval in seconds."),
cfg.StrOpt('endpoint-url',
default='http://localhost:5000',
help='API connection URL')
]
cfg.CONF.register_cli_opts(cli_opts)
cfg.CONF.register_cli_opts(producer_opts)
cfg.CONF.register_opts(producer_opts)
class ProducerService(service.Service, periodic_task.PeriodicTasks):
def __init__(self):
super(ProducerService, self).__init__()
self.messaging = kombu.Connection(cfg.CONF.amqp_url)
self._periodic_last_run = {}
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='tasks')
self._client = messaging.RPCClient(transport, target)
@periodic_task.periodic_task(spacing=cfg.CONF.interval,
run_immediately=True)
def generate_task(self, context):
run_immediately=False)
def generate_task(self, ctxt):
ctxt = {}
random.seed()
number = random.randint(cfg.CONF.min_tasks, cfg.CONF.max_tasks)
LOG.info("generating %d task(s)" % number)
@ -98,16 +95,14 @@ class ProducerService(service.Service, periodic_task.PeriodicTasks):
# NOTE(berendt): only necessary when using requests < 2.4.2
headers = {'Content-type': 'application/json',
'Accept': 'text/plain'}
requests.post("%s/api/fractal" % cfg.CONF.api_url,
requests.post("%s/api/fractal" % cfg.CONF.endpoint_url,
json.dumps(task), headers=headers)
LOG.info("generated task: %s" % task)
with producers[self.messaging].acquire(block=True) as producer:
producer.publish(
task,
serializer='pickle',
exchange=queues.task_exchange,
declare=[queues.task_exchange],
routing_key='tasks')
result = self._client.call(ctxt, 'process', task=task)
LOG.info("task %s processed: %s" % (task['uuid'], result))
requests.put("%s/api/fractal/%s" %
(cfg.CONF.endpoint_url, str(task['uuid'])),
json.dumps(result), headers=headers)
self.add_periodic_task(generate_task)
self.tg.add_dynamic_timer(self.periodic_tasks)

View File

@ -1,20 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kombu import Exchange
from kombu import Queue
task_exchange = Exchange('tasks', type='direct')
task_queues = [Queue('tasks', task_exchange, routing_key='tasks')]
result_exchange = Exchange('results', type='direct')
result_queues = [Queue('results', result_exchange, routing_key='results')]

View File

@ -1,98 +0,0 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# based on http://code.activestate.com/recipes/577120-julia-fractals/
import json
import sys
import daemon
import kombu
from kombu.mixins import ConsumerMixin
from oslo_config import cfg
from oslo_log import log
import requests
from faafo import queues
from faafo import version
LOG = log.getLogger(__name__)
cli_opts = [
cfg.BoolOpt('daemonize',
default=False,
help='Run in background.'),
cfg.StrOpt('amqp-url',
default='amqp://faafo:secretsecret@localhost:5672/',
help='AMQP connection URL'),
cfg.StrOpt('api-url',
default='http://localhost:5000',
help='API connection URL')
]
cfg.CONF.register_cli_opts(cli_opts)
class Tracker(ConsumerMixin):
def __init__(self, amqp_url, api_url):
self.connection = kombu.Connection(amqp_url)
self.api_url = api_url
def get_consumers(self, Consumer, channel):
return [Consumer(queues=queues.result_queues,
accept=['pickle', 'json'],
callbacks=[self.process_result])]
def process_result(self, body, message):
LOG.info("processing result %s" % body['uuid'])
LOG.info("elapsed time %f seconds" % body['duration'])
LOG.info("checksum %s" % body['checksum'])
result = {
'duration': float(body['duration']),
'checksum': str(body['checksum'])
}
# NOTE(berendt): only necessary when using requests < 2.4.2
headers = {'Content-type': 'application/json',
'Accept': 'text/plain'}
requests.put("%s/api/fractal/%s" %
(self.api_url, str(body['uuid'])),
json.dumps(result), headers=headers)
message.ack()
def main():
log.register_options(cfg.CONF)
log.set_defaults()
cfg.CONF(project='tracker', prog='faafo-tracker',
version=version.version_info.version_string())
log.setup(cfg.CONF, 'tracker',
version=version.version_info.version_string())
tracker = Tracker(cfg.CONF.amqp_url, cfg.CONF.api_url)
if cfg.CONF.daemonize:
with daemon.DaemonContext():
tracker.run()
else:
try:
tracker.run()
except Exception as e:
sys.exit("ERROR: %s" % e)
if __name__ == '__main__':
main()

View File

@ -14,39 +14,33 @@
# based on http://code.activestate.com/recipes/577120-julia-fractals/
import eventlet
eventlet.monkey_patch()
import hashlib
import os
from PIL import Image
import random
import sys
import socket
import time
import daemon
import kombu
from kombu.mixins import ConsumerMixin
from kombu.pools import producers
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as messaging
from faafo import queues
from faafo import version
LOG = log.getLogger(__name__)
LOG = log.getLogger('faafo.worker')
cli_opts = [
cfg.BoolOpt('daemonize',
default=False,
help='Run in background.'),
cfg.StrOpt('target',
worker_opts = [
cfg.StrOpt('filesystem_store_datadir',
default='/tmp',
help='Target directory for fractal image files.'),
cfg.StrOpt('amqp-url',
default='amqp://faafo:secretsecret@localhost:5672/',
help='AMQP connection URL')
help='Directory that the filesystem backend store writes '
'fractal image files to.'),
]
cfg.CONF.register_cli_opts(cli_opts)
cfg.CONF.register_opts(worker_opts)
class JuliaSet(object):
@ -97,49 +91,34 @@ class JuliaSet(object):
return (c, z)
class Worker(ConsumerMixin):
class WorkerEndpoint(object):
def __init__(self, url, target):
self.connection = kombu.Connection(url)
self.target = target
def get_consumers(self, Consumer, channel):
return [Consumer(queues=queues.task_queues,
accept=['pickle', 'json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
LOG.info("processing task %s" % body['uuid'])
LOG.debug(body)
def process(self, ctxt, task):
LOG.info("processing task %s" % task['uuid'])
LOG.debug(task)
start_time = time.time()
juliaset = JuliaSet(body['width'],
body['height'],
body['xa'],
body['xb'],
body['ya'],
body['yb'],
body['iterations'])
filename = os.path.join(self.target, "%s.png" % body['uuid'])
juliaset = JuliaSet(task['width'],
task['height'],
task['xa'],
task['xb'],
task['ya'],
task['yb'],
task['iterations'])
filename = os.path.join(cfg.CONF.filesystem_store_datadir,
"%s.png" % task['uuid'])
elapsed_time = time.time() - start_time
LOG.info("task %s processed in %f seconds" %
(body['uuid'], elapsed_time))
(task['uuid'], elapsed_time))
juliaset.save(filename)
LOG.info("saved result of task %s to file %s" %
(body['uuid'], filename))
(task['uuid'], filename))
checksum = hashlib.sha256(open(filename, 'rb').read()).hexdigest()
result = {
'uuid': body['uuid'],
'uuid': task['uuid'],
'duration': elapsed_time,
'checksum': checksum
}
LOG.info("pushed result: %s" % result)
with producers[self.connection].acquire(block=True) as producer:
producer.publish(result, serializer='pickle',
exchange=queues.result_exchange,
declare=[queues.result_exchange],
routing_key='results')
message.ack()
return result
def main():
@ -152,16 +131,18 @@ def main():
log.setup(cfg.CONF, 'worker',
version=version.version_info.version_string())
worker = Worker(cfg.CONF.amqp_url, cfg.CONF.target)
if cfg.CONF.daemonize:
with daemon.DaemonContext():
worker.run()
else:
try:
worker.run()
except Exception as e:
sys.exit("ERROR: %s" % e)
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='tasks', server=socket.gethostname())
endpoints = [
WorkerEndpoint()
]
server = messaging.get_rpc_server(transport, target, endpoints,
executor='eventlet')
server.start()
try:
server.wait()
except KeyboardInterrupt:
LOG.info("Caught keyboard interrupt. Exiting.")
if __name__ == '__main__':

View File

@ -1,14 +1,12 @@
pbr>=0.6,!=0.7,<1.0
anyjson>=0.3.3
amqp
eventlet>=0.16.1,!=0.17.0
kombu>=2.5.0
PyMySQL>=0.6.2 # MIT License
Pillow==2.4.0 # MIT
python-daemon
requests>=2.2.0,!=2.4.0
Flask>=0.10,<1.0
flask-sqlalchemy
flask-restless
oslo.config>=1.9.3,<1.10.0 # Apache-2.0
oslo.log>=1.0.0,<1.1.0 # Apache-2.0
oslo.messaging>=1.8.0,<1.9.0 # Apache-2.0

View File

@ -27,7 +27,6 @@ setup-hooks =
[entry_points]
console_scripts =
faafo-producer = faafo.producer:main
faafo-tracker = faafo.tracker:main
faafo-worker = faafo.worker:main
faafo-api = faafo.api:main