diff --git a/dragonflow/cmd/df_skydive_service.py b/dragonflow/cmd/df_skydive_service.py new file mode 100644 index 000000000..dedfe1348 --- /dev/null +++ b/dragonflow/cmd/df_skydive_service.py @@ -0,0 +1,311 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import argparse +import signal +import sys +import uuid + +from jsonmodels import fields +from oslo_log import log +from skydive.rest.client import RESTClient +from skydive.websocket import client as skydive_client + +from dragonflow.common import utils as df_utils +from dragonflow import conf as cfg +from dragonflow.controller import df_config +from dragonflow.db import api_nb +from dragonflow.db import model_framework as mf +from dragonflow.db import model_proxy +from dragonflow.db.models import all # noqa + +LOG = log.getLogger(__name__) + +DRAGONFLOW_HOST_ID = 'dragonflow-skydive' +DF_SKYDIVE_NAMESPACE_UUID = uuid.UUID('8a527b24-f0f5-4c1f-8f3d-6de400aa0145') + +global_skydive_client = None + + +class SkydiveClient(object): + """Main class that manages all the skydive operation.""" + def __init__(self, nb_api): + protocol = WSClientDragonflowProtocol(nb_api) + self.websocket_client = skydive_client.WSClient( + host_id=DRAGONFLOW_HOST_ID, + endpoint='ws://{0}/ws/publisher'.format( + cfg.CONF.df_skydive.analyzer_endpoint), + protocol=lambda: protocol + ) + logged_in = self.websocket_client.login( + cfg.CONF.df_skydive.analyzer_endpoint, + cfg.CONF.df_skydive.user, + cfg.CONF.df_skydive.password) + if not logged_in: + # TODO(snapiri) raise an exception + LOG.error('Failed authenticating with SkyDive analyzer at %s', + cfg.CONF.df_skydive.analyzer_endpoint) + return + self.websocket_client.connect() + + @staticmethod + def create(): + """Create a new SkydiveClient + :return a newly allocated SkydiveClient + :rtype SkydiveClient + """ + df_utils.config_parse() + nb_api = api_nb.NbApi.get_instance(False, True) + return SkydiveClient(nb_api) + + def clear_dragonflow_items(self): + """Delete all the items created by DragonFlow""" + restclient = RESTClient(cfg.CONF.df_skydive.analyzer_endpoint) + edges = restclient.lookup_edges("G.E().Has('source': 'dragonflow')") + for edge in edges: + edge_del_msg = skydive_client.WSMessage( + "Graph", + skydive_client.EdgeDeletedMsgType, + edge + ) + self.sendWSMessage(edge_del_msg) + nodes = restclient.lookup_nodes("G.V().Has('source': 'dragonflow')") + for node in nodes: + node_del_msg = skydive_client.WSMessage( + "Graph", + skydive_client.NodeDeletedMsgType, + node + ) + self.sendWSMessage(node_del_msg) + + def start(self): + """Start communication with the SkyDive analyzer + + This starts the operaiton of periodically querying the nb_api and + sending all the objects to the SkyDive analyzer. + """ + self.websocket_client.start() + + def schedule_stop(self, wait_time): + """Schedule a loop stop event + :param wait_time: number of seconds until stop + :type wait_time: int + """ + loop = self.websocket_client.loop + loop.call_later(wait_time, self.stop) + + def stop(self): + """Stop the process of sending the updates to the SkyDive analyzer""" + self.websocket_client.stop() + + +class WSClientDragonflowProtocol(skydive_client.WSClientDebugProtocol): + """Protocol handler for the SkyDive client. + + This class does the actual work of sending the updates to the analyzer + """ + def __init__(self, nb_api): + super(WSClientDragonflowProtocol, self).__init__() + self.nb_api = nb_api + + def reschedule_send(self): + # Schedule next update + loop = self.factory.client.loop + wait_time = cfg.CONF.df_skydive.update_interval + loop.call_later(wait_time, self.send_df_updates) + + def send_df_updates(self): + """Callback that is called when the client connects to the analyzer + + As the client is working asynchronously, this is where our work is + actually being done. + We now start sending the updates to skydive periodically. + """ + df_objects = self._get_df_objects() + LOG.debug('Sending to skydive: %s', df_objects) + for node in df_objects["Nodes"]: + node_add_msg = skydive_client.WSMessage( + "Graph", + skydive_client.NodeAddedMsgType, + node + ) + self.sendWSMessage(node_add_msg) + + for edge in df_objects["Edges"]: + edge_add_msg = skydive_client.WSMessage( + "Graph", + skydive_client.EdgeAddedMsgType, + edge + ) + self.sendWSMessage(edge_add_msg) + + self.reschedule_send() + + def _build_edge_message(self, src_type, src_id, dst_type, dst_id): + id_str = '{}->{}'.format(src_id, dst_id) + metadata = { + 'source': 'dragonflow', + 'source_type': src_type, + 'dest_type': dst_type, + } + result = { + 'ID': str(uuid.uuid5(DF_SKYDIVE_NAMESPACE_UUID, id_str)), + 'Child': "DF-{}".format(src_id), + 'Parent': "DF-{}".format(dst_id), + 'Host': 'dragonflow', + 'Metadata': metadata, + } + return result + + def _add_edge_message(self, edges, instance, field): + if model_proxy.is_model_proxy(field): + field = self.nb_api.get(field) + if not hasattr(field, 'id'): + return + result = self._build_edge_message( + type(instance).__name__, instance.id, + type(field).__name__, field.id) + edges.append(result) + + def _output_edge(self, edges, instance, field_name, multi_value): + field = getattr(instance, field_name) + if multi_value: + for _field in field: + self._add_edge_message(edges, instance, _field) + else: + self._add_edge_message(edges, instance, field) + + def _output_table_node_edges(self, edges, instance): + for key, field in type(instance).iterate_over_fields(): + if key == 'id': + continue + multi_value = isinstance(field, fields.ListField) + try: + self._output_edge(edges, instance, key, multi_value) + except AttributeError: + pass # ignore + + @staticmethod + def _has_owner(instance): + if not hasattr(instance, "device_owner"): + return False + return hasattr(instance, "device_id") + + @staticmethod + def _get_instance_type(instance): + return type(instance).__name__ + + def _output_table_node(self, nodes, edges, instance): + metadata = { + 'ID': "DF-{}".format(instance.id), + 'Type': WSClientDragonflowProtocol._get_instance_type(instance), + 'source': 'dragonflow', + 'data': instance.to_struct(), + 'Name': getattr(instance, 'name', None) or instance.id + } + result = { + 'Metadata': metadata, + 'ID': "DF-{}".format(instance.id), + 'Host': 'dragonflow'} + nodes.append(result) + self._output_table_node_edges(edges, instance) + # If we have an owner, add the edge from it to this instance + if WSClientDragonflowProtocol._has_owner(instance): + return + # TODO(snapiri) Fix this code as it is not working correctly + owner_class = mf.get_model(instance.device_owner) + if not owner_class: + return + edge = self._build_edge_message(owner_class.__name__, + instance.device_id, + type(instance).__name__, + instance.id) + edges.append(edge) + + def _output_table(self, nodes, edges, table_name): + model = mf.get_model(table_name) + instances = self.nb_api.get_all(model) + for instance in instances: + self._output_table_node(nodes, edges, instance) + + def _get_df_objects(self): + nodes = [] + edges = [] + for table_name in mf.iter_tables(): + self._output_table(nodes, edges, table_name) + result = { + 'Nodes': nodes, + 'Edges': edges, + } + return result + + def onOpen(self): + """Callback that is called when the client connects to the analyzer + + As the client is working asynchronously, this is where our work is + actually being done. + We now start sending the updates to skydive periodically. + """ + LOG.debug('onOpen') + # TODO(snapiri) have to handle a case in which we got disconnected + # and then reconnected. + self.reschedule_send() + + def onClose(self, wasClean, code, reason): + """Callback that is called when the client disconnects + + Makes sure that the loop is stopped in case the connection was not + closed by the client side. + This is done to prevent the client from getting stuck in the loop + when the connection is closed. + + :param wasClean: was the connection closed cleanly + :type wasClean: bool + :param code: error code of the current error + :type code: integer + :param reason: description of the error that occured + :type reason: string + """ + LOG.debug("Client closing %s %s %s", wasClean, code, reason) + if not wasClean: + self.factory.loop.stop() + super(WSClientDragonflowProtocol, self).onClose(wasClean, code, reason) + + +def signal_handler(signal, frame): + if global_skydive_client: + LOG.info('Stopping SkyDive service') + global_skydive_client.stop() + + +def set_signal_handler(): + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGHUP, signal_handler) + + +def main(): + """main method""" + df_config.init(sys.argv) + parser = argparse.ArgumentParser(description='SkyDive integration service') + parser.add_argument('-t', '--runtime', type=int, + help='Total runtime (default 0 = infinite)') + args = parser.parse_args() + global global_skydive_client + global_skydive_client = SkydiveClient.create() + if args.runtime: + global_skydive_client.schedule_stop(args.runtime) + global_skydive_client.clear_dragonflow_items() + set_signal_handler() + global_skydive_client.start() + + +if __name__ == '__main__': + main() diff --git a/dragonflow/conf/__init__.py b/dragonflow/conf/__init__.py index dbb42d57b..4616d7c09 100644 --- a/dragonflow/conf/__init__.py +++ b/dragonflow/conf/__init__.py @@ -25,6 +25,7 @@ from dragonflow.conf import df_metadata_service from dragonflow.conf import df_provider_networks from dragonflow.conf import df_redis from dragonflow.conf import df_ryu +from dragonflow.conf import df_skydive from dragonflow.conf import df_snat @@ -45,3 +46,4 @@ df_ryu.register_opts() df_provider_networks.register_opts() df_snat.register_opts() df_bgp.register_opts() +df_skydive.register_opts() diff --git a/dragonflow/conf/df_skydive.py b/dragonflow/conf/df_skydive.py new file mode 100644 index 000000000..896993218 --- /dev/null +++ b/dragonflow/conf/df_skydive.py @@ -0,0 +1,38 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + +from dragonflow._i18n import _ + + +df_skydive_app_opts = [ + cfg.StrOpt('analyzer_endpoint', + default='127.0.0.1:8082', + help=_('IP:Port of skydive analyzer.')), + cfg.StrOpt('user', + default='admin', + help=_('Username to authenticate to the skydive analyzer.')), + cfg.StrOpt('password', + help=_('password to authenticate to the skydive analyzer.')), + cfg.IntOpt('update_interval', + default=10, + help=_('Interval (in seconds) between data updates.')), +] + + +def register_opts(): + cfg.CONF.register_opts(df_skydive_app_opts, group='df_skydive') + + +def list_opts(): + return {'df_skydive': df_skydive_app_opts} diff --git a/requirements.txt b/requirements.txt index dc5869546..cb0edadd7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,6 +26,7 @@ six>=1.10.0 # MIT httplib2>=0.9.1 # MIT WebOb>=1.7.1 # MIT jsonmodels>=2.1.3 # BSD License (3 clause) +skydive-client>=0.4.1 # Apache-2.0 # These repos are installed from git in OpenStack CI if the job # configures them as required-projects: