OpenStack workload VMs info export to CSV

VMs info export to CSV implemented. Export uses DB. Export
uses data streaming.
Installation info exporter renamed.
Common CSV export logic extracted to export_utils module.
SQLAlchemy and psycopg added to requirements.
DB credentials added into config.
SQL debug enabled for test configuration.

Implements: blueprint openstack-workload-statistics
Change-Id: I8a9a04e1a2462a3a8d3950e20daa92025a7de11b
This commit is contained in:
Alexander Kislitsky 2015-02-04 14:42:49 +03:00
parent 5321a4e4b1
commit 6dd7081b96
16 changed files with 747 additions and 274 deletions

View File

@ -13,8 +13,10 @@
# under the License.
from flask import Flask
import flask_sqlalchemy
app = Flask(__name__)
db = flask_sqlalchemy.SQLAlchemy(app)
# Registering blueprints
from fuel_analytics.api.resources.csv_exporter import bp as csv_exporter_bp

View File

@ -0,0 +1,14 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,33 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
def make_enum(*values, **kwargs):
names = kwargs.get('names')
if names:
return namedtuple('Enum', names)(*values)
return namedtuple('Enum', values)(*values)
OSWL_RESOURCE_TYPES = make_enum(
'vm',
'tenant',
'volume',
'security_group',
'keystone_user',
'flavor',
'cluster_stats'
)

View File

@ -27,10 +27,12 @@ class Production(object):
ELASTIC_USE_SSL = True
ELASTIC_INDEX_FUEL = 'fuel'
ELASTIC_DOC_TYPE_STRUCTURE = 'structure'
SQLALCHEMY_DATABASE_URI = \
'postgresql://collector:*****@localhost/collector'
class Testing(Production):
DEBUG = False
DEBUG = True
LOG_FILE = os.path.realpath(os.path.join(
os.path.dirname(__file__), '..', 'test', 'logs', 'analytics.log'))
LOG_LEVEL = logging.DEBUG
@ -40,3 +42,6 @@ class Testing(Production):
ELASTIC_HOST = 'localhost'
ELASTIC_PORT = 9200
ELASTIC_USE_SSL = False
SQLALCHEMY_DATABASE_URI = \
'postgresql://collector:collector@localhost/collector'
SQLALCHEMY_ECHO = True

View File

@ -0,0 +1,13 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,30 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy.dialects.postgresql import JSON
from fuel_analytics.api.app import db
class OpenStackWorkloadStats(db.Model):
__tablename__ = 'oswl_stats'
id = db.Column(db.Integer, primary_key=True)
master_node_uid = db.Column(db.Text)
external_id = db.Column(db.Integer)
cluster_id = db.Column(db.Integer)
created_date = db.Column(db.Date)
updated_time = db.Column(db.Time)
resource_type = db.Column(db.Text)
resource_data = db.Column(JSON)
resource_checksum = db.Column(db.Text)

View File

@ -16,15 +16,19 @@ from flask import Blueprint
from flask import Response
from fuel_analytics.api.app import app
from fuel_analytics.api.app import db
from fuel_analytics.api.common.consts import OSWL_RESOURCE_TYPES as RT
from fuel_analytics.api.db.model import OpenStackWorkloadStats
from fuel_analytics.api.resources.utils.es_client import ElasticSearchClient
from fuel_analytics.api.resources.utils.oswl_stats_to_csv import OswlStatsToCsv
from fuel_analytics.api.resources.utils.stats_to_csv import StatsToCsv
bp = Blueprint('csv_exporter', __name__)
bp = Blueprint('clusters_to_csv', __name__)
@bp.route('/clusters', methods=['GET'])
def csv_exporter():
app.logger.debug("Handling csv_exporter get request")
def clusters_to_csv():
app.logger.debug("Handling clusters_to_csv get request")
es_client = ElasticSearchClient()
structures = es_client.get_structures()
@ -33,4 +37,25 @@ def csv_exporter():
# NOTE: result - is generator, but streaming can not work with some
# WSGI middlewares: http://flask.pocoo.org/docs/0.10/patterns/streaming/
app.logger.debug("Get request for clusters_to_csv handled")
return Response(result, mimetype='text/csv')
def get_oswls(yield_per=1000):
app.logger.debug("Fetching oswls with yeld per %d", yield_per)
return db.session.query(OpenStackWorkloadStats).filter(
OpenStackWorkloadStats.resource_type == RT.vm).yield_per(yield_per)
@bp.route('/vms', methods=['GET'])
def vms_to_csv():
app.logger.debug("Handling vms_to_csv get request")
oswls = get_oswls()
exporter = OswlStatsToCsv()
result = exporter.export_vms(oswls)
# NOTE: result - is generator, but streaming can not work with some
# WSGI middlewares: http://flask.pocoo.org/docs/0.10/patterns/streaming/
app.logger.debug("Get request for vms_to_csv handled")
return Response(result, mimetype='text/csv')

