feat(benchmarking) : Producer-Consumer scenario

This patch adds the ability to benchmark Marconi. The benchmark tool
is a console script, and can be triggered using

$ marconi-bench-pc

The Benchmark tool fires up both a Producer Process and a Consumer
Process, while accepting CLI parameters for the number of processes,
number of workers and duration of test.

The Producer Process publishes messages to a given queue, while the
Consumer consumes the messages by claiming and deleting them.

Setup:

Benchmark dependencies need to be pip installed:

 pip install -r bench-requirements.txt

Export an environment variable called MESSAGES_PATH and set it to the
path of messages.json in marconi/bench

Note: This allows benchmarking with different set of messages rather
than those specified in messages.json

Usage:

$ marconi-bench-pc -p {No. Processes} -w {No. Workers} -t {No. Seconds}

Example:

$ marconi-bench-pc -p 2 -w 2 -t 4

Partially Implements: blueprint basic-benchmarking
Change-Id: I57ebe853554199490adba8b2a091423f399b0565
This commit is contained in:
Sriram Madapusi Vasudevan 2014-06-09 16:11:49 -04:00
parent 57689bba73
commit 4c950f99e4
9 changed files with 487 additions and 0 deletions

5
bench-requirements.txt Normal file
View File

@ -0,0 +1,5 @@
argparse>=1.2.1
gevent>=1.0.1
marktime>=0.2.0
psutil>=2.1.1
python-marconiclient>=0.0.2

36
marconi/bench/README.rst Normal file
View File

@ -0,0 +1,36 @@
Marconi Benchmarking
====================
Structure
---------
The Benchmark tool fires up both a Producer Process and a Consumer Process, while
accepting CLI parameters for the number of processes, number of workers and duration of test.
The Producer Process publishes messages to a given queue, while the Consumer consumes the messages
claiming and deleting them.
Need of the Benchmark
---------------------
Marconi is a performance oriented API. Any changes made need to performance tested, and this tool
helps by a being quick way to test that.
Setup
-----
Benchmark dependencies need to be pip installed::
pip install -r bench-requirements.txt
Make sure you have a running instance of Marconi after following `README`_ for
setting up Marconi running at port 8888::
Export an environment variable called MESSAGES_PATH and set it to the path of messages.json
in marconi/bench
Note: This allows benchmarking with different set of messages rather than those specified in
messages.json
$ marconi-bench-pc -p {Number of Processes} -w {Number of Workers} -t {Duration in Seconds}
.. _`README` : https://github.com/openstack/marconi/blob/master/README.rst

View File

View File

@ -0,0 +1,33 @@
# Copyright (c) 2014 Rackspace, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
import psutil
conf = cfg.CONF
_CLI_OPTIONS = (
cfg.IntOpt(
'processes',
short='p',
default=psutil.NUM_CPUS,
help='Number of Processes'),
cfg.IntOpt(
'workers',
short='w',
default=psutil.NUM_CPUS * 2,
help='Number of Workers'),
cfg.IntOpt('time', short='t', default=3, help="time in seconds"),
)
conf.register_cli_opts(_CLI_OPTIONS)
conf(project='marconi', prog='marconi-queues')

View File

@ -0,0 +1,29 @@
# Copyright (c) 2014 Rackspace, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import multiprocessing as mp
from marconi.bench import consumer
from marconi.bench import producer
def main():
procs = [mp.Process(target=worker.run)
for worker in [producer, consumer]]
for each_proc in procs:
each_proc.start()
for each_proc in procs:
each_proc.join()

142
marconi/bench/consumer.py Normal file
View File

