Merge "benchmark: add support for measures add"
This commit is contained in:
commit
3d103abfbc
|
@ -12,7 +12,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import argparse
|
||||
import datetime
|
||||
import functools
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
|
||||
from cliff import show
|
||||
|
@ -25,6 +28,13 @@ from gnocchiclient.v1 import metric_cli
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def grouper(iterable, n, fillvalue=None):
|
||||
"Collect data into fixed-length chunks or blocks"
|
||||
# grouper('ABCDEFG', 3, 'x') --> ABC DEF
|
||||
args = [iter(iterable)] * n
|
||||
return six.moves.zip(*args)
|
||||
|
||||
|
||||
def _positive_non_zero_int(argument_value):
|
||||
if argument_value is None:
|
||||
return None
|
||||
|
@ -80,12 +90,12 @@ class BenchmarkPool(futurist.ThreadPoolExecutor):
|
|||
results.append(f.result())
|
||||
except Exception as e:
|
||||
LOG.error("Error with %s metric: %s" % (verb, e))
|
||||
return results, {
|
||||
return results, runtime, {
|
||||
'client workers': self._max_workers,
|
||||
verb + ' runtime': "%.2f seconds" % runtime,
|
||||
verb + ' executed': self.statistics.executed,
|
||||
verb + ' speed': (
|
||||
"%.2f metric/s" % (self.statistics.executed / runtime)
|
||||
"%.2f %s/s" % (self.statistics.executed / runtime, verb)
|
||||
),
|
||||
verb + ' failures': self.statistics.failures,
|
||||
verb + ' failures rate': (
|
||||
|
@ -126,7 +136,7 @@ class CliBenchmarkMetricShow(CliBenchmarkBase,
|
|||
futures = pool.map_job(self.app.client.metric.get,
|
||||
parsed_args.metric * parsed_args.count,
|
||||
resource_id=parsed_args.resource_id)
|
||||
result, stats = pool.wait_job("show", futures)
|
||||
result, runtime, stats = pool.wait_job("show", futures)
|
||||
return self.dict2columns(stats)
|
||||
|
||||
|
||||
|
@ -150,14 +160,86 @@ class CliBenchmarkMetricCreate(CliBenchmarkBase,
|
|||
futures = pool.submit_job(parsed_args.count,
|
||||
self.app.client.metric.create,
|
||||
metric, refetch_metric=False)
|
||||
created_metrics, stats = pool.wait_job("create", futures)
|
||||
created_metrics, runtime, stats = pool.wait_job("create", futures)
|
||||
|
||||
if not parsed_args.keep:
|
||||
LOG.info("Deleting metrics")
|
||||
pool = BenchmarkPool(parsed_args.workers)
|
||||
futures = pool.map_job(self.app.client.metric.delete,
|
||||
[m['id'] for m in created_metrics])
|
||||
_, dstats = pool.wait_job("delete", futures)
|
||||
_, runtime, dstats = pool.wait_job("delete", futures)
|
||||
stats.update(dstats)
|
||||
|
||||
return self.dict2columns(stats)
|
||||
|
||||
|
||||
class CliBenchmarkMeasuresAdd(CliBenchmarkBase,
|
||||
metric_cli.CliMeasuresAddBase):
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(CliBenchmarkMeasuresAdd, self).get_parser(prog_name)
|
||||
parser.add_argument("--count", "-n",
|
||||
required=True,
|
||||
type=_positive_non_zero_int,
|
||||
help="Number of total measures to send")
|
||||
parser.add_argument("--batch", "-b",
|
||||
default=1,
|
||||
type=_positive_non_zero_int,
|
||||
help="Number of measures to send in each batch")
|
||||
parser.add_argument("--timestamp-start", "-s",
|
||||
default=(
|
||||
timeutils.utcnow(True)
|
||||
- datetime.timedelta(days=365)),
|
||||
type=timeutils.parse_isotime,
|
||||
help="First timestamp to use")
|
||||
parser.add_argument("--timestamp-end", "-e",
|
||||
default=timeutils.utcnow(True),
|
||||
type=timeutils.parse_isotime,
|
||||
help="Last timestamp to use")
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
pool = BenchmarkPool(parsed_args.workers)
|
||||
LOG.info("Sending measures")
|
||||
|
||||
if parsed_args.timestamp_end <= parsed_args.timestamp_start:
|
||||
raise ValueError("End timestamp must be after start timestamp")
|
||||
|
||||
# If batch size is bigger than the number of measures to send, we
|
||||
# reduce it to make sure we send something.
|
||||
if parsed_args.batch > parsed_args.count:
|
||||
parsed_args.batch = parsed_args.count
|
||||
|
||||
start = int(parsed_args.timestamp_start.strftime("%s"))
|
||||
end = int(parsed_args.timestamp_end.strftime("%s"))
|
||||
count = parsed_args.count
|
||||
|
||||
if (end - start) < count:
|
||||
raise ValueError(
|
||||
"The specified time range is not large enough "
|
||||
"for the number of points")
|
||||
|
||||
random_values = (random.randint(- 2 ** 32, 2 ** 32)
|
||||
for _ in six.moves.range(count))
|
||||
all_measures = ({"timestamp": ts, "value": v}
|
||||
for ts, v
|
||||
in six.moves.zip(
|
||||
six.moves.range(start,
|
||||
end,
|
||||
(end - start) // count),
|
||||
random_values))
|
||||
|
||||
measures = grouper(all_measures, parsed_args.batch)
|
||||
|
||||
futures = pool.map_job(functools.partial(
|
||||
self.app.client.metric.add_measures,
|
||||
parsed_args.metric), measures, resource_id=parsed_args.resource_id)
|
||||
_, runtime, stats = pool.wait_job("push", futures)
|
||||
|
||||
stats['measures per request'] = parsed_args.batch
|
||||
stats['measures push speed'] = (
|
||||
"%.2f push/s" % (
|
||||
parsed_args.batch * pool.statistics.executed / runtime
|
||||
)
|
||||
)
|
||||
|
||||
return self.dict2columns(stats)
|
||||
|
|
|
@ -64,6 +64,7 @@ class GnocchiCommandManager(commandmanager.CommandManager):
|
|||
"capabilities list": capabilities_cli.CliCapabilitiesList,
|
||||
"benchmark metric create": benchmark.CliBenchmarkMetricCreate,
|
||||
"benchmark metric show": benchmark.CliBenchmarkMetricShow,
|
||||
"benchmark measures add": benchmark.CliBenchmarkMeasuresAdd,
|
||||
}
|
||||
|
||||
def load_commands(self, namespace):
|
||||
|
|
|
@ -57,3 +57,27 @@ class BenchmarkMetricTest(base.ClientTestBase):
|
|||
result = self.details_multiple(result)[0]
|
||||
self.assertEqual(10, int(result['show executed']))
|
||||
self.assertLessEqual(int(result['show failures']), 10)
|
||||
|
||||
def test_benchmark_measures_add(self):
|
||||
apname = str(uuid.uuid4())
|
||||
# PREPARE AN ACHIVE POLICY
|
||||
self.gnocchi("archive-policy", params="create %s "
|
||||
"--back-window 0 -d granularity:1s,points:86400" % apname)
|
||||
|
||||
result = self.gnocchi(
|
||||
u'metric', params=u"create -a %s" % apname)
|
||||
metric = self.details_multiple(result)[0]
|
||||
|
||||
result = self.gnocchi(
|
||||
u'benchmark', params=u"measures add -n 10 -b 4 %s" % metric['id'])
|
||||
result = self.details_multiple(result)[0]
|
||||
self.assertEqual(2, int(result['push executed']))
|
||||
self.assertLessEqual(int(result['push failures']), 2)
|
||||
|
||||
result = self.gnocchi(
|
||||
u'benchmark',
|
||||
params=u"measures add -s 2010-01-01 -n 10 -b 4 %s"
|
||||
% metric['id'])
|
||||
result = self.details_multiple(result)[0]
|
||||
self.assertEqual(2, int(result['push executed']))
|
||||
self.assertLessEqual(int(result['push failures']), 2)
|
||||
|
|
|
@ -125,15 +125,20 @@ class CliMeasuresGet(CliMetricWithResourceID, lister.Lister):
|
|||
return self.COLS, measures
|
||||
|
||||
|
||||
class CliMeasuresAdd(CliMetricWithResourceID):
|
||||
class CliMeasuresAddBase(CliMetricWithResourceID):
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(CliMeasuresAddBase, self).get_parser(prog_name)
|
||||
parser.add_argument("metric", help="ID or name of the metric")
|
||||
return parser
|
||||
|
||||
|
||||
class CliMeasuresAdd(CliMeasuresAddBase):
|
||||
def measure(self, measure):
|
||||
timestamp, __, value = measure.rpartition("@")
|
||||
return {'timestamp': timestamp, 'value': float(value)}
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(CliMeasuresAdd, self).get_parser(prog_name)
|
||||
parser.add_argument("metric",
|
||||
help="ID or name of the metric")
|
||||
parser.add_argument("-m", "--measure", action='append',
|
||||
required=True, type=self.measure,
|
||||
help=("timestamp and value of a measure "
|
||||
|
|
Loading…
Reference in New Issue