SkyDive agent app

Initial sketch of app that reports DF objects to Skydive analyzer

TODO:
[X] Fix issues with Skydive - use 0.4.1
[X] Investigate why main loop does not exit on stop
[X] Add docstrings
[X] Add links to the owner objects to get a more complete topology view
[X] Add mechanism to remove deleted items

The following will be in separate patches:
[ ] Start as a separate daemon
[ ] Handle DB disconnect/reconnect
[ ] Get add/update/delete events from DF/DB

Co-Authored-By: Omer Anson <omer.anson@toganetworks.com>
Co-Authored-By: Shachar Snapiri <shachar.snapiri@huawei.com>
Related-Bug: #1749429
Depends-On: I89f37e9590de86ca8d6f6a48cf1673ea214b6d29
Change-Id: Ie715e340f9df1b1e250480d05ecd0341d28bda06
This commit is contained in:
Dima Kuznetsov 2017-11-01 16:27:07 +02:00 committed by Omer Anson
parent 3945b4eb7f
commit 439e528add
4 changed files with 352 additions and 0 deletions

View File

@ -0,0 +1,311 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import signal
import sys
import uuid
from jsonmodels import fields
from oslo_log import log
from skydive.rest.client import RESTClient
from skydive.websocket import client as skydive_client
from dragonflow.common import utils as df_utils
from dragonflow import conf as cfg
from dragonflow.controller import df_config
from dragonflow.db import api_nb
from dragonflow.db import model_framework as mf
from dragonflow.db import model_proxy
from dragonflow.db.models import all # noqa
LOG = log.getLogger(__name__)
DRAGONFLOW_HOST_ID = 'dragonflow-skydive'
DF_SKYDIVE_NAMESPACE_UUID = uuid.UUID('8a527b24-f0f5-4c1f-8f3d-6de400aa0145')
global_skydive_client = None
class SkydiveClient(object):
"""Main class that manages all the skydive operation."""
def __init__(self, nb_api):
protocol = WSClientDragonflowProtocol(nb_api)
self.websocket_client = skydive_client.WSClient(
host_id=DRAGONFLOW_HOST_ID,
endpoint='ws://{0}/ws/publisher'.format(
cfg.CONF.df_skydive.analyzer_endpoint),
protocol=lambda: protocol
)
logged_in = self.websocket_client.login(
cfg.CONF.df_skydive.analyzer_endpoint,
cfg.CONF.df_skydive.user,
cfg.CONF.df_skydive.password)
if not logged_in:
# TODO(snapiri) raise an exception
LOG.error('Failed authenticating with SkyDive analyzer at %s',
cfg.CONF.df_skydive.analyzer_endpoint)
return
self.websocket_client.connect()
@staticmethod
def create():
"""Create a new SkydiveClient
:return a newly allocated SkydiveClient
:rtype SkydiveClient
"""
df_utils.config_parse()
nb_api = api_nb.NbApi.get_instance(False, True)
return SkydiveClient(nb_api)
def clear_dragonflow_items(self):
"""Delete all the items created by DragonFlow"""
restclient = RESTClient(cfg.CONF.df_skydive.analyzer_endpoint)
edges = restclient.lookup_edges("G.E().Has('source': 'dragonflow')")
for edge in edges:
edge_del_msg = skydive_client.WSMessage(
"Graph",
skydive_client.EdgeDeletedMsgType,
edge
)
self.sendWSMessage(edge_del_msg)
nodes = restclient.lookup_nodes("G.V().Has('source': 'dragonflow')")
for node in nodes:
node_del_msg = skydive_client.WSMessage(
"Graph",
skydive_client.NodeDeletedMsgType,
node
)
self.sendWSMessage(node_del_msg)
def start(self):
"""Start communication with the SkyDive analyzer
This starts the operaiton of periodically querying the nb_api and
sending all the objects to the SkyDive analyzer.
"""
self.websocket_client.start()
def schedule_stop(self, wait_time):
"""Schedule a loop stop event
:param wait_time: number of seconds until stop
:type wait_time: int
"""
loop = self.websocket_client.loop
loop.call_later(wait_time, self.stop)
def stop(self):
"""Stop the process of sending the updates to the SkyDive analyzer"""
self.websocket_client.stop()
class WSClientDragonflowProtocol(skydive_client.WSClientDebugProtocol):
"""Protocol handler for the SkyDive client.
This class does the actual work of sending the updates to the analyzer
"""
def __init__(self, nb_api):
super(WSClientDragonflowProtocol, self).__init__()
self.nb_api = nb_api
def reschedule_send(self):
# Schedule next update
loop = self.factory.client.loop
wait_time = cfg.CONF.df_skydive.update_interval
loop.call_later(wait_time, self.send_df_updates)
def send_df_updates(self):
"""Callback that is called when the client connects to the analyzer
As the client is working asynchronously, this is where our work is
actually being done.
We now start sending the updates to skydive periodically.
"""
df_objects = self._get_df_objects()
LOG.debug('Sending to skydive: %s', df_objects)
for node in df_objects["Nodes"]:
node_add_msg = skydive_client.WSMessage(
"Graph",
skydive_client.NodeAddedMsgType,
node
)
self.sendWSMessage(node_add_msg)
for edge in df_objects["Edges"]:
edge_add_msg = skydive_client.WSMessage(
"Graph",
skydive_client.EdgeAddedMsgType,
edge
)
self.sendWSMessage(edge_add_msg)
self.reschedule_send()
def _build_edge_message(self, src_type, src_id, dst_type, dst_id):
id_str = '{}->{}'.format(src_id, dst_id)
metadata = {
'source': 'dragonflow',
'source_type': src_type,
'dest_type': dst_type,
}
result = {
'ID': str(uuid.uuid5(DF_SKYDIVE_NAMESPACE_UUID, id_str)),
'Child': "DF-{}".format(src_id),
'Parent': "DF-{}".format(dst_id),
'Host': 'dragonflow',
'Metadata': metadata,
}
return result
def _add_edge_message(self, edges, instance, field):
if model_proxy.is_model_proxy(field):
field = self.nb_api.get(field)
if not hasattr(field, 'id'):
return
result = self._build_edge_message(
type(instance).__name__, instance.id,
type(field).__name__, field.id)
edges.append(result)
def _output_edge(self, edges, instance, field_name, multi_value):
field = getattr(instance, field_name)
if multi_value:
for _field in field:
self._add_edge_message(edges, instance, _field)
else:
self._add_edge_message(edges, instance, field)
def _output_table_node_edges(self, edges, instance):
for key, field in type(instance).iterate_over_fields():
if key == 'id':
continue
multi_value = isinstance(field, fields.ListField)
try:
self._output_edge(edges, instance, key, multi_value)
except AttributeError:
pass # ignore
@staticmethod
def _has_owner(instance):
if not hasattr(instance, "device_owner"):
return False
return hasattr(instance, "device_id")
@staticmethod
def _get_instance_type(instance):
return type(instance).__name__
def _output_table_node(self, nodes, edges, instance):
metadata = {
'ID': "DF-{}".format(instance.id),
'Type': WSClientDragonflowProtocol._get_instance_type(instance),
'source': 'dragonflow',
'data': instance.to_struct(),
'Name': getattr(instance, 'name', None) or instance.id
}
result = {
'Metadata': metadata,
'ID': "DF-{}".format(instance.id),
'Host': 'dragonflow'}
nodes.append(result)
self._output_table_node_edges(edges, instance)
# If we have an owner, add the edge from it to this instance
if WSClientDragonflowProtocol._has_owner(instance):
return
# TODO(snapiri) Fix this code as it is not working correctly
owner_class = mf.get_model(instance.device_owner)
if not owner_class:
return
edge = self._build_edge_message(owner_class.__name__,
instance.device_id,
type(instance).__name__,
instance.id)
edges.append(edge)
def _output_table(self, nodes, edges, table_name):
model = mf.get_model(table_name)
instances = self.nb_api.get_all(model)
for instance in instances:
self._output_table_node(nodes, edges, instance)
def _get_df_objects(self):
nodes = []
edges = []
for table_name in mf.iter_tables():
self._output_table(nodes, edges, table_name)
result = {
'Nodes': nodes,
'Edges': edges,
}
return result
def onOpen(self):
"""Callback that is called when the client connects to the analyzer
As the client is working asynchronously, this is where our work is
actually being done.
We now start sending the updates to skydive periodically.
"""
LOG.debug('onOpen')
# TODO(snapiri) have to handle a case in which we got disconnected
# and then reconnected.
self.reschedule_send()
def onClose(self, wasClean, code, reason):
"""Callback that is called when the client disconnects
Makes sure that the loop is stopped in case the connection was not
closed by the client side.
This is done to prevent the client from getting stuck in the loop
when the connection is closed.
:param wasClean: was the connection closed cleanly
:type wasClean: bool
:param code: error code of the current error
:type code: integer
:param reason: description of the error that occured
:type reason: string
"""
LOG.debug("Client closing %s %s %s", wasClean, code, reason)
if not wasClean:
self.factory.loop.stop()
super(WSClientDragonflowProtocol, self).onClose(wasClean, code, reason)
def signal_handler(signal, frame):
if global_skydive_client:
LOG.info('Stopping SkyDive service')
global_skydive_client.stop()
def set_signal_handler():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
def main():
"""main method"""
df_config.init(sys.argv)
parser = argparse.ArgumentParser(description='SkyDive integration service')
parser.add_argument('-t', '--runtime', type=int,
help='Total runtime (default 0 = infinite)')
args = parser.parse_args()
global global_skydive_client
global_skydive_client = SkydiveClient.create()
if args.runtime:
global_skydive_client.schedule_stop(args.runtime)
global_skydive_client.clear_dragonflow_items()
set_signal_handler()
global_skydive_client.start()
if __name__ == '__main__':
main()

