Boost get_unit func in impl_stacklight.py

get_unit function send requests into DB to get unit of meter.
There are a small amount of default meters in Ceilo.
This patch replace impl_stacklight.py by modified.
New version of script implement dictionary of units for
default meters to boost get_unit function.

Change-Id: I25d212e8b14e5b1ef07285add33317e3c80ff089
This commit is contained in:
Ildar Svetlov 2016-12-07 17:36:21 +04:00
parent 1590db0459
commit a8f15cdc6d
4 changed files with 649 additions and 0 deletions

View File

@ -130,6 +130,47 @@ file {'/usr/lib/python2.7/dist-packages/ceilometer/event/storage/impl_elasticsea
alias => 'impl_elasticsearch.pyc',
}
file { '/usr/lib/python2.7/dist-packages/ceilometer/storage/impl_stacklight.py':
ensure => 'present',
content => file( 'telemetry/ceilometer_fixes/impl_stacklight.py' ),
mode => '0644',
owner => 'root',
group => 'root',
notify => Service['ceilometer-service','ceilometer-agent-notification'],
require => File['impl_stacklight.pyc'],
}
file {'/usr/lib/python2.7/dist-packages/ceilometer/storage/impl_stacklight.pyc':
ensure => 'absent',
alias => 'impl_stacklight.pyc',
}
file { '/usr/lib/python2.7/dist-packages/ceilometer/storage/metrics':
ensure => 'directory',
mode => '0644',
owner => 'root',
group => 'root',
notify => Service['ceilometer-service','ceilometer-agent-notification'],
}
file { '/usr/lib/python2.7/dist-packages/ceilometer/storage/metrics/__init__.py':
ensure => 'present',
content => file( 'telemetry/ceilometer_fixes/metrics/__init__.py' ),
mode => '0644',
owner => 'root',
group => 'root',
notify => Service['ceilometer-service','ceilometer-agent-notification'],
}
file { '/usr/lib/python2.7/dist-packages/ceilometer/storage/metrics/units.py':
ensure => 'present',
content => file( 'telemetry/ceilometer_fixes/metrics/units.py' ),
mode => '0644',
owner => 'root',
group => 'root',
notify => Service['ceilometer-service','ceilometer-agent-notification'],
}
service {'ceilometer-agent-notification':
ensure => $service_ensure,
name => $::ceilometer::params::agent_notification_service_name,

View File

@ -0,0 +1,348 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Backend implementation compatible with StackLight storages"""
import elasticsearch as es
import influxdb
import influxdb.exceptions
import influxdb.resultset
from oslo_config import cfg
from oslo_log import log
from oslo_utils import netutils
import ceilometer
from ceilometer.i18n import _LE
from ceilometer.storage import base
from ceilometer.storage.es import utils as es_utils
from ceilometer.storage.influx import utils as influx_utils
from ceilometer.storage.metrics import units
from ceilometer import utils
LOG = log.getLogger(__name__)
OPTS = [
cfg.IntOpt('influxdb_replication',
min=1,
default=1,
help="Replication factor for InfluxDB retention policy in "
"seconds."),
cfg.StrOpt('resource_connection',
secret=True,
help='The connection string used to connect to the resource '
'database.'),
]
cfg.CONF.register_opts(OPTS, group='database')
AVAILABLE_CAPABILITIES = {
'resources': {'query': {'simple': False,
'metadata': False}},
'statistics': {'groupby': True,
'query': {'simple': True,
'metadata': True},
'aggregation': {'standard': True,
'selectable': {'max': True,
'min': True,
'sum': True,
'avg': True,
'count': True,
'stddev': True,
'cardinality': False}}},
'meters': {'query': {'simple': False,
'metadata': False}},
'samples': {'query': {'simple': False,
'metadata': False,
'complex': False}},
}
AVAILABLE_STORAGE_CAPABILITIES = {
'storage': {'production_ready': True},
}
class Connection(base.Connection):
"""Get Ceilometer data from InfluxDB and ElasticSearch databases.
Samples are stored in the following format in InfluxDB:
- measurement: sample
- tags (indexed): user_id, resource_id, project_id, source and
configured metadata fields
- fields (not indexed): counter_type -> type, counter_unit -> unit,
counter_volume -> value, counter_name -> meter, message_id,
message_signature, timestamp and recorded_at.
Resources and meters are stored in ElasticSearch.
Resources:
{
"_index": "ceilometer_resource",
"_type": "<source>",
"_id": "<resource_id>",
"_source":{
"first_sample_timestamp": "<datetime in isoformat>",
"last_sample_timestamp": "<datetime in isoformat>",
"project_id": "<project_id>",
"user_id": "<user_id>",
"metadata": {
"foo" : "bar",
"foofoo" : {"barbar": {"foo": "bar"}}
},
"meters": {"<meter_name>": {"unit": "<meter_unit>",
"type": "<meter_type>"}
}
}
This class has 'record_metering_data' implementation, but it is used only
for testing needs. In real life, data will be recorded by StackLight
"""
CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
AVAILABLE_CAPABILITIES)
STORAGE_CAPABILITIES = utils.update_nested(
base.Connection.STORAGE_CAPABILITIES,
AVAILABLE_STORAGE_CAPABILITIES,
)
resource_index = "ceilometer_resource"
_refresh_on_write = False
def __init__(self, url):
if cfg.CONF.database.resource_connection:
url_split = netutils.urlsplit(
cfg.CONF.database.resource_connection)
self.resource_connection = es.Elasticsearch(url_split.netloc)
else:
self.resource_connection = None
user, pwd, host, port, self.database = influx_utils.split_url(url)
self.sample_connection = influxdb.InfluxDBClient(host, port, user, pwd,
self.database)
def upgrade(self):
self.upgrade_resource_database()
self.upgrade_sample_database()
def upgrade_resource_database(self):
if not self.resource_connection:
return
iclient = es.client.IndicesClient(self.resource_connection)
template = {
'template': 'ceilometer_*',
'mappings': {
'_default_': {
'properties': {
'first_sample_timestamp': {'type': 'date'},
'last_sample_timestamp': {'type': 'date'},
},
"dynamic_templates": [
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "string",
"index": "not_analyzed"
}
}
}
]
}
}
}
iclient.put_template(name='ceilometer_resource_template',
body=template)
iclient.create(self.resource_index)
def upgrade_sample_database(self):
try:
self.sample_connection.create_database(self.database)
except influxdb.exceptions.InfluxDBClientError as e:
if "database already exists" not in e.content:
raise
self.sample_connection.create_retention_policy(
name=influx_utils.RETENTION_POLICY_NAME,
duration="INF",
replication=cfg.CONF.database.influxdb_replication,
database=self.database,
default=True)
if cfg.CONF.database.metering_time_to_live > 0:
duration = "%ss" % cfg.CONF.database.metering_time_to_live
self.sample_connection.alter_retention_policy(
name=influx_utils.RETENTION_POLICY_NAME,
database=self.database,
duration=duration,
replication=cfg.CONF.database.influxdb_replication,
default=True
)
def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery=None, limit=None, unique=None):
if not self.resource_connection:
raise base.NoResultFound(
"Resource connection url is not defined and "
"meter requests could not be processed")
if limit == 0:
return
q_args = es_utils.make_query(self.resource_index, resource=resource,
user=user, project=project, source=source,
metaquery=metaquery, limit=limit)
results = self.resource_connection.search(
fields=['_type', '_id', '_source'],
**q_args)
return es_utils.search_results_to_meters(results, limit, unique)
def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None,
metaquery=None, resource=None, limit=None):
if not self.resource_connection:
raise base.NoResultFound(
"Resource connection url is not defined and "
"resource requests could not be processed")
if limit == 0:
return
q_args = es_utils.make_query(self.resource_index, user, project,
source, start_timestamp,
start_timestamp_op, end_timestamp,
end_timestamp_op, metaquery, resource,
limit)
results = self.resource_connection.search(
fields=['_type', '_id', '_source'],
**q_args)
return es_utils.search_results_to_resources(results)
def get_meter_statistics(self, sample_filter, period=None, groupby=None,
aggregate=None):
# Note InfluxDB should have a lower time bound in query,
# otherwise it will be defined as 1970-01-01T00:00:00.
if (groupby and set(groupby) -
set(['user_id', 'project_id', 'resource_id', 'source',
'resource_metadata.instance_type'])):
raise ceilometer.NotImplementedError(
"Unable to group by these fields")
if any([aggr.func == 'cardinality' for aggr in (aggregate or [])]):
raise ceilometer.NotImplementedError(
"Cardinality aggregation is not supported "
"by StackLight backends"
)
try:
if (not sample_filter.start_timestamp or
not sample_filter.end_timestamp):
first, last = self.get_time_boundary(sample_filter)
sample_filter.start_timestamp = \
sample_filter.start_timestamp or first
unit = self.get_unit(sample_filter)
except base.NoResultFound:
return []
query = influx_utils.make_aggregate_query(sample_filter, period,
groupby, aggregate)
response = self._query(query)
stats = []
for serie, points in response.items():
measurement, tags = serie
for point in points or []:
stats.append(
influx_utils.point_to_stat(point, tags, period, aggregate,
unit))
return [stat for stat in stats if stat]
def get_samples(self, sample_filter, limit=None):
if limit is 0:
return
response = self._query(
influx_utils.make_list_query(sample_filter, limit))
for point in response.get_points(influx_utils.MEASUREMENT):
yield influx_utils.point_to_sample(point)
def query_samples(self, filter_expr=None, orderby=None, limit=None):
q = influx_utils.make_complex_query(filter_expr, limit)
response = self._query(q)
samples = []
for point in response.get_points(influx_utils.MEASUREMENT):
samples.append(influx_utils.point_to_sample(point))
return influx_utils.sort_samples(samples, orderby)
def get_unit(self, sample_filter):
meter = sample_filter.meter
if meter in units.UNITS_BY_METRIC:
return units.UNITS_BY_METRIC[meter]
response = self._query(
influx_utils.make_unit_query(sample_filter))
try:
point = response.get_points(influx_utils.MEASUREMENT).next()
except StopIteration:
raise base.NoResultFound()
units.UNITS_BY_METRIC[meter] = point['unit']
return point['unit']
def get_time_boundary(self, sample_filter):
"""Find timestamp of the first matching sample in the database."""
response = self._query(
influx_utils.make_time_bounds_query(sample_filter))
try:
first_point = response.get_points(influx_utils.MEASUREMENT).next()
except StopIteration:
raise base.NoResultFound()
start_timestamp = utils.sanitize_timestamp(first_point['first'])
end_timestamp = utils.sanitize_timestamp(first_point['last'])
return start_timestamp, end_timestamp
def _query(self, q):
"""Make a query to InfluxDB database.
:param q: Query string in InfluxDB query format.
:returns a response ResultSet
"""
LOG.debug("InfluxDB query requested: %s" % q)
try:
return self.sample_connection.query(q)
except influxdb.exceptions.InfluxDBClientError as e:
LOG.exception(_LE("Client error during the InfluxDB query: %s"), e)
return influxdb.resultset.ResultSet({})
def record_metering_data(self, data):
"""Records data into databases
Method is needed for testing needs only. In real life, data will be
written to the databases by StackLight.
"""
data['counter_name'] = utils.decode_unicode(data['counter_name'])
self.resource_connection.update(
index=self.resource_index, doc_type='source',
id=data['resource_id'], body=es_utils.sample_to_resource(data)
)
self.sample_connection.write_points(
[influx_utils.sample_to_point(data)], "n", self.database,
influx_utils.RETENTION_POLICY_NAME)
if self._refresh_on_write:
self.resource_connection.indices.refresh(self.resource_index)
while self.resource_connection.cluster.pending_tasks(
local=True)['tasks']:
pass
def clear(self):
self.resource_connection.indices.delete(index=self.resource_index,
ignore=[400, 404])
self.sample_connection.drop_database(self.database)

View File

@ -0,0 +1,260 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
UNITS_BY_METRIC = {
'bandwidth': 'B',
'cluster.create': 'cluster',
'cluster.delete': 'cluster',
'cluster.update': 'cluster',
'compute.node.cpu.frequency': 'MHz',
'compute.node.cpu.idle.percent': '%',
'compute.node.cpu.idle.time': 'ns',
'compute.node.cpu.iowait.percent': '%',
'compute.node.cpu.iowait.time': 'ns',
'compute.node.cpu.kernel.percent': '%',
'compute.node.cpu.kernel.time': 'ns',
'compute.node.cpu.percent': '%',
'compute.node.cpu.user.percent': '%',
'compute.node.cpu.user.time': 'ns',
'cpu': 'ns',
'cpu.delta': 'ns',
'cpu_l3_cache': 'B',
'cpu_util': '%',
'disk.allocation': 'B',
'disk.capacity': 'B',
'disk.device.allocation': 'B',
'disk.device.capacity': 'B',
'disk.device.iops': 'count/s',
'disk.device.latency': 'ms',
'disk.device.read.bytes .rate': 'B/s',
'disk.device.read.bytes': 'B',
'disk.device.read.requests': 'request',
'disk.device.read.requests.rate': 'request/s',
'disk.device.usage': 'B',
'disk.device.write.bytes .rate': 'B/s',
'disk.device.write.bytes': 'B',
'disk.device.write.requests': 'request',
'disk.device.write.requests.rate': 'request/s',
'disk.ephemeral.size': 'GB',
'disk.iops': 'count/s',
'disk.latency': 'ms',
'disk.read.bytes': 'B',
'disk.read.bytes.rate': 'B/s',
'disk.read.requests': 'request',
'disk.read.requests.rate': 'request/s',
'disk.root.size': 'GB',
'disk.usage': 'B',
'disk.write.bytes': 'B',
'disk.write.bytes.rate': 'B/s',
'disk.write.requests': 'request',
'disk.write.requests.rate': 'request/s',
'hardware.cpu.load.10min': 'process',
'hardware.cpu.load.1min': 'process',
'hardware.cpu.load.5min': 'process',
'hardware.disk.size.total': 'KB',
'hardware.disk.size.used': 'KB',
'hardware.ipmi.current': 'W',
'hardware.ipmi.fan': 'RPM',
'hardware.ipmi.node.airflow': 'CFM',
'hardware.ipmi.node.cpu_util': '%',
'hardware.ipmi.node.cups': 'CUPS',
'hardware.ipmi.node.inlet_temperature': 'C',
'hardware.ipmi.node.io_util': '%',
'hardware.ipmi.node.mem_util': '%',
'hardware.ipmi.node.outlet_temperature': 'C',
'hardware.ipmi.node.power': 'W',
'hardware.ipmi.node.temperature': 'C',
'hardware.ipmi.temperature': 'C',
'hardware.ipmi.voltage': 'V',
'hardware.memory.buffer': 'KB',
'hardware.memory.cached': 'KB',
'hardware.memory.swap.avail': 'KB',
'hardware.memory.swap.total': 'KB',
'hardware.memory.total': 'KB',
'hardware.memory.used': 'KB',
'hardware.network.incoming.bytes': 'B',
'hardware.network.ip.incoming.datagrams': 'datagrams',
'hardware.network.ip.outgoing.datagrams': 'datagrams',
'hardware.network.outgoing.bytes': 'B',
'hardware.network.outgoing.errors': 'packet',
'hardware.system_stats.cpu.idle': '%',
'hardware.system_stats.io.incoming.blocks': 'blocks',
'hardware.system_stats.io.outgoing.blocks': 'blocks',
'identity.authenticate.failure': 'user',
'identity.authenticate.pending': 'user',
'identity.authenticate.success': 'user',
'identity.group.created': 'group',
'identity.group.deleted': 'group',
'identity.group.updated': 'group',
'identity.project.created': 'project',
'identity.project.deleted': 'project',
'identity.project.updated': 'project',
'identity.role.created': 'role',
'identity.role.deleted': 'role',
'identity.role.updated': 'role',
'identity.role_assignment.created': 'role_assignment',
'identity.role_assignment.deleted': 'role_assignment',
'identity.trust.created': 'trust',
'identity.trust.deleted': 'trust',
'identity.user.created': 'user',
'identity.user.deleted': 'user',
'identity.user.updated': 'user',
'image': 'image',
'image.delete': 'image',
'image.download': 'B',
'image.serve': 'B',
'image.size': 'image',
'image.update': 'image',
'image.upload': 'image',
'instance': 'instance',
'ip.floating': 'ip',
'ip.floating.create': 'ip',
'ip.floating.update': 'ip',
'memory': 'MB',
'memory.bandwidth.local': 'B/s',
'memory.bandwidth.total': 'B/s',
'memory.resident': 'MB',
'memory.usage': 'MB',
'network': 'network',
'network.create': 'network',
'network.incoming.bytes': 'B',
'network.incoming.bytes.rate': 'B/s',
'network.incoming.packets': 'packet',
'network.incoming.packets.rate': 'packet/s',
'network.outgoing.bytes': 'B',
'network.outgoing.bytes.rate': 'B/s',
'network.outgoing.packets': 'packet',
'network.outgoing.packets.rate': 'packet/s',
'network.services.firewall': 'firewall',
'network.services.firewall.create': 'firewall',
'network.services.firewall.policy': 'firewall_policy',
'network.services.firewall.policy.create': 'firewall_policy',
'network.services.firewall.policy.update': 'firewall_policy',
'network.services.firewall.rule': 'firewall_rule',
'network.services.firewall.rule.create': 'firewall_rule',
'network.services.firewall.rule.update': 'firewall_rule',
'network.services.firewall.update': 'firewall',
'network.services.lb.active.connections': 'connection',
'network.services.lb.health_monitor': 'health_monitor',
'network.services.lb.health_monitor.create': 'health_monitor',
'network.services.lb.health_monitor.update': 'health_monitor',
'network.services.lb.healthmonitor.create': 'health_monitor',
'network.services.lb.healthmonitor.update': 'health_monitor',
'network.services.lb.incoming.bytes': 'B',
'network.services.lb.listener': 'listener',
'network.services.lb.listener.create': 'listener',
'network.services.lb.listener.update': 'listener',
'network.services.lb.loadbalancer': 'loadbalancer',
'network.services.lb.loadbalancer.create': 'loadbalancer',
'network.services.lb.loadbalancer.update': 'loadbalancer',
'network.services.lb.member': 'member',
'network.services.lb.member.create': 'member',
'network.services.lb.member.update': 'member',
'network.services.lb.outgoing.bytes': 'B',
'network.services.lb.pool': 'pool',
'network.services.lb.pool.create': 'pool',
'network.services.lb.pool.update': 'pool',
'network.services.lb.total.connections': 'connection',
'network.services.lb.vip': 'vip',
'network.services.lb.vip.create': 'vip',
'network.services.lb.vip.update': 'vip',
'network.services.vpn': 'vpnservice',
'network.services.vpn.connections': 'ipsec_site_connection',
'network.services.vpn.connections.create': 'ipsec_site_connection',
'network.services.vpn.connections.update': 'ipsec_site_connection',
'network.services.vpn.create': 'vpnservice',
'network.services.vpn.ikepolicy': 'ikepolicy',
'network.services.vpn.ikepolicy.create': 'ikepolicy',
'network.services.vpn.ikepolicy.update': 'ikepolicy',
'network.services.vpn.ipsecpolicy': 'ipsecpolicy',
'network.services.vpn.ipsecpolicy.create': 'ipsecpolicy',
'network.services.vpn.ipsecpolicy.update': 'ipsecpolicy',
'network.services.vpn.update': 'vpnservice',
'network.update': 'network',
'perf.cache.misses': 'count',
'perf.cache.references': 'count',
'perf.cpu.cycles': 'cycle',
'perf.instructions': 'instruction',
'port': 'port',
'port.create': 'port',
'port.update': 'port',
'router': 'router',
'router.create': 'router',
'router.update': 'router',
'snapshot': 'snapshot',
'snapshot.create.start': 'snapshot',
'snapshot.create.end': 'snapshot',
'snapshot.delete.start': 'snapshot',
'snapshot.delete.end': 'snapshot',
'snapshot.size': 'GB',
'stack.create': 'stack',
'stack.delete': 'stack',
'stack.resume': 'stack',
'stack.suspend': 'stack',
'stack.update': 'stack',
'storage.api.request': 'request',
'storage.containers.objects': 'object',
'storage.containers.objects.size': 'B',
'storage.objects': 'object',
'storage.objects.containers': 'container',
'storage.objects.incoming.bytes': 'B',
'storage.objects.outgoing.bytes': 'B',
'storage.objects.size': 'B',
'subnet': 'subnet',
'subnet.create': 'subnet',
'subnet.update': 'subnet',
'switch': 'switch',
'switch.flow': 'flow',
'switch.flow.bytes': 'B',
'switch.flow.duration.nanoseconds': 'ns',
'switch.flow.duration.seconds': 's',
'switch.flow.packets': 'packet',
'switch.port': 'port',
'switch.port.collision.count': 'count',
'switch.port.receive.bytes': 'B',
'switch.port.receive.crc_error': 'packet',
'switch.port.receive.drops': 'packet',
'switch.port.receive.errors': 'packet',
'switch.port.receive.frame_error': 'packet',
'switch.port.receive.overrun_error': 'packet',
'switch.port.receive.packets': 'packet',
'switch.port.transmit.bytes': 'B',
'switch.port.transmit.drops': 'packet',
'switch.port.transmit.errors': 'packet',
'switch.port.transmit.packets': 'packet',
'switch.table': 'table',
'switch.table.active.entries': 'entry',
'switch.table.lookup.packets': 'packet',
'switch.table.matched.packets': 'packet',
'vcpus': 'vcpu',
'volume': 'volume',
'volume.attach.start': 'volume',
'volume.attach.end': 'volume',
'volume.backup.create.start': 'volume',
'volume.backup.create.end': 'volume',
'volume.backup.delete.start': 'volume',
'volume.backup.delete.end': 'volume',
'volume.backup.restore.start': 'volume',
'volume.backup.restore.end': 'volume',
'volume.create.start': 'volume',
'volume.create.end': 'volume',
'volume.delete.start': 'volume',
'volume.delete.end': 'volume',
'volume.detach.start': 'volume',
'volume.detach.end': 'volume',
'volume.resize.start': 'volume',
'volume.resize.end': 'volume',
'volume.size': 'GB',
'volume.update.start': 'volume',
'volume.update.end': 'volume'
}