@ -0,0 +1,142 @@
# Copyright (c) 2014 Rackspace, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import multiprocessing as mp
import time
from gevent import monkey as curious_george
curious_george.patch_all(thread=False, select=False)
import gevent
from marconiclient.queues.v1 import client
from marconiclient.transport.errors import TransportError
import marktime
from marconi.bench.cli_config import conf
URL = 'http://localhost:8888'
QUEUE_PREFIX = 'ogre-test-queue-'
def claim_delete(stats, test_duration, ttl, grace, limit):
"""Consumer Worker
The Consumer Worker continuously claims and deletes messages
for the specified duration. The time taken for each claim and
delete is recorded for calculating throughput and latency.
"""
cli = client.Client(URL)
queue = cli.queue(QUEUE_PREFIX + '1')
end = time.time() + test_duration
total_elapsed = 0
total_requests = 0
claim_total_requests = 0
delete_total_requests = 0
while time.time() < end:
marktime.start('claim_message')
try:
claim = queue.claim(ttl=ttl, grace=grace, limit=limit)
except TransportError as ex:
print ("Could not claim messages : {0}".format(ex))
else:
total_elapsed += marktime.stop('claim_message').seconds
total_requests += 1
claim_total_requests += 1
try:
marktime.start('delete_message')
for msg in claim:
# TODO(TheSriram): Simulate actual work before deletion
msg.delete()
total_elapsed += marktime.stop('delete_message').seconds
delete_total_requests += 1
total_requests += 1
stats.put({'total_requests': total_requests,
'claim_total_requests': claim_total_requests,
'delete_total_requests': delete_total_requests,
'total_elapsed': total_elapsed})
except TransportError as ex:
print ("Could not claim and delete : {0}".format(ex))
def load_generator(stats, num_workers, test_duration, url, ttl, grace, limit):
gevent.joinall([
gevent.spawn(claim_delete, stats, test_duration, ttl,
grace, limit)
for _ in range(num_workers)
])
def crunch(stats):
total_requests = 0
total_latency = 0.0
claim_total_requests = 0
delete_total_requests = 0
while not stats.empty():
entry = stats.get_nowait()
total_requests += entry['total_requests']
total_latency += entry['total_elapsed']
claim_total_requests += entry['claim_total_requests']
delete_total_requests += entry['delete_total_requests']
return (total_requests, total_latency, claim_total_requests,
delete_total_requests)
def run():
num_procs = conf.processes
num_workers = conf.workers
test_duration = conf.time
stats = mp.Queue()
# TODO(TheSriram) : Make ttl,grace and limit configurable
args = (stats, num_workers, test_duration, URL, 300, 200, 1)
procs = [mp.Process(target=load_generator, args=args)
for _ in range(num_procs)]
print ("\nStarting Consumer...")
start = time.time()
for each_proc in procs:
each_proc.start()
for each_proc in procs:
each_proc.join()
(total_requests, total_latency, claim_total_requests,
delete_total_requests) = crunch(stats)
duration = time.time() - start
throughput = total_requests / duration
latency = 1000 * total_latency / total_requests
print('Duration: {0:.1f} sec'.format(duration))
print('Total Requests: {0}'.format(total_requests))
print('Throughput: {0:.0f} req/sec'.format(throughput))
print('Latency: {0:.1f} ms/req'.format(latency))
print('') # Blank line
def main():
run()

View File

@ -0,0 +1,72 @@
[
{
"weight": 0.8,
"doc": {
"ttl": 60,
"body": {
"id": "7FA23C90-62F7-40D2-9360-FBD5D7D61CD1",
"evt": "Wakeup"
}
}
},
{
"weight": 0.1,
"doc": {
"ttl": 3600,
"body": {
"ResultSet": {
"totalResultsAvailable": 1827221,
"totalResultsReturned": 2,
"firstResultPosition": 1,
"Result": [
{
"Title": "potato jpg",
"Summary": "Kentang Si bungsu dari keluarga Solanum tuberosum L ini ternyata memiliki khasiat untuk mengurangi kerutan jerawat bintik hitam dan kemerahan pada kulit Gunakan seminggu sekali sebagai",
"Url": "http://www.mediaindonesia.com/spaw/uploads/images/potato.jpg",
"ClickUrl": "http://www.mediaindonesia.com/spaw/uploads/images/potato.jpg",
"RefererUrl": "http://www.mediaindonesia.com/mediaperempuan/index.php?ar_id=Nzkw",
"FileSize": 22630,
"FileFormat": "jpeg",
"Height": 362,
"Width": 532,
"Thumbnail": {
"Url": "http://thm-a01.yimg.com/nimage/557094559c18f16a",
"Height": 98,
"Width": 145
}
},
{
"Title": "potato jpg",
"Summary": "Introduction of puneri aloo This is a traditional potato preparation flavoured with curry leaves and peanuts and can be eaten on fasting day Preparation time 10 min",
"Url": "http://www.infovisual.info/01/photo/potato.jpg",
"ClickUrl": "http://www.infovisual.info/01/photo/potato.jpg",
"RefererUrl": "http://sundayfood.com/puneri-aloo-indian-%20recipe",
"FileSize": 119398,
"FileFormat": "jpeg",
"Height": 685,
"Width": 1024,
"Thumbnail": {
"Url": "http://thm-a01.yimg.com/nimage/7fa23212efe84b64",
"Height": 107,
"Width": 160
}
}
]
}
}
}
},
{
"weight": 0.1,
"doc": {
"ttl": 360,
"body": {
"id": "7FA23C90-62F7-40D2-9360-FBD5D7D61CD1",
"evt": "StartBackup",
"files": [
"/foo/bar/stuff/thing.dat"
]
}
}
}
]

