From 5f3fdd5248091ae52498251295d7d58c63cd6f10 Mon Sep 17 00:00:00 2001 From: Volodymyr Samotiy Date: Tue, 24 May 2016 15:15:46 +0300 Subject: [PATCH] Add Apache Kafka BST plugin and documentation Change-Id: I8e896ee5a3d4ba39dbaedd5e8bf0e0b9e83480b0 --- .../config/broadviewcollector.conf | 10 ++ broadview_collector/plugins/kafkapublisher.py | 74 +++++++++++++ doc/kafka.md | 100 ++++++++++++++++++ 3 files changed, 184 insertions(+) create mode 100644 broadview_collector/plugins/kafkapublisher.py create mode 100644 doc/kafka.md diff --git a/broadview_collector/config/broadviewcollector.conf b/broadview_collector/config/broadviewcollector.conf index 01ce729..7b862a7 100644 --- a/broadview_collector/config/broadviewcollector.conf +++ b/broadview_collector/config/broadviewcollector.conf @@ -55,3 +55,13 @@ port: 8082 #port: 8088 #timeout: 1.0 +# uncomment for kafka, add kafkapublisher to the [plugins] publishers var + +#[kafka] + +# ip address, port, and topic for kafka + +#ip_address: 172.16.170.184 +#port: 9092 +#topic: broadview-bst + diff --git a/broadview_collector/plugins/kafkapublisher.py b/broadview_collector/plugins/kafkapublisher.py new file mode 100644 index 0000000..a064d51 --- /dev/null +++ b/broadview_collector/plugins/kafkapublisher.py @@ -0,0 +1,74 @@ +# (C) Copyright Broadcom Corporation 2016 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from broadviewpublisherbase import BroadViewPublisherBase +from kafka import KafkaProducer +from broadview_collector.serializers.bst_to_monasca import BSTToMonasca +import json +import ConfigParser + +from oslo_log import log + +LOG = log.getLogger(__name__) + +class BroadViewPublisher(BroadViewPublisherBase): + def readConfig(self): + try: + bvcfg = ConfigParser.ConfigParser() + bvcfg.read("/etc/broadviewcollector.conf") + self._ip_address = bvcfg.get("kafka", "ip_address") + self._port = bvcfg.get("kafka", "port") + self._topic = bvcfg.get("kafka", "topic") + except: + LOG.info("BroadViewPublisher: unable to read configuration") + + def getKafkaProducer(self): + try: + self._producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(self._ip_address, self._port)]) + except kafka.errors.NoBrokersAvailable as e: + LOG.error("BroadViewPublisher: NoBrokersAvailable {}".format(e)) + except: + LOG.error("Unexpected error: {}".format(sys.exc_info()[0])) + + def __init__(self): + self._ip_address = "127.0.0.1" + self._port = "9092" + self._topic = "broadview-bst" + self.readConfig() + self._producer = None + + def publish(self, host, data): + LOG.info('kafka publish enter') + code = 500 + # get a producer if needed + if not self._producer: + self.getKafkaProducer() + if self._producer: + code = 200 + success, sdata = BSTToMonasca().serialize(host, data) + sdata = json.loads(sdata) + if success: + for x in sdata: + try: + self._producer.send(self._topic, json.dumps(x)) + except: + LOG.info('unable to send to kafka topic {}: {}'.format(self._topic, sys.exc_info()[0])) + else: + code = 500 + LOG.info('kafka publish code {}'.format(code)) + return code + + def __repr__(self): + return "BroadView kafka Publisher {} {}:{}".format(self._topic, self._ip_address, self._port) + diff --git a/doc/kafka.md b/doc/kafka.md new file mode 100644 index 0000000..b3f040d --- /dev/null +++ b/doc/kafka.md @@ -0,0 +1,100 @@ +Using the kafka plugin +====================== + +This document runs through the steps needed to configure and test out the +kafka plugin for BroadView Collector. + +Enabling the kafka plugin +------------------------- + +The kafka plugin is located in broadview_collector/plugins/kafkapublisher.py. +In the plugin constructor (__init__), the following defaults related to the +kafka message bus are assigned: + +* ip address of kafka - 127.0.0.1 +* port - 9092 +* topic - broadview-bst + +You can override any or all of these settings in /etc/broadviewcollector.conf +in the [kafka] section of the file. The checked in version of this file in +github has these settings commented out. To enable them, uncomment the +[kafka] section and the settings that you would like to override, and give +appropriate values. The following example configures the IP address, port +and topic and will override the default settings of the plugin: + + [kafka] + + # ip address, port, and topic for kafka + + ip_address: 192.168.0.120 + port: 8088 + topic: broadview-bst + +You'll also need to enable the kafka plugin in /etc/broadviewcollector.conf. +To do this, add the string "kafkapublisher" (no quotes) to the publishers +setting in the [plugins] section. The following example enables both the +log publisher and the kafka publisher plugins: + + [plugins] + publishers: logpublisher, kafkapublisher + +Testing the Kafka Publisher Plugin +---------------------------------- + +To test out, or experiment with, the kafka publisher plugin, consider doing +the following: + +* Install broadview-collector. See the instructions in the main README.md +* Configure the collector for kafka as described above. +* Install kafka. There are tutorials on the web for this. One that is known +to work for Ubuntu 14.04 can be found at DigitalOcean's website: + + https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04 + +* Start up the collector, and then launch kafka from a terminal window. If you +are using the DigitalOcean kafka tutorial: + + $ nohup ~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties > ~/kafka/kafka.log 2>&1 & + +* In a separate terminal window, run the following to view the data that is +being written to the kafka queue by BroadView Collector: + + $ ~/kafka/bin/kafka-console-consumer --zookeeper 127.0.0.1:2181 --topic broadview.bst + +Note in the above, the path to kafka-console-consumber may be different based +on how you installed kafka. The IP address and port should work if you +installed everything on a single host, and zookeeper is configured to use the +default listen port of 2181. On Ubuntu 14.04, the settings can be found in +/etc/zookeeper/zooinspector/defaultConnectionSettings.cfg in the "hosts" +setting. + +* Run the BST burst simulator. Instructions for this are provided in the next +section. + +Starting the Simulator +---------------------- + +To start the bst simulator, go to where broadview-collector was cloned from +github. Then, cd into broadview-collector/broadview_collector/tools. Edit the +script bst_burst.py to set the host and port variables to the IP address and +port that the collector is running on, then run the following in a bash window: + + $ while true; do sleep 90; python bst_burst.py; done + +This will get data flowing into the collector and then into the configured +plugin. + +If everything is working, you should see output like the following displayed +from kafka-console-consumer: + + {"timestamp": 1464032262000.0, "name": "broadview.bst.ingress-port-priority-group", "value": 15, "dimensions": {"asic-id": "20", "stat": "um-share-buffer-count", "priority-group": 5, "port": "2", "bv-agent": "192.168.0.120"}} + {"timestamp": 1464032262000.0, "name": "broadview.bst.ingress-port-priority-group", "value": 15, "dimensions": {"asic-id": "20", "stat": "um-headroom-buffer-count", "priority-group": 5, "port": "2", "bv-agent": "192.168.0.120"}} + {"timestamp": 1464032262000.0, "name": "broadview.bst.ingress-port-priority-group", "value": 15, "dimensions": {"asic-id": "20", "stat": "um-share-buffer-count", "priority-group": 6, "port": "3", "bv-agent": "192.168.0.120"}} + {"timestamp": 1464032262000.0, "name": "broadview.bst.ingress-port-priority-group", "value": 15, "dimensions": {"asic-id": "20", "stat": "um-headroom-buffer-count", "priority-group": 6, "port": "3", "bv-agent": "192.168.0.120"}} + {"timestamp": 1464032262000.0, "name": "broadview.bst.ingress-port-service-pool", "value": 15, "dimensions": {"asic-id": "20", "stat": "um-share-buffer-count", "service-pool": 5, "port": "2", "bv-agent": "192.168.0.120"}} + {"timestamp": 1464032262000.0, "name": "broadview.bst.ingress-port-service-pool", "value": 15, "dimensions": {"asic-id": "20", "stat": "um-share-buffer-count", "service-pool": 6, "port": "3", "bv-agent": "192.168.0.120"}} + {"timestamp": 1464032262000.0, "name": "broadview.bst.ingress-service-pool", "value": 15, "dimensions": {"asic-id": "20", "stat": "um-share-buffer-count", "service-pool": 1, "bv-agent": "192.168.0.120"}} + +Congratulations - you've properly installed and configured the BroadView +Collector to accept BST data from an agent, and publish it to kafka. +