monasca-transform initial commit

The monasca-transform is a new component in Monasca that
aggregates and transforms metrics.

monasca-transform is a Spark based data driven aggregation
engine which collects, groups and aggregates existing individual
Monasca metrics according to business requirements and publishes
new transformed (derived) metrics to the Monasca Kafka queue.

Since the new transformed metrics are published as any other
metric in Monasca, alarms can be set and triggered on the
transformed metric, just like any other metric.

Co-Authored-By: Flint Calvin <flint.calvin@hp.com>
Co-Authored-By: David Charles Kennedy <david.c.kennedy@hpe.com>
Co-Authored-By: Ashwin Agate <ashwin.agate@hp.com>

Implements: blueprint monasca-transform

Change-Id: I0e67ac7a4c9a5627ddaf698855df086d55a52d26
This commit is contained in:
Ashwin Agate 2016-05-11 20:18:49 +00:00 committed by Flint Calvin
parent 52504f802d
commit 8f61dd95a9
129 changed files with 12170 additions and 0 deletions

175
LICENSE Normal file
View File

@ -0,0 +1,175 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.

5
README.md Normal file
View File

@ -0,0 +1,5 @@
# Monasca Transform
##To set up the development environment
The monasca-transform dev environment uses devstack so see the README in the devstack directory.

103
devstack/README.md Normal file
View File

@ -0,0 +1,103 @@
# Monasca-transform DevStack Plugin
The Monasca-transform DevStack plugin is tested only on Ubuntu 14.04 (Trusty).
A short cut to running monasca-transform in devstack is implemented with vagrant.
## To run monasca-transform using the provided vagrant environment
### Using any changes made locally to monasca-transform
cd tools/vagrant
vagrant up
vagrant ssh
cd devstack
./stack.sh
The devstack vagrant environment is set up to share the monasca-transform
directory with the vm, copy it and commit any changes in the vm copy. This is
because the devstack deploy process checks out the master branch to
/opt/stack
and deploys using that. Changes made by the user need to be committed in order
to be used in the devstack instance. It is important therefore that changes
should not be pushed from the vm as the unevaluated commit would be pushed.
### Using the upstream committed state of monasca-transform
This should operate the same as for any other devstack plugin. However, to use
the plugin from the upstream repo with the vagrant environment as described
above it is sufficient to do:
cd tools/vagrant
vagrant up
vagrant ssh
cd devstack
vi local.conf
and change the line
enable_plugin monasca-transform /home/vagrant/monasca-transform
to
enable_plugin monasca-transform https://gitlab.gozer.hpcloud.net/host/capacityplanning.git
before running
./stack.sh
### Connecting to devstack
The host key changes with each ```vagrant destroy```/```vagrant up``` cycle so
it is necessary to manage host key verification for your workstation:
ssh-keygen -R 192.168.12.5
The devstack vm vagrant up process generates a private key which can be used for
passwordless ssh to the host as follows:
cd tools/vagrant
ssh -i .vagrant/machines/default/virtualbox/private_key vagrant@192.168.12.5
### Running tox on devstack
Once the deploy is up use the following commands to set up tox.
sudo su monasca-transform
cd /opt/stack/monasca-transform
virtualenv .venv
. .venv/bin/activate
pip install tox
tox
### Updating the code for dev
To regenerate the environment for development purposes a script is provided
on the devstack instance at
tools/vagrant/refresh_monasca_transform.sh
(note: to use/run tox after running this script, the
"Running tox on devstack" steps above have to be re-executed)
This mostly re-does the work of the devstack plugin, updating the code from the
shared directory, regenerating the venv and the zip that is passed to spark
during the spark-submit call. The configuration and the contents of the
database are updated with fresh copies also though the start scripts, driver and
service python code are left as they are (because I'm not envisaging much change
in those).
## WIP
This is a work in progress. There are a number of improvements necessary to
improve value as a development tool.
###TODO
1. Shorten initial deploy
Currently the services deployed are the default set plus all of monasca. It's
quite possible that not all of this is necessary to develop monasca-transform.
So some services may be dropped in order to shorten the deploy.

View File

@ -0,0 +1,21 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
activate_this_file = "/opt/monasca/transform/venv/bin/activate_this.py"
execfile(activate_this_file, dict(__file__=activate_this_file))
from monasca_transform.driver.mon_metrics_kafka import invoke
invoke()

View File

@ -0,0 +1,94 @@
#!/bin/bash
### BEGIN INIT INFO
# Provides: {{ service_name }}
# Required-Start:
# Required-Stop:
# Default-Start: {{ service_start_levels }}
# Default-Stop:
# Short-Description: {{ service_name }}
# Description:
### END INIT INFO
service_is_running()
{
if [ -e {{ service_pid_file }} ]; then
PID=$(cat {{ service_pid_file }})
if $(ps $PID > /dev/null 2>&1); then
return 0
else
echo "Found obsolete PID file for {{ service_name }}...deleting it"
rm {{ service_pid_file }}
return 1
fi
else
return 1
fi
}
case $1 in
start)
echo "Starting {{ service_name }}..."
if service_is_running; then
echo "{{ service_name }} is already running"
exit 0
fi
echo "
_/_/ _/_/ _/_/_/_/ _/_/ _/ _/_/_/_/ _/_/_/_/ _/_/_/_/ _/_/_/_/
_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/
_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/
_/ _/ _/ _/ _/ _/ _/ _/ _/ _/_/_/_/ _/_/_/_/ _/ _/_/_/_/ _/_/_/_/
_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/
_/ _/_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/
_/ _/ _/ _/_/_/_/ _/ _/_/ _/ _/ _/_/_/_/ _/_/_/_/ _/ _/
_/_/_/_/ _/_/_/ _/_/_/_/ _/_/ _/ _/_/_/_/ _/_/_/_/ _/_/_/_/ _/_/_/ _/_/ _/_/
_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/
_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/
_/ _/ _/ _/_/_/_/ _/ _/ _/ _/_/_/_/ _/_/_/ _/ _/ _/ _/ _/ _/ _/ _/
_/ _/_/_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/_/_/ _/ _/ _/ _/
_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/_/ _/
_/ _/ _/ _/ _/ _/ _/_/ _/_/_/_/ _/ _/_/_/_/ _/ _/ _/ _/ _/
" >> {{ service_log_dir }}/{{ service_name }}.log
nohup sudo -u {{ service_user }} {{ virtualenv_location }}/bin/python \
{{ service_dir }}/{{ service_file }} \
>> {{ service_log_dir }}/{{ service_name }}.log \
2>> {{ service_log_dir }}/{{ service_name }}.log &
PID=$(echo $!)
if [ -z $PID ]; then
echo "{{ service_name }} failed to start"
else
echo $PID > {{ service_pid_file }}
echo "{{ service_name }} is running"
fi
;;
stop)
echo "Stopping {{ service_name }}..."
if service_is_running; then
PID=$(cat {{ service_pid_file }})
sudo kill -- -$(ps -o pgid= $PID | grep -o '[0-9]*')
rm {{ service_pid_file }}
echo "{{ service_name }} is stopped"
else
echo "{{ service_name }} is not running"
exit 0
fi
;;
status)
if service_is_running; then
echo "{{ service_name }} is running"
else
echo "{{ service_name }} is not running"
fi
;;
restart)
$0 stop
$0 start
;;
esac

View File

@ -0,0 +1,60 @@
[DEFAULTS]
[repositories]
offsets = monasca_transform.mysql_offset_specs:MySQLOffsetSpecs
data_driven_specs = monasca_transform.data_driven_specs.mysql_data_driven_specs_repo:MySQLDataDrivenSpecsRepo
[database]
server_type = mysql
host = localhost
database_name = monasca_transform
username = m-transform
password = password
[messaging]
adapter = monasca_transform.messaging.adapter:KafkaMessageAdapter
topic = metrics
brokers = 10.0.2.15:9092
publish_kafka_tenant_id = d2cb21079930415a9f2a33588b9f2bb6
#
# Configurable values for the monasca-transform service
#
[service]
# The address of the mechanism being used for election coordination
coordinator_address = kazoo://localhost:2181
# The name of the coordination/election group
coordinator_group = monasca-transform
# How long the candidate should sleep between election result
# queries (in seconds)
election_polling_frequency = 15
# The path for the monasca-transform Spark driver
spark_driver = /opt/monasca/transform/lib/driver.py
# the location for the transform-service log
service_log_path=/var/log/monasca/transform/
# The location where Spark event logs should be written
spark_event_logging_dest = /var/log/spark-events
# Whether Spark event logging should be enabled (true/false)
spark_event_logging_enabled = true
# A list of jars which Spark should use
spark_jars_list = /opt/spark/current/lib/spark-streaming-kafka_2.10-1.6.0.jar,/opt/spark/current/lib/scala-library-2.10.1.jar,/opt/spark/current/lib/kafka_2.10-0.8.1.1.jar,/opt/spark/current/lib/metrics-core-2.2.0.jar,/usr/share/java/mysql.jar
# A list of where the Spark master(s) should run
spark_master_list = spark://localhost:7077
# Python files for Spark to use
spark_python_files = /opt/monasca/transform/lib/monasca-transform.zip
# How often the stream should be read (in seconds)
stream_interval = 120
# The working directory for monasca-transform
work_dir = /var/run/monasca/transform

View File

@ -0,0 +1,10 @@
[Unit]
Description=Monasca Transform Daemon
After=zookeeper.service spark-master.service spark-worker.service
[Service]
Type=simple
LimitNOFILE=32768
ExecStart=/etc/monasca/transform/init/start-monasca-transform.sh
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,28 @@
CREATE DATABASE IF NOT EXISTS `monasca_transform` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE `monasca_transform`;
SET foreign_key_checks = 0;
CREATE TABLE IF NOT EXISTS `kafka_offsets` (
`topic` varchar(128) NOT NULL,
`until_offset` BIGINT NULL,
`from_offset` BIGINT NULL,
`app_name` varchar(128) NOT NULL,
`partition` integer NOT NULL,
PRIMARY KEY (`app_name`, `topic`, `partition`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE IF NOT EXISTS `transform_specs` (
`metric_id` varchar(128) NOT NULL,
`transform_spec` varchar(2048) NOT NULL,
PRIMARY KEY (`metric_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE IF NOT EXISTS `pre_transform_specs` (
`event_type` varchar(128) NOT NULL,
`pre_transform_spec` varchar(2048) NOT NULL,
PRIMARY KEY (`event_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
GRANT ALL ON monasca_transform.* TO 'm-transform'@'%' IDENTIFIED BY 'password';
GRANT ALL ON monasca_transform.* TO 'm-transform'@'localhost' IDENTIFIED BY 'password';

View File

@ -0,0 +1,12 @@
description "Monasca Transform"
start on runlevel [2345]
stop on runlevel [!2345]
respawn
limit nofile 32768 32768
expect daemon
exec /etc/monasca/transform/init/start-monasca-transform.sh

View File

@ -0,0 +1,29 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
activate_this_file = "/opt/monasca/transform/venv/bin/activate_this.py"
execfile(activate_this_file, dict(__file__=activate_this_file))
from monasca_transform.service.transform_service import main_service
def main():
main_service()
if __name__ == "__main__":
main()
sys.exit(0)

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
start-stop-daemon -c monasca-transform:monasca-transform -m\
--pidfile /var/run/monasca/transform/transform.pid \
--start --exec /opt/monasca/transform/venv/bin/python /etc/monasca/transform/init/service_runner.py \
>> /var/log/monasca/transform/monasca-transform.log 2>> /var/log/monasca/transform/monasca-transform.log

View File

@ -0,0 +1,10 @@
spark.driver.extraClassPath /usr/share/java/mysql.jar
spark.executor.extraClassPath /usr/share/java/mysql.jar
spark.blockManager.port 7100
spark.broadcast.port 7105
spark.driver.port 7110
spark.executor.port 7115
spark.fileserver.port 7120
spark.speculation true
spark.speculation.interval 200

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_PORT=7077
export SPARK_MASTERS=127.0.0.1:7077
export SPARK_MASTER_WEBUI_PORT=18080
export SPARK_WORKER_PORT=7078
export SPARK_WORKER_WEBUI_PORT=18081
export SPARK_WORKER_DIR=/var/run/spark/work
export SPARK_WORKER_MEMORY=2g
export SPARK_WORKER_CORES=2
export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Dspark.history.fs.logDirectory=file://var/log/spark/spark-events -Dspark.history.ui.port=18082"
export SPARK_LOG_DIR=/var/log/spark
export SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=127.0.0.1:2181 -Dspark.deploy.zookeeper.dir=/var/run/spark"

View File

@ -0,0 +1,12 @@
description "Spark Master"
start on runlevel [2345]
stop on runlevel [!2345]
respawn
limit nofile 32768 32768
expect daemon
exec /etc/spark/init/start-spark-master.sh

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_PORT=7077
export SPARK_MASTERS=127.0.0.1:7077
export SPARK_MASTER_WEBUI_PORT=18080
export SPARK_WORKER_PORT=7078
export SPARK_WORKER_WEBUI_PORT=18081
export SPARK_WORKER_DIR=/var/run/spark/work
export SPARK_WORKER_MEMORY=2g
export SPARK_WORKER_CORES=2
export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Dspark.history.fs.logDirectory=file://var/log/spark/spark-events -Dspark.history.ui.port=18082"
export SPARK_LOG_DIR=/var/log/spark
export SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=127.0.0.1:2181 -Dspark.deploy.zookeeper.dir=/var/run/spark"

View File

@ -0,0 +1,12 @@
description "Spark Worker"
start on runlevel [2345]
stop on runlevel [!2345]
respawn
limit nofile 32768 32768
expect daemon
exec /etc/spark/init/start-spark-worker.sh

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
. /opt/spark/current/conf/spark-env.sh
export EXEC_CLASS=org.apache.spark.deploy.master.Master
export INSTANCE_ID=1
export SPARK_CLASSPATH=/opt/spark/current/conf/:/opt/spark/current/lib/spark-assembly-1.6.1-hadoop2.6.0.jar:/opt/spark/current/lib/datanucleus-core-3.2.10.jar:/opt/spark/current/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark/current/lib/datanucleus-api-jdo-3.2.6.jar
export log="$SPARK_LOG_DIR/spark-spark-"$EXEC_CLASS"-"$INSTANCE_ID"-127.0.0.1.out"
export SPARK_HOME=/opt/spark/current
start-stop-daemon -c spark:spark --pidfile /var/run/spark/spark-spark-"$EXEC_CLASS"-"$INSTANCE_ID".pid --name spark-master --start --exec /usr/bin/java -- -cp $SPARK_CLASSPATH $SPARK_DAEMON_JAVA_OPTS -Xms1g -Xmx1g -XX:MaxPermSize=256m "$EXEC_CLASS" --ip "$SPARK_MASTER_IP" --port "$SPARK_MASTER_PORT" --webui-port "$SPARK_MASTER_WEBUI_PORT" >> "$log" 2>&1 < /dev/null

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
. /opt/spark/current/conf/spark-env.sh
export EXEC_CLASS=org.apache.spark.deploy.worker.Worker
export INSTANCE_ID=1
export SPARK_CLASSPATH=/opt/spark/current/conf/:/opt/spark/current/lib/spark-assembly-1.6.1-hadoop2.6.0.jar:/opt/spark/current/lib/datanucleus-core-3.2.10.jar:/opt/spark/current/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark/current/lib/datanucleus-api-jdo-3.2.6.jar
export log="$SPARK_LOG_DIR/spark-spark-"$EXEC_CLASS"-"$INSTANCE_ID"-127.0.0.1.out"
export SPARK_HOME=/opt/spark/current
start-stop-daemon -c spark:spark --pidfile /var/run/spark/spark-spark-"$EXEC_CLASS"-"$INSTANCE_ID".pid --name spark-worker --start --exec /usr/bin/java -- -cp $SPARK_CLASSPATH $SPARK_DAEMON_JAVA_OPTS -Xms1g -Xmx1g -XX:MaxPermSize=256m "$EXEC_CLASS" --webui-port "$SPARK_WORKER_WEBUI_PORT" --port $SPARK_WORKER_PORT $SPARK_MASTERS >> "$log" 2>&1 < /dev/null

442
devstack/plugin.sh Executable file
View File

@ -0,0 +1,442 @@
#
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
# Copyright 2016 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Monasca-transform DevStack plugin
#
# Install and start Monasca-transform service in devstack
#
# To enable Monasca-transform in devstack add an entry to local.conf that
# looks like
#
# [[local|localrc]]
# enable_plugin monasca-transform https://git.openstack.org/openstack/monasca-transform
#
# By default all Monasca services are started (see
# devstack/settings). To disable a specific service use the
# disable_service function. For example to turn off notification:
#
# disable_service monasca-notification
#
# Several variables set in the localrc section adjust common behaviors
# of Monasca (see within for additional settings):
#
# EXAMPLE VARS HERE
# Save trace setting
XTRACE=$(set +o | grep xtrace)
set -o xtrace
ERREXIT=$(set +o | grep errexit)
set -o errexit
# Determine if we are running in devstack-gate or devstack.
if [[ $DEST ]]; then
# We are running in devstack-gate.
export MONASCA_TRANSFORM_BASE=${MONASCA_TRANSFORM_BASE:-"${DEST}"}
else
# We are running in devstack.
export MONASCA_TRANSFORM_BASE=${MONASCA_TRANSFORM_BASE:-"/opt/stack"}
fi
function pre_install_monasca_transform {
:
}
function pre_install_spark {
:
}
function install_mysql_connector {
sudo apt-get -y install libmysql-java
}
function install_java_libs {
pushd /opt/spark/current/lib
#MAVEN_STUB="http://uk.maven.org/maven2"
MAVEN_STUB="https://repo1.maven.org/maven2"
for SPARK_JAVA_LIB in "${SPARK_JAVA_LIBS[@]}"
do
SPARK_LIB_NAME=`echo ${SPARK_JAVA_LIB} | sed 's/.*\///'`
sudo -u spark curl ${MAVEN_STUB}/${SPARK_JAVA_LIB} -o ${SPARK_LIB_NAME}
done
popd
}
function link_spark_streaming_lib {
pushd /opt/spark/current/lib
ln -sf spark-streaming-kafka.jar spark-streaming-kafka_2.10-1.6.0.jar
popd
}
function unstack_monasca_transform {
echo_summary "Unstack Monasca-transform"
sudo service monasca-transform stop || true
delete_monasca_transform_files
sudo rm /etc/init/monasca-transform.conf || true
sudo rm -rf /etc/monasca/transform || true
drop_monasca_transform_database
unstack_spark
}
function delete_monasca_transform_files {
sudo rm -rf /opt/monasca/transform || true
sudo rm /etc/monasca-transform.conf
MONASCA_TRANSFORM_DIRECTORIES=("/var/log/monasca/transform" "/var/run/monasca/transform" "/etc/monasca/transform/init")
for MONASCA_TRANSFORM_DIRECTORY in "${MONASCA_TRANSFORM_DIRECTORIES[@]}"
do
sudo rm -rf ${MONASCA_TRANSFORM_DIRECTORY} || true
done
}
function drop_monasca_transform_database {
# must login as root@localhost
sudo mysql -h "127.0.0.1" -uroot -psecretmysql < "drop database monasca_transform; drop user 'm-transform'@'%' from mysql.user; drop user 'm-transform'@'localhost' from mysql.user;" || echo "Failed to drop database 'monasca_transform' and/or user 'm-transform' from mysql database, you may wish to do this manually."
}
function unstack_spark {
echo_summary "Unstack Spark"
sudo service spark-master stop || true
sudo service spark-worker stop || true
delete_spark_start_scripts
delete_spark_upstart_definitions
unlink_spark_commands
delete_spark_directories
sudo rm -rf `readlink /opt/spark/current` || true
sudo rm /opt/spark/current || true
sudo rm -rf /opt/spark/download || true
sudo userdel spark || true
sudo groupdel spark || true
}
function clean_monasca_transform {
set +o errexit
unstack_monasca_transform
clean_monasca_transform
set -o errexit
}
function create_spark_directories {
for SPARK_DIRECTORY in "${SPARK_DIRECTORIES[@]}"
do
sudo mkdir -p ${SPARK_DIRECTORY}
sudo chown spark:spark ${SPARK_DIRECTORY}
sudo chmod 755 ${SPARK_DIRECTORY}
done
sudo mkdir -p /var/log/spark-events
sudo chmod "a+rw" /var/log/spark-events
}
function delete_spark_directories {
for SPARK_DIRECTORY in "${SPARK_DIRECTORIES[@]}"
do
sudo rm -rf ${SPARK_DIRECTORY} || true
done
sudo rm -rf /var/log/spark-events || true
}
function link_spark_commands_to_usr_bin {
SPARK_COMMANDS=("spark-submit" "spark-class" "spark-shell" "spark-sql")
for SPARK_COMMAND in "${SPARK_COMMANDS[@]}"
do
sudo ln -sf /opt/spark/current/bin/${SPARK_COMMAND} /usr/bin/${SPARK_COMMAND}
done
}
function unlink_spark_commands {
SPARK_COMMANDS=("spark-submit" "spark-class" "spark-shell" "spark-sql")
for SPARK_COMMAND in "${SPARK_COMMANDS[@]}"
do
sudo unlink /usr/bin/${SPARK_COMMAND} || true
done
}
function copy_and_link_config {
SPARK_ENV_FILES=("spark-env.sh" "spark-worker-env.sh" "spark-defaults.conf")
for SPARK_ENV_FILE in "${SPARK_ENV_FILES[@]}"
do
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/spark/"${SPARK_ENV_FILE}" /etc/spark/conf/.
sudo ln -sf /etc/spark/conf/"${SPARK_ENV_FILE}" /opt/spark/current/conf/"${SPARK_ENV_FILE}"
done
}
function copy_spark_start_scripts {
SPARK_START_SCRIPTS=("start-spark-master.sh" "start-spark-worker.sh")
for SPARK_START_SCRIPT in "${SPARK_START_SCRIPTS[@]}"
do
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/spark/"${SPARK_START_SCRIPT}" /etc/spark/init/.
sudo chmod 755 /etc/spark/init/"${SPARK_START_SCRIPT}"
done
}
function delete_spark_start_scripts {
SPARK_START_SCRIPTS=("start-spark-master.sh" "start-spark-worker.sh")
for SPARK_START_SCRIPT in "${SPARK_START_SCRIPTS[@]}"
do
sudo rm /etc/spark/init/"${SPARK_START_SCRIPT}" || true
done
}
function copy_spark_upstart_definitions {
SPARK_UPSTART_DEFINITIONS=("spark-master.conf" "spark-worker.conf")
for SPARK_UPSTART_DEFINITION in "${SPARK_UPSTART_DEFINITIONS[@]}"
do
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/spark/"${SPARK_UPSTART_DEFINITION}" /etc/init/.
sudo chmod 644 /etc/init/"${SPARK_UPSTART_DEFINITION}"
done
}
function delete_spark_upstart_definitions {
SPARK_UPSTART_DEFINITIONS=("spark-master.conf" "spark-worker.conf")
for SPARK_UPSTART_DEFINITION in "${SPARK_UPSTART_DEFINITIONS[@]}"
do
sudo rm /etc/init/${SPARK_UPSTART_DEFINITION} || true
done
}
function install_monasca_transform {
echo_summary "Install Monasca-Transform"
sudo groupadd --system monasca-transform || true
sudo useradd --system -g monasca-transform monasca-transform || true
create_monasca_transform_directories
copy_monasca_transform_files
create_monasca_transform_venv
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/monasca-transform/monasca_transform_init.conf /etc/init/monasca-transform.conf
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/monasca-transform/start-monasca-transform.sh /etc/monasca/transform/init/.
sudo chmod +x /etc/monasca/transform/init/start-monasca-transform.sh
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/monasca-transform/service_runner.py /etc/monasca/transform/init/.
create_and_populate_monasca_transform_database
}
function create_monasca_transform_directories {
MONASCA_TRANSFORM_DIRECTORIES=("/var/log/monasca/transform" "/opt/monasca/transform/lib" "/var/run/monasca/transform" "/etc/monasca/transform/init")
for MONASCA_TRANSFORM_DIRECTORY in "${MONASCA_TRANSFORM_DIRECTORIES[@]}"
do
sudo mkdir -p ${MONASCA_TRANSFORM_DIRECTORY}
sudo chown monasca-transform:monasca-transform ${MONASCA_TRANSFORM_DIRECTORY}
sudo chmod 755 ${MONASCA_TRANSFORM_DIRECTORY}
done
}
function copy_monasca_transform_files {
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/monasca-transform/service_runner.py /opt/monasca/transform/lib/.
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/monasca-transform/monasca-transform.conf /etc/.
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/monasca-transform/driver.py /opt/monasca/transform/lib/.
${MONASCA_TRANSFORM_BASE}/monasca-transform/scripts/create_zip.sh
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/scripts/monasca-transform.zip /opt/monasca/transform/lib/.
${MONASCA_TRANSFORM_BASE}/monasca-transform/scripts/generate_ddl_for_devstack.sh
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/monasca-transform/monasca-transform_mysql.sql /opt/monasca/transform/lib/.
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/monasca-transform/transform_specs.sql /opt/monasca/transform/lib/.
sudo cp -f "${MONASCA_TRANSFORM_BASE}"/monasca-transform/devstack/files/monasca-transform/pre_transform_specs.sql /opt/monasca/transform/lib/.
sudo chown -R monasca-transform:monasca-transform /opt/monasca/transform
}
function create_monasca_transform_venv {
sudo chown -R monasca-transform:monasca-transform /opt/stack/monasca-transform
sudo su - monasca-transform -c "
virtualenv /opt/monasca/transform/venv ;
. /opt/monasca/transform/venv/bin/activate ;
pip install -e "${MONASCA_TRANSFORM_BASE}"/monasca-transform/ ;
deactivate"
}
function create_and_populate_monasca_transform_database {
# must login as root@localhost
sudo mysql -h "127.0.0.1" -uroot -psecretmysql < /opt/monasca/transform/lib/monasca-transform_mysql.sql || echo "Did the schema change? This process will fail on schema changes."
sudo mysql -h "127.0.0.1" -um-transform -ppassword < /opt/monasca/transform/lib/pre_transform_specs.sql
sudo mysql -h "127.0.0.1" -um-transform -ppassword < /opt/monasca/transform/lib/transform_specs.sql
}
function install_spark {
echo_summary "Install Spark"
sudo groupadd --system spark || true
sudo useradd --system -g spark spark || true
sudo mkdir -p /opt/spark/download
sudo chown -R spark:spark /opt/spark
if [ ! -f /opt/spark/download/${SPARK_TARBALL_NAME} ]
then
sudo curl -m 300 http://apache.cs.utah.edu/spark/spark-${SPARK_VERSION}/${SPARK_TARBALL_NAME} -o /opt/spark/download/${SPARK_TARBALL_NAME}
fi
sudo chown spark:spark /opt/spark/download/${SPARK_TARBALL_NAME}
sudo -u spark tar -xzf /opt/spark/download/${SPARK_TARBALL_NAME} -C /opt/spark/
sudo -u spark ln -sf /opt/spark/${SPARK_HADOOP_VERSION} /opt/spark/current
install_mysql_connector
install_java_libs
create_spark_directories
link_spark_commands_to_usr_bin
copy_and_link_config
copy_spark_start_scripts
copy_spark_upstart_definitions
}
function extra_spark {
sudo service spark-master start
sleep 10
sudo service spark-worker start
}
function post_config_monasca_transform {
:
}
function extra_monasca_transform {
sudo service monasca-transform start
}
# check for service enabled
echo_summary "Monasca-transform plugin with service enabled = `is_service_enabled monasca-transform`"
if is_service_enabled monasca-transform; then
if [[ "$1" == "stack" && "$2" == "pre-install" ]]; then
# Set up system services
echo_summary "Configuring Spark system services"
pre_install_spark
echo_summary "Configuring Monasca-transform system services"
pre_install_monasca_transform
elif [[ "$1" == "stack" && "$2" == "install" ]]; then
# Perform installation of service source
echo_summary "Installing Spark"
install_spark
echo_summary "Installing Monasca-transform"
install_monasca_transform
elif [[ "$1" == "stack" && "$2" == "post-config" ]]; then
# Configure after the other layer 1 and 2 services have been configured
echo_summary "Configuring Monasca-transform"
post_config_monasca_transform
elif [[ "$1" == "stack" && "$2" == "extra" ]]; then
# Initialize and start the Monasca service
echo_summary "Initializing Spark"
extra_spark
echo_summary "Initializing Monasca-transform"
extra_monasca_transform
fi
if [[ "$1" == "unstack" ]]; then
echo_summary "Unstacking Monasca-transform"
unstack_monasca_transform
fi
if [[ "$1" == "clean" ]]; then
# Remove state and transient data
# Remember clean.sh first calls unstack.sh
echo_summary "Cleaning Monasca-transform"
clean_monasca_transform
fi
else
echo_summary "Monasca-transform not enabled"
fi
#Restore errexit
$ERREXIT
# Restore xtrace
$XTRACE

44
devstack/settings Normal file
View File

@ -0,0 +1,44 @@
#
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
enable_service monasca-transform
#
# Monasca infrastructure services
#
# databases
# MySQL is already enabled in devstack
#
# Dependent Software Versions
#
# spark vars
SPARK_DIRECTORIES=("/var/spark" "/var/log/spark" "/var/run/spark/work" "/etc/spark/conf" "/etc/spark/init" )
SPARK_VERSION=${SPARK_VERSION:-1.6.1}
HADOOP_VERSION=${HADOOP_VERSION:-2.6}
SPARK_HADOOP_VERSION=spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION
SPARK_TARBALL_NAME=${SPARK_HADOOP_VERSION}.tgz
# Kafka deb consists of the version of scala plus the version of kafka
BASE_KAFKA_VERSION=${BASE_KAFKA_VERSION:-0.8.1.1}
SCALA_VERSION=${SCALA_VERSION:-2.10}
KAFKA_VERSION=${KAFKA_VERSION:-${SCALA_VERSION}-${BASE_KAFKA_VERSION}}
SPARK_JAVA_LIBS=("org/apache/kafka/kafka_2.10/0.8.1.1/kafka_2.10-0.8.1.1.jar" "org/scala-lang/scala-library/2.10.1/scala-library-2.10.1.jar" "com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar" "org/apache/spark/spark-streaming-kafka_2.10/1.6.0/spark-streaming-kafka_2.10-1.6.0.jar")

15
devstack/test.sh Normal file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
MAVEN_STUB="https://repo1.maven.org/maven2"
SPARK_JAVA_LIBS=("org/apache/kafka/kafka_2.10/0.8.1.1/kafka_2.10-0.8.1.1.jar" "org/scala-lang/scala-library/2.10.1/scala-library-2.10.1.jar" "com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar" "org/apache/spark/spark-streaming-kafka_2.10/1.6.0/spark-streaming-kafka_2.10-1.6.0.jar")
for SPARK_JAVA_LIB in "${SPARK_JAVA_LIBS[@]}"
do
echo Would fetch ${MAVEN_STUB}/${SPARK_JAVA_LIB}
done
for SPARK_JAVA_LIB in "${SPARK_JAVA_LIBS[@]}"
do
SPARK_LIB_NAME=`echo ${SPARK_JAVA_LIB} | sed 's/.*\///'`
echo Got lib ${SPARK_LIB_NAME}
done

View File

@ -0,0 +1,66 @@
[DEFAULTS]
[repositories]
offsets = monasca_transform.mysql_offset_specs:MySQLOffsetSpecs
data_driven_specs = monasca_transform.data_driven_specs.mysql_data_driven_specs_repo:MySQLDataDrivenSpecsRepo
[database]
server_type = mysql
host = localhost
database_name = monasca_transform
username = m-transform
password = password
[messaging]
adapter = monasca_transform.messaging.adapter:KafkaMessageAdapter
topic = metrics
brokers = localhost:9092
publish_kafka_tenant_id = d2cb21079930415a9f2a33588b9f2bb6
#
# Configurable values for the monasca-transform service
#
[service]
# The address of the mechanism being used for election coordination
coordinator_address = kazoo://localhost:2181
# The name of the coordination/election group
coordinator_group = monasca-transform
# How long the candidate should sleep between election result
# queries (in seconds)
election_polling_frequency = 15
# The path for the setup file to be executed
setup_file = /opt/stack/monasca-transform/setup.py
# The target of the setup file
setup_target = bdist_egg
# The path for the monasca-transform Spark driver
spark_driver = /opt/stack/monasca-transform/monasca_transform/driver/mon_metrics_kafka.py
# the location for the transform-service log
service_log_path=/opt/stack/monasca-transform
# The location where Spark event logs should be written
spark_event_logging_dest = /var/log/spark-events
# Whether Spark event logging should be enabled (true/false)
spark_event_logging_enabled = true
# A list of jars which Spark should use
spark_jars_list = /opt/spark/current/lib/spark-streaming-kafka.jar,/opt/spark/current/lib/scala-library-2.10.1.jar,/opt/spark/current/lib/kafka_2.10-0.8.1.1.jar,/opt/spark/current/lib/metrics-core-2.2.0.jar,/usr/share/java/mysql.jar
# A list of where the Spark master(s) should run
spark_master_list = spark://192.168.10.4:7077,192.168.10.5:7077
# Python files for Spark to use
spark_python_files = /opt/stack/monasca-transform/dist/monasca_transform-0.0.1.egg
# How often the stream should be read (in seconds)
stream_interval = 120
# The working directory for monasca-transform
work_dir = /opt/stack/monasca-transform

View File

View File

@ -0,0 +1,42 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
class Component(object):
SOURCE_COMPONENT_TYPE = "source"
USAGE_COMPONENT_TYPE = "usage"
SETTER_COMPONENT_TYPE = "setter"
INSERT_COMPONENT_TYPE = "insert"
DEFAULT_UNAVAILABLE_VALUE = "NA"
InstanceUsageDataAggParamsBase = namedtuple('InstanceUsageDataAggParams',
['instance_usage_data',
'agg_params'])
class InstanceUsageDataAggParams(InstanceUsageDataAggParamsBase):
"""A tuple which is a wrapper containing the instance usage data
and aggregation params
namdetuple contains:
instance_usage_data - instance usage
agg_params - aggregation params dict
"""

View File

@ -0,0 +1,50 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
LOG = logging.getLogger(__name__)
class ComponentUtils(object):
@staticmethod
def _get_group_by_period_list(aggregation_period):
"""get a list of columns for an aggregation period."""
group_by_period_list = []
if (aggregation_period == "daily"):
group_by_period_list = ["event_date"]
elif (aggregation_period == "hourly"):
group_by_period_list = ["event_date", "event_hour"]
elif (aggregation_period == "minutely"):
group_by_period_list = ["event_date", "event_hour", "event_minute"]
elif (aggregation_period == "secondly"):
group_by_period_list = ["event_date", "event_hour",
"event_minute", "event_second"]
return group_by_period_list
@staticmethod
def _get_instance_group_by_period_list(aggregation_period):
"""get a list of columns for an aggregation period."""
group_by_period_list = []
if (aggregation_period == "daily"):
group_by_period_list = ["usage_date"]
elif (aggregation_period == "hourly"):
group_by_period_list = ["usage_date", "usage_hour"]
elif (aggregation_period == "minutely"):
group_by_period_list = ["usage_date", "usage_hour", "usage_minute"]
elif (aggregation_period == "secondly"):
group_by_period_list = ["usage_date", "usage_hour",
"usage_minute", "usage_second"]
return group_by_period_list

View File

@ -0,0 +1,146 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import time
from monasca_transform.component import Component
from monasca_transform.messaging.adapter import MessageAdapter
from oslo_config import cfg
class InsertComponent(Component):
@abc.abstractmethod
def insert(transform_context, instance_usage_df):
raise NotImplementedError(
"Class %s doesn't implement setter(instance_usage_df,"
" transform_spec_df)"
% __name__)
@staticmethod
def get_component_type():
return Component.INSERT_COMPONENT_TYPE
@staticmethod
def _prepare_metric(instance_usage_dict, agg_params):
"""transform instance usage rdd to a monasca metric.
example metric:
{"metric":{"name":"host_alive_status",
"dimensions":{"hostname":"mini-mon",
"observer_host":"devstack",
"test_type":"ssh"},
"timestamp":1456858016000,
"value":1.0,
"value_meta":{"error":
"Unable to open socket to host mini-mon"}
},
"meta":{"tenantId":"8eadcf71fc5441d8956cb9cbb691704e",
"region":"useast"},
"creation_time":1456858034
}
"""
current_epoch_seconds = time.time()
current_epoch_milliseconds = current_epoch_seconds * 1000
dimension_list = agg_params["dimension_list"]
# build dimensions dynamically
dimensions_part = {}
for dim in dimension_list:
dimensions_part[dim] = \
instance_usage_dict.get(dim,
Component.DEFAULT_UNAVAILABLE_VALUE)
meta_part = {}
# TODO(someone) determine the appropriate tenant ID to use. For now,
# what works is to use the same tenant ID as other metrics specify in
# their kafka messages (and this appears to change each time mini-mon
# is re-installed). The long term solution is to have HLM provide
# a usable tenant ID to us in a configurable way. BTW, without a
# proper/valid tenant ID, aggregated metrics don't get persisted
# to the Monasca DB.
meta_part["tenantId"] = cfg.CONF.messaging.publish_kafka_tenant_id
meta_part["region"] = "useast"
value_meta_part = {"record_count": instance_usage_dict.get(
"record_count", 0),
"firstrecord_timestamp": instance_usage_dict.get(
"firstrecord_timestamp",
Component.DEFAULT_UNAVAILABLE_VALUE),
"lastrecord_timestamp": instance_usage_dict.get(
"lastrecord_timestamp",
Component.DEFAULT_UNAVAILABLE_VALUE)}
metric_part = {"name": instance_usage_dict.get(
"aggregated_metric_name"),
"dimensions": dimensions_part,
"timestamp": int(current_epoch_milliseconds),
"value": instance_usage_dict.get(
"quantity", 0.0),
"value_meta": value_meta_part}
metric = {"metric": metric_part,
"meta": meta_part,
"creation_time": int(current_epoch_seconds)}
return metric
@staticmethod
def _write_metric(row, agg_params):
"""write data to kafka. extracts and formats
metric data and write s the data to kafka
"""
instance_usage_dict = {"tenant_id": row.tenant_id,
"user_id": row.user_id,
"resource_uuid": row.resource_uuid,
"geolocation": row.geolocation,
"region": row.region,
"zone": row.zone,
"host": row.host,
"project_id": row.project_id,
"aggregated_metric_name":
row.aggregated_metric_name,
"quantity": row.quantity,
"firstrecord_timestamp":
row.firstrecord_timestamp_string,
"lastrecord_timestamp":
row.lastrecord_timestamp_string,
"record_count": row.record_count,
"service_group": row.service_group,
"service_id": row.service_id,
"usage_date": row.usage_date,
"usage_hour": row.usage_hour,
"usage_minute": row.usage_minute,
"aggregation_period":
row.aggregation_period}
metric = InsertComponent._prepare_metric(instance_usage_dict,
agg_params)
MessageAdapter.send_metric(metric)
@staticmethod
def _write_metrics_from_partition(partlistiter):
"""iterate through all rdd elements in partition
and write metrics to kafka
"""
for part in partlistiter:
agg_params = part.agg_params
row = part.instance_usage_data
InsertComponent._write_metric(row, agg_params)

View File

@ -0,0 +1,69 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_transform.component.insert import InsertComponent
from oslo_config import cfg
class DummyInsert(InsertComponent):
"""Insert component that writes instance usage data
to kafka queue
"""
@staticmethod
def insert(transform_context, instance_usage_df):
"""write instance usage data to kafka"""
transform_spec_df = transform_context.transform_spec_df_info
agg_params = transform_spec_df.select("aggregation_params_map"
".dimension_list"
).collect()[0].asDict()
cfg.CONF.set_override('adapter',
'tests.unit.messaging.adapter:DummyAdapter',
group='messaging')
# Approach 1
# using foreachPartition to iterate through elements in an
# RDD is the recommended approach so as to not overwhelm kafka with the
# zillion connections (but in our case the MessageAdapter does
# store the adapter_impl so we should not create many producers)
# using foreachpartitions was causing some serialization (cpickle)
# problems where few libs like kafka.SimpleProducer and oslo_config.cfg
# were not available
#
# removing _write_metrics_from_partition for now in favor of
# Approach 2
#
# instance_usage_df_agg_params = instance_usage_df.rdd.map(
# lambda x: InstanceUsageDataAggParams(x,
# agg_params))
# instance_usage_df_agg_params.foreachPartition(
# DummyInsert._write_metrics_from_partition)
#
# Approach # 2
#
# using collect() to fetch all elements of an RDD
# and write to kafka
#
for instance_usage_row in instance_usage_df.collect():
InsertComponent._write_metric(instance_usage_row,
agg_params)
return instance_usage_df

View File

@ -0,0 +1,64 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_transform.component.insert import InsertComponent
from monasca_transform.config.config_initializer import ConfigInitializer
class KafkaInsert(InsertComponent):
"""Insert component that writes instance usage data
to kafka queue
"""
@staticmethod
def insert(transform_context, instance_usage_df):
"""write instance usage data to kafka"""
# object to init config
ConfigInitializer.basic_config()
transform_spec_df = transform_context.transform_spec_df_info
agg_params = transform_spec_df.select(
"aggregation_params_map.dimension_list").collect()[0].asDict()
# Approach # 1
# using foreachPartition to iterate through elements in an
# RDD is the recommended approach so as to not overwhelm kafka with the
# zillion connections (but in our case the MessageAdapter does
# store the adapter_impl so we should not create many producers)
# using foreachpartitions was causing some serialization/cpickle
# problems where few libs like kafka.SimpleProducer and oslo_config.cfg
# were not available in foreachPartition method
#
# removing _write_metrics_from_partition for now in favor of
# Approach # 2
#
# instance_usage_df_agg_params = instance_usage_df.rdd.map(
# lambda x: InstanceUsageDataAggParams(x,
# agg_params))
# instance_usage_df_agg_params.foreachPartition(
# DummyInsert._write_metrics_from_partition)
# Approach # 2
# using collect() to fetch all elements of an RDD and write to
# kafka
#
for instance_usage_row in instance_usage_df.collect():
InsertComponent._write_metric(instance_usage_row, agg_params)
return instance_usage_df

View File

@ -0,0 +1,30 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_transform.component.insert import InsertComponent
class PrepareData(InsertComponent):
"""prepare for insert component validates instance usage
data before calling Insert component
"""
@staticmethod
def insert(transform_context, instance_usage_df):
"""write instance usage data to kafka"""
#
# FIXME: add instance usage data validation
#
return instance_usage_df

View File

@ -0,0 +1,31 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from monasca_transform.component import Component
class SetterComponent(Component):
@abc.abstractmethod
def setter(transform_context, instance_usage_df):
raise NotImplementedError(
"Class %s doesn't implement setter(instance_usage_df,"
" transform_context)"
% __name__)
@staticmethod
def get_component_type():
"""get component type."""
return Component.SETTER_COMPONENT_TYPE

View File

@ -0,0 +1,201 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.sql import SQLContext
import datetime
from monasca_transform.component import Component
from monasca_transform.component.component_utils import ComponentUtils
from monasca_transform.component.setter import SetterComponent
from monasca_transform.transform.transform_utils import InstanceUsageUtils
import json
class RollupQuantityException(Exception):
"""Exception thrown when doing quantity rollup
Attributes:
value: string representing the error
"""
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class RollupQuantity(SetterComponent):
@staticmethod
def _supported_rollup_operations():
return ["sum", "max", "min", "avg"]
@staticmethod
def _is_valid_rollup_operation(operation):
if operation in RollupQuantity._supported_rollup_operations():
return True
else:
return False
@staticmethod
def _rollup_quantity(instance_usage_df,
transform_spec_df,
setter_rollup_group_by_list,
setter_rollup_operation):
instance_usage_data_json_list = []
# check if operation is valid
if not RollupQuantity.\
_is_valid_rollup_operation(setter_rollup_operation):
raise RollupQuantityException(
"Operation %s is not supported" % setter_rollup_operation)
# call required operation on grouped data
# e.g. sum, max, min, avg etc
agg_operations_map = {
"quantity": str(setter_rollup_operation),
"firstrecord_timestamp_unix": "min",
"lastrecord_timestamp_unix": "max",
"record_count": "sum"}
# do a group by
grouped_data = instance_usage_df.groupBy(
*setter_rollup_group_by_list)
rollup_df = grouped_data.agg(agg_operations_map)
for row in rollup_df.collect():
# first record timestamp
earliest_record_timestamp_unix = getattr(
row, "min(firstrecord_timestamp_unix)",
Component.DEFAULT_UNAVAILABLE_VALUE)
earliest_record_timestamp_string = \
datetime.datetime.fromtimestamp(
earliest_record_timestamp_unix).strftime(
'%Y-%m-%d %H:%M:%S')
# last record_timestamp
latest_record_timestamp_unix = getattr(
row, "max(lastrecord_timestamp_unix)",
Component.DEFAULT_UNAVAILABLE_VALUE)
latest_record_timestamp_string = \
datetime.datetime.fromtimestamp(
latest_record_timestamp_unix).strftime('%Y-%m-%d %H:%M:%S')
# record count
record_count = getattr(row, "sum(record_count)", 0.0)
# quantity
# get expression that will be used to select quantity
# from rolled up data
select_quant_str = "".join((setter_rollup_operation, "(quantity)"))
quantity = getattr(row, select_quant_str, 0.0)
# create a new instance usage dict
instance_usage_dict = {"tenant_id": getattr(row, "tenant_id",
"all"),
"user_id":
getattr(row, "user_id", "all"),
"resource_uuid":
getattr(row, "resource_uuid", "all"),
"geolocation":
getattr(row, "geolocation", "all"),
"region":
getattr(row, "region", "all"),
"zone":
getattr(row, "zone", "all"),
"host":
getattr(row, "host", "all"),
"project_id":
getattr(row, "tenant_id", "all"),
"aggregated_metric_name":
getattr(row, "aggregated_metric_name",
"all"),
"quantity":
quantity,
"firstrecord_timestamp_unix":
earliest_record_timestamp_unix,
"firstrecord_timestamp_string":
earliest_record_timestamp_string,
"lastrecord_timestamp_unix":
latest_record_timestamp_unix,
"lastrecord_timestamp_string":
latest_record_timestamp_string,
"record_count": record_count,
"service_group":
getattr(row, "service_group", "all"),
"service_id":
getattr(row, "service_id", "all"),
"usage_date":
getattr(row, "usage_date", "all"),
"usage_hour":
getattr(row, "usage_hour", "all"),
"usage_minute":
getattr(row, "usage_minute", "all"),
"aggregation_period":
getattr(row, "aggregation_period",
"all")
}
instance_usage_data_json = json.dumps(instance_usage_dict)
instance_usage_data_json_list.append(instance_usage_data_json)
# convert to rdd
spark_context = instance_usage_df.rdd.context
return spark_context.parallelize(instance_usage_data_json_list)
@staticmethod
def setter(transform_context, instance_usage_df):
transform_spec_df = transform_context.transform_spec_df_info
# get fields we want to group by for a rollup
agg_params = transform_spec_df.select(
"aggregation_params_map.setter_rollup_group_by_list").\
collect()[0].asDict()
setter_rollup_group_by_list = agg_params["setter_rollup_group_by_list"]
# get aggregation period
agg_params = transform_spec_df.select(
"aggregation_params_map.aggregation_period").collect()[0].asDict()
aggregation_period = agg_params["aggregation_period"]
group_by_period_list = \
ComponentUtils._get_instance_group_by_period_list(
aggregation_period)
# group by columns list
group_by_columns_list = group_by_period_list + \
setter_rollup_group_by_list
# get rollup operation (sum, max, avg, min)
agg_params = transform_spec_df.select(
"aggregation_params_map.setter_rollup_operation").\
collect()[0].asDict()
setter_rollup_operation = agg_params["setter_rollup_operation"]
# perform rollup operation
instance_usage_json_rdd = RollupQuantity._rollup_quantity(
instance_usage_df, transform_spec_df,
group_by_columns_list,
str(setter_rollup_operation))
sql_context = SQLContext.getOrCreate(instance_usage_df.rdd.context)
instance_usage_trans_df = InstanceUsageUtils.create_df_from_json_rdd(
sql_context,
instance_usage_json_rdd)
return instance_usage_trans_df

View File

@ -0,0 +1,90 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.sql import SQLContext
from monasca_transform.component import InstanceUsageDataAggParams
from monasca_transform.component.setter import SetterComponent
from monasca_transform.transform.transform_utils import InstanceUsageUtils
import json
class SetAggregatedMetricName(SetterComponent):
"""setter component that sets final aggregated metric name.
aggregated metric name is available as a parameter 'aggregated_metric_name'
in aggregation_params in metric processing driver table.
"""
@staticmethod
def _set_aggregated_metric_name(instance_usage_agg_params):
row = instance_usage_agg_params.instance_usage_data
agg_params = instance_usage_agg_params.agg_params
instance_usage_dict = {"tenant_id": row.tenant_id,
"user_id": row.user_id,
"resource_uuid": row.resource_uuid,
"geolocation": row.geolocation,
"region": row.region,
"zone": row.zone,
"host": row.host,
"project_id": row.project_id,
"aggregated_metric_name":
agg_params["aggregated_metric_name"],
"quantity": row.quantity,
"firstrecord_timestamp_unix":
row.firstrecord_timestamp_unix,
"firstrecord_timestamp_string":
row.firstrecord_timestamp_string,
"lastrecord_timestamp_unix":
row.lastrecord_timestamp_unix,
"lastrecord_timestamp_string":
row.lastrecord_timestamp_string,
"record_count": row.record_count,
"service_group": row.service_group,
"service_id": row.service_id,
"usage_date": row.usage_date,
"usage_hour": row.usage_hour,
"usage_minute": row.usage_minute,
"aggregation_period": row.aggregation_period}
instance_usage_data_json = json.dumps(instance_usage_dict)
return instance_usage_data_json
@staticmethod
def setter(transform_context, instance_usage_df):
"""set the aggregated metric name field for elements in instance usage
rdd
"""
transform_spec_df = transform_context.transform_spec_df_info
agg_params = transform_spec_df.select(
"aggregation_params_map.aggregated_metric_name").collect()[0].\
asDict()
instance_usage_df_agg_params = instance_usage_df.rdd.map(
lambda x: InstanceUsageDataAggParams(x, agg_params))
instance_usage_json_rdd = instance_usage_df_agg_params.map(
SetAggregatedMetricName._set_aggregated_metric_name)
sql_context = SQLContext.getOrCreate(instance_usage_df.rdd.context)
instance_usage_trans_df = InstanceUsageUtils.create_df_from_json_rdd(
sql_context,
instance_usage_json_rdd)
return instance_usage_trans_df

View File

@ -0,0 +1,90 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.sql import SQLContext
from monasca_transform.component import InstanceUsageDataAggParams
from monasca_transform.component.setter import SetterComponent
from monasca_transform.transform.transform_utils import InstanceUsageUtils
import json
class SetAggregatedPeriod(SetterComponent):
"""setter component that sets final aggregated metric name.
aggregated metric name is available as a parameter 'aggregated_metric_name'
in aggregation_params in metric processing driver table.
"""
@staticmethod
def _set_aggregated_period(instance_usage_agg_params):
row = instance_usage_agg_params.instance_usage_data
agg_params = instance_usage_agg_params.agg_params
instance_usage_dict = {"tenant_id": row.tenant_id,
"user_id": row.user_id,
"resource_uuid": row.resource_uuid,
"geolocation": row.geolocation,
"region": row.region,
"zone": row.zone,
"host": row.host,
"project_id": row.project_id,
"aggregated_metric_name":
row.aggregated_metric_name,
"quantity": row.quantity,
"firstrecord_timestamp_unix":
row.firstrecord_timestamp_unix,
"firstrecord_timestamp_string":
row.firstrecord_timestamp_string,
"lastrecord_timestamp_unix":
row.lastrecord_timestamp_unix,
"lastrecord_timestamp_string":
row.lastrecord_timestamp_string,
"record_count": row.record_count,
"service_group": row.service_group,
"service_id": row.service_id,
"usage_date": row.usage_date,
"usage_hour": row.usage_hour,
"usage_minute": row.usage_minute,
"aggregation_period":
agg_params["aggregation_period"]}
instance_usage_data_json = json.dumps(instance_usage_dict)
return instance_usage_data_json
@staticmethod
def setter(transform_context, instance_usage_df):
"""set the aggregated metric name field for elements in instance usage
rdd
"""
transform_spec_df = transform_context.transform_spec_df_info
agg_params = transform_spec_df.select(
"aggregation_params_map.aggregation_period").collect()[0].asDict()
instance_usage_df_agg_params = instance_usage_df.rdd.map(
lambda x: InstanceUsageDataAggParams(x, agg_params))
instance_usage_json_rdd = instance_usage_df_agg_params.map(
SetAggregatedPeriod._set_aggregated_period)
sql_context = SQLContext.getOrCreate(instance_usage_df.rdd.context)
instance_usage_trans_df = InstanceUsageUtils.create_df_from_json_rdd(
sql_context,
instance_usage_json_rdd)
return instance_usage_trans_df

View File

@ -0,0 +1,30 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from monasca_transform.component import Component
class UsageComponent(Component):
@abc.abstractmethod
def usage(transform_context, record_store_df):
raise NotImplementedError(
"Class %s doesn't implement setter(instance_usage_df,"
" transform_spec_df)"
% __name__)
@staticmethod
def get_component_type():
return Component.USAGE_COMPONENT_TYPE

View File

@ -0,0 +1,398 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
import datetime
from pyspark.sql import SQLContext
from monasca_transform.component import Component
from monasca_transform.component.component_utils import ComponentUtils
from monasca_transform.component.usage import UsageComponent
from monasca_transform.transform.grouping.group_sort_by_timestamp \
import GroupSortbyTimestamp
from monasca_transform.transform.grouping.group_sort_by_timestamp_partition \
import GroupSortbyTimestampPartition
from monasca_transform.transform.transform_utils import InstanceUsageUtils
import json
class FetchQuantityException(Exception):
"""Exception thrown when fetching quantity
Attributes:
value: string representing the error
"""
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
GroupedDataWithOperation = namedtuple("GroupedDataWithOperation",
["grouped_data",
"usage_fetch_operation"])
class GroupedDataWithOperation(GroupedDataWithOperation):
"""A tuple which is a wrapper containing record store data
and the usage operation
namdetuple contains:
grouped_data - grouped record store data
usage_fetch_operation - operation
"""
class FetchQuantity(UsageComponent):
@staticmethod
def _supported_fetch_operations():
return ["sum", "max", "min", "avg", "latest", "oldest"]
@staticmethod
def _is_valid_fetch_operation(operation):
"""return true if its a valid fetch operation"""
if operation in FetchQuantity._supported_fetch_operations():
return True
else:
return False
@staticmethod
def _get_latest_oldest_quantity(grouping_results_with_operation):
"""get quantity for each group by performing the requested
usage operation and return a instance usage data.
"""
# row
grouping_results = grouping_results_with_operation.\
grouped_data
# usage fetch operation
usage_fetch_operation = grouping_results_with_operation.\
usage_fetch_operation
group_by_dict = grouping_results.grouping_key_dict
#
tenant_id = group_by_dict.get("tenant_id",
Component.DEFAULT_UNAVAILABLE_VALUE)
resource_uuid = group_by_dict.get("resource_uuid",
Component.DEFAULT_UNAVAILABLE_VALUE)
user_id = group_by_dict.get("user_id",
Component.DEFAULT_UNAVAILABLE_VALUE)
geolocation = group_by_dict.get("geolocation",
Component.DEFAULT_UNAVAILABLE_VALUE)
region = group_by_dict.get("region",
Component.DEFAULT_UNAVAILABLE_VALUE)
zone = group_by_dict.get("zone", Component.DEFAULT_UNAVAILABLE_VALUE)
host = group_by_dict.get("host", Component.DEFAULT_UNAVAILABLE_VALUE)
usage_date = group_by_dict.get("event_date",
Component.DEFAULT_UNAVAILABLE_VALUE)
usage_hour = group_by_dict.get("event_hour",
Component.DEFAULT_UNAVAILABLE_VALUE)
usage_minute = group_by_dict.get("event_minute",
Component.DEFAULT_UNAVAILABLE_VALUE)
aggregated_metric_name = group_by_dict.get(
"aggregated_metric_name", Component.DEFAULT_UNAVAILABLE_VALUE)
# stats
agg_stats = grouping_results.results
# get quantity for this host
quantity = None
if (usage_fetch_operation == "latest"):
quantity = agg_stats["lastrecord_quantity"]
elif usage_fetch_operation == "oldest":
quantity = agg_stats["firstrecord_quantity"]
firstrecord_timestamp_unix = agg_stats["firstrecord_timestamp_unix"]
firstrecord_timestamp_string = \
agg_stats["firstrecord_timestamp_string"]
lastrecord_timestamp_unix = agg_stats["lastrecord_timestamp_unix"]
lastrecord_timestamp_string = agg_stats["lastrecord_timestamp_string"]
record_count = agg_stats["record_count"]
# service id
service_group = Component.DEFAULT_UNAVAILABLE_VALUE
service_id = Component.DEFAULT_UNAVAILABLE_VALUE
# aggregation period
aggregation_period = Component.DEFAULT_UNAVAILABLE_VALUE
# event type
event_type = group_by_dict.get("event_type",
Component.DEFAULT_UNAVAILABLE_VALUE)
instance_usage_dict = {"tenant_id": tenant_id, "user_id": user_id,
"resource_uuid": resource_uuid,
"geolocation": geolocation, "region": region,
"zone": zone, "host": host,
"aggregated_metric_name":
aggregated_metric_name,
"quantity": quantity,
"firstrecord_timestamp_unix":
firstrecord_timestamp_unix,
"firstrecord_timestamp_string":
firstrecord_timestamp_string,
"lastrecord_timestamp_unix":
lastrecord_timestamp_unix,
"lastrecord_timestamp_string":
lastrecord_timestamp_string,
"record_count": record_count,
"service_group": service_group,
"service_id": service_id,
"usage_date": usage_date,
"usage_hour": usage_hour,
"usage_minute": usage_minute,
"aggregation_period": aggregation_period,
"processing_meta": {"event_type": event_type}
}
instance_usage_data_json = json.dumps(instance_usage_dict)
return instance_usage_data_json
@staticmethod
def _get_quantity(grouped_record_with_operation):
# row
row = grouped_record_with_operation.grouped_data
# usage fetch operation
usage_fetch_operation = grouped_record_with_operation.\
usage_fetch_operation
# first record timestamp # FIXME: beginning of epoch?
earliest_record_timestamp_unix = getattr(
row, "min(event_timestamp_unix_for_min)",
Component.DEFAULT_UNAVAILABLE_VALUE)
earliest_record_timestamp_string = \
datetime.datetime.fromtimestamp(
earliest_record_timestamp_unix).strftime(
'%Y-%m-%d %H:%M:%S')
# last record_timestamp # FIXME: beginning of epoch?
latest_record_timestamp_unix = getattr(
row, "max(event_timestamp_unix_for_max)",
Component.DEFAULT_UNAVAILABLE_VALUE)
latest_record_timestamp_string = \
datetime.datetime.fromtimestamp(
latest_record_timestamp_unix).strftime('%Y-%m-%d %H:%M:%S')
# record count
record_count = getattr(row, "count(event_timestamp_unix)", 0.0)
# quantity
# get expression that will be used to select quantity
# from rolled up data
select_quant_str = "".join((usage_fetch_operation, "(event_quantity)"))
quantity = getattr(row, select_quant_str, 0.0)
# create a new instance usage dict
instance_usage_dict = {"tenant_id": getattr(row, "tenant_id",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"user_id":
getattr(row, "user_id",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"resource_uuid":
getattr(row, "resource_uuid",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"geolocation":
getattr(row, "geolocation",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"region":
getattr(row, "region",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"zone":
getattr(row, "zone",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"host":
getattr(row, "host",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"project_id":
getattr(row, "tenant_id",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"aggregated_metric_name":
getattr(row, "aggregated_metric_name",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"quantity":
quantity,
"firstrecord_timestamp_unix":
earliest_record_timestamp_unix,
"firstrecord_timestamp_string":
earliest_record_timestamp_string,
"lastrecord_timestamp_unix":
latest_record_timestamp_unix,
"lastrecord_timestamp_string":
latest_record_timestamp_string,
"record_count": record_count,
"service_group":
getattr(row, "service_group",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"service_id":
getattr(row, "service_id",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"usage_date":
getattr(row, "usage_date",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"usage_hour":
getattr(row, "usage_hour",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"usage_minute":
getattr(row, "usage_minute",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"aggregation_period":
getattr(row, "aggregation_period",
Component.
DEFAULT_UNAVAILABLE_VALUE),
"processing_meta": {"event_type": getattr(
row, "event_type",
Component.DEFAULT_UNAVAILABLE_VALUE)}
}
instance_usage_data_json = json.dumps(instance_usage_dict)
return instance_usage_data_json
@staticmethod
def usage(transform_context, record_store_df):
"""component which groups together record store records by
provided group by columns list , sorts within the group by event
timestamp field, applies group stats udf and returns the latest
quantity as a instance usage dataframe
"""
transform_spec_df = transform_context.transform_spec_df_info
# get rollup operation (sum, max, avg, min)
agg_params = transform_spec_df.select(
"aggregation_params_map.usage_fetch_operation").\
collect()[0].asDict()
usage_fetch_operation = agg_params["usage_fetch_operation"]
# check if operation is valid
if not FetchQuantity.\
_is_valid_fetch_operation(usage_fetch_operation):
raise FetchQuantityException(
"Operation %s is not supported" % usage_fetch_operation)
# get aggregation period
agg_params = transform_spec_df.select(
"aggregation_params_map.aggregation_period").collect()[0].asDict()
aggregation_period = agg_params["aggregation_period"]
group_by_period_list = ComponentUtils._get_group_by_period_list(
aggregation_period)
# get what we want to group by
agg_params = transform_spec_df.select(
"aggregation_params_map.aggregation_group_by_list").\
collect()[0].asDict()
aggregation_group_by_list = agg_params["aggregation_group_by_list"]
# group by columns list
group_by_columns_list = group_by_period_list + \
aggregation_group_by_list
instance_usage_json_rdd = None
if (usage_fetch_operation == "latest"
or usage_fetch_operation == "oldest"):
grouped_rows_rdd = None
# FIXME:
# select group by method
IS_GROUP_BY_PARTITION = False
if (IS_GROUP_BY_PARTITION):
# GroupSortbyTimestampPartition is a more scalable
# since it creates groups using repartitioning and sorting
# but is disabled
# number of groups should be more than what is expected
# this might be hard to guess. Setting this to a very
# high number is adversely affecting performance
num_of_groups = 100
grouped_rows_rdd = \
GroupSortbyTimestampPartition.\
fetch_group_latest_oldest_quantity(
record_store_df, transform_spec_df,
group_by_columns_list,
num_of_groups)
else:
# group using key-value pair RDD's groupByKey()
grouped_rows_rdd = \
GroupSortbyTimestamp.\
fetch_group_latest_oldest_quantity(
record_store_df, transform_spec_df,
group_by_columns_list)
grouped_data_rdd_with_operation = grouped_rows_rdd.map(
lambda x:
GroupedDataWithOperation(x,
str(usage_fetch_operation)))
instance_usage_json_rdd = \
grouped_data_rdd_with_operation.map(
FetchQuantity._get_latest_oldest_quantity)
else:
record_store_df_int = \
record_store_df.select(
record_store_df.event_timestamp_unix
.alias("event_timestamp_unix_for_min"),
record_store_df.event_timestamp_unix
.alias("event_timestamp_unix_for_max"),
"*")
# for standard sum, max, min, avg operations on grouped data
agg_operations_map = {
"event_quantity": str(usage_fetch_operation),
"event_timestamp_unix_for_min": "min",
"event_timestamp_unix_for_max": "max",
"event_timestamp_unix": "count"}
# do a group by
grouped_data = record_store_df_int.groupBy(*group_by_columns_list)
grouped_record_store_df = grouped_data.agg(agg_operations_map)
grouped_data_rdd_with_operation = grouped_record_store_df.map(
lambda x:
GroupedDataWithOperation(x,
str(usage_fetch_operation)))
instance_usage_json_rdd = grouped_data_rdd_with_operation.map(
FetchQuantity._get_quantity)
sql_context = SQLContext.getOrCreate(record_store_df.rdd.context)
instance_usage_df = \
InstanceUsageUtils.create_df_from_json_rdd(sql_context,
instance_usage_json_rdd)
return instance_usage_df

View File

@ -0,0 +1,275 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.sql.functions import ceil
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.sql import SQLContext
from monasca_transform.component import Component
from monasca_transform.component.component_utils import ComponentUtils
from monasca_transform.component.usage.fetch_quantity import FetchQuantity
from monasca_transform.component.usage import UsageComponent
from monasca_transform.transform.transform_utils import InstanceUsageUtils
import json
class FetchQuantityUtilException(Exception):
"""Exception thrown when fetching quantity
Attributes:
value: string representing the error
"""
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class FetchQuantityUtil(UsageComponent):
@staticmethod
def _supported_fetch_quantity_util_operations():
# The results of "sum", "max", and "min" don't make sense and/or
# may be misleading (the latter two due to the metrics which are
# used as input to the utilization calculation potentially not
# being from the same time period...e.g., one being from the
# beginning of the streaming intervale and the other being from
# the end.
return ["avg", "latest", "oldest"]
@staticmethod
def _is_valid_fetch_quantity_util_operation(operation):
"""return true if its a valid fetch operation"""
if operation in FetchQuantityUtil.\
_supported_fetch_quantity_util_operations():
return True
else:
return False
@staticmethod
def _format_quantity_util(row):
"""calculate the utilized quantity based on idle percentage
quantity and convert to instance usage format
"""
#
tenant_id = getattr(row, "tenant_id", "all")
resource_uuid = getattr(row, "resource_uuid",
Component.DEFAULT_UNAVAILABLE_VALUE)
user_id = getattr(row, "user_id",
Component.DEFAULT_UNAVAILABLE_VALUE)
geolocation = getattr(row, "geolocation",
Component.DEFAULT_UNAVAILABLE_VALUE)
region = getattr(row, "region", Component.DEFAULT_UNAVAILABLE_VALUE)
zone = getattr(row, "zone", Component.DEFAULT_UNAVAILABLE_VALUE)
host = getattr(row, "host", "all")
usage_date = getattr(row, "usage_date",
Component.DEFAULT_UNAVAILABLE_VALUE)
usage_hour = getattr(row, "usage_hour",
Component.DEFAULT_UNAVAILABLE_VALUE)
usage_minute = getattr(row, "usage_minute",
Component.DEFAULT_UNAVAILABLE_VALUE)
aggregated_metric_name = getattr(row, "aggregated_metric_name",
Component.DEFAULT_UNAVAILABLE_VALUE)
# get utilized quantity
quantity = row.utilized_quantity
firstrecord_timestamp_unix = \
getattr(row, "firstrecord_timestamp_unix",
Component.DEFAULT_UNAVAILABLE_VALUE)
firstrecord_timestamp_string = \
getattr(row, "firstrecord_timestamp_string",
Component.DEFAULT_UNAVAILABLE_VALUE)
lastrecord_timestamp_unix = \
getattr(row, "lastrecord_timestamp_unix",
Component.DEFAULT_UNAVAILABLE_VALUE)
lastrecord_timestamp_string = \
getattr(row, "lastrecord_timestamp_string",
Component.DEFAULT_UNAVAILABLE_VALUE)
record_count = getattr(row, "record_count",
Component.DEFAULT_UNAVAILABLE_VALUE)
# service id
service_group = Component.DEFAULT_UNAVAILABLE_VALUE
service_id = Component.DEFAULT_UNAVAILABLE_VALUE
# aggregation period
aggregation_period = Component.DEFAULT_UNAVAILABLE_VALUE
instance_usage_dict = {"tenant_id": tenant_id, "user_id": user_id,
"resource_uuid": resource_uuid,
"geolocation": geolocation, "region": region,
"zone": zone, "host": host,
"aggregated_metric_name":
aggregated_metric_name,
"quantity": quantity,
"firstrecord_timestamp_unix":
firstrecord_timestamp_unix,
"firstrecord_timestamp_string":
firstrecord_timestamp_string,
"lastrecord_timestamp_unix":
lastrecord_timestamp_unix,
"lastrecord_timestamp_string":
lastrecord_timestamp_string,
"record_count": record_count,
"service_group": service_group,
"service_id": service_id,
"usage_date": usage_date,
"usage_hour": usage_hour,
"usage_minute": usage_minute,
"aggregation_period": aggregation_period}
instance_usage_data_json = json.dumps(instance_usage_dict)
return instance_usage_data_json
@staticmethod
def usage(transform_context, record_store_df):
"""component which groups together record store records by
provided group by columns list, sorts within the group by event
timestamp field, applies group stats udf and returns the latest
quantity as a instance usage dataframe
This component does groups records by event_type (a.k.a metric name)
and expects two kinds of records in record_store data
total quantity records - the total available quantity
e.g. cpu.total_logical_cores
idle perc records - percentage that is idle
e.g. cpu.idle_perc
To calculate the utilized quantity this component uses following
formula:
utilized quantity = ceil((100 - idle_perc) * total_quantity / 100)
"""
sql_context = SQLContext.getOrCreate(record_store_df.rdd.context)
transform_spec_df = transform_context.transform_spec_df_info
# get rollup operation (sum, max, avg, min)
agg_params = transform_spec_df.select(
"aggregation_params_map.usage_fetch_operation"). \
collect()[0].asDict()
usage_fetch_operation = agg_params["usage_fetch_operation"]
# check if operation is valid
if not FetchQuantityUtil. \
_is_valid_fetch_quantity_util_operation(usage_fetch_operation):
raise FetchQuantityUtilException(
"Operation %s is not supported" % usage_fetch_operation)
# get the quantities for idle perc and quantity
instance_usage_df = FetchQuantity().usage(
transform_context, record_store_df)
# get aggregation period for instance usage dataframe
agg_params = transform_spec_df.select(
"aggregation_params_map.aggregation_period").collect()[0].asDict()
aggregation_period = agg_params["aggregation_period"]
group_by_period_list = ComponentUtils.\
_get_instance_group_by_period_list(aggregation_period)
# get what we want to group by
agg_params = transform_spec_df.select(
"aggregation_params_map.aggregation_group_by_list").\
collect()[0].asDict()
aggregation_group_by_list = agg_params["aggregation_group_by_list"]
# group by columns list
group_by_columns_list = group_by_period_list + \
aggregation_group_by_list
# get quantity event type
agg_params = transform_spec_df.select(
"aggregation_params_map.usage_fetch_util_quantity_event_type").\
collect()[0].asDict()
usage_fetch_util_quantity_event_type = \
agg_params["usage_fetch_util_quantity_event_type"]
# check if driver parameter is provided
if usage_fetch_util_quantity_event_type is None or \
usage_fetch_util_quantity_event_type == "":
raise FetchQuantityUtilException(
"Driver parameter '%s' is missing"
% "usage_fetch_util_quantity_event_type")
# get idle perc event type
agg_params = transform_spec_df.select(
"aggregation_params_map.usage_fetch_util_idle_perc_event_type").\
collect()[0].asDict()
usage_fetch_util_idle_perc_event_type = \
agg_params["usage_fetch_util_idle_perc_event_type"]
# check if driver parameter is provided
if usage_fetch_util_idle_perc_event_type is None or \
usage_fetch_util_idle_perc_event_type == "":
raise FetchQuantityUtilException(
"Driver parameter '%s' is missing"
% "usage_fetch_util_idle_perc_event_type")
# get quantity records dataframe
event_type_quantity_clause = "processing_meta.event_type='%s'" \
% usage_fetch_util_quantity_event_type
quantity_df = instance_usage_df.select('*').where(
event_type_quantity_clause).alias("quantity_df_alias")
# get idle perc records dataframe
event_type_idle_perc_clause = "processing_meta.event_type='%s'" \
% usage_fetch_util_idle_perc_event_type
idle_perc_df = instance_usage_df.select('*').where(
event_type_idle_perc_clause).alias("idle_perc_df_alias")
# join quantity records with idle perc records
# create a join condition without the event_type
cond = [item for item in group_by_columns_list
if item != 'event_type']
quant_idle_perc_df = quantity_df.join(idle_perc_df, cond, 'left')
#
# Find utilized quantity based on idle percentage
#
# utilized quantity = (100 - idle_perc) * total_quantity / 100
#
quant_idle_perc_calc_df = quant_idle_perc_df.select(
col("quantity_df_alias.*"),
when(col("idle_perc_df_alias.quantity")
!= 0.0,
ceil(((100.0 - col("idle_perc_df_alias.quantity")))
* col("quantity_df_alias.quantity") / 100.0))
.otherwise(col("quantity_df_alias.quantity"))
.alias("utilized_quantity"),
col("quantity_df_alias.quantity")
.alias("total_quantity"),
col("idle_perc_df_alias.quantity")
.alias("idle_perc"))
instance_usage_json_rdd = \
quant_idle_perc_calc_df.rdd.map(
FetchQuantityUtil._format_quantity_util)
instance_usage_df = \
InstanceUsageUtils.create_df_from_json_rdd(sql_context,
instance_usage_json_rdd)
return instance_usage_df

View File

View File

@ -0,0 +1,105 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
class ConfigInitializer(object):
@staticmethod
def basic_config(default_config_files=None):
ConfigInitializer.load_repositories_options()
ConfigInitializer.load_database_options()
ConfigInitializer.load_messaging_options()
ConfigInitializer.load_service_options()
if not default_config_files:
default_config_files = ['/etc/monasca-transform.conf',
'etc/monasca-transform.conf']
cfg.CONF(args=[],
project='monasca_transform',
default_config_files=default_config_files)
@staticmethod
def load_repositories_options():
repo_opts = [
cfg.StrOpt(
'offsets',
default='monasca_transform.offset_specs:JSONOffsetSpecs',
help='Repository for offset persistence'
),
cfg.StrOpt(
'data_driven_specs',
default='monasca_transform.data_driven_specs.'
'json_data_driven_specs_repo:JSONDataDrivenSpecsRepo',
help='Repository for metric and event data_driven_specs'
)
]
repo_group = cfg.OptGroup(name='repositories', title='repositories')
cfg.CONF.register_group(repo_group)
cfg.CONF.register_opts(repo_opts, group=repo_group)
@staticmethod
def load_database_options():
db_opts = [
cfg.StrOpt('server_type'),
cfg.StrOpt('host'),
cfg.StrOpt('database_name'),
cfg.StrOpt('username'),
cfg.StrOpt('password')
]
mysql_group = cfg.OptGroup(name='database', title='database')
cfg.CONF.register_group(mysql_group)
cfg.CONF.register_opts(db_opts, group=mysql_group)
@staticmethod
def load_messaging_options():
messaging_options = [
cfg.StrOpt('adapter',
default='monasca_transform.messaging.adapter:'
'KafkaMessageAdapter',
help='Message adapter implementation'),
cfg.StrOpt('topic', default='metrics',
help='Messaging topic'),
cfg.StrOpt('brokers',
default='192.168.10.4:9092',
help='Messaging brokers'),
cfg.StrOpt('publish_kafka_tenant_id',
default='111111',
help='publish aggregated metrics tenant')
]
messaging_group = cfg.OptGroup(name='messaging', title='messaging')
cfg.CONF.register_group(messaging_group)
cfg.CONF.register_opts(messaging_options, group=messaging_group)
@staticmethod
def load_service_options():
service_opts = [
cfg.StrOpt('coordinator_address'),
cfg.StrOpt('coordinator_group'),
cfg.FloatOpt('election_polling_frequency'),
cfg.StrOpt('setup_file'),
cfg.StrOpt('setup_target'),
cfg.StrOpt('spark_driver'),
cfg.StrOpt('service_log_path'),
cfg.StrOpt('spark_event_logging_dest'),
cfg.StrOpt('spark_event_logging_enabled'),
cfg.StrOpt('spark_jars_list'),
cfg.StrOpt('spark_master_list'),
cfg.StrOpt('spark_python_files'),
cfg.IntOpt('stream_interval'),
cfg.StrOpt('work_dir')
]
service_group = cfg.OptGroup(name='service', title='service')
cfg.CONF.register_group(service_group)
cfg.CONF.register_opts(service_opts, group=service_group)

View File

@ -0,0 +1,43 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo_config import cfg
import simport
import six
class DataDrivenSpecsRepoFactory(object):
data_driven_specs_repo = None
@staticmethod
def get_data_driven_specs_repo():
if not DataDrivenSpecsRepoFactory.data_driven_specs_repo:
DataDrivenSpecsRepoFactory.data_driven_specs_repo = simport.load(
cfg.CONF.repositories.data_driven_specs)()
return DataDrivenSpecsRepoFactory.data_driven_specs_repo
@six.add_metaclass(abc.ABCMeta)
class DataDrivenSpecsRepo(object):
transform_specs_type = 'transform_specs'
pre_transform_specs_type = 'pre_transform_specs'
@abc.abstractmethod
def get_data_driven_specs(self, sql_context=None, type=None):
raise NotImplementedError(
"Class %s doesn't implement get_data_driven_specs(self, type=None)"
% self.__class__.__name__)

View File

@ -0,0 +1,45 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepo
class JSONDataDrivenSpecsRepo(DataDrivenSpecsRepo):
def __init__(self, common_file_system_stub_path=None):
self._common_file_system_stub_path = common_file_system_stub_path or ''
def get_data_driven_specs(self, sql_context=None,
data_driven_spec_type=None):
path = None
if data_driven_spec_type == self.transform_specs_type:
path = (os.path.join(
self._common_file_system_stub_path,
"monasca_transform/data_driven_specs/"
"transform_specs/transform_specs.json"
))
elif data_driven_spec_type == self.pre_transform_specs_type:
path = (os.path.join(
self._common_file_system_stub_path,
"monasca_transform/data_driven_specs/"
"pre_transform_specs/pre_transform_specs.json"
))
if os.path.exists(path):
# read file to json
return sql_context.read.json(path)

View File

@ -0,0 +1,93 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from oslo_config import cfg
from pyspark.sql import DataFrameReader
from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepo
class MySQLDataDrivenSpecsRepo(DataDrivenSpecsRepo):
transform_specs_data_frame = None
pre_transform_specs_data_frame = None
def __init__(self):
self.database_impl = cfg.CONF.database.server_type
self.database_name = cfg.CONF.database.database_name
self.database_server = cfg.CONF.database.host
self.database_uid = cfg.CONF.database.username
self.database_pwd = cfg.CONF.database.password
def get_connection_string(self):
# FIXME I don't like this, find a better way of managing the conn
return 'jdbc:%s://%s/%s?user=%s&password=%s' % (
self.database_impl,
self.database_server,
self.database_name,
self.database_uid,
self.database_pwd
)
def get_data_driven_specs(self, sql_context=None,
data_driven_spec_type=None):
data_driven_spec = None
if self.transform_specs_type == data_driven_spec_type:
if not self.transform_specs_data_frame:
self.generate_transform_specs_data_frame(
spark_context=sql_context._sc,
sql_context=sql_context)
data_driven_spec = self.transform_specs_data_frame
elif self.pre_transform_specs_type == data_driven_spec_type:
if not self.pre_transform_specs_data_frame:
self.generate_pre_transform_specs_data_frame(
spark_context=sql_context._sc,
sql_context=sql_context)
data_driven_spec = self.pre_transform_specs_data_frame
return data_driven_spec
def generate_transform_specs_data_frame(self, spark_context=None,
sql_context=None):
data_frame_reader = DataFrameReader(sql_context)
transform_specs_data_frame = data_frame_reader.jdbc(
self.get_connection_string(),
'transform_specs'
)
data = []
for item in transform_specs_data_frame.collect():
spec = json.loads(item['transform_spec'])
data.append(json.dumps(spec))
data_frame = sql_context.jsonRDD(spark_context.parallelize(data))
self.transform_specs_data_frame = data_frame
def generate_pre_transform_specs_data_frame(self, spark_context=None,
sql_context=None):
data_frame_reader = DataFrameReader(sql_context)
pre_transform_specs_data_frame = data_frame_reader.jdbc(
self.get_connection_string(),
'pre_transform_specs'
)
data = []
for item in pre_transform_specs_data_frame.collect():
spec = json.loads(item['pre_transform_spec'])
data.append(json.dumps(spec))
data_frame = sql_context.jsonRDD(spark_context.parallelize(data))
self.pre_transform_specs_data_frame = data_frame

View File

@ -0,0 +1,9 @@
{"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"mem.total_mb","metric_id_list":["mem_total_all"],"required_raw_fields_list":["creation_time"],"service_id":"host_metrics"}
{"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"mem.usable_mb","metric_id_list":["mem_usable_all"],"required_raw_fields_list":["creation_time"],"service_id":"host_metrics"}
{"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"disk.total_space_mb","metric_id_list":["disk_total_all"],"required_raw_fields_list":["creation_time"],"service_id":"host_metrics"}
{"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"disk.total_used_space_mb","metric_id_list":["disk_usable_all"],"required_raw_fields_list":["creation_time"],"service_id":"host_metrics"}
{"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"nova.vm.disk.total_allocated_gb","metric_id_list":["disk_allocated_all"],"required_raw_fields_list":["creation_time"],"service_id":"host_metrics"}
{"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"cpu.total_logical_cores","metric_id_list":["cpu_total_all","cpu_util_all"],"required_raw_fields_list":["creation_time"],"service_id":"host_metrics"}
{"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"cpu.idle_perc","metric_id_list":["cpu_util_all"],"required_raw_fields_list":["creation_time"],"service_id":"host_metrics"}
{"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"vcpus","metric_id_list":["vcpus_all","vcpus_project"],"required_raw_fields_list":["creation_time","project_id"],"service_id":"host_metrics"}
{"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"vm.mem.total_mb","metric_id_list":["vm_mem_total_mb_all","vm_mem_total_mb_project"],"required_raw_fields_list":["creation_time","tenantId"],"service_id":"host_metrics"}

View File

@ -0,0 +1,11 @@
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"mem_total_all","metric_id":"mem_total_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"mem.usable_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"mem_usable_all","metric_id":"mem_usable_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"disk.total_space_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"disk_total_all","metric_id":"disk_total_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"disk.total_used_space_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"disk_usable_all","metric_id":"disk_usable_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"nova.vm.disk.total_allocated_gb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"disk_allocated_all","metric_id":"disk_allocated_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"cpu.total_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"cpu_total_all","metric_id":"cpu_total_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity_util","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"cpu.utilized_logical_cores_agg","aggregation_period":"hourly","aggregation_group_by_list": ["event_type", "host"],"usage_fetch_operation": "avg","usage_fetch_util_quantity_event_type": "cpu.total_logical_cores","usage_fetch_util_idle_perc_event_type": "cpu.idle_perc","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"cpu_util_all","metric_id":"cpu_util_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vcpus_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vcpus_all","metric_id":"vcpus_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vcpus_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vcpus_project","metric_id":"vcpus_project"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": [],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_mem_total__mb_all","metric_id":"vm_mem_total_mb_all"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],"insert":["prepare_data","insert_data"]},"aggregated_metric_name":"vm.mem.total_mb_agg","aggregation_period":"hourly","aggregation_group_by_list": ["host", "metric_id", "tenant_id"],"usage_fetch_operation": "avg","setter_rollup_group_by_list": ["tenant_id"],"setter_rollup_operation": "sum","dimension_list":["aggregation_period","host","project_id"]},"metric_group":"vm_mem_total_mb_project","metric_id":"vm_mem_total_mb_project"}

View File

View File

@ -0,0 +1,522 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import TopicAndPartition
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import lit
from pyspark.sql.functions import when
from pyspark.sql import SQLContext
import json
import logging
from oslo_config import cfg
import simport
import time
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.transform.builder.generic_transform_builder \
import GenericTransformBuilder
from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepo
from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepoFactory
from monasca_transform.transform import RddTransformContext
from monasca_transform.transform.transform_utils import MonMetricUtils
from monasca_transform.transform import TransformContextUtils
ConfigInitializer.basic_config()
# initialize logger
log = logging.getLogger(__name__)
_h = logging.FileHandler('%s/monasca_transform.log' %
cfg.CONF.service.service_log_path)
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
"%(lineno)s - %(levelname)s - %(message)s'"))
log.addHandler(_h)
log.setLevel(logging.DEBUG)
class MonMetricsKafkaProcessor(object):
@staticmethod
def log_debug(message):
print(message)
log.debug(message)
@staticmethod
def store_offset_ranges(rdd):
if rdd.isEmpty():
MonMetricsKafkaProcessor.log_debug(
"storeOffsetRanges: nothing to process...")
return rdd
else:
my_offset_ranges = rdd.offsetRanges()
transform_context = \
TransformContextUtils.get_context(offset_info=my_offset_ranges)
rdd_transform_context = \
rdd.map(lambda x: RddTransformContext(x, transform_context))
return rdd_transform_context
@staticmethod
def print_offset_ranges(my_offset_ranges):
for o in my_offset_ranges:
print("printOffSetRanges: %s %s %s %s" % (
o.topic, o.partition, o.fromOffset, o.untilOffset))
@staticmethod
def get_kafka_stream(topic, streaming_context):
offset_specifications = simport.load(cfg.CONF.repositories.offsets)()
saved_offset_spec = offset_specifications.get_kafka_offsets()
app_name = streaming_context.sparkContext.appName
if len(saved_offset_spec) < 1:
MonMetricsKafkaProcessor.log_debug(
"No saved offsets available..."
"connecting to kafka without specifying offsets")
kvs = KafkaUtils.createDirectStream(
streaming_context, [topic],
{"metadata.broker.list": cfg.CONF.messaging.brokers})
return kvs
else:
from_offsets = {}
for key, value in saved_offset_spec.items():
if key.startswith("%s_%s" % (app_name, topic)):
# spec_app_name = value.get_app_name()
spec_topic = value.get_topic()
spec_partition = int(value.get_partition())
# spec_from_offset = value.get_from_offset()
spec_until_offset = value.get_until_offset()
# composite_key = "%s_%s_%s" % (spec_app_name,
# spec_topic,
# spec_partition)
# partition = saved_offset_spec[composite_key]
from_offsets[
TopicAndPartition(spec_topic, spec_partition)
] = long(spec_until_offset)
MonMetricsKafkaProcessor.log_debug(
"get_kafka_stream: calling createDirectStream :"
" topic:{%s} : start " % topic)
for key, value in from_offsets.items():
MonMetricsKafkaProcessor.log_debug(
"get_kafka_stream: calling createDirectStream : "
"offsets : TopicAndPartition:{%s,%s}, value:{%s}" %
(str(key._topic), str(key._partition), str(value)))
MonMetricsKafkaProcessor.log_debug(
"get_kafka_stream: calling createDirectStream : "
"topic:{%s} : done" % topic)
kvs = KafkaUtils.createDirectStream(
streaming_context, [topic],
{"metadata.broker.list": cfg.CONF.messaging.brokers},
from_offsets)
return kvs
@staticmethod
def save_rdd_contents(rdd):
file_name = "".join((
"/vagrant_home/uniq_metrics",
'-', time.strftime("%Y-%m-%d-%H-%M-%S"),
'-', str(rdd.id),
'.log'))
rdd.saveAsTextFile(file_name)
@staticmethod
def print_unique_metric_count(kvs):
# print unique metric count
lines = kvs.map(lambda x: x[1])
counts = lines.map(
lambda x: json.loads(x)["metric"]["name"]
).map(
lambda name: (name, 1)
).reduceByKey(
lambda a, b: a + b)
counts.pprint(9999)
@staticmethod
def save_kafka_offsets(current_offsets, app_name):
"""save current offsets to offset specification."""
# get the offsets from global var
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
for o in current_offsets:
MonMetricsKafkaProcessor.log_debug(
"adding offset: topic:{%s}, partition:{%s}, fromOffset:{%s}, "
"untilOffset:{%s}" % (
o.topic, o.partition, o.fromOffset, o.untilOffset))
offset_specs.add(
app_name, o.topic, o.partition, o.fromOffset, o.untilOffset)
@staticmethod
def reset_kafka_offsets():
"""delete all offsets from the offset specification."""
# get the offsets from global var
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
offset_specs.delete_all_kafka_offsets()
@staticmethod
def _validate_raw_mon_metrics(row):
required_fields = row.required_raw_fields_list
invalid_list = []
for required_field in required_fields:
required_field_value = None
# Look for the field in the first layer of the row
try:
required_field_value = eval(".".join(("row", required_field)))
except Exception:
pass
if (required_field_value is None or required_field_value == "" and
row.metric is not None and
row.metric.dimensions is not None):
# Look for the field in the dimensions layer of the row
try:
required_field_value = eval(
".".join(("row.metric.dimensions", required_field)))
except Exception:
pass
if (required_field_value is None or required_field_value == "" and
row.meta is not None):
# Look for the field in the meta layer of the row
try:
required_field_value = eval(
".".join(("row.meta", required_field)))
except Exception:
pass
if required_field_value is None \
or required_field_value == "":
invalid_list.append("invalid")
if len(invalid_list) <= 0:
return row
else:
print("_validate_raw_mon_metrics : found invalid : ** %s: %s" % (
(".".join(("row", required_field))),
required_field_value))
@staticmethod
def process_metric(transform_context, record_store_df):
"""process (aggregate) metric data from record_store data
All the parameters to drive processing should be available
in transform_spec_df dataframe.
"""
# call processing chain
GenericTransformBuilder.do_transform(transform_context,
record_store_df)
@staticmethod
def process_metrics(transform_context, record_store_df):
"""start processing (aggregating) metrics
"""
#
# look in record_store_df for list of metrics to be processed
#
metric_ids_df = record_store_df.select("metric_id").distinct()
metric_ids_to_process = [row.metric_id
for row in metric_ids_df.collect()]
data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
get_data_driven_specs_repo()
sqlc = SQLContext.getOrCreate(record_store_df.rdd.context)
transform_specs_df = data_driven_specs_repo.get_data_driven_specs(
sql_context=sqlc,
data_driven_spec_type=DataDrivenSpecsRepo.transform_specs_type)
for metric_id in metric_ids_to_process:
transform_spec_df = transform_specs_df.select(
["aggregation_params_map", "metric_id"]
).where(transform_specs_df.metric_id == metric_id)
source_record_store_df = record_store_df.select("*").where(
record_store_df.metric_id == metric_id)
# set transform_spec_df in TransformContext
transform_context = \
TransformContextUtils.get_context(
transform_context_info=transform_context,
transform_spec_df_info=transform_spec_df)
MonMetricsKafkaProcessor.process_metric(
transform_context, source_record_store_df)
@staticmethod
def rdd_to_recordstore(rdd_transform_context_rdd):
if rdd_transform_context_rdd.isEmpty():
MonMetricsKafkaProcessor.log_debug(
"rdd_to_recordstore: nothing to process...")
else:
sql_context = SQLContext(rdd_transform_context_rdd.context)
data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
get_data_driven_specs_repo()
pre_transform_specs_df = data_driven_specs_repo.\
get_data_driven_specs(
sql_context=sql_context,
data_driven_spec_type=DataDrivenSpecsRepo.
pre_transform_specs_type)
#
# extract second column containing raw metric data
#
raw_mon_metrics = rdd_transform_context_rdd.map(
lambda nt: nt.rdd_info[1])
#
# convert raw metric data rdd to dataframe rdd
#
raw_mon_metrics_df = \
MonMetricUtils.create_mon_metrics_df_from_json_rdd(
sql_context,
raw_mon_metrics)
#
# filter out unwanted metrics and keep metrics we are interested in
#
cond = [
raw_mon_metrics_df.metric.name ==
pre_transform_specs_df.event_type]
filtered_metrics_df = raw_mon_metrics_df.join(
pre_transform_specs_df, cond)
#
# validate filtered metrics to check if required fields
# are present and not empty
# In order to be able to apply filter function had to convert
# data frame rdd to normal rdd. After validation the rdd is
# converted back to dataframe rdd
#
# FIXME: find a way to apply filter function on dataframe rdd data
validated_mon_metrics_rdd = filtered_metrics_df.rdd.filter(
MonMetricsKafkaProcessor._validate_raw_mon_metrics)
validated_mon_metrics_df = sql_context.createDataFrame(
validated_mon_metrics_rdd, filtered_metrics_df.schema)
#
# record generator
# generate a new intermediate metric record if a given metric
# metric_id_list, in pre_transform_specs table has several
# intermediate metrics defined.
# intermediate metrics are used as a convenient way to
# process (aggregated) metric in mutiple ways by making a copy
# of the source data for each processing
#
gen_mon_metrics_df = validated_mon_metrics_df.select(
validated_mon_metrics_df.meta,
validated_mon_metrics_df.metric,
validated_mon_metrics_df.event_processing_params,
validated_mon_metrics_df.event_type,
explode(validated_mon_metrics_df.metric_id_list).alias(
"this_metric_id"),
validated_mon_metrics_df.service_id)
#
# transform metrics data to record_store format
# record store format is the common format which will serve as
# source to aggregation processing.
# converting the metric to common standard format helps in writing
# generic aggregation routines driven by configuration parameters
# and can be reused
#
record_store_df = gen_mon_metrics_df.select(
(gen_mon_metrics_df.metric.timestamp / 1000).alias(
"event_timestamp_unix"),
from_unixtime(
gen_mon_metrics_df.metric.timestamp / 1000).alias(
"event_timestamp_string"),
gen_mon_metrics_df.event_type.alias("event_type"),
gen_mon_metrics_df.event_type.alias("event_quantity_name"),
(gen_mon_metrics_df.metric.value / 1.0).alias(
"event_quantity"),
when(gen_mon_metrics_df.metric.dimensions.state != '',
gen_mon_metrics_df.metric.dimensions.state).otherwise(
'NA').alias("event_status"),
lit('1.0').alias('event_version'),
lit('metrics').alias("record_type"),
# resource_uuid
when(gen_mon_metrics_df.metric.dimensions.instanceId != '',
gen_mon_metrics_df.metric.dimensions.
instanceId).otherwise('NA').alias("resource_uuid"),
when(gen_mon_metrics_df.metric.dimensions.tenantId != '',
gen_mon_metrics_df.metric.dimensions.tenantId).when(
gen_mon_metrics_df.metric.dimensions.project_id != '',
gen_mon_metrics_df.metric.dimensions.project_id).otherwise(
'NA').alias("tenant_id"),
when(gen_mon_metrics_df.meta.userId != '',
gen_mon_metrics_df.meta.userId).otherwise('NA').alias(
"user_id"),
when(gen_mon_metrics_df.meta.region != '',
gen_mon_metrics_df.meta.region).when(
gen_mon_metrics_df.event_processing_params
.set_default_region_to != '',
gen_mon_metrics_df.event_processing_params
.set_default_region_to).otherwise(
'NA').alias("region"),
when(gen_mon_metrics_df.meta.zone != '',
gen_mon_metrics_df.meta.zone).when(
gen_mon_metrics_df.event_processing_params
.set_default_zone_to != '',
gen_mon_metrics_df.event_processing_params
.set_default_zone_to).otherwise(
'NA').alias("zone"),
when(gen_mon_metrics_df.metric.dimensions.hostname != '',
gen_mon_metrics_df.metric.dimensions.hostname).when(
gen_mon_metrics_df.metric.value_meta.host != '',
gen_mon_metrics_df.metric.value_meta.host).otherwise(
'NA').alias("host"),
when(gen_mon_metrics_df.service_id != '',
gen_mon_metrics_df.service_id).otherwise(
'NA').alias("service_group"),
when(gen_mon_metrics_df.service_id != '',
gen_mon_metrics_df.service_id).otherwise(
'NA').alias("service_id"),
from_unixtime(gen_mon_metrics_df.metric.timestamp / 1000,
'yyyy-MM-dd').alias("event_date"),
from_unixtime(gen_mon_metrics_df.metric.timestamp / 1000,
'HH').alias("event_hour"),
from_unixtime(gen_mon_metrics_df.metric.timestamp / 1000,
'mm').alias("event_minute"),
from_unixtime(gen_mon_metrics_df.metric.timestamp / 1000,
'ss').alias("event_second"),
gen_mon_metrics_df.this_metric_id.alias("metric_group"),
gen_mon_metrics_df.this_metric_id.alias("metric_id"))
#
# get transform context
#
rdd_transform_context = rdd_transform_context_rdd.first()
transform_context = rdd_transform_context.transform_context_info
#
# start processing metrics available in record_store data
#
MonMetricsKafkaProcessor.process_metrics(transform_context,
record_store_df)
#
# extract kafka offsets stored in rdd and save
#
offsets = transform_context.offset_info
for o in offsets:
MonMetricsKafkaProcessor.log_debug(
"going to save: OffSetRanges: %s %s %s %s" % (
o.topic, o.partition, o.fromOffset, o.untilOffset))
MonMetricsKafkaProcessor.save_kafka_offsets(
offsets, rdd_transform_context_rdd.context.appName)
@staticmethod
def transform_to_recordstore(kvs):
"""Transform metrics data from kafka to record store format.
extracts, validates, filters, generates data from kakfa to only keep
data that has to be aggregated. Generate data generates multiple
records for for the same incoming metric if the metric has multiple
intermediate metrics defined, so that each of intermediate metrics can
be potentially processed independently.
"""
# save offsets in global var myOffsetRanges
# http://spark.apache.org/docs/latest/streaming-kafka-integration.html
# Note that the typecast to HasOffsetRanges will only succeed if it is
# done in the first method called on the directKafkaStream, not later
# down a chain of methods. You can use transform() instead of
# foreachRDD() as your first method call in order to access offsets,
# then call further Spark methods. However, be aware that the
# one-to-one mapping between RDD partition and Kafka partition does not
# remain after any methods that shuffle or repartition,
# e.g. reduceByKey() or window()
kvs.transform(
MonMetricsKafkaProcessor.store_offset_ranges
).foreachRDD(MonMetricsKafkaProcessor.rdd_to_recordstore)
def invoke():
# object to keep track of offsets
ConfigInitializer.basic_config()
# app name
application_name = "mon_metrics_kafka"
my_spark_conf = SparkConf().setAppName(application_name)
spark_context = SparkContext(conf=my_spark_conf)
# read at the configured interval
spark_streaming_context = \
StreamingContext(spark_context, cfg.CONF.service.stream_interval)
kafka_stream = MonMetricsKafkaProcessor.get_kafka_stream(
cfg.CONF.messaging.topic,
spark_streaming_context)
# transform to recordstore
MonMetricsKafkaProcessor.transform_to_recordstore(kafka_stream)
# print unique metric count
MonMetricsKafkaProcessor.print_unique_metric_count(kafka_stream)
# catch interrupt, stop streaming context gracefully
# signal.signal(signal.SIGINT, signal_handler)
# start processing
spark_streaming_context.start()
# FIXME: stop spark context to relinquish resources
# FIXME: specify cores, so as not to use all the resources on the cluster.
# FIXME: HA deploy multiple masters, may be one on each control node
try:
# Wait for the Spark driver to "finish"
spark_streaming_context.awaitTermination()
except Exception as e:
MonMetricsKafkaProcessor.log_debug(
"Exception raised during Spark execution : " + str(e))
# One exception that can occur here is the result of the saved
# kafka offsets being obsolete/out of range. Delete the saved
# offsets to improve the chance of success on the next execution.
MonMetricsKafkaProcessor.log_debug(
"Deleting saved offsets for chance of success on next execution")
MonMetricsKafkaProcessor.reset_kafka_offsets()
if __name__ == "__main__":
invoke()

View File

@ -0,0 +1,33 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
class LogUtils(object):
"""util methods for logging"""
@staticmethod
def log_debug(message):
log = logging.getLogger(__name__)
print(message)
log.debug(message)
@staticmethod
def who_am_i(obj):
sep = "*" * 10
debugstr = "\n".join((sep, "name: %s " % type(obj).__name__))
debugstr = "\n".join((debugstr, "type: %s" % (type(obj))))
debugstr = "\n".join((debugstr, "dir: %s" % (dir(obj)), sep))
LogUtils.log_debug(debugstr)

View File

View File

@ -0,0 +1,57 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import json
from kafka import KafkaClient
from kafka import SimpleProducer
from oslo_config import cfg
import simport
class MessageAdapter(object):
adapter_impl = None
@staticmethod
def init():
# object to keep track of offsets
MessageAdapter.adapter_impl = simport.load(
cfg.CONF.messaging.adapter)()
@staticmethod
def send_metric(metric):
if not MessageAdapter.adapter_impl:
MessageAdapter.init()
MessageAdapter.adapter_impl.do_send_metric(metric)
@abc.abstractmethod
def do_send_metric(self, metric):
raise NotImplementedError(
"Class %s doesn't implement do_send_metric(self, metric)"
% self.__class__.__name__)
class KafkaMessageAdapter(MessageAdapter):
def __init__(self):
client_for_writing = KafkaClient(cfg.CONF.messaging.brokers)
self.producer = SimpleProducer(client_for_writing)
self.topic = cfg.CONF.messaging.topic
def do_send_metric(self, metric):
self.producer.send_messages(
self.topic,
json.dumps(metric, separators=(',', ':')))
return

View File

@ -0,0 +1,86 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from sqlalchemy import create_engine
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import sessionmaker
from monasca_transform.offset_specs import OffsetSpec
from monasca_transform.offset_specs import OffsetSpecs
Base = automap_base()
class MySQLOffsetSpec(Base, OffsetSpec):
__tablename__ = 'kafka_offsets'
class MySQLOffsetSpecs(OffsetSpecs):
def __init__(self):
database_name = cfg.CONF.database.database_name
database_server = cfg.CONF.database.host
database_uid = cfg.CONF.database.username
database_pwd = cfg.CONF.database.password
db = create_engine('mysql+pymysql://%s:%s@%s/%s' % (
database_uid,
database_pwd,
database_server,
database_name
), isolation_level="READ UNCOMMITTED")
db.echo = True
# reflect the tables
Base.prepare(db, reflect=True)
Session = sessionmaker(bind=db)
self.session = Session()
def add(self, app_name, topic, partition,
from_offset, until_offset):
try:
offset_spec = self.session.query(MySQLOffsetSpec).filter_by(
app_name=app_name, topic=topic,
partition=partition).one()
offset_spec.from_offset = from_offset
offset_spec.until_offset = until_offset
self.session.commit()
except NoResultFound:
offset_spec = MySQLOffsetSpec(
topic=topic,
app_name=app_name,
partition=partition,
from_offset=from_offset,
until_offset=until_offset)
self.session.add(offset_spec)
self.session.commit()
def get_kafka_offsets(self):
return {'%s_%s_%s' % (
offset.get_app_name(), offset.get_topic(), offset.get_partition()
): offset for offset in self.session.query(MySQLOffsetSpec).all()}
def delete_all_kafka_offsets(self):
try:
self.session.query(MySQLOffsetSpec).delete()
self.session.commit()
except Exception:
# Seems like there isn't much that can be done in this situation
pass

View File

@ -0,0 +1,149 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import json
import logging
import os
import six
log = logging.getLogger(__name__)
class OffsetSpec(object):
def __init__(self, app_name=None, topic=None, partition=None,
from_offset=None, until_offset=None):
self.app_name = app_name
self.topic = topic
self.partition = partition
self.from_offset = from_offset
self.until_offset = until_offset
def get_app_name(self):
return self.app_name
def get_topic(self):
return self.topic
def get_partition(self):
return self.partition
def get_from_offset(self):
return self.from_offset
def get_until_offset(self):
return self.until_offset
@six.add_metaclass(abc.ABCMeta)
class OffsetSpecs(object):
"""Class representing offset specs to help recover.
From where processing should pick up in case of failure
"""
@abc.abstractmethod
def add(self, app_name, topic, partition,
from_offset, until_offset):
raise NotImplementedError(
"Class %s doesn't implement add(self, app_name, topic, "
"partition, from_offset, until_offset)"
% self.__class__.__name__)
@abc.abstractmethod
def get_kafka_offsets(self):
raise NotImplementedError(
"Class %s doesn't implement get_kafka_offsets()"
% self.__class__.__name__)
@abc.abstractmethod
def delete_all_kafka_offsets(self):
raise NotImplementedError(
"Class %s doesn't implement delete_all_kafka_offsets()"
% self.__class__.__name__)
class JSONOffsetSpecs(OffsetSpecs):
def __init__(self, path=None, filename=None):
self.kafka_offset_spec_file = os.path.join(
(path or "/tmp/"), (filename or 'kafka_offset_specs.json'))
self._kafka_offsets = {}
if os.path.exists(self.kafka_offset_spec_file):
try:
f = open(self.kafka_offset_spec_file)
kafka_offset_dict = json.load(f)
for key, value in kafka_offset_dict.items():
log.info("Found offset %s: %s", key, value)
self._kafka_offsets[key] = OffsetSpec(
app_name=value.get('app_name'),
topic=value.get('topic'),
until_offset=value.get('until_offset'),
from_offset=value.get('from_offset'),
partition=value.get('partition')
)
except Exception:
log.info('Invalid or corrupts offsets file found at %s,'
' starting over' % self.kafka_offset_spec_file)
else:
log.info('No kafka offsets found at startup')
def _save(self):
"""get the specs of last run time of offset
"""
log.info("Saving json offsets: %s", self._kafka_offsets)
with open(self.kafka_offset_spec_file, 'w') as offset_file:
offset_file.write('{')
# json_values = []
# for key, value in self._kafka_offsets.items():
# json_values.append({key: })
offset_file.write(','.join(
['\"%s\": %s' % (key, json.dumps(self.as_dict(value)))
for key, value in self._kafka_offsets.items()]))
offset_file.write('}')
@staticmethod
def as_dict(offset_value):
return {"app_name": offset_value.get_app_name(),
"topic": offset_value.get_topic(),
"partition": offset_value.get_partition(),
"from_offset": offset_value.get_from_offset(),
"until_offset": offset_value.get_until_offset()}
def add(self, app_name, topic, partition,
from_offset, until_offset):
key_name = "%s_%s_%s" % (
app_name, topic, partition)
offset = OffsetSpec(
app_name=app_name,
topic=topic,
partition=partition,
from_offset=from_offset,
until_offset=until_offset
)
log.info('Adding offset %s for key %s to current offsets: %s' %
(offset, key_name, self._kafka_offsets))
self._kafka_offsets[key_name] = offset
log.info('Added so kafka offset is now %s', self._kafka_offsets)
self._save()
def get_kafka_offsets(self):
return self._kafka_offsets
def delete_all_kafka_offsets(self):
log.info("Deleting json offsets file: %s", self.kafka_offset_spec_file)
os.remove(self.kafka_offset_spec_file)

View File

View File

@ -0,0 +1,181 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import signal
import socket
from subprocess import call
import sys
import threading
import time
from oslo_config import cfg
from oslo_log import log
from oslo_service import service as os_service
from tooz import coordination
from monasca_transform.config.config_initializer import ConfigInitializer
LOG = log.getLogger(__name__)
log.register_options(cfg.CONF)
log.set_defaults()
log.setup(cfg.CONF, 'transform')
CONF = cfg.CONF
def main():
transform_service = TransformService()
transform_service.start()
def shutdown_all_threads_and_die():
"""Shut down all threads and exit process.
Hit it with a hammer to kill all threads and die.
"""
LOG.info('Monasca Transform service stopping...')
os._exit(1)
class Transform(os_service.Service):
"""Class used with Openstack service.
"""
def __init__(self, threads=1):
super(Transform, self).__init__(threads)
def signal_handler(self, signal_number, stack_frame):
# Catch stop requests and appropriately shut down
shutdown_all_threads_and_die()
def start(self):
try:
# Register to catch stop requests
signal.signal(signal.SIGTERM, self.signal_handler)
main()
except Exception:
LOG.exception('Monasca Transform service encountered fatal error. '
'Shutting down all threads and exiting')
shutdown_all_threads_and_die()
class TransformService(threading.Thread):
previously_running = False
# A unique name used for establishing election candidacy
my_host_name = socket.getfqdn()
def __init__(self):
super(TransformService, self).__init__()
def when_i_am_elected_leader(self, event):
try:
LOG.info('Monasca Transform service running on ' +
self.my_host_name + ' has been elected leader')
self.previously_running = True
if CONF.service.spark_python_files:
pyfiles = " --py-files %s" % CONF.service.spark_python_files
else:
pyfiles = ''
# Build the command to start the Spark driver
spark_cmd = ("export SPARK_HOME=/opt/spark/current && "
"spark-submit --supervise --master " +
CONF.service.spark_master_list +
" --conf spark.eventLog.enabled=" +
CONF.service.spark_event_logging_enabled +
" --conf spark.eventLog.dir=file://" +
CONF.service.spark_event_logging_dest +
" --jars " + CONF.service.spark_jars_list +
pyfiles +
" " + CONF.service.spark_driver)
# Start the Spark driver (specify shell=True in order to
# correctly handle wildcards in the spark_cmd
call(spark_cmd, shell=True)
except Exception:
LOG.exception(
'TransformService on ' + self.my_host_name +
' encountered fatal exception. '
'Shutting down all threads and exiting')
shutdown_all_threads_and_die()
def run(self):
LOG.info('The host of this Monasca Transform service is ' +
self.my_host_name)
# Loop until the service is stopped
while True:
self.previously_running = False
# Start an election coordinator
coordinator = coordination.get_coordinator(
CONF.service.coordinator_address, self.my_host_name)
coordinator.start()
# Create a coordination/election group
group = CONF.service.coordinator_group
try:
request = coordinator.create_group(group)
request.get()
except coordination.GroupAlreadyExist:
LOG.info('Group %s already exists' % group)
# Join the coordination/election group
try:
request = coordinator.join_group(group)
request.get()
except coordination.MemberAlreadyExist:
LOG.info('Host already joined to group %s as %s' %
(group, self.my_host_name))
# Announce the candidacy and wait to be elected
coordinator.watch_elected_as_leader(group,
self.when_i_am_elected_leader)
while self.previously_running is False:
LOG.info('Monasca Transform service on %s is checking'
' election results...' % self.my_host_name)
coordinator.heartbeat()
coordinator.run_watchers()
if self.previously_running is True:
# Leave/exit the coordination/election group
request = coordinator.leave_group(group)
request.get()
time.sleep(float(CONF.service.election_polling_frequency))
coordinator.stop()
def main_service():
"""Method to use with Openstack service.
"""
ConfigInitializer.basic_config()
launcher = os_service.ServiceLauncher(CONF)
launcher.launch_service(Transform())
launcher.wait()
# Used if run without Openstack service.
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,82 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
TransformContextBase = namedtuple("TransformContext",
["config_info",
"offset_info",
"transform_spec_df_info"])
class TransformContext(TransformContextBase):
"""A tuple which contains all the configuration information
to drive processing
namedtuple contains:
config_info - configuration information from oslo config
offset_info - current kafka offset information
transform_spec_df - processing information from
transform_spec aggregation driver table
"""
RddTransformContextBase = namedtuple("RddTransformContext",
["rdd_info",
"transform_context_info"])
class RddTransformContext(RddTransformContextBase):
"""A tuple which is a wrapper containing the RDD and transform_context
namdetuple contains:
rdd_info - rdd
transform_context_info - transform context
"""
class TransformContextUtils(object):
"""utility method to get TransformContext"""
@staticmethod
def get_context(transform_context_info=None,
config_info=None,
offset_info=None,
transform_spec_df_info=None):
if transform_context_info is None:
return TransformContext(config_info,
offset_info,
transform_spec_df_info)
else:
if config_info is None or config_info == "":
# get from passed in transform_context
config_info = transform_context_info.config_info
if offset_info is None or offset_info == "":
# get from passed in transform_context
offset_info = transform_context_info.offset_info
if transform_spec_df_info is None or \
transform_spec_df_info == "":
# get from passed in transform_context
transform_spec_df_info = \
transform_context_info.transform_spec_df_info
return TransformContext(config_info,
offset_info,
transform_spec_df_info)

View File

@ -0,0 +1,130 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import extension
from monasca_transform.log_utils import LogUtils
class GenericTransformBuilder (object):
"""Build transformation pipeline based on
aggregation_pipeline spec in metric processing
configuration
"""
_MONASCA_TRANSFORM_USAGE_NAMESPACE = 'monasca_transform.usage'
_MONASCA_TRANSFORM_SETTER_NAMESPACE = 'monasca_transform.setter'
_MONASCA_TRANSFORM_INSERT_NAMESPACE = 'monasca_transform.insert'
@staticmethod
def log_load_extension_error(manager, entry_point, error):
LogUtils.log_debug("GenericTransformBuilder: "
"log load extension error: manager: {%s},"
"entry_point: {%s}, error: {%s}"
% (str(manager),
str(entry_point),
str(error)))
@staticmethod
def _get_usage_component_manager():
"""stevedore extension manager for usage components."""
return extension.ExtensionManager(
namespace=GenericTransformBuilder
._MONASCA_TRANSFORM_USAGE_NAMESPACE,
on_load_failure_callback=GenericTransformBuilder.
log_load_extension_error,
invoke_on_load=False)
@staticmethod
def _get_setter_component_manager():
"""stevedore extension manager for setter components."""
return extension.ExtensionManager(
namespace=GenericTransformBuilder.
_MONASCA_TRANSFORM_SETTER_NAMESPACE,
on_load_failure_callback=GenericTransformBuilder.
log_load_extension_error,
invoke_on_load=False)
@staticmethod
def _get_insert_component_manager():
"""stevedore extension manager for insert components."""
return extension.ExtensionManager(
namespace=GenericTransformBuilder.
_MONASCA_TRANSFORM_INSERT_NAMESPACE,
on_load_failure_callback=GenericTransformBuilder.
log_load_extension_error,
invoke_on_load=False)
@staticmethod
def _parse_transform_pipeline(transform_spec_df):
"""parse aggregation pipeline from metric
processing configuration
"""
# get aggregation pipeline df
aggregation_pipeline_df = transform_spec_df\
.select("aggregation_params_map.aggregation_pipeline")
# call components
source_row = aggregation_pipeline_df\
.select("aggregation_pipeline.source").collect()[0]
source = source_row.source
usage_row = aggregation_pipeline_df\
.select("aggregation_pipeline.usage").collect()[0]
usage = usage_row.usage
setter_row_list = aggregation_pipeline_df\
.select("aggregation_pipeline.setters").collect()
setter_list = [setter_row.setters for setter_row in setter_row_list]
insert_row_list = aggregation_pipeline_df\
.select("aggregation_pipeline.insert").collect()
insert_list = [insert_row.insert for insert_row in insert_row_list]
return (source, usage, setter_list[0], insert_list[0])
@staticmethod
def do_transform(transform_context,
record_store_df):
"""Build a dynamic aggregation pipeline and call components to
process record store dataframe
"""
transform_spec_df = transform_context.transform_spec_df_info
(source,
usage,
setter_list,
insert_list) = GenericTransformBuilder.\
_parse_transform_pipeline(transform_spec_df)
# FIXME: source is a placeholder for non-streaming source
# in the future?
usage_component = GenericTransformBuilder.\
_get_usage_component_manager()[usage].plugin
instance_usage_df = usage_component.usage(transform_context,
record_store_df)
for setter in setter_list:
setter_component = GenericTransformBuilder.\
_get_setter_component_manager()[setter].plugin
instance_usage_df = setter_component.setter(transform_context,
instance_usage_df)
for insert in insert_list:
insert_component = GenericTransformBuilder.\
_get_insert_component_manager()[insert].plugin
instance_usage_df = insert_component.insert(transform_context,
instance_usage_df)
return instance_usage_df

View File

@ -0,0 +1,67 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
RecordStoreWithGroupByBase = namedtuple("RecordStoreWithGroupBy",
["record_store_data",
"group_by_columns_list"])
class RecordStoreWithGroupBy(RecordStoreWithGroupByBase):
"""A tuple which is a wrapper containing record store data
and the group by columns
namdetuple contains:
record_store_data - record store data
group_by_columns_list - group by columns list
"""
GroupingResultsBase = namedtuple("GroupingResults",
["grouping_key",
"results",
"grouping_key_dict"])
class GroupingResults(GroupingResultsBase):
"""A tuple which is a wrapper containing grouping key
and grouped result set
namdetuple contains:
grouping_key - group by key
results - grouped results
grouping_key_dict - group by key as dictionary
"""
class Grouping(object):
"""Base class for all grouping classes."""
@staticmethod
def _parse_grouping_key(grouping_str):
"""parse grouping key which in "^key1=value1^key2=value2..." format
into a dictionary of key value pairs
"""
group_by_dict = {}
#
# convert key=value^key1=value1 string into a dict
#
for key_val_pair in grouping_str.split("^"):
if "=" in key_val_pair:
key_val = key_val_pair.split("=")
group_by_dict[key_val[0]] = key_val[1]
return group_by_dict

View File

@ -0,0 +1,178 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_transform.transform.grouping import Grouping
from monasca_transform.transform.grouping import GroupingResults
from monasca_transform.transform.grouping import RecordStoreWithGroupBy
class GroupSortbyTimestamp(Grouping):
@staticmethod
def log_debug(logStr):
print(str)
# LOG.debug(logStr)
@staticmethod
def _get_group_by_key(row_decorated):
"""Build a group by key using the group by column list.
row_decorated: [[Rows(a=1, b=1, c=2, d=3)],[group_by_a,group_by_b]]
"""
# LOG.debug(whoami(row_decorated))
# LOG.debug(row_decorated)
group_by_columns_list = row_decorated[1]
group_by_key = ""
for gcol in group_by_columns_list:
group_by_key = "^".join((group_by_key,
eval(".".join(("row", gcol)))))
return group_by_key
@staticmethod
def _prepare_for_group_by(record_store_with_group_by_rdd):
"""creates a new rdd where the first element of each row
contains array of grouping key and event timestamp fields.
Grouping key and event timestamp fields are used by
partitioning and sorting function to partition the data
by grouping key and then sort the elements in a group by the
timestamp
"""
# get the record store data and group by columns
record_store_data = record_store_with_group_by_rdd.record_store_data
group_by_columns_list = \
record_store_with_group_by_rdd.group_by_columns_list
# construct a group by key
# key1=value1^key2=value2^...
group_by_key_value = ""
for gcol in group_by_columns_list:
group_by_key_value = \
"^".join((group_by_key_value,
"=".join((gcol,
eval(".".join(("record_store_data",
gcol)))))))
# return a key-value rdd
return [group_by_key_value, record_store_data]
@staticmethod
def _sort_by_timestamp(result_iterable):
# LOG.debug(whoami(result_iterable.data[0]))
# sort list might cause OOM, if the group has lots of items
# use group_sort_by_timestamp_partitions module instead if you run
# into OOM
sorted_list = sorted(result_iterable.data,
key=lambda row: row.event_timestamp_string)
return sorted_list
@staticmethod
def _group_sort_by_timestamp(record_store_df, group_by_columns_list):
# convert the dataframe rdd to normal rdd and add the group by column
# list
record_store_with_group_by_rdd = record_store_df.rdd.\
map(lambda x: RecordStoreWithGroupBy(x, group_by_columns_list))
# convert rdd into key-value rdd
record_store_with_group_by_rdd_key_val = \
record_store_with_group_by_rdd.\
map(GroupSortbyTimestamp._prepare_for_group_by)
first_step = record_store_with_group_by_rdd_key_val.groupByKey()
record_store_rdd_grouped_sorted = first_step.mapValues(
GroupSortbyTimestamp._sort_by_timestamp)
return record_store_rdd_grouped_sorted
@staticmethod
def _get_group_first_last_quantity_udf(grouplistiter):
"""Return stats that include first row key, first_event_timestamp,
first event quantity, last_event_timestamp and last event quantity
"""
first_row = None
last_row = None
# extract key and value list
group_key = grouplistiter[0]
grouped_values = grouplistiter[1]
count = 0.0
for row in grouped_values:
# set the first row
if first_row is None:
first_row = row
# set the last row
last_row = row
count = count + 1
first_event_timestamp_unix = None
first_event_timestamp_string = None
first_event_quantity = None
if first_row is not None:
first_event_timestamp_unix = first_row.event_timestamp_unix
first_event_timestamp_string = first_row.event_timestamp_string
first_event_quantity = first_row.event_quantity
last_event_timestamp_unix = None
last_event_timestamp_string = None
last_event_quantity = None
if last_row is not None:
last_event_timestamp_unix = last_row.event_timestamp_unix
last_event_timestamp_string = last_row.event_timestamp_string
last_event_quantity = last_row.event_quantity
results_dict = {"firstrecord_timestamp_unix":
first_event_timestamp_unix,
"firstrecord_timestamp_string":
first_event_timestamp_string,
"firstrecord_quantity": first_event_quantity,
"lastrecord_timestamp_unix":
last_event_timestamp_unix,
"lastrecord_timestamp_string":
last_event_timestamp_string,
"lastrecord_quantity": last_event_quantity,
"record_count": count}
group_key_dict = Grouping._parse_grouping_key(group_key)
return GroupingResults(group_key, results_dict, group_key_dict)
@staticmethod
def fetch_group_latest_oldest_quantity(record_store_df,
transform_spec_df,
group_by_columns_list):
"""function to group record store data, sort by timestamp within group
and get first and last timestamp along with quantity within each group
This function uses key-value pair rdd's groupBy function to do group_by
"""
# group and order elements in group
record_store_grouped_data_rdd = \
GroupSortbyTimestamp._group_sort_by_timestamp(
record_store_df, group_by_columns_list)
# find stats for a group
record_store_grouped_rows = \
record_store_grouped_data_rdd.\
map(GroupSortbyTimestamp.
_get_group_first_last_quantity_udf)
return record_store_grouped_rows

View File

@ -0,0 +1,226 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_transform.transform.grouping import Grouping
from monasca_transform.transform.grouping import GroupingResults
from monasca_transform.transform.grouping import RecordStoreWithGroupBy
class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def log_debug(logStr):
print(str)
# LOG.debug(logStr)
@staticmethod
def _get_group_first_last_quantity_udf(partition_list_iter):
"""user defined function to to through a list of partitions. Each
partition contains elements for a group. All the elements are sorted by
timestamp.
The stats include first row key, first_event_timestamp,
fist event quantity, last_event_timestamp and last event quantity
"""
first_row = None
last_row = None
count = 0.0
for row in partition_list_iter:
# set the first row
if first_row is None:
first_row = row
# set the last row
last_row = row
count = count + 1
first_event_timestamp_unix = None
first_event_timestamp_string = None
first_event_quantity = None
first_row_key = None
if first_row is not None:
first_event_timestamp_unix = first_row[1].event_timestamp_unix
first_event_timestamp_string = first_row[1].event_timestamp_string
first_event_quantity = first_row[1].event_quantity
# extract the grouping_key from composite grouping_key
# composite grouping key is a list, where first item is the
# grouping key and second item is the event_timestamp_string
first_row_key = first_row[0][0]
last_event_timestamp_unix = None
last_event_timestamp_string = None
last_event_quantity = None
if last_row is not None:
last_event_timestamp_unix = last_row[1].event_timestamp_unix
last_event_timestamp_string = last_row[1].event_timestamp_string
last_event_quantity = last_row[1].event_quantity
results_dict = {"firstrecord_timestamp_unix":
first_event_timestamp_unix,
"firstrecord_timestamp_string":
first_event_timestamp_string,
"firstrecord_quantity": first_event_quantity,
"lastrecord_timestamp_unix":
last_event_timestamp_unix,
"lastrecord_timestamp_string":
last_event_timestamp_string,
"lastrecord_quantity": last_event_quantity,
"record_count": count}
first_row_key_dict = Grouping._parse_grouping_key(first_row_key)
yield [GroupingResults(first_row_key, results_dict,
first_row_key_dict)]
@staticmethod
def _prepare_for_group_by(record_store_with_group_by_rdd):
"""creates a new rdd where the first element of each row
contains array of grouping key and event timestamp fields.
Grouping key and event timestamp fields are used by
partitioning and sorting function to partition the data
by grouping key and then sort the elements in a group by the
timestamp
"""
# get the record store data and group by columns
record_store_data = record_store_with_group_by_rdd.record_store_data
group_by_columns_list = \
record_store_with_group_by_rdd.group_by_columns_list
# construct a group by key
# key1=value1^key2=value2^...
group_by_key_value = ""
for gcol in group_by_columns_list:
group_by_key_value = \
"^".join((group_by_key_value,
"=".join((gcol, eval(".".join(("record_store_data",
gcol)))))))
# return a key-value rdd
# key is a composite key which consists of grouping key and
# event_timestamp_string
return [[group_by_key_value,
record_store_data.event_timestamp_string], record_store_data]
@staticmethod
def _get_partition_by_group(group_composite):
"""get a hash of the grouping key, which is then used by partitioning
function to get partition where the groups data should end up in.
It uses hash % num_partitions to get partition
"""
# FIXME: find out of hash function in python gives same value on
# different machines
# Look at using portable_hash method in spark rdd
grouping_key = group_composite[0]
grouping_key_hash = hash(grouping_key)
# log_debug("group_by_sort_by_timestamp_partition: got hash : %s" \
# % str(returnhash))
return grouping_key_hash
@staticmethod
def _sort_by_timestamp(group_composite):
"""get timestamp which will be used to sort grouped data
"""
event_timestamp_string = group_composite[1]
return event_timestamp_string
@staticmethod
def _group_sort_by_timestamp_partition(record_store_df,
group_by_columns_list,
num_of_groups):
"""component that does a group by and then sorts all
the items within the group by event timestamp.
"""
# convert the dataframe rdd to normal rdd and add the group by
# column list
record_store_with_group_by_rdd = record_store_df.rdd.\
map(lambda x: RecordStoreWithGroupBy(x, group_by_columns_list))
# prepare the data for repartitionAndSortWithinPartitions function
record_store_rdd_prepared = \
record_store_with_group_by_rdd.\
map(GroupSortbyTimestampPartition._prepare_for_group_by)
# repartition data based on a grouping key and sort the items within
# group by timestamp
# give high number of partitions
# numPartitions > number of groups expected, so that each group gets
# allocated a separate partition
record_store_rdd_partitioned_sorted = \
record_store_rdd_prepared.\
repartitionAndSortWithinPartitions(
numPartitions=num_of_groups,
partitionFunc=GroupSortbyTimestampPartition.
_get_partition_by_group,
keyfunc=GroupSortbyTimestampPartition.
_sort_by_timestamp)
return record_store_rdd_partitioned_sorted
@staticmethod
def _remove_none_filter(row):
"""remove any rows which have None as grouping key
[GroupingResults(grouping_key="key1", results={})] rows get created
when partition does not get any grouped data assigned to it
"""
if len(row[0].results) > 0 and row[0].grouping_key is not None:
return row
@staticmethod
def fetch_group_first_last_quantity(record_store_df,
transform_spec_df,
group_by_columns_list,
num_of_groups):
"""function to group record store data, sort by timestamp within group
and get first and last timestamp along with quantity within each group
To do group by it uses custom partitioning function which creates a new
partition
for each group and uses RDD's repartitionAndSortWithinPartitions
function to do the grouping and sorting within the group.
This is more scalable than just using RDD's group_by as using this
technique
group is not materialized into a list and stored in memory, but rather
it uses RDD's in built partitioning capability to do the sort
num_of_groups should be more than expected groups, otherwise the same
partition can get used for two groups which will cause incorrect
results.
"""
# group and order elements in group using repartition
record_store_grouped_data_rdd = \
GroupSortbyTimestampPartition.\
_group_sort_by_timestamp_partition(record_store_df,
group_by_columns_list,
num_of_groups)
# do some operations on all elements in the group
grouping_results_tuple_with_none = \
record_store_grouped_data_rdd.\
mapPartitions(GroupSortbyTimestampPartition.
_get_group_first_last_quantity_udf)
# filter all rows which have no data (where grouping key is None) and
# convert resuts into grouping results tuple
grouping_results_tuple1 = grouping_results_tuple_with_none.\
filter(GroupSortbyTimestampPartition._remove_none_filter)
grouping_results_tuple = grouping_results_tuple1.map(lambda x: x[0])
return grouping_results_tuple

View File

@ -0,0 +1,177 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_transform.transform.grouping import Grouping
from monasca_transform.transform.grouping import GroupingResults
from monasca_transform.transform.grouping import RecordStoreWithGroupBy
class GroupSortbyTimestamp(Grouping):
@staticmethod
def log_debug(logStr):
print(str)
# LOG.debug(logStr)
@staticmethod
def _get_groupby_key(row_decorated):
"""Build a group by key using the group by column list.
row_decorated: [[Rows(a=1, b=1, c=2, d=3)],[groupby_a,groupby_b]]
"""
# LOG.debug(whoami(row_decorated))
# LOG.debug(row_decorated)
groupby_columns_list = row_decorated[1]
groupby_key = ""
for gcol in groupby_columns_list:
groupby_key = "^".join((groupby_key,
eval(".".join(("row", gcol)))))
return groupby_key
@staticmethod
def _prepare_for_groupby(record_store_with_groupby_rdd):
"""creates a new rdd where the first element of each row
contains array of grouping key and event timestamp fields.
Grouping key and event timestamp fields are used by
partitioning and sorting function to partition the data
by grouping key and then sort the elements in a group by the
timestamp
"""
# get the record store data and group by columns
record_store_data = record_store_with_groupby_rdd.record_store_data
groupby_columns_list = \
record_store_with_groupby_rdd.groupby_columns_list
# construct a group by key
# key1=value1^key2=value2^...
groupby_key_value = ""
for gcol in groupby_columns_list:
groupby_key_value = \
"^".join((groupby_key_value,
"=".join((gcol,
eval(".".join(("record_store_data",
gcol)))))))
# return a key-value rdd
return [groupby_key_value, record_store_data]
@staticmethod
def _sortby_timestamp(result_iterable):
# LOG.debug(whoami(result_iterable.data[0]))
# sort list might cause OOM, if the group has lots of items
# use group_sortby_timestamp_partitions module instead if you run
# into OOM
sorted_list = sorted(result_iterable.data,
key=lambda row: row.event_timestamp_string)
return sorted_list
@staticmethod
def _group_sortby_timestamp(record_store_df, groupby_columns_list):
# convert the dataframe rdd to normal rdd and add the group by column
# list
record_store_with_groupby_rdd = record_store_df.rdd.\
map(lambda x: RecordStoreWithGroupBy(x, groupby_columns_list))
# convert rdd into key-value rdd
record_store_with_groupby_rdd_key_val = record_store_with_groupby_rdd.\
map(GroupSortbyTimestamp._prepare_for_groupby)
first_step = record_store_with_groupby_rdd_key_val.groupByKey()
record_store_rdd_grouped_sorted = first_step.mapValues(
GroupSortbyTimestamp._sortby_timestamp)
return record_store_rdd_grouped_sorted
@staticmethod
def _get_group_first_last_quantity_udf(grouplistiter):
"""Return stats that include first row key, first_event_timestamp,
first event quantity, last_event_timestamp and last event quantity
"""
first_row = None
last_row = None
# extract key and value list
group_key = grouplistiter[0]
grouped_values = grouplistiter[1]
count = 0.0
for row in grouped_values:
# set the first row
if first_row is None:
first_row = row
# set the last row
last_row = row
count = count + 1
first_event_timestamp_unix = None
first_event_timestamp_string = None
first_event_quantity = None
if first_row is not None:
first_event_timestamp_unix = first_row.event_timestamp_unix
first_event_timestamp_string = first_row.event_timestamp_string
first_event_quantity = first_row.event_quantity
last_event_timestamp_unix = None
last_event_timestamp_string = None
last_event_quantity = None
if last_row is not None:
last_event_timestamp_unix = last_row.event_timestamp_unix
last_event_timestamp_string = last_row.event_timestamp_string
last_event_quantity = last_row.event_quantity
results_dict = {"firstrecord_timestamp_unix":
first_event_timestamp_unix,
"firstrecord_timestamp_string":
first_event_timestamp_string,
"firstrecord_quantity": first_event_quantity,
"lastrecord_timestamp_unix":
last_event_timestamp_unix,
"lastrecord_timestamp_string":
last_event_timestamp_string,
"lastrecord_quantity": last_event_quantity,
"record_count": count}
group_key_dict = Grouping._parse_grouping_key(group_key)
return GroupingResults(group_key, results_dict, group_key_dict)
@staticmethod
def fetch_group_latest_oldest_quantity(record_store_df,
transform_spec_df,
groupby_columns_list):
"""function to group record store data, sort by timestamp within group
and get first and last timestamp along with quantity within each group
This function uses key-value pair rdd's groupBy function to do groupby
"""
# group and order elements in group
record_store_grouped_data_rdd = \
GroupSortbyTimestamp._group_sortby_timestamp(record_store_df,
groupby_columns_list)
# find stats for a group
record_store_grouped_rows = \
record_store_grouped_data_rdd.\
map(GroupSortbyTimestamp.
_get_group_first_last_quantity_udf)
return record_store_grouped_rows

View File

@ -0,0 +1,226 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_transform.transform.grouping import Grouping
from monasca_transform.transform.grouping import GroupingResults
from monasca_transform.transform.grouping import RecordStoreWithGroupBy
class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def log_debug(logStr):
print(str)
# LOG.debug(logStr)
@staticmethod
def _get_group_first_last_quantity_udf(partitionlistiter):
"""user defined function to to through a list of partitions. Each
partition contains elements for a group. All the elements are sorted by
timestamp.
The stats include first row key, first_event_timestamp,
fist event quantity, last_event_timestamp and last event quantity
"""
first_row = None
last_row = None
count = 0.0
for row in partitionlistiter:
# set the first row
if first_row is None:
first_row = row
# set the last row
last_row = row
count = count + 1
first_event_timestamp_unix = None
first_event_timestamp_string = None
first_event_quantity = None
first_row_key = None
if first_row is not None:
first_event_timestamp_unix = first_row[1].event_timestamp_unix
first_event_timestamp_string = first_row[1].event_timestamp_string
first_event_quantity = first_row[1].event_quantity
# extract the grouping_key from composite grouping_key
# composite grouping key is a list, where first item is the
# grouping key and second item is the event_timestamp_string
first_row_key = first_row[0][0]
last_event_timestamp_unix = None
last_event_timestamp_string = None
last_event_quantity = None
if last_row is not None:
last_event_timestamp_unix = last_row[1].event_timestamp_unix
last_event_timestamp_string = last_row[1].event_timestamp_string
last_event_quantity = last_row[1].event_quantity
results_dict = {"firstrecord_timestamp_unix":
first_event_timestamp_unix,
"firstrecord_timestamp_string":
first_event_timestamp_string,
"firstrecord_quantity": first_event_quantity,
"lastrecord_timestamp_unix":
last_event_timestamp_unix,
"lastrecord_timestamp_string":
last_event_timestamp_string,
"lastrecord_quantity": last_event_quantity,
"record_count": count}
first_row_key_dict = Grouping._parse_grouping_key(first_row_key)
yield [GroupingResults(first_row_key, results_dict,
first_row_key_dict)]
@staticmethod
def _prepare_for_groupby(record_store_with_groupby_rdd):
"""creates a new rdd where the first element of each row
contains array of grouping key and event timestamp fields.
Grouping key and event timestamp fields are used by
partitioning and sorting function to partition the data
by grouping key and then sort the elements in a group by the
timestamp
"""
# get the record store data and group by columns
record_store_data = record_store_with_groupby_rdd.record_store_data
groupby_columns_list = \
record_store_with_groupby_rdd.groupby_columns_list
# construct a group by key
# key1=value1^key2=value2^...
groupby_key_value = ""
for gcol in groupby_columns_list:
groupby_key_value = \
"^".join((groupby_key_value,
"=".join((gcol, eval(".".join(("record_store_data",
gcol)))))))
# return a key-value rdd
# key is a composite key which consists of grouping key and
# event_timestamp_string
return [[groupby_key_value,
record_store_data.event_timestamp_string], record_store_data]
@staticmethod
def _get_partition_by_group(group_composite):
"""get a hash of the grouping key, which is then used by partitioning
function to get partition where the groups data should end up in.
It uses hash % num_partitions to get partition
"""
# FIXME: find out of hash function in python gives same value on
# different machines
# Look at using portable_hash method in spark rdd
grouping_key = group_composite[0]
grouping_key_hash = hash(grouping_key)
# log_debug("groupby_sortby_timestamp_partition: got hash : %s" \
# % str(returnhash))
return grouping_key_hash
@staticmethod
def _sortby_timestamp(group_composite):
"""get timestamp which will be used to sort grouped data
"""
event_timestamp_string = group_composite[1]
return event_timestamp_string
@staticmethod
def _group_sortby_timestamp_partition(record_store_df,
groupby_columns_list,
num_of_groups):
"""component that does a group by and then sorts all
the items within the group by event timestamp.
"""
# convert the dataframe rdd to normal rdd and add the group by
# column list
record_store_with_groupby_rdd = record_store_df.rdd.\
map(lambda x: RecordStoreWithGroupBy(x, groupby_columns_list))
# prepare the data for repartitionAndSortWithinPartitions function
record_store_rdd_prepared = \
record_store_with_groupby_rdd.\
map(GroupSortbyTimestampPartition._prepare_for_groupby)
# repartition data based on a grouping key and sort the items within
# group by timestamp
# give high number of partitions
# numPartitions > number of groups expected, so that each group gets
# allocated a separate partition
record_store_rdd_partitioned_sorted = \
record_store_rdd_prepared.\
repartitionAndSortWithinPartitions(
numPartitions=num_of_groups,
partitionFunc=GroupSortbyTimestampPartition.
_get_partition_by_group,
keyfunc=GroupSortbyTimestampPartition.
_sortby_timestamp)
return record_store_rdd_partitioned_sorted
@staticmethod
def _remove_none_filter(row):
"""remove any rows which have None as grouping key
[GroupingResults(grouping_key="key1", results={})] rows get created
when partition does not get any grouped data assigned to it
"""
if len(row[0].results) > 0 and row[0].grouping_key is not None:
return row
@staticmethod
def fetch_group_first_last_quantity(record_store_df,
transform_spec_df,
groupby_columns_list,
num_of_groups):
"""function to group record store data, sort by timestamp within group
and get first and last timestamp along with quantity within each group
To do group by it uses custom partitioning function which creates a new
partition
for each group and uses RDD's repartitionAndSortWithinPartitions
function to do the grouping and sorting within the group.
This is more scalable than just using RDD's groupby as using this
technique
group is not materialized into a list and stored in memory, but rather
it uses RDD's in built partitioning capability to do the sort
num_of_groups should be more than expected groups, otherwise the same
partition can get used for two groups which will cause incorrect
results.
"""
# group and order elements in group using repartition
record_store_grouped_data_rdd = \
GroupSortbyTimestampPartition.\
_group_sortby_timestamp_partition(record_store_df,
groupby_columns_list,
num_of_groups)
# do some operations on all elements in the group
grouping_results_tuple_with_none = \
record_store_grouped_data_rdd.\
mapPartitions(GroupSortbyTimestampPartition.
_get_group_first_last_quantity_udf)
# filter all rows which have no data (where grouping key is None) and
# convert resuts into grouping results tuple
grouping_results_tuple1 = grouping_results_tuple_with_none.\
filter(GroupSortbyTimestampPartition._remove_none_filter)
grouping_results_tuple = grouping_results_tuple1.map(lambda x: x[0])
return grouping_results_tuple

View File

@ -0,0 +1,363 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from pyspark.sql import SQLContext
from pyspark.sql.types import ArrayType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import MapType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
LOG = logging.getLogger(__name__)
class TransformUtils(object):
"""utility methods for different kinds of data."""
@staticmethod
def _rdd_to_df(rdd, schema):
"""convert rdd to dataframe using schema."""
spark_context = rdd.context
sql_context = SQLContext(spark_context)
if schema is None:
df = sql_context.createDataFrame(rdd)
else:
df = sql_context.createDataFrame(rdd, schema)
return df
class InstanceUsageUtils(TransformUtils):
"""utility methods to transform instance usage data."""
@staticmethod
def _get_instance_usage_schema():
"""get instance usage schema."""
# Initialize columns for all string fields
columns = ["tenant_id", "user_id", "resource_uuid",
"geolocation", "region", "zone", "host", "project_id",
"aggregated_metric_name", "firstrecord_timestamp_string",
"lastrecord_timestamp_string",
"service_group", "service_id",
"usage_date", "usage_hour", "usage_minute",
"aggregation_period"]
columns_struct_fields = [StructField(field_name, StringType(), True)
for field_name in columns]
# Add columns for non-string fields
columns_struct_fields.append(StructField("firstrecord_timestamp_unix",
DoubleType(), True))
columns_struct_fields.append(StructField("lastrecord_timestamp_unix",
DoubleType(), True))
columns_struct_fields.append(StructField("quantity",
DoubleType(), True))
columns_struct_fields.append(StructField("record_count",
DoubleType(), True))
columns_struct_fields.append(StructField("processing_meta",
MapType(StringType(),
StringType(),
True),
True))
schema = StructType(columns_struct_fields)
return schema
@staticmethod
def create_df_from_json_rdd(sql_context, jsonrdd):
"""create instance usage df from json rdd."""
schema = InstanceUsageUtils._get_instance_usage_schema()
instance_usage_schema_df = sql_context.jsonRDD(jsonrdd, schema)
return instance_usage_schema_df
class RecordStoreUtils(TransformUtils):
"""utility methods to transform record store data."""
@staticmethod
def _get_record_store_df_schema():
"""get instance usage schema."""
columns = ["event_timestamp_string",
"event_type", "event_quantity_name",
"event_status", "event_version",
"record_type", "resource_uuid", "tenant_id",
"user_id", "region", "zone",
"host", "project_id", "service_group", "service_id",
"event_date", "event_hour", "event_minute",
"event_second", "metric_group", "metric_id"]
columns_struct_fields = [StructField(field_name, StringType(), True)
for field_name in columns]
# Add a column for a non-string fields
columns_struct_fields.insert(0,
StructField("event_timestamp_unix",
DoubleType(), True))
columns_struct_fields.insert(0,
StructField("event_quantity",
DoubleType(), True))
schema = StructType(columns_struct_fields)
return schema
@staticmethod
def recordstore_rdd_to_df(record_store_rdd):
"""convert record store rdd to a dataframe."""
schema = RecordStoreUtils._get_record_store_df_schema()
return TransformUtils._rdd_to_df(record_store_rdd, schema)
@staticmethod
def create_df_from_json(sql_context, jsonpath):
"""create a record store df from json file."""
schema = RecordStoreUtils._get_record_store_df_schema()
record_store_df = sql_context.read.json(jsonpath, schema)
return record_store_df
class TransformSpecsUtils(TransformUtils):
"""utility methods to transform_specs."""
@staticmethod
def _get_transform_specs_df_schema():
"""get transform_specs df schema."""
# FIXME: change when transform_specs df is finalized
source = StructField("source", StringType(), True)
usage = StructField("usage", StringType(), True)
setters = StructField("setters", ArrayType(StringType(),
containsNull=False), True)
insert = StructField("insert", ArrayType(StringType(),
containsNull=False), True)
aggregation_params_map = \
StructField("aggregation_params_map",
StructType([StructField("aggregation_period",
StringType(), True),
StructField("dimension_list",
ArrayType(StringType(),
containsNull=False),
True),
StructField("aggregation_group_by_list",
ArrayType(StringType(),
containsNull=False),
True),
StructField("usage_fetch_operation",
StringType(),
True),
StructField(
"usage_fetch_util_quantity_event_type",
StringType(),
True),
StructField(
"usage_fetch_util_idle_perc_event_type",
StringType(),
True),
StructField("setter_rollup_group_by_list",
ArrayType(StringType(),
containsNull=False),
True),
StructField("setter_rollup_operation",
StringType(), True),
StructField("aggregated_metric_name",
StringType(), True),
StructField("aggregation_pipeline",
StructType([source, usage,
setters, insert]),
True)
]), True)
metric_id = StructField("metric_id", StringType(), True)
schema = StructType([aggregation_params_map, metric_id])
return schema
@staticmethod
def transform_specs_rdd_to_df(transform_specs_rdd):
"""convert transform_specs rdd to a dataframe."""
schema = TransformSpecsUtils._get_transform_specs_df_schema()
return TransformUtils._rdd_to_df(transform_specs_rdd, schema)
@staticmethod
def create_df_from_json(sql_context, jsonpath):
"""create a metric processing df from json file."""
schema = TransformSpecsUtils._get_transform_specs_df_schema()
transform_specs_df = sql_context.read.json(jsonpath, schema)
return transform_specs_df
class MonMetricUtils(TransformUtils):
"""utility methods to transform raw metric."""
@staticmethod
def _get_mon_metric_json_schema():
"""get the schema of the incoming monasca metric."""
dimensions = ["apache_host", "apache_port", "component",
"consumer_group", "device", "hostname",
"mode", "mount_point", "observer_host",
"process_name", "project_id", "service", "test_type",
"tenantId", "topic", "url", "state", "state_description",
"instanceId"]
dimensions_struct_fields = [
StructField(field_name, StringType(), True)
for field_name in dimensions]
value_meta = ["host"]
value_meta_struct_fields = [
StructField(field_name, StringType(), True)
for field_name in value_meta]
metric_struct_field = StructField(
"metric",
StructType([StructField("dimensions",
StructType(dimensions_struct_fields)),
StructField("value_meta",
StructType(value_meta_struct_fields)),
StructField("name", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("value", StringType(), True)]), True)
meta_struct_field = StructField(
"meta",
StructType([StructField("timestamp", StringType(), True),
StructField("region", StringType(), True),
StructField("tenantId", StringType(), True),
StructField("userId", StringType(), True),
StructField("zone", StringType(), True),
StructField("geolocation", StringType(), True)]))
creation_time_struct_field = StructField("creation_time",
StringType(), True)
schema = StructType([creation_time_struct_field,
meta_struct_field, metric_struct_field])
return schema
@staticmethod
def create_mon_metrics_df_from_json_rdd(sql_context, jsonrdd):
"""create mon metrics df from json rdd."""
schema = MonMetricUtils._get_mon_metric_json_schema()
mon_metrics_df = sql_context.jsonRDD(jsonrdd, schema)
return mon_metrics_df
class PreTransformSpecsUtils(TransformUtils):
"""utility methods to transform pre_transform_specs"""
@staticmethod
def _get_pre_transform_specs_df_schema():
"""get pre_transform_specs df schema."""
# FIXME: change when pre_transform_specs df is finalized
event_type = StructField("event_type", StringType(), True)
metric_id_list = StructField("metric_id_list",
ArrayType(StringType(),
containsNull=False),
True)
required_raw_fields_list = StructField("required_raw_fields_list",
ArrayType(StringType(),
containsNull=False),
True)
service_id = StructField("service_id", StringType(), True)
event_processing_params = \
StructField("event_processing_params",
StructType([StructField("set_default_zone_to",
StringType(), True),
StructField("set_default_geolocation_to",
StringType(), True),
StructField("set_default_region_to",
StringType(), True),
]), True)
schema = StructType([event_processing_params, event_type,
metric_id_list, required_raw_fields_list,
service_id])
return schema
@staticmethod
def pre_transform_specs_rdd_to_df(pre_transform_specs_rdd):
"""convert pre_transform_specs processing rdd to a dataframe."""
schema = PreTransformSpecsUtils._get_pre_transform_specs_df_schema()
return TransformUtils._rdd_to_df(pre_transform_specs_rdd, schema)
@staticmethod
def create_df_from_json(sql_context, jsonpath):
"""create a pre_transform_specs df from json file."""
schema = PreTransformSpecsUtils._get_pre_transform_specs_df_schema()
pre_transform_specs_df = sql_context.read.json(jsonpath, schema)
return pre_transform_specs_df
class GroupingResultsUtils(TransformUtils):
"""utility methods to transform record store data."""
@staticmethod
def _get_grouping_results_df_schema(group_by_column_list):
"""get grouping results schema."""
group_by_field_list = [StructField(field_name, StringType(), True)
for field_name in group_by_column_list]
# Initialize columns for string fields
columns = ["firstrecord_timestamp_string",
"lastrecord_timestamp_string"]
columns_struct_fields = [StructField(field_name, StringType(), True)
for field_name in columns]
# Add columns for non-string fields
columns_struct_fields.append(StructField("firstrecord_timestamp_unix",
DoubleType(), True))
columns_struct_fields.append(StructField("lastrecord_timestamp_unix",
DoubleType(), True))
columns_struct_fields.append(StructField("firstrecord_quantity",
DoubleType(), True))
columns_struct_fields.append(StructField("lastrecord_quantity",
DoubleType(), True))
columns_struct_fields.append(StructField("record_count",
DoubleType(), True))
instance_usage_schema_part = StructType(columns_struct_fields)
grouping_results = \
StructType([StructField("grouping_key",
StringType(), True),
StructField("results",
instance_usage_schema_part,
True),
StructField("grouping_key_dict",
StructType(group_by_field_list))])
# schema = \
# StructType([StructField("GroupingResults", grouping_results)])
return grouping_results
@staticmethod
def grouping_results_rdd_to_df(grouping_results_rdd, group_by_list):
"""convert record store rdd to a dataframe."""
schema = GroupingResultsUtils._get_grouping_results_df_schema(
group_by_list)
return TransformUtils._rdd_to_df(grouping_results_rdd, schema)

10
requirements.txt Normal file
View File

@ -0,0 +1,10 @@
PyMySQL
six>=1.7.0 # MIT
SQLAlchemy<1.1.0,>=0.9.9
kafka-python
simport>=0.0.dev0
stevedore>=1.5.0 # Apache-2.0
oslo.config>=1.2.1
oslo.log
oslo.service
tooz

21
scripts/create_zip.py Normal file
View File

@ -0,0 +1,21 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zipfile import PyZipFile
with PyZipFile("monasca-transform.zip", "w") as spark_submit_zipfile:
spark_submit_zipfile.writepy(
"../monasca_transform"
)

8
scripts/create_zip.sh Executable file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
SCRIPT_HOME=$(dirname $(readlink -f $BASH_SOURCE))
pushd $SCRIPT_HOME
python create_zip.py
popd

110
scripts/ddl/generate_ddl.py Normal file
View File

@ -0,0 +1,110 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Generator for ddl
-t type of output to generate - either 'pre_transform_spec' or 'transform_spec'
-o output path
-i path to template file
"""
import getopt
import json
import os.path
import sys
class Generator(object):
key_name = None
def generate(self, template_path, source_json_path, output_path):
print("Generating content at %s with template at %s, using key %s" % (
output_path, template_path, self.key_name))
data = []
with open(source_json_path) as f:
for line in f:
json_line = json.loads(line)
data_line = '(\'%s\',\n\'%s\')' % (
json_line[self.key_name], json.dumps(json_line))
data.append(str(data_line))
print(data)
with open(template_path) as f:
template = f.read()
with open(output_path, 'w') as write_file:
write_file.write(template)
for record in data:
write_file.write(record)
write_file.write(',\n')
write_file.seek(-2, 1)
write_file.truncate()
write_file.write(';')
class TransformSpecsGenerator(Generator):
key_name = 'metric_id'
class PreTransformSpecsGenerator(Generator):
key_name = 'event_type'
def main():
# parse command line options
try:
opts, args = getopt.getopt(sys.argv[1:], "ht:o:i:s:")
print('Opts = %s' % opts)
print('Args = %s' % args)
except getopt.error as msg:
print(msg)
print("for help use --help")
sys.exit(2)
script_type = None
template_path = None
source_json_path = None
output_path = None
# process options
for o, a in opts:
if o in ("-h", "--help"):
print(__doc__)
sys.exit(0)
elif o == "-t":
script_type = a
if a not in ('pre_transform_spec', 'transform_spec'):
print('Incorrect output type specified: \'%s\'.\n %s' % (
a, __doc__))
sys.exit(1)
elif o == "-i":
template_path = a
if not os.path.isfile(a):
print('Cannot find template file at %s' % a)
sys.exit(1)
elif o == "-o":
output_path = a
elif o == "-s":
source_json_path = a
print("Called with type = %s, template_path = %s, source_json_path %s"
" and output_path = %s" % (
script_type, template_path, source_json_path, output_path))
generator = None
if script_type == 'pre_transform_spec':
generator = PreTransformSpecsGenerator()
elif script_type == 'transform_spec':
generator = TransformSpecsGenerator()
generator.generate(template_path, source_json_path, output_path)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,6 @@
DELETE FROM `monasca_transform`.`pre_transform_specs`;
INSERT IGNORE INTO `monasca_transform`.`pre_transform_specs`
(`event_type`,
`pre_transform_spec`)
VALUES

View File

@ -0,0 +1,6 @@
DELETE FROM `monasca_transform`.`transform_specs`;
INSERT IGNORE INTO `monasca_transform`.`transform_specs`
(`metric_id`,
`transform_spec`)
VALUES

9
scripts/generate_ddl.sh Executable file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
SCRIPT_HOME=$(dirname $(readlink -f $BASH_SOURCE))
pushd $SCRIPT_HOME
python ddl/generate_ddl.py -t pre_transform_spec -i ddl/pre_transform_specs_template.sql -s ../monasca_transform/data_driven_specs/pre_transform_specs/pre_transform_specs.json -o ddl/pre_transform_specs.sql
python ddl/generate_ddl.py -t transform_spec -i ddl/transform_specs_template.sql -s ../monasca_transform/data_driven_specs/transform_specs/transform_specs.json -o ddl/transform_specs.sql
popd

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
SCRIPT_HOME=$(dirname $(readlink -f $BASH_SOURCE))
pushd $SCRIPT_HOME
./generate_ddl.sh
cp ddl/pre_transform_specs.sql ../devstack/files/monasca-transform/pre_transform_specs.sql
cp ddl/transform_specs.sql ../devstack/files/monasca-transform/transform_specs.sql
popd

20
scripts/make_egg.sh Executable file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
SCRIPT_HOME=$(dirname $(readlink -f $BASH_SOURCE))
pushd $SCRIPT_HOME
pushd ../
rm -rf build monasca-transform.egg-info dist
python setup.py bdist_egg
found_egg=`ls dist`
echo
echo
echo Created egg file at dist/$found_egg
dev=dev
find_dev_index=`expr index $found_egg $dev`
new_filename=${found_egg:0:$find_dev_index - 1 }egg
echo Copying dist/$found_egg to dist/$new_filename
cp dist/$found_egg dist/$new_filename
popd
popd

3
scripts/pyspark-shell.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/bash
JARS_PATH="/opt/spark/current/lib/spark-streaming-kafka.jar,/opt/spark/current/lib/scala-library-2.10.1.jar,/opt/spark/current/lib/kafka_2.10-0.8.1.1.jar,/opt/spark/current/lib/metrics-core-2.2.0.jar"
pyspark --master spark://192.168.10.4:7077 --jars $JARS_PATH

View File

@ -0,0 +1,19 @@
#!/bin/bash
SCRIPT_HOME=$(dirname $(readlink -f $BASH_SOURCE))
pushd $SCRIPT_HOME
pushd ../
JARS_PATH="/opt/spark/current/lib/spark-streaming-kafka.jar,/opt/spark/current/lib/scala-library-2.10.1.jar,/opt/spark/current/lib/kafka_2.10-0.8.1.1.jar,/opt/spark/current/lib/metrics-core-2.2.0.jar,/usr/share/java/mysql.jar"
export SPARK_HOME=/opt/spark/current/
# There is a known issue where obsolete kafka offsets can cause the
# driver to crash. However when this occurs, the saved offsets get
# deleted such that the next execution should be successful. Therefore,
# create a loop to run spark-submit for two iterations or until
# control-c is pressed.
COUNTER=0
while [ $COUNTER -lt 2 ]; do
spark-submit --supervise --master spark://192.168.10.4:7077,192.168.10.5:7077 --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=file:///var/log/spark-events --jars $JARS_PATH --py-files dist/$new_filename /opt/monasca/transform/lib/driver.py || break
let COUNTER=COUNTER+1
done
popd
popd

45
setup.cfg Normal file
View File

@ -0,0 +1,45 @@
[metadata]
name=monasca_transform
description-file = README.md
keywords=
openstack
monitoring
license=Apache-2
summary=Data aggregation for metrics
author=Calvin, Flint;Agate, Ashwin;Kennedy, David
author-email=monasca-transform@hpe.com
home-page=https://wiki.hpcloud.net/display/iaas/Spark+Evaluation
version=0.0.1
[wheel]
universal = 1
[files]
packages =
monasca_transform
[entry_points]
monasca_transform.usage =
fetch_quantity = monasca_transform.component.usage.fetch_quantity:FetchQuantity
fetch_quantity_util = monasca_transform.component.usage.fetch_quantity_util:FetchQuantityUtil
monasca_transform.setter =
set_aggregated_metric_name = monasca_transform.component.setter.set_aggregated_metric_name:SetAggregatedMetricName
set_aggregated_period = monasca_transform.component.setter.set_aggregated_period:SetAggregatedPeriod
rollup_quantity = monasca_transform.component.setter.rollup_quantity:RollupQuantity
monasca_transform.insert =
prepare_data = monasca_transform.component.insert.prepare_data:PrepareData
insert_data = monasca_transform.component.insert.kafka_insert:KafkaInsert
[global]
setup-hooks =
pbr.hooks.setup_hook
[pbr]
autodoc_index_modules = True
[egg_info]
tag_build =
tag_date = 0
tag_svn_revision = 0

20
setup.py Executable file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from setuptools import setup
setup(
setup_requires=['pbr>=1.8'],
pbr=True)

6
test-requirements.txt Normal file
View File

@ -0,0 +1,6 @@
# mock object framework
hacking>=0.10.2
flake8>=2.2.4
nose==1.3.0
mock>=1.0.1
tox

0
tests/__init__.py Normal file
View File

27
tests/unit/__init__.py Normal file
View File

@ -0,0 +1,27 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Add the location of Spark to the path
# TODO(someone) Does the "/opt/spark/current" location need to be configurable?
import os
import sys
try:
sys.path.append(os.path.join("/opt/spark/current", "python"))
sys.path.append(os.path.join("/opt/spark/current",
"python", "lib", "py4j-0.9-src.zip"))
except KeyError:
print("Error adding Spark location to the path")
# TODO(someone) not sure what action is appropriate
sys.exit(1)

View File

View File

@ -0,0 +1,85 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from pyspark.sql import SQLContext
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.transform.builder.generic_transform_builder \
import GenericTransformBuilder
from monasca_transform.transform.transform_utils import RecordStoreUtils
from monasca_transform.transform.transform_utils import TransformSpecsUtils
from monasca_transform.transform import TransformContextUtils
from tests.unit.spark_context_test import SparkContextTest
from tests.unit.test_resources.mem_total_all.data_provider import DataProvider
from tests.unit.test_resources.mock_component_manager \
import MockComponentManager
class TransformBuilderTest(SparkContextTest):
def setUp(self):
super(TransformBuilderTest, self).setUp()
# configure the system with a dummy messaging adapter
ConfigInitializer.basic_config(
default_config_files=[
'tests/unit/test_resources/config/test_config.conf'])
@mock.patch('monasca_transform.transform.builder.generic_transform_builder'
'.GenericTransformBuilder._get_insert_component_manager')
@mock.patch('monasca_transform.transform.builder.generic_transform_builder'
'.GenericTransformBuilder._get_setter_component_manager')
@mock.patch('monasca_transform.transform.builder.generic_transform_builder'
'.GenericTransformBuilder._get_usage_component_manager')
def test_transform_builder(self,
usage_manager,
setter_manager,
insert_manager):
usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr()
setter_manager.return_value = \
MockComponentManager.get_setter_cmpt_mgr()
insert_manager.return_value = \
MockComponentManager.get_insert_cmpt_mgr()
record_store_json_path = DataProvider.record_store_path
metric_proc_json_path = DataProvider.transform_spec_path
sql_context = SQLContext.getOrCreate(self.spark_context)
record_store_df = \
RecordStoreUtils.create_df_from_json(sql_context,
record_store_json_path)
transform_spec_df = TransformSpecsUtils.create_df_from_json(
sql_context, metric_proc_json_path)
transform_context = TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df)
# invoke the generic transformation builder
instance_usage_df = GenericTransformBuilder.do_transform(
transform_context, record_store_df)
result_list = [(row.usage_date, row.usage_hour,
row.tenant_id, row.host, row.quantity,
row.aggregated_metric_name)
for row in instance_usage_df.rdd.collect()]
expected_result = [('2016-02-08', '18', 'all',
'all', 12946.0,
'mem.total_mb_agg')]
self.assertItemsEqual(result_list, expected_result)

View File

View File

@ -0,0 +1,66 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
import unittest
from monasca_transform.config.config_initializer import ConfigInitializer
class TestConfigInitializer(unittest.TestCase):
def test_use_specific_config_file(self):
ConfigInitializer.basic_config(
default_config_files=[
'tests/unit/test_resources/config/test_config.conf'
])
self.assertEqual('test_offsets_repo_class',
cfg.CONF.repositories.offsets)
self.assertEqual('test_data_driven_specs_repo_class',
cfg.CONF.repositories.data_driven_specs)
self.assertEqual('test_server_type',
cfg.CONF.database.server_type)
self.assertEqual('test_host_name',
cfg.CONF.database.host)
self.assertEqual('test_database_name',
cfg.CONF.database.database_name)
self.assertEqual('test_database_user_name',
cfg.CONF.database.username)
self.assertEqual('test_database_password',
cfg.CONF.database.password)
def test_use_default_config_file(self):
ConfigInitializer.basic_config(default_config_files=[])
self.assertEqual(
'monasca_transform.mysql_offset_specs:MySQLOffsetSpecs',
cfg.CONF.repositories.offsets)
self.assertEqual(
'monasca_transform.data_driven_specs.'
'mysql_data_driven_specs_repo:MySQLDataDrivenSpecsRepo',
cfg.CONF.repositories.data_driven_specs)
self.assertEqual('mysql',
cfg.CONF.database.server_type)
self.assertEqual('localhost',
cfg.CONF.database.host)
self.assertEqual('monasca_transform',
cfg.CONF.database.database_name)
self.assertEqual('m-transform',
cfg.CONF.database.username)
self.assertEqual('password',
cfg.CONF.database.password)

View File

@ -0,0 +1,24 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Add the location of Spark to the path
import os
import sys
try:
sys.path.append(os.path.join("/opt/spark/current", "python"))
sys.path.append(os.path.join("/opt/spark/current",
"python", "lib", "py4j-0.9-src.zip"))
except KeyError:
print("Error adding Spark location to the path")
sys.exit(1)

View File

@ -0,0 +1,331 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from collections import Counter
from pyspark import SQLContext
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepo
from monasca_transform.data_driven_specs.json_data_driven_specs_repo \
import JSONDataDrivenSpecsRepo
from monasca_transform.data_driven_specs.mysql_data_driven_specs_repo \
import MySQLDataDrivenSpecsRepo
from tests.unit.spark_context_test import SparkContextTest
class TestDataDrivenSpecsRepo(SparkContextTest):
def setUp(self):
super(TestDataDrivenSpecsRepo, self).setUp()
if type(self) is not TestDataDrivenSpecsRepo:
self.sql_context = SQLContext(self.spark_context)
def check_transform_specs_data_frame(self, transform_specs_data_frame):
self.check_metric(
metric_id='mem_total_all',
expected_agg_metric_name='mem.total_mb_agg',
transform_specs_dataframe=transform_specs_data_frame)
self.check_metric(
metric_id='mem_usable_all',
expected_agg_metric_name='mem.usable_mb_agg',
transform_specs_dataframe=transform_specs_data_frame)
self.check_metric(
metric_id='disk_total_all',
expected_agg_metric_name='disk.total_space_mb_agg',
transform_specs_dataframe=transform_specs_data_frame)
self.check_metric(
metric_id='disk_usable_all',
expected_agg_metric_name='disk.total_used_space_mb_agg',
transform_specs_dataframe=transform_specs_data_frame)
self.check_metric(
metric_id='vcpus_all',
expected_agg_metric_name='vcpus_agg',
transform_specs_dataframe=transform_specs_data_frame)
self.check_metric(
metric_id='vm_mem_total_mb_all',
expected_agg_metric_name='vm.mem.total_mb_agg',
transform_specs_dataframe=transform_specs_data_frame)
self.check_metric(
metric_id='disk_allocated_all',
expected_agg_metric_name='nova.vm.disk.total_allocated_gb_agg',
transform_specs_dataframe=transform_specs_data_frame)
def check_metric(self, metric_id=None, expected_agg_metric_name=None,
transform_specs_dataframe=None):
transform_specs_data_frame = transform_specs_dataframe.select(
["aggregation_params_map",
"metric_id"]
).where(
transform_specs_dataframe.metric_id == metric_id)
agg_params_json = transform_specs_data_frame.select(
"aggregation_params_map.aggregated_metric_name").collect()[0].\
asDict()
self.assertEqual(expected_agg_metric_name,
agg_params_json["aggregated_metric_name"])
def check_pre_transform_specs_data_frame(
self, pre_transform_specs_data_frame):
# gather the references and uses here
self.assertEqual(
Counter([u'mem.usable_mb',
u'mem.total_mb',
u'disk.total_used_space_mb', u'disk.total_space_mb',
u'cpu.total_logical_cores',
u'cpu.idle_perc', u'vcpus',
u'vm.mem.total_mb', u'nova.vm.disk.total_allocated_gb']),
Counter([row.event_type for row in
pre_transform_specs_data_frame.collect()]))
# mem.usable_mb
event_type = 'mem.usable_mb'
mem_usable_mb_row = self.get_row_for_event_type(
event_type=event_type,
pre_transform_specs_data_frame=pre_transform_specs_data_frame)
self.check_list_field_for_row(
row=mem_usable_mb_row,
field_name='metric_id_list',
expected_list=['mem_usable_all']
)
self.check_list_field_for_row(
row=mem_usable_mb_row,
field_name='required_raw_fields_list',
expected_list=['creation_time'])
self.check_dict_field_for_row(
row=mem_usable_mb_row,
field_name='event_processing_params',
expected_dict={
"set_default_zone_to": "1",
"set_default_geolocation_to": "1",
"set_default_region_to": "W"})
self.check_value_field_for_row(
row=mem_usable_mb_row,
field_name='service_id',
expected_value='host_metrics'
)
# mem.total_mb
event_type = 'mem.total_mb'
mem_total_mb_row = self.get_row_for_event_type(
event_type=event_type,
pre_transform_specs_data_frame=pre_transform_specs_data_frame)
self.check_list_field_for_row(
row=mem_total_mb_row,
field_name='metric_id_list',
expected_list=['mem_total_all']
)
self.check_list_field_for_row(
row=mem_total_mb_row,
field_name='required_raw_fields_list',
expected_list=['creation_time'],
)
self.check_dict_field_for_row(
row=mem_total_mb_row,
field_name='event_processing_params',
expected_dict={
"set_default_zone_to": "1",
"set_default_geolocation_to": "1",
"set_default_region_to": "W"})
self.check_value_field_for_row(
row=mem_total_mb_row,
field_name='service_id',
expected_value='host_metrics'
)
event_type = 'vcpus'
vcpus_all_row = self.get_row_for_event_type(
event_type=event_type,
pre_transform_specs_data_frame=pre_transform_specs_data_frame)
self.check_list_field_for_row(
row=vcpus_all_row,
field_name='metric_id_list',
expected_list=['vcpus_all',
'vcpus_project']
)
self.check_list_field_for_row(
row=vcpus_all_row,
field_name='required_raw_fields_list',
expected_list=['creation_time', 'project_id'],
)
self.check_dict_field_for_row(
row=vcpus_all_row,
field_name='event_processing_params',
expected_dict={
"set_default_zone_to": "1",
"set_default_geolocation_to": "1",
"set_default_region_to": "W"})
self.check_value_field_for_row(
row=vcpus_all_row,
field_name='service_id',
expected_value='host_metrics'
)
# vm.mem.total_mb
event_type = 'vm.mem.total_mb'
vm_mem_total_mb_all_row = self.get_row_for_event_type(
event_type=event_type,
pre_transform_specs_data_frame=pre_transform_specs_data_frame)
self.check_list_field_for_row(
row=vm_mem_total_mb_all_row,
field_name='metric_id_list',
expected_list=['vm_mem_total_mb_all',
'vm_mem_total_mb_project']
)
self.check_list_field_for_row(
row=vm_mem_total_mb_all_row,
field_name='required_raw_fields_list',
expected_list=['creation_time', 'tenantId'],
)
self.check_dict_field_for_row(
row=vm_mem_total_mb_all_row,
field_name='event_processing_params',
expected_dict={
"set_default_zone_to": "1",
"set_default_geolocation_to": "1",
"set_default_region_to": "W"})
self.check_value_field_for_row(
row=vm_mem_total_mb_all_row,
field_name='service_id',
expected_value='host_metrics'
)
event_type = 'nova.vm.disk.total_allocated_gb'
disk_total_alloc_row = self.get_row_for_event_type(
event_type=event_type,
pre_transform_specs_data_frame=pre_transform_specs_data_frame)
self.check_list_field_for_row(
row=disk_total_alloc_row,
field_name='metric_id_list',
expected_list=['disk_allocated_all']
)
self.check_list_field_for_row(
row=disk_total_alloc_row,
field_name='required_raw_fields_list',
expected_list=['creation_time'],
)
self.check_dict_field_for_row(
row=disk_total_alloc_row,
field_name='event_processing_params',
expected_dict={
"set_default_zone_to": "1",
"set_default_geolocation_to": "1",
"set_default_region_to": "W"})
self.check_value_field_for_row(
row=disk_total_alloc_row,
field_name='service_id',
expected_value='host_metrics'
)
def get_row_for_event_type(self,
event_type=None,
pre_transform_specs_data_frame=None):
"""get row for event type
:rtype: Row
"""
rows = pre_transform_specs_data_frame.filter(
pre_transform_specs_data_frame.event_type == event_type
).collect()
self.assertEqual(
1, len(rows),
'There should be only one row for event_type %s' % event_type)
return rows[0]
def check_dict_field_for_row(
self, row=None, field_name=None, expected_dict=None):
field = getattr(row, field_name)
values = field.asDict()
self.assertEqual(expected_dict, values)
def check_list_field_for_row(
self, row=None, field_name=None, expected_list=None):
found_list = getattr(row, field_name)
self.assertEqual(Counter(expected_list), Counter(found_list))
def check_value_field_for_row(
self, row=None, field_name=None, expected_value=None):
found_value = getattr(row, field_name)
self.assertEqual(expected_value, found_value)
@abc.abstractmethod
def test_transform_specs_data_frame(self):
pass
@abc.abstractmethod
def test_pre_transform_specs_data_frame(self):
pass
class TestMySQLDataDrivenSpecsRepo(TestDataDrivenSpecsRepo):
def setUp(self):
ConfigInitializer.basic_config()
super(TestMySQLDataDrivenSpecsRepo, self).setUp()
self.data_driven_specs_repo = MySQLDataDrivenSpecsRepo()
def tearDown(self):
super(TestMySQLDataDrivenSpecsRepo, self).tearDown()
def test_transform_specs_data_frame(self):
db_transform_specs_data_frame = \
self.data_driven_specs_repo.get_data_driven_specs(
sql_context=self.sql_context,
data_driven_spec_type=DataDrivenSpecsRepo.transform_specs_type)
self.check_transform_specs_data_frame(db_transform_specs_data_frame)
def test_pre_transform_specs_data_frame(self):
db_pre_transform_specs_data_frame = \
self.data_driven_specs_repo.get_data_driven_specs(
sql_context=self.sql_context,
data_driven_spec_type=DataDrivenSpecsRepo.
pre_transform_specs_type)
self.check_pre_transform_specs_data_frame(
db_pre_transform_specs_data_frame)
class TestJSONDataDrivenSpecsRepo(TestDataDrivenSpecsRepo):
def setUp(self):
super(TestJSONDataDrivenSpecsRepo, self).setUp()
self.data_driven_specs_repo = JSONDataDrivenSpecsRepo()
def tearDown(self):
super(TestJSONDataDrivenSpecsRepo, self).tearDown()
def test_transform_specs_data_frame(self):
json_transform_specs_data_frame = \
self.data_driven_specs_repo.get_data_driven_specs(
sql_context=self.sql_context,
data_driven_spec_type=DataDrivenSpecsRepo.transform_specs_type)
self.check_transform_specs_data_frame(json_transform_specs_data_frame)
def test_pre_transform_specs_data_frame(self):
json_pre_transform_specs_data_frame = \
self.data_driven_specs_repo.get_data_driven_specs(
sql_context=self.sql_context,
data_driven_spec_type=DataDrivenSpecsRepo.
pre_transform_specs_type)
self.check_pre_transform_specs_data_frame(
json_pre_transform_specs_data_frame)

View File

@ -0,0 +1,24 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Add the location of Spark to the path
import os
import sys
try:
sys.path.append(os.path.join("/opt/spark/current", "python"))
sys.path.append(os.path.join("/opt/spark/current",
"python", "lib", "py4j-0.9-src.zip"))
except KeyError:
print("Error adding Spark location to the path")
sys.exit(1)

View File

@ -0,0 +1,463 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from oslo_config import cfg
from pyspark.streaming.kafka import OffsetRange
import mock
from mock import call
from mock import MagicMock
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.driver.mon_metrics_kafka \
import MonMetricsKafkaProcessor
from monasca_transform.messaging.adapter import MessageAdapter
from monasca_transform.transform import RddTransformContext
from monasca_transform.transform import TransformContextUtils
from tests.unit.spark_context_test import SparkContextTest
from tests.unit.test_resources.kafka_data.data_provider import DataProvider
from tests.unit.test_resources.mock_component_manager \
import MockComponentManager
class SparkUnitTest(unittest.TestCase):
def test_transform_to_recordstore(self):
# simply verify that the transform method is called first, then
# rdd to recordstore
kafka_stream = MagicMock(name='kafka_stream')
transformed_stream = MagicMock(name='transformed_stream')
kafka_stream.transform.return_value = transformed_stream
transformed_stream_expected = call.foreachRDD(
MonMetricsKafkaProcessor.rdd_to_recordstore
).call_list()
kafka_stream_expected = call.transform(
MonMetricsKafkaProcessor.store_offset_ranges
).call_list()
MonMetricsKafkaProcessor.transform_to_recordstore(
kafka_stream)
self.assertEqual(
kafka_stream_expected, kafka_stream.mock_calls)
self.assertEqual(
transformed_stream_expected, transformed_stream.mock_calls)
class SparkTest(SparkContextTest):
def setUp(self):
super(SparkTest, self).setUp()
# configure the system with a dummy messaging adapter
ConfigInitializer.basic_config(
default_config_files=[
'tests/unit/test_resources/config/'
'test_config_with_dummy_messaging_adapter.conf'])
# reset metric_id list dummy adapter
if not MessageAdapter.adapter_impl:
MessageAdapter.init()
MessageAdapter.adapter_impl.metric_list = []
@mock.patch('monasca_transform.transform.builder.'
'generic_transform_builder.GenericTransformBuilder.'
'_get_insert_component_manager')
@mock.patch('monasca_transform.transform.builder.'
'generic_transform_builder.GenericTransformBuilder.'
'_get_setter_component_manager')
@mock.patch('monasca_transform.transform.builder.'
'generic_transform_builder.GenericTransformBuilder.'
'_get_usage_component_manager')
def test_rdd_to_recordstore(self,
usage_manager,
setter_manager,
insert_manager):
usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr()
setter_manager.return_value = \
MockComponentManager.get_setter_cmpt_mgr()
insert_manager.return_value = \
MockComponentManager.get_insert_cmpt_mgr()
# Create an emulated set of Kafka messages (these were gathered
# by extracting Monasca messages from the Metrics queue on mini-mon).
# Create an RDD out of the mocked Monasca metrics
with open(DataProvider.kafka_data_path) as f:
raw_lines = f.read().splitlines()
raw_tuple_list = [eval(raw_line) for raw_line in raw_lines]
rdd_monasca = self.spark_context.parallelize(raw_tuple_list)
# decorate mocked RDD with dummy kafka offsets
myOffsetRanges = [
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges)
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
# Do something simple with the RDD
result = simple_count_transform(rdd_monasca_with_offsets)
# Verify it worked
self.assertEqual(result, 307)
# Call the primary method in mon_metrics_kafka
MonMetricsKafkaProcessor.rdd_to_recordstore(
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = MessageAdapter.adapter_impl.metric_list
total_mb_agg_metric = [
value for value in metrics
if value.get('metric').get('name') == 'mem.total_mb_agg'][0]
self.assertEqual(3733.75,
total_mb_agg_metric.get('metric').get('value'))
self.assertEqual('useast',
total_mb_agg_metric.get('meta').get('region'))
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
total_mb_agg_metric.get('meta').get('tenantId'))
self.assertEqual('all',
total_mb_agg_metric.get('metric').get('dimensions')
.get('host'))
self.assertEqual('all',
total_mb_agg_metric.get('metric').get('dimensions')
.get('project_id'))
self.assertEqual('hourly',
total_mb_agg_metric.get('metric').get('dimensions')
.get('aggregation_period'))
self.assertEqual(4.0,
total_mb_agg_metric.get('metric').get('value_meta')
.get('record_count'))
self.assertEqual('2016-01-20 16:40:00',
total_mb_agg_metric.get('metric').get('value_meta')
.get('firstrecord_timestamp'))
self.assertEqual('2016-01-20 16:40:46',
total_mb_agg_metric.get('metric').get('value_meta')
.get('lastrecord_timestamp'))
usable_mb_agg_metric = [
value for value in metrics
if value.get('metric').get('name') == 'mem.usable_mb_agg'][0]
self.assertEqual(843.0,
usable_mb_agg_metric.get('metric').get('value'))
self.assertEqual('useast',
usable_mb_agg_metric.get('meta').get('region'))
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
usable_mb_agg_metric.get('meta').get('tenantId'))
self.assertEqual('all',
usable_mb_agg_metric.get('metric').get('dimensions')
.get('host'))
self.assertEqual('all',
usable_mb_agg_metric.get('metric').get('dimensions')
.get('project_id'))
self.assertEqual('hourly',
usable_mb_agg_metric.get('metric').get('dimensions')
.get('aggregation_period'))
self.assertEqual(4.0,
total_mb_agg_metric.get('metric').get('value_meta')
.get('record_count'))
self.assertEqual('2016-01-20 16:40:00',
total_mb_agg_metric.get('metric').get('value_meta')
.get('firstrecord_timestamp'))
self.assertEqual('2016-01-20 16:40:46',
total_mb_agg_metric.get('metric').get('value_meta')
.get('lastrecord_timestamp'))
vcpus_agg_metric = [
value for value in metrics
if value.get('metric').get('name') ==
'vcpus_agg' and
value.get('metric').get('dimensions').get('project_id') ==
'all'][0]
self.assertTrue(vcpus_agg_metric is not None)
self.assertEqual(8.0,
vcpus_agg_metric
.get('metric').get('value'))
self.assertEqual('useast',
vcpus_agg_metric
.get('meta').get('region'))
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
vcpus_agg_metric
.get('meta').get('tenantId'))
self.assertEqual('all',
vcpus_agg_metric
.get('metric').get('dimensions').get('host'))
self.assertEqual('hourly',
vcpus_agg_metric
.get('metric').get('dimensions')
.get('aggregation_period'))
self.assertEqual(14.0,
vcpus_agg_metric
.get('metric').get('value_meta').get('record_count'))
self.assertEqual('2016-01-20 16:40:00',
vcpus_agg_metric
.get('metric').get('value_meta')
.get('firstrecord_timestamp'))
self.assertEqual('2016-01-20 16:40:46',
vcpus_agg_metric
.get('metric').get('value_meta')
.get('lastrecord_timestamp'))
vcpus_agg_metric = [
value for value in metrics
if value.get('metric').get('name') ==
'vcpus_agg' and
value.get('metric').get('dimensions').get('project_id') ==
'8647fd5030b04a799b0411cc38c4102d'][0]
self.assertTrue(vcpus_agg_metric is not None)
self.assertEqual(2.0,
vcpus_agg_metric
.get('metric').get('value'))
self.assertEqual('useast',
vcpus_agg_metric
.get('meta').get('region'))
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
vcpus_agg_metric
.get('meta').get('tenantId'))
self.assertEqual('all',
vcpus_agg_metric
.get('metric').get('dimensions').get('host'))
self.assertEqual('hourly',
vcpus_agg_metric
.get('metric').get('dimensions')
.get('aggregation_period'))
self.assertEqual(6.0,
vcpus_agg_metric
.get('metric').get('value_meta').get('record_count'))
self.assertEqual('2016-01-20 16:40:00',
vcpus_agg_metric
.get('metric').get('value_meta')
.get('firstrecord_timestamp'))
self.assertEqual('2016-01-20 16:40:42',
vcpus_agg_metric
.get('metric').get('value_meta')
.get('lastrecord_timestamp'))
vcpus_agg_metric = [
value for value in metrics
if value.get('metric').get('name') ==
'vcpus_agg' and
value.get('metric').get('dimensions').get('project_id') ==
'9647fd5030b04a799b0411cc38c4102d'][0]
self.assertTrue(vcpus_agg_metric is not None)
self.assertEqual(6.0,
vcpus_agg_metric
.get('metric').get('value'))
self.assertEqual('useast',
vcpus_agg_metric
.get('meta').get('region'))
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
vcpus_agg_metric
.get('meta').get('tenantId'))
self.assertEqual('all',
vcpus_agg_metric
.get('metric').get('dimensions').get('host'))
self.assertEqual('hourly',
vcpus_agg_metric
.get('metric').get('dimensions')
.get('aggregation_period'))
self.assertEqual(8.0,
vcpus_agg_metric
.get('metric').get('value_meta').get('record_count'))
self.assertEqual('2016-01-20 16:40:05',
vcpus_agg_metric
.get('metric').get('value_meta')
.get('firstrecord_timestamp'))
self.assertEqual('2016-01-20 16:40:46',
vcpus_agg_metric
.get('metric').get('value_meta')
.get('lastrecord_timestamp'))
vm_mem_total_mb_agg_metric = [
value for value in metrics
if value.get('metric').get('name') ==
'vm.mem.total_mb_agg' and
value.get('metric').get('dimensions').get('project_id') ==
'all'][0]
self.assertTrue(vm_mem_total_mb_agg_metric is not None)
self.assertEqual(9728.0,
vm_mem_total_mb_agg_metric
.get('metric').get('value'))
self.assertEqual('useast',
vm_mem_total_mb_agg_metric
.get('meta').get('region'))
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
vm_mem_total_mb_agg_metric
.get('meta').get('tenantId'))
self.assertEqual('all',
vm_mem_total_mb_agg_metric
.get('metric').get('dimensions').get('host'))
self.assertEqual('hourly',
vm_mem_total_mb_agg_metric
.get('metric').get('dimensions')
.get('aggregation_period'))
self.assertEqual(9.0,
vm_mem_total_mb_agg_metric
.get('metric').get('value_meta').get('record_count'))
self.assertEqual('2016-01-20 16:40:00',
vm_mem_total_mb_agg_metric
.get('metric').get('value_meta')
.get('firstrecord_timestamp'))
self.assertEqual('2016-01-20 16:40:46',
vm_mem_total_mb_agg_metric
.get('metric').get('value_meta')
.get('lastrecord_timestamp'))
vm_mem_total_mb_agg_metric = [
value for value in metrics
if value.get('metric').get('name') ==
'vm.mem.total_mb_agg' and
value.get('metric').get('dimensions').get('project_id') ==
'5f681592f7084c5fbcd4e8a20a4fef15'][0]
self.assertTrue(vm_mem_total_mb_agg_metric is not None)
self.assertEqual(1536.0,
vm_mem_total_mb_agg_metric
.get('metric').get('value'))
self.assertEqual('useast',
vm_mem_total_mb_agg_metric
.get('meta').get('region'))
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
vm_mem_total_mb_agg_metric
.get('meta').get('tenantId'))
self.assertEqual('all',
vm_mem_total_mb_agg_metric
.get('metric').get('dimensions').get('host'))
self.assertEqual('hourly',
vm_mem_total_mb_agg_metric
.get('metric').get('dimensions')
.get('aggregation_period'))
self.assertEqual(3.0,
vm_mem_total_mb_agg_metric
.get('metric').get('value_meta').get('record_count'))
self.assertEqual('2016-01-20 16:40:00',
vm_mem_total_mb_agg_metric
.get('metric').get('value_meta')
.get('firstrecord_timestamp'))
self.assertEqual('2016-01-20 16:40:40',
vm_mem_total_mb_agg_metric
.get('metric').get('value_meta')
.get('lastrecord_timestamp'))
vm_mem_total_mb_agg_metric = [
value for value in metrics
if value.get('metric').get('name') ==
'vm.mem.total_mb_agg' and
value.get('metric').get('dimensions').get('project_id') ==
'6f681592f7084c5fbcd4e8a20a4fef15'][0]
self.assertTrue(vm_mem_total_mb_agg_metric is not None)
self.assertEqual(8192.0,
vm_mem_total_mb_agg_metric
.get('metric').get('value'))
self.assertEqual('useast',
vm_mem_total_mb_agg_metric
.get('meta').get('region'))
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
vm_mem_total_mb_agg_metric
.get('meta').get('tenantId'))
self.assertEqual('all',
vm_mem_total_mb_agg_metric
.get('metric').get('dimensions').get('host'))
self.assertEqual('hourly',
vm_mem_total_mb_agg_metric
.get('metric').get('dimensions')
.get('aggregation_period'))
self.assertEqual(6.0,
vm_mem_total_mb_agg_metric
.get('metric').get('value_meta').get('record_count'))
self.assertEqual('2016-01-20 16:40:00',
vm_mem_total_mb_agg_metric
.get('metric').get('value_meta')
.get('firstrecord_timestamp'))
self.assertEqual('2016-01-20 16:40:46',
vm_mem_total_mb_agg_metric
.get('metric').get('value_meta')
.get('lastrecord_timestamp'))
total_allocated_disk_agg_metric = [
value for value in metrics
if value.get('metric').get('name') ==
'nova.vm.disk.total_allocated_gb_agg'][0]
self.assertEqual(180.0,
total_allocated_disk_agg_metric
.get('metric').get('value'))
self.assertEqual('useast',
total_allocated_disk_agg_metric.get('meta')
.get('region'))
self.assertEqual(cfg.CONF.messaging.publish_kafka_tenant_id,
total_allocated_disk_agg_metric.get('meta')
.get('tenantId'))
self.assertEqual('all',
total_allocated_disk_agg_metric.get('metric')
.get('dimensions').get('host'))
self.assertEqual('all',
total_allocated_disk_agg_metric.get('metric')
.get('dimensions').get('project_id'))
self.assertEqual('hourly',
total_allocated_disk_agg_metric.get('metric')
.get('dimensions').get('aggregation_period'))
self.assertEqual(5.0,
total_allocated_disk_agg_metric.get('metric')
.get('value_meta').get('record_count'))
self.assertEqual('2016-05-17 15:14:08',
total_allocated_disk_agg_metric.get('metric')
.get('value_meta').get('firstrecord_timestamp'))
self.assertEqual('2016-05-17 15:14:44',
total_allocated_disk_agg_metric.get('metric')
.get('value_meta').get('lastrecord_timestamp'))
def simple_count_transform(rdd):
return rdd.count()
if __name__ == "__main__":
print("PATH *************************************************************")
import sys
print(sys.path)
print("PATH==============================================================")
unittest.main()

View File

View File

@ -0,0 +1,23 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_transform.messaging.adapter import MessageAdapter
class DummyAdapter(MessageAdapter):
metric_list = []
def do_send_metric(self, metric):
self.metric_list.append(metric)

View File

View File

@ -0,0 +1,68 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.sql import SQLContext
from monasca_transform.transform.transform_utils import RecordStoreUtils
from monasca_transform.transform.transform_utils import TransformSpecsUtils
from monasca_transform.transform import TransformContextUtils
from monasca_transform.component.setter.rollup_quantity \
import RollupQuantity
from monasca_transform.component.setter.set_aggregated_metric_name \
import SetAggregatedMetricName
from monasca_transform.component.usage.fetch_quantity \
import FetchQuantity
from tests.unit.spark_context_test import SparkContextTest
from tests.unit.test_resources.mem_total_all.data_provider import DataProvider
class SetAggregatedMetricNameTest(SparkContextTest):
def setUp(self):
super(SetAggregatedMetricNameTest, self).setUp()
self.sql_context = SQLContext(self.spark_context)
def test_set_aggregated_metric_name(self):
record_store_df = RecordStoreUtils.create_df_from_json(
self.sql_context,
DataProvider.record_store_path)
transform_spec_df = TransformSpecsUtils.create_df_from_json(
self.sql_context,
DataProvider.transform_spec_path)
transform_context = \
TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df)
instance_usage_df = FetchQuantity.usage(
transform_context, record_store_df)
instance_usage_df_1 = RollupQuantity.setter(
transform_context, instance_usage_df)
instance_usage_df_2 = SetAggregatedMetricName.setter(
transform_context, instance_usage_df_1)
result_list = [(row.usage_date, row.usage_hour,
row.tenant_id, row.host, row.quantity,
row.aggregated_metric_name)
for row in instance_usage_df_2.rdd.collect()]
expected_result = [
('2016-02-08', '18', 'all', 'all', 12946.0, 'mem.total_mb_agg')]
self.assertItemsEqual(result_list, expected_result)

View File

@ -0,0 +1,60 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.sql import SQLContext
from monasca_transform.transform.transform_utils import RecordStoreUtils
from monasca_transform.transform.transform_utils import TransformSpecsUtils
from monasca_transform.transform import TransformContextUtils
from monasca_transform.component.setter.rollup_quantity \
import RollupQuantity
from monasca_transform.component.usage.fetch_quantity \
import FetchQuantity
from tests.unit.spark_context_test import SparkContextTest
from tests.unit.test_resources.mem_total_all.data_provider import DataProvider
class UsageComponentTest(SparkContextTest):
def setUp(self):
super(UsageComponentTest, self).setUp()
self.sql_context = SQLContext(self.spark_context)
def test_sum_quantity_all_component(self):
record_store_df = RecordStoreUtils.create_df_from_json(
self.sql_context, DataProvider.record_store_path)
transform_spec_df = TransformSpecsUtils.create_df_from_json(
self.sql_context, DataProvider.transform_spec_path)
transform_context = \
TransformContextUtils.get_context(
transform_spec_df_info=transform_spec_df)
instance_usage_df = FetchQuantity.usage(
transform_context, record_store_df)
instance_usage_df_all = RollupQuantity.setter(
transform_context, instance_usage_df)
result_list = [(row.usage_date, row.usage_hour,
row.tenant_id, row.host, row.quantity)
for row in instance_usage_df_all.rdd.collect()]
expected_result = [
('2016-02-08', '18', 'all', 'all', 12946.0)]
self.assertItemsEqual(result_list, expected_result)

View File

@ -0,0 +1,31 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.context import SparkContext
from pyspark import SparkConf
import unittest
class SparkContextTest(unittest.TestCase):
def setUp(self):
# Create a local Spark context with 4 cores
spark_conf = SparkConf().setMaster('local[4]')
self.spark_context = SparkContext.getOrCreate(conf=spark_conf)
def tearDown(self):
# we don't stop the spark context because it doesn't work cleanly,
# a context is left behind that cannot work. Instead we rely on the
# context to be shutdown at the end of the tests run
pass

View File

@ -0,0 +1,232 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import random
import sys
import unittest
import uuid
from monasca_transform.offset_specs import JSONOffsetSpecs
from monasca_transform.offset_specs import OffsetSpec
class TestJSONOffsetSpecs(unittest.TestCase):
test_resources_path = 'tests/unit/test_resources'
def setUp(self):
pass
def tearDown(self):
pass
def test_read_offsets_on_start(self):
json_offset_specs = JSONOffsetSpecs(
path=self.test_resources_path,
filename='test_read_offsets_on_start.json')
kafka_offsets = json_offset_specs.get_kafka_offsets()
self.assertEqual(1, len(kafka_offsets))
offset_key_0 = kafka_offsets.iterkeys().next()
self.assertEqual('mon_metrics_kafka_metrics_0', offset_key_0)
offset_value_0 = kafka_offsets.get(offset_key_0)
self.assertEqual('metrics', offset_value_0.get_topic())
self.assertEqual(85081, offset_value_0.get_until_offset())
self.assertEqual(0, offset_value_0.get_partition())
self.assertEqual('mon_metrics_kafka', offset_value_0.get_app_name())
self.assertEqual(84790, offset_value_0.get_from_offset())
def test_write_offsets_each_add(self):
filename = '%s.json' % str(uuid.uuid4())
file_path = os.path.join(self.test_resources_path, filename)
json_offset_specs = JSONOffsetSpecs(
path=self.test_resources_path,
filename=filename
)
topic_1 = str(uuid.uuid4())
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
used_values = {}
json_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1, from_offset=from_offset_1,
until_offset=until_offset_1)
kafka_offset_dict = self.load_offset_file_as_json(file_path)
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
used_values[offset_key_1] = {
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
"from_offset": from_offset_1, "until_offset": until_offset_1
}
offset_value_1 = kafka_offset_dict.get(offset_key_1)
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
offset_value=offset_value_1)
topic_2 = str(uuid.uuid4())
partition_2 = random.randint(0, 1024)
until_offset_2 = random.randint(0, sys.maxsize)
from_offset_2 = random.randint(0, sys.maxsize)
app_name_2 = str(uuid.uuid4())
json_offset_specs.add(topic=topic_2, partition=partition_2,
app_name=app_name_2, from_offset=from_offset_2,
until_offset=until_offset_2)
offset_key_2 = "%s_%s_%s" % (app_name_2, topic_2, partition_2)
used_values[offset_key_2] = {
"topic": topic_2, "partition": partition_2, "app_name": app_name_2,
"from_offset": from_offset_2, "until_offset": until_offset_2
}
kafka_offset_dict = self.load_offset_file_as_json(file_path)
for key in [offset_key_1, offset_key_2]:
self.assertions_on_offset(used_value=used_values.get(key),
offset_value=kafka_offset_dict.get(key))
# if assertions fail then file is left for analysis
os.remove(file_path)
def test_as_dict(self):
topic_1 = str(uuid.uuid4())
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
offset_spec = OffsetSpec(
app_name=app_name_1, topic=topic_1, partition=partition_1,
from_offset=from_offset_1, until_offset=until_offset_1)
offset_spec_dict = JSONOffsetSpecs.as_dict(offset_spec)
self.assertions_on_offset(
used_value={
"topic": topic_1, "partition": partition_1,
"app_name": app_name_1, "from_offset": from_offset_1,
"until_offset": until_offset_1},
offset_value=offset_spec_dict)
def test_write_then_read(self):
filename = '%s.json' % str(uuid.uuid4())
json_offset_specs = JSONOffsetSpecs(
path=self.test_resources_path,
filename=filename
)
topic_1 = str(uuid.uuid4())
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
used_values = {}
json_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1, from_offset=from_offset_1,
until_offset=until_offset_1)
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
used_values[offset_key_1] = {
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
"from_offset": from_offset_1, "until_offset": until_offset_1
}
topic_2 = str(uuid.uuid4())
partition_2 = random.randint(0, 1024)
until_offset_2 = random.randint(0, sys.maxsize)
from_offset_2 = random.randint(0, sys.maxsize)
app_name_2 = str(uuid.uuid4())
json_offset_specs.add(topic=topic_2, partition=partition_2,
app_name=app_name_2, from_offset=from_offset_2,
until_offset=until_offset_2)
offset_key_2 = "%s_%s_%s" % (app_name_2, topic_2, partition_2)
used_values[offset_key_2] = {
"topic": topic_2, "partition": partition_2, "app_name": app_name_2,
"from_offset": from_offset_2, "until_offset": until_offset_2
}
# now create a new JSONOffsetSpecs
json_offset_specs_2 = JSONOffsetSpecs(self.test_resources_path,
filename)
found_offsets = json_offset_specs_2.get_kafka_offsets()
json_found_offsets = {key: JSONOffsetSpecs.as_dict(value)
for key, value in found_offsets.items()}
for key, value in used_values.items():
found_value = json_found_offsets.get(key)
self.assertEqual(value, found_value)
os.remove(os.path.join(self.test_resources_path, filename))
def test_update_offset_values(self):
filename = '%s.json' % str(uuid.uuid4())
file_path = os.path.join(self.test_resources_path, filename)
json_offset_specs = JSONOffsetSpecs(
path=self.test_resources_path,
filename=filename
)
topic_1 = str(uuid.uuid4())
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
used_values = {}
json_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1, from_offset=from_offset_1,
until_offset=until_offset_1)
kafka_offset_dict = self.load_offset_file_as_json(file_path)
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
used_values[offset_key_1] = {
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
"from_offset": from_offset_1, "until_offset": until_offset_1
}
offset_value_1 = kafka_offset_dict.get(offset_key_1)
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
offset_value=offset_value_1)
until_offset_2 = random.randint(0, sys.maxsize)
while until_offset_2 == until_offset_1:
until_offset_2 = random.randint(0, sys.maxsize)
from_offset_2 = random.randint(0, sys.maxsize)
while from_offset_2 == from_offset_1:
from_offset_2 = random.randint(0, sys.maxsize)
json_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1, from_offset=from_offset_2,
until_offset=until_offset_2)
kafka_offset_dict = self.load_offset_file_as_json(file_path)
offset_value_updated = kafka_offset_dict.get(offset_key_1)
self.assertEqual(from_offset_2,
offset_value_updated.get('from_offset'))
self.assertEqual(until_offset_2,
offset_value_updated.get('until_offset'))
os.remove(file_path)
def load_offset_file_as_json(self, file_path):
with open(file_path, 'r') as f:
json_file = json.load(f)
return json_file
@unittest.skip
def test_get_offsets_is_obj_based(self):
self.fail('We need to assert that we get objects back '
'from the get offsets method')
def assertions_on_offset(self, used_value=None, offset_value=None):
self.assertEqual(used_value.get('topic'),
offset_value.get('topic'))
self.assertEqual(used_value.get('partition'),
int(offset_value.get('partition')))
self.assertEqual(used_value.get('until_offset'),
int(offset_value.get('until_offset')))
self.assertEqual(used_value.get('from_offset'),
int(offset_value.get('from_offset')))
self.assertEqual(used_value.get('app_name'),
offset_value.get('app_name'))

View File

@ -0,0 +1,126 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
import sys
import unittest
import uuid
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.mysql_offset_specs import MySQLOffsetSpecs
class TestMySQLOffsetSpecs(unittest.TestCase):
def setUp(self):
ConfigInitializer.basic_config()
self.kafka_offset_specs = MySQLOffsetSpecs()
def tearDown(self):
pass
def test_add_offset(self):
topic_1 = str(uuid.uuid4())
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
used_values = {}
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_1,
until_offset=until_offset_1)
used_values[offset_key_1] = {
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
"from_offset": from_offset_1, "until_offset": until_offset_1
}
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets()
offset_value_1 = kafka_offset_specs.get(offset_key_1)
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
offset_value=offset_value_1)
def test_add_another_offset(self):
offset_specs_at_outset = self.kafka_offset_specs.get_kafka_offsets()
offset_count = len(offset_specs_at_outset)
topic_1 = str(uuid.uuid4())
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
used_values = {}
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_1,
until_offset=until_offset_1)
used_values[offset_key_1] = {
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
"from_offset": from_offset_1, "until_offset": until_offset_1
}
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets()
offset_value_1 = kafka_offset_specs.get(offset_key_1)
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
offset_value=offset_value_1)
self.assertEqual(offset_count + 1,
len(self.kafka_offset_specs.get_kafka_offsets()))
def test_update_offset_values(self):
topic_1 = str(uuid.uuid4())
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = str(uuid.uuid4())
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_1,
until_offset=until_offset_1)
until_offset_2 = random.randint(0, sys.maxsize)
while until_offset_2 == until_offset_1:
until_offset_2 = random.randint(0, sys.maxsize)
from_offset_2 = random.randint(0, sys.maxsize)
while from_offset_2 == from_offset_1:
from_offset_2 = random.randint(0, sys.maxsize)
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_2,
until_offset=until_offset_2)
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets()
updated_offset_value = kafka_offset_specs.get(offset_key_1)
self.assertEqual(from_offset_2, updated_offset_value.get_from_offset())
self.assertEqual(until_offset_2,
updated_offset_value.get_until_offset())
def assertions_on_offset(self, used_value=None, offset_value=None):
self.assertEqual(used_value.get('topic'),
offset_value.get_topic())
self.assertEqual(used_value.get('partition'),
int(offset_value.get_partition()))
self.assertEqual(used_value.get('until_offset'),
int(offset_value.get_until_offset()))
self.assertEqual(used_value.get('from_offset'),
int(offset_value.get_from_offset()))
self.assertEqual(used_value.get('app_name'),
offset_value.get_app_name())

View File

View File

@ -0,0 +1,12 @@
[DEFAULTS]
[repositories]
offsets = test_offsets_repo_class
data_driven_specs = test_data_driven_specs_repo_class
[database]
server_type = test_server_type
host = test_host_name
database_name = test_database_name
username = test_database_user_name
password = test_database_password

View File

@ -0,0 +1,4 @@
[DEFAULTS]
[messaging]
adapter = tests.unit.messaging.adapter:DummyAdapter

View File

@ -0,0 +1,93 @@
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366959000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366963})
('',{"metric":{"name":"vcpus","dimensions":{"region":"useast","unit":"vcpu","resource_id":"b1a7f31d-3f85-4475-a50b-77094b62b078","source":"openstack","project_id":"103e4d4d14bc4fdda4a9c73d1643e1d7","user_id":"a69aabac824c4e1688702b8eb7e22af2","type":"gauge"},"timestamp":1453308000000,"value":0.0,"value_meta":{"audit_period_beginning":"2016-05-13T23:00:00.000000","availability_zone":"None","host":"devstack","event_type":"compute.instance.update","launched_at":"2016-05-13T23:14:06.000000","deleted_at":"","state":"active","memory_mb":"512","root_gb":"1","vcpus":"1","audit_period_ending":"2016-05-13T23:15:29.476223","ephemeral_gb":"0","instance_flavor_id":"1","created_at":"2016-05-13 23:13:44+00:00","disk_gb":"1","image_ref_url":"http://10.241.104.4:9292/images/12990d03-8059-4052-8b93-d4c6376fea57"}},"meta":{"tenantId":"225bbc4c5c1c4dfaa910cb44b5a4d7b7","region":"unset"},"creation_time":1458166131})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366959000,"value":45.8},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366963})
('',{"metric":{"name":"apache.performance.cpu_load_perc","dimensions":{"apache_port":"80","component":"apache","service":"apache","hostname":"devstack","apache_host":"devstack"},"timestamp":1457366959000,"value":0.143361},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366963})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366959000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366963})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366959000,"value":3.6},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366963})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366959000,"value":2.7},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366963})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366963000,"value":3.3000000000000003},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366966})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366963000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366966})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366963000,"value":8.7},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366966})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366963000,"value":8.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366966})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366963000,"value":8.1},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366966})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366963000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366966})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366974000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366977})
('',{"metric":{"name":"vcpus","dimensions":{"region":"useast","unit":"vcpu","resource_id":"b1a7f31d-3f85-4475-a50b-77094b62b078","source":"openstack","project_id":"103e4d4d14bc4fdda4a9c73d1643e1d7","user_id":"a69aabac824c4e1688702b8eb7e22af2","type":"gauge"},"timestamp":1453308000000,"value":0.0,"value_meta":{"audit_period_beginning":"2016-05-13T23:00:00.000000","availability_zone":"None","host":"devstack","event_type":"compute.instance.update","launched_at":"2016-05-13T23:14:06.000000","deleted_at":"","state":"active","memory_mb":"512","root_gb":"1","vcpus":"1","audit_period_ending":"2016-05-13T23:15:29.476223","ephemeral_gb":"0","instance_flavor_id":"1","created_at":"2016-05-13 23:13:44+00:00","disk_gb":"1","image_ref_url":"http://10.241.104.4:9292/images/12990d03-8059-4052-8b93-d4c6376fea57"}},"meta":{"tenantId":"225bbc4c5c1c4dfaa910cb44b5a4d7b7","region":"unset"},"creation_time":1458166131})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366974000,"value":53.4},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366977})
('',{"metric":{"name":"apache.performance.cpu_load_perc","dimensions":{"apache_port":"80","component":"apache","service":"apache","hostname":"devstack","apache_host":"devstack"},"timestamp":1457366974000,"value":0.143374},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366977})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366974000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366977})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366974000,"value":3.7},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366977})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366974000,"value":2.6},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366977})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366977000,"value":3.5},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366980})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366977000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366980})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366977000,"value":5.7},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366980})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366977000,"value":8.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366980})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366977000,"value":6.6},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366980})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366977000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366980})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366989000,"value":0.1},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366991})
('',{"metric":{"name":"vcpus","dimensions":{"region":"useast","unit":"vcpu","resource_id":"b1a7f31d-3f85-4475-a50b-77094b62b078","source":"openstack","project_id":"103e4d4d14bc4fdda4a9c73d1643e1d7","user_id":"a69aabac824c4e1688702b8eb7e22af2","type":"gauge"},"timestamp":1453308000000,"value":0.0,"value_meta":{"audit_period_beginning":"2016-05-13T23:00:00.000000","availability_zone":"None","host":"devstack","event_type":"compute.instance.update","launched_at":"2016-05-13T23:14:06.000000","deleted_at":"","state":"active","memory_mb":"512","root_gb":"1","vcpus":"1","audit_period_ending":"2016-05-13T23:15:29.476223","ephemeral_gb":"0","instance_flavor_id":"1","created_at":"2016-05-13 23:13:44+00:00","disk_gb":"1","image_ref_url":"http://10.241.104.4:9292/images/12990d03-8059-4052-8b93-d4c6376fea57"}},"meta":{"tenantId":"225bbc4c5c1c4dfaa910cb44b5a4d7b7","region":"unset"},"creation_time":1458166131})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366989000,"value":75.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366991})
('',{"metric":{"name":"apache.performance.cpu_load_perc","dimensions":{"apache_port":"80","component":"apache","service":"apache","hostname":"devstack","apache_host":"devstack"},"timestamp":1457366989000,"value":0.143414},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366991})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366989000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366991})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366989000,"value":6.5},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366991})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"hostname":"devstack"},"timestamp":1457366989000,"value":4.4},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366991})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366993000,"value":4.9},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366994})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366993000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366994})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366993000,"value":5.4},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366994})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366993000,"value":16.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366994})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366993000,"value":93.3},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366994})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457366993000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366994})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367004000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367005})
('',{"metric":{"name":"vcpus","dimensions":{"region":"useast","unit":"vcpu","resource_id":"b1a7f31d-3f85-4475-a50b-77094b62b078","source":"openstack","project_id":"103e4d4d14bc4fdda4a9c73d1643e1d7","user_id":"a69aabac824c4e1688702b8eb7e22af2","type":"gauge"},"timestamp":1453308000000,"value":0.0,"value_meta":{"audit_period_beginning":"2016-05-13T23:00:00.000000","availability_zone":"None","host":"devstack","event_type":"compute.instance.update","launched_at":"2016-05-13T23:14:06.000000","deleted_at":"","state":"active","memory_mb":"512","root_gb":"1","vcpus":"1","audit_period_ending":"2016-05-13T23:15:29.476223","ephemeral_gb":"0","instance_flavor_id":"1","created_at":"2016-05-13 23:13:44+00:00","disk_gb":"1","image_ref_url":"http://10.241.104.4:9292/images/12990d03-8059-4052-8b93-d4c6376fea57"}},"meta":{"tenantId":"225bbc4c5c1c4dfaa910cb44b5a4d7b7","region":"unset"},"creation_time":1458166131})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367004000,"value":65.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367005})
('',{"metric":{"name":"apache.performance.cpu_load_perc","dimensions":{"apache_port":"80","component":"apache","service":"apache","hostname":"devstack","apache_host":"devstack"},"timestamp":1457367004000,"value":0.143409},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367005})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367004000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367005})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367004000,"value":4.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367005})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367004000,"value":2.5},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367005})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367008000,"value":3.5},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367008})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367008000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367008})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367008000,"value":5.2},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367008})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367008000,"value":8.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367008})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367008000,"value":75.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367008})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367008000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367008})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367019000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367026})
('',{"metric":{"name":"vcpus","dimensions":{"region":"useast","unit":"vcpu","resource_id":"b1a7f31d-3f85-4475-a50b-77094b62b078","source":"openstack","project_id":"103e4d4d14bc4fdda4a9c73d1643e1d7","user_id":"a69aabac824c4e1688702b8eb7e22af2","type":"gauge"},"timestamp":1453308000000,"value":0.0,"value_meta":{"audit_period_beginning":"2016-05-13T23:00:00.000000","availability_zone":"None","host":"devstack","event_type":"compute.instance.update","launched_at":"2016-05-13T23:14:06.000000","deleted_at":"","state":"active","memory_mb":"512","root_gb":"1","vcpus":"1","audit_period_ending":"2016-05-13T23:15:29.476223","ephemeral_gb":"0","instance_flavor_id":"1","created_at":"2016-05-13 23:13:44+00:00","disk_gb":"1","image_ref_url":"http://10.241.104.4:9292/images/12990d03-8059-4052-8b93-d4c6376fea57"}},"meta":{"tenantId":"225bbc4c5c1c4dfaa910cb44b5a4d7b7","region":"unset"},"creation_time":1458166131})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367019000,"value":29.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367026})
('',{"metric":{"name":"apache.performance.cpu_load_perc","dimensions":{"apache_port":"80","component":"apache","service":"apache","hostname":"devstack","apache_host":"devstack"},"timestamp":1457367019000,"value":0.143412},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367026})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367019000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367026})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367019000,"value":3.8},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367026})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367019000,"value":2.5},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367026})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367023000,"value":3.3},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367029})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367023000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367029})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367023000,"value":5.5},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367029})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367023000,"value":4.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367029})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367023000,"value":52.4},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367029})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367023000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367029})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367034000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367040})
('',{"metric":{"name":"vcpus","dimensions":{"region":"useast","unit":"vcpu","resource_id":"b1a7f31d-3f85-4475-a50b-77094b62b078","source":"openstack","project_id":"103e4d4d14bc4fdda4a9c73d1643e1d7","user_id":"a69aabac824c4e1688702b8eb7e22af2","type":"gauge"},"timestamp":1453308000000,"value":0.0,"value_meta":{"audit_period_beginning":"2016-05-13T23:00:00.000000","availability_zone":"None","host":"devstack","event_type":"compute.instance.update","launched_at":"2016-05-13T23:14:06.000000","deleted_at":"","state":"active","memory_mb":"512","root_gb":"1","vcpus":"1","audit_period_ending":"2016-05-13T23:15:29.476223","ephemeral_gb":"0","instance_flavor_id":"1","created_at":"2016-05-13 23:13:44+00:00","disk_gb":"1","image_ref_url":"http://10.241.104.4:9292/images/12990d03-8059-4052-8b93-d4c6376fea57"}},"meta":{"tenantId":"225bbc4c5c1c4dfaa910cb44b5a4d7b7","region":"unset"},"creation_time":1458166131})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367034000,"value":92.9},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367040})
('',{"metric":{"name":"apache.performance.cpu_load_perc","dimensions":{"apache_port":"80","component":"apache","service":"apache","hostname":"devstack","apache_host":"devstack"},"timestamp":1457367034000,"value":0.143419},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367040})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367034000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367040})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367034000,"value":3.9},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367040})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367034000,"value":3.2},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367040})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367038000,"value":3.5999999999999996},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367043})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367038000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367043})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367038000,"value":5.3},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367043})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367038000,"value":10.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367043})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367038000,"value":33.5},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367043})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367038000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367043})
('',{"metric":{"name":"cpu.wait_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367049000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367054})
('',{"metric":{"name":"vcpus","dimensions":{"region":"useast","unit":"vcpu","resource_id":"b1a7f31d-3f85-4475-a50b-77094b62b078","source":"openstack","project_id":"103e4d4d14bc4fdda4a9c73d1643e1d7","user_id":"a69aabac824c4e1688702b8eb7e22af2","type":"gauge"},"timestamp":1453308000000,"value":0.0,"value_meta":{"audit_period_beginning":"2016-05-13T23:00:00.000000","availability_zone":"None","host":"devstack","event_type":"compute.instance.update","launched_at":"2016-05-13T23:14:06.000000","deleted_at":"","state":"active","memory_mb":"512","root_gb":"1","vcpus":"1","audit_period_ending":"2016-05-13T23:15:29.476223","ephemeral_gb":"0","instance_flavor_id":"1","created_at":"2016-05-13 23:13:44+00:00","disk_gb":"1","image_ref_url":"http://10.241.104.4:9292/images/12990d03-8059-4052-8b93-d4c6376fea57"}},"meta":{"tenantId":"225bbc4c5c1c4dfaa910cb44b5a4d7b7","region":"unset"},"creation_time":1458166131})
('',{"metric":{"name":"cpu.idle_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367049000,"value":86.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367054})
('',{"metric":{"name":"apache.performance.cpu_load_perc","dimensions":{"apache_port":"80","component":"apache","service":"apache","hostname":"devstack","apache_host":"devstack"},"timestamp":1457367049000,"value":0.143422},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367054})
('',{"metric":{"name":"cpu.stolen_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367049000,"value":0.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367054})
('',{"metric":{"name":"cpu.user_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367049000,"value":3.6},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367054})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"hostname":"devstack"},"timestamp":1457367049000,"value":3.1},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367054})
('',{"metric":{"name":"cpu.system_perc","dimensions":{"service":"monitoring","hostname":"mini-mon"},"timestamp":1457367053000,"value":4.2},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367057})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"devstack"},"timestamp":1457366963000,"value":4.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366966})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"devstack"},"timestamp":1457366963000,"value":4.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366966})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"devstack"},"timestamp":1457366977000,"value":18.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366980})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"devstack"},"timestamp":1457366993000,"value":4.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457366994})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"devstack"},"timestamp":1457367008000,"value":2.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367008})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"devstack"},"timestamp":1457367023000,"value":2.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367029})
('',{"metric":{"name":"cpu.total_logical_cores","dimensions":{"service":"monitoring","hostname":"devstack"},"timestamp":1457367038000,"value":8.0},"meta":{"tenantId":"tenantId of metric writer","region":"useast"},"creation_time":1457367043})

View File

@ -0,0 +1,23 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
class DataProvider(object):
_resource_path = 'tests/unit/test_resources/cpu_kafka_data/'
kafka_data_path = os.path.join(_resource_path,
"cpu_kafka_data.txt")

Some files were not shown because too many files have changed in this diff Show More