169
marconi/bench/producer.py Normal file
View File

@ -0,0 +1,169 @@
# Copyright (c) 2014 Rackspace, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import json
import multiprocessing as mp
import os
import random
import sys
import time
from gevent import monkey as curious_george
curious_george.patch_all(thread=False, select=False)
import gevent
from marconiclient.queues.v1 import client
from marconiclient.transport.errors import TransportError
import marktime
from marconi.bench.cli_config import conf
# TODO(TheSriram): Make configurable
URL = 'http://localhost:8888'
QUEUE_PREFIX = 'ogre-test-queue-'
# TODO(TheSriram) : Migrate from env variable to config
if os.environ.get('MESSAGES_PATH'):
with open(os.environ.get('MESSAGES_PATH')) as f:
message_pool = json.loads(f.read())
else:
print("Error : $MESSAGES_PATH needs to be set")
sys.exit(1)
message_pool.sort(key=lambda msg: msg['weight'])
def choose_message():
"""Choose a message from our pool of possibilities."""
# Assume message_pool is sorted by weight, ascending
position = random.random()
accumulator = 0.00
for each_message in message_pool:
accumulator += each_message['weight']
if position < accumulator:
return each_message['doc']
assert False
def producer(stats, test_duration):
"""Producer Worker
The Producer Worker continuously post messages for
the specified duration. The time taken for each post
is recorded for calculating throughput and latency.
"""
cli = client.Client(URL)
queue = cli.queue(QUEUE_PREFIX + '1')
total_requests = 0
total_elapsed = 0
end = time.time() + test_duration
while time.time() < end:
marktime.start('post message')
# TODO(TheSriram): Track/report errors
try:
queue.post(choose_message())
except TransportError as ex:
print("Could not post a message : {0}".format(ex))
else:
total_elapsed += marktime.stop('post message').seconds
total_requests += 1
stats.put({
'total_requests': total_requests,
'total_elapsed': total_elapsed
})
# TODO(TheSriram): make distributed across multiple machines
# TODO(TheSriram): post across several queues (which workers to which queues?
# weight them, so can have some busy queues, some not.)
def load_generator(stats, num_workers, test_duration):
# TODO(TheSriram): Have some way to get all of the workers to line up and
# start at the same time (is this really useful?)
gevent.joinall([
gevent.spawn(producer, stats, test_duration)
for _ in range(num_workers)
])
def crunch(stats):
total_requests = 0
total_latency = 0.0
while not stats.empty():
entry = stats.get_nowait()
total_requests += entry['total_requests']
total_latency += entry['total_elapsed']
return total_requests, total_latency
def run():
num_procs = conf.processes
num_workers = conf.workers
test_duration = conf.time
stats = mp.Queue()
args = (stats, num_workers, test_duration)
# TODO(TheSriram): Multiple test runs, vary num workers and drain/delete
# queues in between each run. Plot these on a graph, with
# concurrency as the X axis.
procs = [
mp.Process(target=load_generator, args=args)
for _ in range(num_procs)
]
print('\nStarting Producer...')
start = time.time()
for each_proc in procs:
each_proc.start()
for each_proc in procs:
each_proc.join()
total_requests, total_latency = crunch(stats)
# TODO(TheSriram): Add one more stat: "attempted req/sec" so can
# graph that on the x axis vs. achieved throughput and
# latency.
duration = time.time() - start
throughput = total_requests / duration
latency = 1000 * total_latency / total_requests
print('Duration: {0:.1f} sec'.format(duration))
print('Total Requests: {0}'.format(total_requests))
print('Throughput: {0:.0f} req/sec'.format(throughput))
print('Latency: {0:.1f} ms/req'.format(latency))
print('') # Blank line
def main():
run()

View File

@ -32,6 +32,7 @@ source-dir = doc/source
[entry_points]
console_scripts =
marconi-bench-pc = marconi.bench.conductor:main
marconi-server = marconi.cmd.server:run
marconi.queues.data.storage =