View File

@ -0,0 +1,167 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import csv
import io
import six
from fuel_analytics.api.app import app
def get_keys_paths(skeleton):
"""Gets paths to leaf keys in the data
:param skeleton: data skeleton
:return: list of lists of dict keys
"""
def _keys_paths_helper(keys, skel):
result = []
if isinstance(skel, dict):
for k in sorted(six.iterkeys(skel)):
result.extend(_keys_paths_helper(keys + [k], skel[k]))
else:
result.append(keys)
return result
return _keys_paths_helper([], skeleton)
def get_flatten_data(keys_paths, data):
"""Creates flatten data from data by keys_paths
:param keys_paths: list of dict keys lists
:param data: dict with nested structures
:return: list of flatten data dicts
"""
flatten_data = []
for key_path in keys_paths:
d = data
for key in key_path:
if isinstance(d, dict):
d = d.get(key, None)
else:
d = getattr(d, key, None)
if d is None:
break
if isinstance(d, (list, tuple)):
flatten_data.append(' '.join(d))
else:
flatten_data.append(d)
return flatten_data
def construct_skeleton(data):
"""Creates structure for searching all key paths in given data
:param data: fetched from ES dict
:return: skeleton of data structure
"""
if isinstance(data, dict):
result = {}
for k in sorted(data.keys()):
result[k] = construct_skeleton(data[k])
return result
elif isinstance(data, (list, tuple)):
list_result = []
dict_result = {}
for d in data:
if isinstance(d, dict):
dict_result.update(construct_skeleton(d))
elif isinstance(d, (list, tuple)):
if not list_result:
list_result.append(construct_skeleton(d))
else:
list_result[0].extend(construct_skeleton(d))
if dict_result:
list_result.append(dict_result)
return list_result
else:
return data
def get_data_skeleton(structures):
"""Gets skeleton by structures list
:param structures:
:return: data structure skeleton
"""
def _merge_skeletons(lh, rh):
keys_paths = get_keys_paths(rh)
for keys_path in keys_paths:
merge_point = lh
data_point = rh
for key in keys_path:
data_point = data_point[key]
if isinstance(data_point, dict):
if key not in merge_point:
merge_point[key] = {}
elif isinstance(data_point, list):
if key not in merge_point:
merge_point[key] = [{}]
_merge_skeletons(merge_point[key][0],
get_data_skeleton(data_point))
else:
merge_point[key] = None
merge_point = merge_point[key]
skeleton = {}
for structure in structures:
app.logger.debug("Constructing skeleton by data: %s", structure)
app.logger.debug("Updating skeleton by %s",
construct_skeleton(structure))
_merge_skeletons(skeleton, construct_skeleton(structure))
app.logger.debug("Result skeleton is %s", skeleton)
return skeleton
def align_enumerated_field_values(values, number):
"""Fills result list by the None values, if number is greater than
values len. The first element of result is bool value
len(values) > number
:param values:
:param number:
:return: aligned list to 'number' + 1 length, filled by Nones on
empty values positions and bool value on the first place. Bool value
is True if len(values) > number
"""
return ([len(values) > number] +
(values + [None] * (number - len(values)))[:number])
def flatten_data_as_csv(keys_paths, flatten_data):
"""Returns flatten data in CSV
:param keys_paths: list of dict keys lists for columns names
generation
:param flatten_data: list of flatten data dicts
:return: stream with data in CSV format
"""
app.logger.debug("Saving flatten data as CSV is started")
names = []
for key_path in keys_paths:
names.append('.'.join(key_path))
yield names
output = six.BytesIO()
writer = csv.writer(output)
writer.writerow(names)
def read_and_flush():
output.seek(io.SEEK_SET)
data = output.read()
output.seek(io.SEEK_SET)
output.truncate()
return data
for d in flatten_data:
app.logger.debug("Writing row %s", d)
encoded_d = [s.encode("utf-8") if isinstance(s, unicode) else s
for s in d]
writer.writerow(encoded_d)
yield read_and_flush()
app.logger.debug("Saving flatten data as CSV is finished")

View File

@ -0,0 +1,94 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import six
from fuel_analytics.api.app import app
from fuel_analytics.api.resources.utils import export_utils
from fuel_analytics.api.resources.utils.export_utils import get_keys_paths
from fuel_analytics.api.resources.utils.skeleton import \
OSWL_STATS_SKELETON
from fuel_analytics.api.resources.utils.skeleton import \
OSWL_VM_SKELETON
class OswlStatsToCsv(object):
def get_vm_keys_paths(self):
"""Gets key paths for vm. csv key paths is combination
of oswl, vm and additional vm key paths
:return: tuple of lists of oswl, vm, csv key paths
"""
app.logger.debug("Getting vm keys paths")
oswl_key_paths = get_keys_paths(OSWL_STATS_SKELETON)
vm_key_paths = get_keys_paths(OSWL_VM_SKELETON)
# Additional key paths for vm info
vm_additional_key_paths = [['vm', 'is_added'], ['vm', 'is_modified'],
['vm', 'is_removed']]
result_key_paths = oswl_key_paths + vm_key_paths + \
vm_additional_key_paths
app.logger.debug("Vm keys paths got")
return oswl_key_paths, vm_key_paths, result_key_paths
def get_additional_vm_info(self, vm, oswl):
"""Gets additional info about vm operations
:param vm: vm info
:param oswl: OpenStack workload
:return: list of is_added, is_removed, is_modified flags
"""
resource_data = oswl.resource_data
added = resource_data.get('added', {})
removed = resource_data.get('removed', {})
modified = resource_data.get('modified', {})
# After JSON saving in the object dict keys are converted into strings
vm_id = six.text_type(vm.get('id'))
is_added = vm_id in added
is_modified = vm_id in modified
is_removed = vm_id in removed
return [is_added, is_modified, is_removed]
def get_flatten_vms(self, oswl_keys_paths, vm_keys_paths, oswls):
"""Gets flatten vms data
:param oswl_keys_paths: list of keys paths in the OpenStack workload
info
:param vm_keys_paths: list of keys paths in the vm
:param oswls: list of OpenStack workloads
:return: list of flatten vms info
"""
app.logger.debug("Getting flatten vms info is started")
for oswl in oswls:
flatten_oswl = export_utils.get_flatten_data(oswl_keys_paths,
oswl)
resource_data = oswl.resource_data
current = resource_data.get('current', [])
removed = resource_data.get('removed', {})
for vm in itertools.chain(current, six.itervalues(removed)):
flatten_vm = export_utils.get_flatten_data(vm_keys_paths,
{'vm': vm})
vm_additional_info = self.get_additional_vm_info(vm, oswl)
yield flatten_oswl + flatten_vm + vm_additional_info
app.logger.debug("Flatten vms info is got")
def export_vms(self, oswls):
app.logger.info("Export oswls vms info into CSV is started")
oswl_keys_paths, vm_keys_paths, csv_keys_paths = \
self.get_vm_keys_paths()
flatten_vms = self.get_flatten_vms(oswl_keys_paths, vm_keys_paths,
oswls)
result = export_utils.flatten_data_as_csv(csv_keys_paths, flatten_vms)
app.logger.info("Export oswls vms info into CSV is finished")
return result

View File

@ -106,3 +106,27 @@ INSTALLATION_INFO_SKELETON = {
'name': None
}
}
OSWL_STATS_SKELETON = {
'id': None,
'master_node_uid': None,
'external_id': None,
'cluster_id': None,
'created_date': None,
'updated_time': None,
'resource_type': None,
'resource_checksum': None,
}
OSWL_VM_SKELETON = {
'vm': {
'id': None,
'status': None,
'tenant_id': None,
'host_id': None,
'created_at': None,
'power_state': None,
'flavor_id': None,
'image_id': None
}
}

View File

@ -12,11 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import csv
import io
import six
from fuel_analytics.api.app import app
from fuel_analytics.api.resources.utils import export_utils
from fuel_analytics.api.resources.utils.skeleton import \
INSTALLATION_INFO_SKELETON
@ -26,138 +23,10 @@ class StatsToCsv(object):
MANUFACTURERS_NUM = 3
PLATFORM_NAMES_NUM = 3
def construct_skeleton(self, data):
"""Creates structure for searching all key paths in given data
:param data: fetched from ES dict
:return: skeleton of data structure
"""
if isinstance(data, dict):
result = {}
for k in sorted(data.keys()):
result[k] = self.construct_skeleton(data[k])
return result
elif isinstance(data, (list, tuple)):
list_result = []
dict_result = {}
for d in data:
if isinstance(d, dict):
dict_result.update(self.construct_skeleton(d))
elif isinstance(d, (list, tuple)):
if not list_result:
list_result.append(self.construct_skeleton(d))
else:
list_result[0].extend(self.construct_skeleton(d))
if dict_result:
list_result.append(dict_result)
return list_result
else:
return data
def get_data_skeleton(self, structures):
"""Gets skeleton by structures list
:param structures:
:return: data structure skeleton
"""
def _merge_skeletons(lh, rh):
keys_paths = self.get_keys_paths(rh)
for keys_path in keys_paths:
merge_point = lh
data_point = rh
for key in keys_path:
data_point = data_point[key]
if isinstance(data_point, dict):
if key not in merge_point:
merge_point[key] = {}
elif isinstance(data_point, list):
if key not in merge_point:
merge_point[key] = [{}]
_merge_skeletons(merge_point[key][0],
self.get_data_skeleton(data_point))
else:
merge_point[key] = None
merge_point = merge_point[key]
skeleton = {}
for structure in structures:
app.logger.debug("Constructing skeleton by data: %s", structure)
app.logger.debug("Updating skeleton by %s",
self.construct_skeleton(structure))
_merge_skeletons(skeleton, self.construct_skeleton(structure))
app.logger.debug("Result skeleton is %s", skeleton)
return skeleton
def get_keys_paths(self, skeleton):
"""Gets paths to leaf keys in the data
:param skeleton: data skeleton
:return: list of lists of dict keys
"""
def _keys_paths_helper(keys, skel):
result = []
if isinstance(skel, dict):
for k in sorted(six.iterkeys(skel)):
result.extend(_keys_paths_helper(keys + [k], skel[k]))
else:
result.append(keys)
return result
return _keys_paths_helper([], skeleton)
def flatten_data_as_csv(self, keys_paths, flatten_data):
"""Returns flatten data in CSV
:param keys_paths: list of dict keys lists for columns names
generation
:param flatten_data: list of flatten data dicts
:return: stream with data in CSV format
"""
app.logger.debug("Saving flatten data as CSV is started")
names = []
for key_path in keys_paths:
names.append('.'.join(key_path))
yield names
output = six.BytesIO()
writer = csv.writer(output)
writer.writerow(names)
def read_and_flush():
output.seek(io.SEEK_SET)
data = output.read()
output.seek(io.SEEK_SET)
output.truncate()
return data
for d in flatten_data:
app.logger.debug("Writing row %s", d)
encoded_d = [s.encode("utf-8") if isinstance(s, unicode) else s
for s in d]
writer.writerow(encoded_d)
yield read_and_flush()
app.logger.debug("Saving flatten data as CSV is finished")
def get_flatten_data(self, keys_paths, data):
"""Creates flatten data from data by keys_paths
:param keys_paths: list of dict keys lists
:param data: dict with nested structures
:return: list of flatten data dicts
"""
flatten_data = []
for key_path in keys_paths:
d = data
for key in key_path:
d = d.get(key, None)
if d is None:
break
if isinstance(d, (list, tuple)):
flatten_data.append(' '.join(d))
else:
flatten_data.append(d)
return flatten_data
def get_cluster_keys_paths(self):
app.logger.debug("Getting cluster keys paths")
structure_skeleton = INSTALLATION_INFO_SKELETON
structure_key_paths = self.get_keys_paths(structure_skeleton)
structure_key_paths = export_utils.get_keys_paths(structure_skeleton)
clusters = structure_skeleton.get('clusters')
if not clusters:
clusters = [{}]
@ -167,7 +36,7 @@ class StatsToCsv(object):
cluster_skeleton.pop('nodes', None)
cluster_skeleton.pop('node_groups', None)
cluster_skeleton.pop('openstack_info', None)
cluster_key_paths = self.get_keys_paths(cluster_skeleton)
cluster_key_paths = export_utils.get_keys_paths(cluster_skeleton)
result_key_paths = cluster_key_paths + structure_key_paths
@ -194,20 +63,6 @@ class StatsToCsv(object):
app.logger.debug("Cluster keys paths got")
return structure_key_paths, cluster_key_paths, result_key_paths
@staticmethod
def align_enumerated_field_values(values, number):
"""Fills result list by the None values, if number is greater than
values len. The first element of result is bool value
len(values) > number
:param values:
:param number:
:return: aligned list to 'number' + 1 length, filled by Nones on
empty values positions and bool value on the first place. Bool value
is True if len(values) > number
"""
return ([len(values) > number] +
(values + [None] * (number - len(values)))[:number])
def get_flatten_clusters(self, structure_keys_paths, cluster_keys_paths,
structures):
"""Gets flatten clusters data
@ -236,22 +91,22 @@ class StatsToCsv(object):
for structure in structures:
clusters = structure.pop('clusters', [])
flatten_structure = self.get_flatten_data(structure_keys_paths,
structure)
flatten_structure = export_utils.get_flatten_data(
structure_keys_paths, structure)
for cluster in clusters:
flatten_cluster = self.get_flatten_data(cluster_keys_paths,
cluster)
flatten_cluster = export_utils.get_flatten_data(
cluster_keys_paths, cluster)
flatten_cluster.extend(flatten_structure)
nodes = cluster.get('nodes', [])
# Adding enumerated manufacturers
manufacturers = extract_nodes_manufacturers(nodes)
flatten_cluster += StatsToCsv.align_enumerated_field_values(
flatten_cluster += export_utils.align_enumerated_field_values(
manufacturers, self.MANUFACTURERS_NUM)
# Adding enumerated platforms
platform_names = extract_nodes_platform_name(nodes)
flatten_cluster += StatsToCsv.align_enumerated_field_values(
flatten_cluster += export_utils.align_enumerated_field_values(
platform_names, self.PLATFORM_NAMES_NUM)
yield flatten_cluster
@ -264,6 +119,7 @@ class StatsToCsv(object):
flatten_clusters = self.get_flatten_clusters(structure_keys_paths,
cluster_keys_paths,
structures)
result = self.flatten_data_as_csv(csv_keys_paths, flatten_clusters)
result = export_utils.flatten_data_as_csv(csv_keys_paths,
flatten_clusters)
app.logger.info("Export clusters info into CSV is finished")
return result

View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuel_analytics.test.base import BaseTest
from fuel_analytics.api.resources.utils import export_utils
class ExportUtilsTest(BaseTest):
def test_get_key_paths(self):
skeleton = {'a': 'b', 'c': 'd'}
paths = export_utils.get_keys_paths(skeleton)
self.assertListEqual([['a'], ['c']], paths)
skeleton = {'a': {'e': 'f', 'g': None}}
paths = export_utils.get_keys_paths(skeleton)
self.assertListEqual([['a', 'e'], ['a', 'g']], paths)
skeleton = [{'a': 'b', 'c': 'd'}]
paths = export_utils.get_keys_paths(skeleton)
self.assertListEqual([[]], paths)
def test_get_flatten_data(self):
class O(object):
def __init__(self, a, c, x):
self.a = a
self.c = c
self.x = x
data = [
{'a': 'b', 'c': {'e': 2.1}},
{'a': 'ee\nxx', 'c': {'e': 3.1415}, 'x': ['z', 'zz']},
O('y', {'e': 44}, None),
O('yy', {'e': 45}, ['b', 'e'])
]
expected_flatten_data = [
['b', 2.1, None],
['ee\nxx', 3.1415, 'z zz'],
['y', 44, None],
['yy', 45, 'b e']
]
skeleton = export_utils.get_data_skeleton(data)
key_paths = export_utils.get_keys_paths(skeleton)
for idx, expected in enumerate(expected_flatten_data):
actual = export_utils.get_flatten_data(key_paths, data[idx])
self.assertListEqual(expected, actual)
for idx, data in enumerate(data):
actual = export_utils.get_flatten_data(key_paths, data)
self.assertListEqual(expected_flatten_data[idx], actual)
def test_dict_construct_skeleton(self):
data = {'a': 'b'}
skeleton = export_utils.construct_skeleton(data)
self.assertDictEqual(data, skeleton)
data = {'a': 'b', 'x': None}
skeleton = export_utils.construct_skeleton(data)
self.assertDictEqual(data, skeleton)
def test_list_construct_skeleton(self):
data = ['a', 'b', 'c']
skeleton = export_utils.construct_skeleton(data)
self.assertListEqual([], skeleton)
data = [{'a': None}, {'b': 'x'}, {'a': 4, 'c': 'xx'}, {}]
skeleton = export_utils.construct_skeleton(data)
self.assertListEqual(
sorted(skeleton[0].keys()),
sorted(['a', 'b', 'c'])
)
data = [
'a',
['a', 'b', []],
[],
[{'x': 'z'}, 'zz', {'a': 'b'}],
['a'],
{'p': 'q'}
]
skeleton = export_utils.construct_skeleton(data)
self.assertListEqual([[[], {'a': 'b', 'x': 'z'}], {'p': 'q'}],
skeleton)
def test_get_skeleton(self):
data = [
{'ci': {'p': True, 'e': '@', 'n': 'n'}},
# reducing fields in nested dict
{'ci': {'p': False}},
# adding list values
{'c': [{'s': 'v', 'n': 2}, {'s': 'vv', 'n': 22}]},
# adding new value in the list
{'c': [{'z': 'p'}]},
# checking empty list
{'c': []},
# adding new value
{'a': 'b'},
]
skeleton = export_utils.get_data_skeleton(data)
self.assertDictEqual(
{'a': None, 'c': [{'s': None, 'n': None, 'z': None}],
'ci': {'p': None, 'e': None, 'n': None}},
skeleton)
def test_align_enumerated_field_values(self):
# Data for checks in format (source, num, expected)
checks = [
([], 0, [False]),
([], 1, [False, None]),
(['a'], 1, [False, 'a']),
(['a'], 2, [False, 'a', None]),
(['a', 'b'], 2, [False, 'a', 'b']),
(['a', 'b'], 1, [True, 'a'])
]
for source, num, expected in checks:
self.assertListEqual(
expected,
export_utils.align_enumerated_field_values(source, num)
)

View File

@ -0,0 +1,180 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import csv
from datetime import datetime
from datetime import timedelta
import random
import six
import uuid
from fuel_analytics.test.base import BaseTest
from fuel_analytics.api.app import db
from fuel_analytics.api.common import consts
from fuel_analytics.api.db.model import OpenStackWorkloadStats
from fuel_analytics.api.resources.utils.oswl_stats_to_csv import OswlStatsToCsv
import types
class OswlStatsToCsvTest(BaseTest):
def generate_vms(self, vms_num, statuses=('on', 'off'),
created_at_range=(1, 10),
power_states_range=(1, 10)):
result = []
for i in xrange(vms_num):
result.append({
'id': i,
'status': random.choice(statuses),
'tenant_id': 'tenant_id_{}'.format(i),
'host_id': 'host_id_{}'.format(i),
'created_at': (datetime.utcnow() - timedelta(
days=random.randint(*created_at_range))).isoformat(),
'power_state': random.randint(*power_states_range),
'flavor_id': 'flavor_id_{}'.format(i),
'image_id': 'image_id_{}'.format(i),
})
return result
def generate_removed_vms(self, vms_num):
result = {}
for vm in self.generate_vms(vms_num):
vm['time'] = datetime.utcnow().time().isoformat()
result[vm['id']] = vm
return result
def generate_added_vms(self, vms_num):
result = {}
for i in xrange(vms_num):
result[i] = {'time': datetime.utcnow().time().isoformat()}
return result
def generate_modified_vms(self, vms_num, modifs_num_range=(0, 3),
power_states_range=(1, 10)):
result = {}
for i in xrange(vms_num):
for _ in xrange(random.randint(*modifs_num_range)):
result.setdefault(i, []).append({
'time': datetime.utcnow().time().isoformat(),
'power_state': random.choice(power_states_range)
})
return result
def generate_vm_oswls(self, oswl_num, current_vms_num_range=(0, 7),
created_date_range=(1, 10),
added_vms_num_range=(0, 5),
removed_vms_num_range=(0, 3),
modified_vms_num_range=(0, 15),
stats_per_mn_range=(1, 10),
cluster_ids_range=(1, 5)):
i = 1
current_mn_stats = 0
while i <= oswl_num:
if not current_mn_stats:
mn_uid = six.text_type(uuid.uuid4())
current_mn_stats = random.randint(*stats_per_mn_range)
if current_mn_stats:
i += 1
created_date = (datetime.utcnow() - timedelta(
days=random.randint(*created_date_range))).\
date().isoformat()
obj = OpenStackWorkloadStats(
master_node_uid=mn_uid,
external_id=i,
cluster_id=random.choice(cluster_ids_range),
created_date=created_date,
updated_time=datetime.utcnow().time().isoformat(),
resource_type=consts.OSWL_RESOURCE_TYPES.vm,
resource_checksum=six.text_type(uuid.uuid4()),
resource_data={
'current': self.generate_vms(
random.randint(*current_vms_num_range)),
'added': self.generate_added_vms(
random.randint(*added_vms_num_range)),
'modified': self.generate_modified_vms(
random.randint(*modified_vms_num_range)),
'removed': self.generate_removed_vms(
random.randint(*removed_vms_num_range))
}
)
current_mn_stats -= 1
yield obj
def test_get_vm_keys_paths(self):
exporter = OswlStatsToCsv()
oswl_keys_paths, vm_keys_paths, csv_keys_paths = \
exporter.get_vm_keys_paths()
self.assertTrue(['external_id'] in oswl_keys_paths)
self.assertTrue(['vm', 'id'] in vm_keys_paths)
self.assertTrue(['vm', 'is_added'] in csv_keys_paths)
self.assertTrue(['vm', 'is_modified'] in csv_keys_paths)
self.assertTrue(['vm', 'is_removed'] in csv_keys_paths)
def test_get_flatten_vms(self):
exporter = OswlStatsToCsv()
oswl_keys_paths, vm_keys_paths, csv_keys_paths = \
exporter.get_vm_keys_paths()
oswls = self.generate_vm_oswls(2)
flatten_vms = exporter.get_flatten_vms(oswl_keys_paths, vm_keys_paths,
oswls)
self.assertTrue(isinstance(flatten_vms, types.GeneratorType))
for _ in flatten_vms:
pass
def test_get_additional_vm_info(self):
exporter = OswlStatsToCsv()
added_num = 0
modified_num = 3
removed_num = 5
oswls = self.generate_vm_oswls(
1,
added_vms_num_range=(added_num, added_num),
modified_vms_num_range=(modified_num, modified_num),
removed_vms_num_range=(removed_num, removed_num)
)
oswl = oswls.next()
# Saving data for true JSON loading from DB object
db.session.add(oswl)
db.session.commit()
resource_data = oswl.resource_data
for vm in resource_data['current']:
# After conversion into JSON dict keys became strings
vm_id = six.text_type(vm['id'])
expected = [
vm_id in resource_data['added'],
vm_id in resource_data['modified'],
vm_id in resource_data['removed'],
]
self.assertListEqual(expected,
exporter.get_additional_vm_info(vm, oswl))
# Cleaning DB
db.session.delete(oswl)
db.session.commit()
def test_export_vms(self):
exporter = OswlStatsToCsv()
oswls = self.generate_vm_oswls(200)
result = exporter.export_vms(oswls)
self.assertTrue(isinstance(result, types.GeneratorType))
output = six.StringIO(list(result))
reader = csv.reader(output)
for _ in reader:
pass

View File

@ -18,101 +18,14 @@ import csv
import six
import types
from fuel_analytics.test.base import BaseTest
from fuel_analytics.test.base import ElasticTest
from fuel_analytics.api.resources.utils.es_client import ElasticSearchClient
from fuel_analytics.api.resources.utils import export_utils
from fuel_analytics.api.resources.utils.stats_to_csv import StatsToCsv
class StatsToCsvTest(BaseTest):
def test_dict_construct_skeleton(self):
exporter = StatsToCsv()
data = {'a': 'b'}
skeleton = exporter.construct_skeleton(data)
self.assertDictEqual(data, skeleton)
data = {'a': 'b', 'x': None}
skeleton = exporter.construct_skeleton(data)
self.assertDictEqual(data, skeleton)
def test_list_construct_skeleton(self):
exporter = StatsToCsv()
data = ['a', 'b', 'c']
skeleton = exporter.construct_skeleton(data)
self.assertListEqual([], skeleton)
data = [{'a': None}, {'b': 'x'}, {'a': 4, 'c': 'xx'}, {}]
skeleton = exporter.construct_skeleton(data)
self.assertListEqual(
sorted(skeleton[0].keys()),
sorted(['a', 'b', 'c'])
)
data = [
'a',
['a', 'b', []],
[],
[{'x': 'z'}, 'zz', {'a': 'b'}],
['a'],
{'p': 'q'}
]
skeleton = exporter.construct_skeleton(data)
self.assertListEqual([[[], {'a': 'b', 'x': 'z'}], {'p': 'q'}],
skeleton)
def test_get_skeleton(self):
exporter = StatsToCsv()
data = [
{'ci': {'p': True, 'e': '@', 'n': 'n'}},
# reducing fields in nested dict
{'ci': {'p': False}},
# adding list values
{'c': [{'s': 'v', 'n': 2}, {'s': 'vv', 'n': 22}]},
# adding new value in the list
{'c': [{'z': 'p'}]},
# checking empty list
{'c': []},
# adding new value
{'a': 'b'},
]
skeleton = exporter.get_data_skeleton(data)
self.assertDictEqual(
{'a': None, 'c': [{'s': None, 'n': None, 'z': None}],
'ci': {'p': None, 'e': None, 'n': None}},
skeleton)
def test_get_key_paths(self):
exporter = StatsToCsv()
skeleton = {'a': 'b', 'c': 'd'}
paths = exporter.get_keys_paths(skeleton)
self.assertListEqual([['a'], ['c']], paths)
skeleton = {'a': {'e': 'f', 'g': None}}
paths = exporter.get_keys_paths(skeleton)
self.assertListEqual([['a', 'e'], ['a', 'g']], paths)
skeleton = [{'a': 'b', 'c': 'd'}]
paths = exporter.get_keys_paths(skeleton)
self.assertListEqual([[]], paths)
def test_get_flatten_data(self):
exporter = StatsToCsv()
data = [
{'a': 'b', 'c': {'e': 2.1}},
{'a': 'ee\nxx', 'c': {'e': 3.1415}, 'x': ['z', 'zz']},
]
expected_flatten_data = [
['b', 2.1, None],
['ee\nxx', 3.1415, 'z zz'],
]
skeleton = exporter.get_data_skeleton(data)
key_paths = exporter.get_keys_paths(skeleton)
for idx, expected in enumerate(expected_flatten_data):
actual = exporter.get_flatten_data(key_paths, data[idx])
self.assertListEqual(expected, actual)
class StatsToCsvExportTest(ElasticTest):
def test_get_cluster_keys_paths(self):
exporter = StatsToCsv()
@ -127,25 +40,6 @@ class StatsToCsvTest(BaseTest):
self.assertTrue(['manufacturer_2' in csv_keys_paths])
self.assertTrue(['attributes', 'heat'] in csv_keys_paths)
def test_align_enumerated_field_values(self):
# Data for checks in format (source, num, expected)
checks = [
([], 0, [False]),
([], 1, [False, None]),
(['a'], 1, [False, 'a']),
(['a'], 2, [False, 'a', None]),
(['a', 'b'], 2, [False, 'a', 'b']),
(['a', 'b'], 1, [True, 'a'])
]
for source, num, expected in checks:
self.assertListEqual(
expected,
StatsToCsv.align_enumerated_field_values(source, num)
)
class StatsToCsvExportTest(ElasticTest):
def test_new_param_handled_by_structures_skeleton(self):
installations_num = 5
self.generate_data(installations_num=installations_num)
@ -157,8 +51,7 @@ class StatsToCsvExportTest(ElasticTest):
structures = list(structures)
structures[-1]['mixed_param'] = 'xx'
exporter = StatsToCsv()
skeleton = exporter.get_data_skeleton(structures)
skeleton = export_utils.get_data_skeleton(structures)
self.assertTrue('mixed_param' in skeleton)
def test_get_flatten_clusters(self):
@ -190,7 +83,7 @@ class StatsToCsvExportTest(ElasticTest):
cluster_paths,
structures)
self.assertTrue(isinstance(flatten_clusters, types.GeneratorType))
result = exporter.flatten_data_as_csv(csv_paths, flatten_clusters)
result = export_utils.flatten_data_as_csv(csv_paths, flatten_clusters)
self.assertTrue(isinstance(result, types.GeneratorType))
output = six.StringIO(list(result))
reader = csv.reader(output)
@ -220,7 +113,7 @@ class StatsToCsvExportTest(ElasticTest):
structures)
flatten_clusters = list(flatten_clusters)
flatten_clusters[1][0] = u'эюя'
list(exporter.flatten_data_as_csv(csv_paths, flatten_clusters))
list(export_utils.flatten_data_as_csv(csv_paths, flatten_clusters))
def test_export_clusters(self):
installations_num = 100

View File

@ -1,3 +1,7 @@
elasticsearch==1.2.0
psycopg2==2.5.4
uWSGI==2.0.9
Flask==0.10.1
Flask-Script==2.0.5
Flask-SQLAlchemy==2.0
SQLAlchemy==0.9.8

View File

@ -5,7 +5,4 @@ nose==1.3.4
nose2==0.4.7
tox==1.8.0
unittest2==0.5.1
# required for use tests from migration
SQLAlchemy==0.9.8
psycopg2==2.5.4