View File

@ -25,6 +25,7 @@ from dragonflow.conf import df_metadata_service
from dragonflow.conf import df_provider_networks
from dragonflow.conf import df_redis
from dragonflow.conf import df_ryu
from dragonflow.conf import df_skydive
from dragonflow.conf import df_snat
@ -45,3 +46,4 @@ df_ryu.register_opts()
df_provider_networks.register_opts()
df_snat.register_opts()
df_bgp.register_opts()
df_skydive.register_opts()

View File

@ -0,0 +1,38 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from dragonflow._i18n import _
df_skydive_app_opts = [
cfg.StrOpt('analyzer_endpoint',
default='127.0.0.1:8082',
help=_('IP:Port of skydive analyzer.')),
cfg.StrOpt('user',
default='admin',
help=_('Username to authenticate to the skydive analyzer.')),
cfg.StrOpt('password',
help=_('password to authenticate to the skydive analyzer.')),
cfg.IntOpt('update_interval',
default=10,
help=_('Interval (in seconds) between data updates.')),
]
def register_opts():
cfg.CONF.register_opts(df_skydive_app_opts, group='df_skydive')
def list_opts():
return {'df_skydive': df_skydive_app_opts}

View File

@ -26,6 +26,7 @@ six>=1.10.0 # MIT
httplib2>=0.9.1 # MIT
WebOb>=1.7.1 # MIT
jsonmodels>=2.1.3 # BSD License (3 clause)
skydive-client>=0.4.1 # Apache-2.0
# These repos are installed from git in OpenStack CI if the job
# configures them as required-projects: