From e095cb81c736abf642139766d4aa4fe0c1e6ea8a Mon Sep 17 00:00:00 2001 From: Ofer Ben-Yacov Date: Thu, 29 Dec 2016 09:57:55 +0200 Subject: [PATCH] Add RPC callbacks --- setup.cfg | 4 +- setup.py | 29 +++++ wan_qos/agent/ovsdb/commands.py | 63 ----------- wan_qos/agent/ovsdb/impl_idl.py | 29 ----- wan_qos/agent/tc_agent.py | 1 - wan_qos/agent/tc_driver.py | 1 - wan_qos/agent/tc_manager.py | 17 ++- wan_qos/common/api.py | 49 ++++++++ .../_wan_qos.py => common/constants.py} | 2 + wan_qos/common/topics.py | 4 +- .../{agent/ovsdb => db/migration}/__init__.py | 0 wan_qos/extensions/wanqos.py | 105 ++++++++++++++++++ wan_qos/services/plugin.py | 98 ++++++++++++++++ .../tests/{unit/ovsdb => system}/__init__.py | 0 wan_qos/tests/system/plugin.py | 39 +++++++ wan_qos/tests/unit/ovsdb/test_ovsdb.py | 42 ------- wan_qos/tests/unit/test_tc_agent.py | 39 ++++++- 17 files changed, 378 insertions(+), 144 deletions(-) create mode 100644 setup.py delete mode 100644 wan_qos/agent/ovsdb/commands.py delete mode 100644 wan_qos/agent/ovsdb/impl_idl.py create mode 100644 wan_qos/common/api.py rename wan_qos/{wanqos_client/_wan_qos.py => common/constants.py} (94%) rename wan_qos/{agent/ovsdb => db/migration}/__init__.py (100%) create mode 100644 wan_qos/extensions/wanqos.py create mode 100644 wan_qos/services/plugin.py rename wan_qos/tests/{unit/ovsdb => system}/__init__.py (100%) create mode 100644 wan_qos/tests/system/plugin.py delete mode 100644 wan_qos/tests/unit/ovsdb/test_ovsdb.py diff --git a/setup.cfg b/setup.cfg index 2e7bfc7..4363a56 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,7 +21,7 @@ classifier = [files] packages = - wan-qos + wan_qos data_files = etc/neutron = etc/wan_qos_agent.ini @@ -33,4 +33,4 @@ console_scripts = neutron.db.alembic_migrations = wan-qos = wan_qos.db.migration:alembic_migrations neutronclient.extension = - wan_qos =wan_qos.wanqos_client._wan_qos + wan_qos = wan_qos.wanqos_client._wanqos diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..782bb21 --- /dev/null +++ b/setup.py @@ -0,0 +1,29 @@ +# Copyright (c) 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT +import setuptools + +# In python < 2.7.4, a lazy loading of package `pbr` will break +# setuptools if some other modules registered functions in `atexit`. +# solution from: http://bugs.python.org/issue15881#msg170215 +try: + import multiprocessing # noqa +except ImportError: + pass + +setuptools.setup( + setup_requires=['pbr>=1.8'], + pbr=True) diff --git a/wan_qos/agent/ovsdb/commands.py b/wan_qos/agent/ovsdb/commands.py deleted file mode 100644 index 59b3d6f..0000000 --- a/wan_qos/agent/ovsdb/commands.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright (c) 2015 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.agent.ovsdb import api -from neutron.agent.ovsdb import impl_idl -from neutron.agent.ovsdb.native import commands as cmd -from neutron.agent.ovsdb.native import connection -from neutron.agent.ovsdb.native import idlutils -from neutron.agent.ovsdb.native import vlog - - -def _get_queue_id_list(api, port_name): - port_row = idlutils.row_by_value(api.idl, 'Port', 'name', port_name) - if port_row and port_row.qos: - qos_row = api._tables['QoS'].rows[port_row.qos[0].uuid] - if qos_row: - queues = idlutils.get_column_value(qos_row, 'queues') - return queues.keys() - - -class GetQueueIdList(cmd.BaseCommand): - def __init__(self, api, port_name): - super(GetQueueIdList, self).__init__(api) - self.port_name = port_name - - def run_idl(self, txn): - self.result = _get_queue_id_list(self.api, self.port_name) - - -class AddQueue(cmd.BaseCommand): - def __init__(self, api, port_name, queue_id, min_rate, max_rate): - super(AddQueue, self).__init__(api) - self.port_name = port_name - self.queue_id = queue_id - self.min_rate = min_rate - self.max_rate = max_rate - - def run_idl(self, txn): - port_row = idlutils.row_by_value(self.api.idl, 'Port', 'name', - self.port_name) - qos_row = self.api._tables['QoS'].rows[port_row.qos[0].uuid] - queues = getattr(qos_row, 'queues', []) - if self.queue_id in queues.keys(): - raise Exception - queue_row = txn.insert(self.api._tables['Queue']) - queue_row.other_config = {'min-rate': self.min_rate, - 'max-rate': self.max_rate} - queues[self.queue_id] = queue_row - qos_row.verify('queues') - qos_row.queues = queues - - self.result = 'Done' diff --git a/wan_qos/agent/ovsdb/impl_idl.py b/wan_qos/agent/ovsdb/impl_idl.py deleted file mode 100644 index 6979971..0000000 --- a/wan_qos/agent/ovsdb/impl_idl.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright (c) 2015 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.agent.ovsdb import api -from neutron.agent.ovsdb import impl_idl -from neutron.agent.ovsdb.native import commands as cmd -from neutron.agent.ovsdb.native import connection -from neutron.agent.ovsdb.native import idlutils -from neutron.agent.ovsdb.native import vlog - - -class OvsdbQosIdl(impl_idl.OvsdbIdl): - def __init__(self, context, conn, timeout): - super(OvsdbQosIdl, self).__init__(context) - self.ovsdb_connection = connection.Connection(conn, - timeout, - 'Open_vSwitch') - diff --git a/wan_qos/agent/tc_agent.py b/wan_qos/agent/tc_agent.py index 4eb233d..7d59402 100644 --- a/wan_qos/agent/tc_agent.py +++ b/wan_qos/agent/tc_agent.py @@ -45,7 +45,6 @@ def main(): common_config.init(sys.argv[1:]) config.setup_logging() server = neutron_service.Service.create( - binary='neutron-wan-qos-agent', topic=topics.TC_AGENT, report_interval=10, manager='wan_qos.agent.tc_manager.TcAgentManager') diff --git a/wan_qos/agent/tc_driver.py b/wan_qos/agent/tc_driver.py index bc35874..02e80f8 100644 --- a/wan_qos/agent/tc_driver.py +++ b/wan_qos/agent/tc_driver.py @@ -16,7 +16,6 @@ from subprocess import call from subprocess import check_call -from oslo_config import cfg from oslo_log import log as logging import agent_api diff --git a/wan_qos/agent/tc_manager.py b/wan_qos/agent/tc_manager.py index fc72dbf..623a479 100644 --- a/wan_qos/agent/tc_manager.py +++ b/wan_qos/agent/tc_manager.py @@ -15,22 +15,33 @@ from oslo_config import cfg from oslo_log import log as logging +import oslo_messaging as messaging + +from neutron import context as ctx from wan_qos.agent import tc_driver +from wan_qos.common import api +from wan_qos.common import topics LOG = logging.getLogger(__name__) class TcAgentManager: - def __init__(self, host, conf=None): + + target = messaging.Target(version='1.0') + + def __init__(self, host=None, conf=None): self.agent = tc_driver.TcDriver() if not conf: self.conf = cfg.CONF else: self.conf = conf + if not host: + host = self.conf.host lan_port = self.conf.WANQOS.lan_port_name wan_port = self.conf.WANQOS.wan_port_name self.agent.set_ports(lan_port, wan_port) + self.plugin_rpc = api.TcPluginApi(host, topics.TC_PLUGIN) def init_host(self): self.agent.clear_all() @@ -45,9 +56,9 @@ class TcAgentManager: } self.agent.set_root_queue(tc_dict) - def after_start(self): LOG.info("WAN QoS agent started") def periodic_tasks(self, context, raise_on_error=False): - pass \ No newline at end of file + LOG.info( + self.plugin_rpc.agent_up_notification(ctx.get_admin_context())) diff --git a/wan_qos/common/api.py b/wan_qos/common/api.py new file mode 100644 index 0000000..e457689 --- /dev/null +++ b/wan_qos/common/api.py @@ -0,0 +1,49 @@ +# Copyright 2016 Huawei corp. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import oslo_messaging + +from neutron.common import rpc as n_rpc + +from wan_qos.common import topics + + +class TcPluginApi(object): + def __init__(self, host, topic=topics.TC_PLUGIN): + self.host = host + target = oslo_messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) + + def agent_up_notification(self, context): + cctxt = self.client.prepare() + return cctxt.cast(context, 'agent_up_notification', host=self.host) + + def get_configuration_from_db(self, context): + cctxt = self.client.prepare() + return cctxt.call(context, 'get_configuration_from_db', host=self.host) + + +class TcAgentApi(object): + def __init__(self, host, topic=topics.TC_AGENT): + self.host = host + target = oslo_messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) + + def create_wan_qos(self, context, wan_qos_dict): + cctxt = self.client.prepare() + return cctxt.call(context, + 'create_wan_qos', + wan_qos_dict) + diff --git a/wan_qos/wanqos_client/_wan_qos.py b/wan_qos/common/constants.py similarity index 94% rename from wan_qos/wanqos_client/_wan_qos.py rename to wan_qos/common/constants.py index b49e332..1183adb 100644 --- a/wan_qos/wanqos_client/_wan_qos.py +++ b/wan_qos/common/constants.py @@ -13,3 +13,5 @@ # License for the specific language governing permissions and limitations # under the License. +WAN_QOS = 'WAN_QOS' +WANQOS = 'wanqos' \ No newline at end of file diff --git a/wan_qos/common/topics.py b/wan_qos/common/topics.py index 7f1c200..a244cca 100644 --- a/wan_qos/common/topics.py +++ b/wan_qos/common/topics.py @@ -13,5 +13,5 @@ # License for the specific language governing permissions and limitations # under the License. -TC_AGENT = 'wan_tc_agent' -TC_PLUGIN = 'wan_tc_plugin' \ No newline at end of file +TC_AGENT = 'wan_qos_agent' +TC_PLUGIN = 'wan_qos_plugin' diff --git a/wan_qos/agent/ovsdb/__init__.py b/wan_qos/db/migration/__init__.py similarity index 100% rename from wan_qos/agent/ovsdb/__init__.py rename to wan_qos/db/migration/__init__.py diff --git a/wan_qos/extensions/wanqos.py b/wan_qos/extensions/wanqos.py new file mode 100644 index 0000000..71b3ed6 --- /dev/null +++ b/wan_qos/extensions/wanqos.py @@ -0,0 +1,105 @@ +# Copyright 2016 Huawei corp. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +from neutron_lib.api import extensions +from neutron.api.v2 import resource_helper + +from wan_qos.common import constants + +RESOURCE_ATTRIBUTE_MAP = { + constants.WAN_QOS: { + 'id': {'allow_post': False, 'allow_put': False, + 'is_visible': True}, + 'max_rate': {'allow_post': True, 'allow_put': False, + 'validate': {'type:string': None}, + 'is_visible': True, 'default': ''}, + 'min_rate': {'allow_post': True, 'allow_put': False, + 'validate': {'type:string': None}, + 'is_visible': True, 'default': ''}, + 'network_id': {'allow_post': True, 'allow_put': False, + 'validate': {'type:string': None}, + 'is_visible': True}, + 'tenant_id': {'allow_post': True, 'allow_put': False, + 'validate': {'type:string': None}, + 'required_by_policy': True, + 'is_visible': True} + }, +} + + +class WanQos(extensions.ExtensionDescriptor): + + @classmethod + def get_name(cls): + return "WAN QoS" + + @classmethod + def get_alias(cls): + return "wan_qos" + + @classmethod + def get_description(cls): + return "Limit traffic on WAN links" + + @classmethod + def get_updated(cls): + return "2016-12-01T00:00:00-00:00" + + @classmethod + def get_resources(cls): + """Returns Ext Resources.""" + + mem_actions = {} + plural_mappings = resource_helper.build_plural_mappings( + {}, RESOURCE_ATTRIBUTE_MAP) + resources = resource_helper.build_resource_info(plural_mappings, + RESOURCE_ATTRIBUTE_MAP, + constants.WANQOS, + action_map=mem_actions, + register_quota=True, + translate_name=True) + + return resources + + def get_extended_resources(self, version): + if version == "2.0": + return RESOURCE_ATTRIBUTE_MAP + else: + return {} + + +class WanQosPluginBase(object): + + @abc.abstractmethod + def create_wan_qos(self, context, wan_qos): + pass + + @abc.abstractmethod + def get_wan_qos(self, id): + pass + + @abc.abstractmethod + def get_wan_qoss(self): + pass + + @abc.abstractmethod + def update_wan_qos(self, context, id, wan_qos): + pass + + @abc.abstractmethod + def delete_wan_qos(self, context, id): + pass diff --git a/wan_qos/services/plugin.py b/wan_qos/services/plugin.py new file mode 100644 index 0000000..ff9e801 --- /dev/null +++ b/wan_qos/services/plugin.py @@ -0,0 +1,98 @@ +# Copyright 2016 Huawei corp. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.common import rpc as n_rpc +from neutron.db import agents_db + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import importutils +import oslo_messaging as messaging + +from neutron.services import service_base + +from wan_qos.common import api +from wan_qos.common import constants +from wan_qos.common import topics +from wan_qos.extensions import wanqos + +LOG = logging.getLogger(__name__) + + +class PluginRpcCallback(object): + target = messaging.Target(version='1.0') + + def __init__(self, plugin): + super(PluginRpcCallback, self).__init__() + self.plugin = plugin + LOG.debug('rpc callback started.') + + def agent_up_notification(self, context, host): + LOG.debug('got up notification from %s' % host) + self.plugin.agent_up_notification(host) + + +class WanQosDriver(service_base.ServicePluginBase): + def get_plugin_description(self): + pass + + def get_plugin_type(self): + pass + + @property + def service_type(self): + return 'wan_qos' + + +class WanQosPlugin(wanqos.WanQosPluginBase): + def __init__(self): + rpc_callback = importutils.import_object( + 'wan_qos.services.plugin.PluginRpcCallback', self) + endpoints = ( + [rpc_callback, agents_db.AgentExtRpcCallback()]) + self.agent_rpc = api.TcAgentApi(cfg.CONF.host) + self.conn = n_rpc.create_connection() + self.conn.create_consumer(topics.TC_PLUGIN, + endpoints, + fanout=False) + self.conn.consume_in_threads() + + def get_plugin_type(self): + """Get type of the plugin.""" + return constants.WANQOS + + def get_plugin_description(self): + """Get description of the plugin.""" + return 'Plugin for rate limiting on WAN links.' + + def get_wan_qos(self, id): + pass + + def get_wan_qoss(self): + pass + + def delete_wan_qos(self, context, id): + pass + + def update_wan_qos(self, context, id, wan_qos): + pass + + def create_wan_qos(self, context, wan_qos): + pass + # self.agent_rpc.create_wan_qos(context, wan_qos) + + def agent_up_notification(self, host): + LOG.debug('agent %s is up' % host) + return 'OK' diff --git a/wan_qos/tests/unit/ovsdb/__init__.py b/wan_qos/tests/system/__init__.py similarity index 100% rename from wan_qos/tests/unit/ovsdb/__init__.py rename to wan_qos/tests/system/__init__.py diff --git a/wan_qos/tests/system/plugin.py b/wan_qos/tests/system/plugin.py new file mode 100644 index 0000000..49487ae --- /dev/null +++ b/wan_qos/tests/system/plugin.py @@ -0,0 +1,39 @@ +# Copyright 2016 Huawei corp. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import time +import sys + +from oslo_config import cfg +from oslo_service import service + +from neutron.agent.common import config +from neutron.common import config as common_config +from neutron import service as neutron_service + +from wan_qos.common import topics +from wan_qos.services import plugin + + +def main(): + common_config.init(sys.argv[1:]) + config.setup_logging() + wanqos_plugin = plugin.WanQosPlugin() + while True: + time.sleep(3) + +if __name__ == '__main__': + main() diff --git a/wan_qos/tests/unit/ovsdb/test_ovsdb.py b/wan_qos/tests/unit/ovsdb/test_ovsdb.py deleted file mode 100644 index c2a9693..0000000 --- a/wan_qos/tests/unit/ovsdb/test_ovsdb.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright (c) 2016 Huawei, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock -import testtools - -from neutron.tests import base - -import wan_qos.agent.ovsdb.impl_idl as impl_idl -import wan_qos.agent.ovsdb.commands as cmd - - -class OvsdbIdlTestCase(base.BaseTestCase): - def setUp(self): - super(OvsdbIdlTestCase, self).setUp() - self.vsctl_timeout = 10 - self.ovsdb_idl = impl_idl.OvsdbQosIdl(self, 'tcp:127.0.0.1:6640', 30) - self.ovsdb_idl.ovsdb_connection.start() - self.idl = self.ovsdb_idl.ovsdb_connection.idl - - def test1(self): - assert self.ovsdb_idl.br_exists('tc-br').execute()==True - - def get_queue_list(self): - print (cmd.GetQueueIdList(self.ovsdb_idl, 'enp1s0f1').execute()) - - def add_queue(self): - print (cmd.AddQueue(self.ovsdb_idl, 'enp1s0f1', 1, '1000000', '1000000') - .execute()) - - diff --git a/wan_qos/tests/unit/test_tc_agent.py b/wan_qos/tests/unit/test_tc_agent.py index 0913198..fbd21a1 100644 --- a/wan_qos/tests/unit/test_tc_agent.py +++ b/wan_qos/tests/unit/test_tc_agent.py @@ -12,14 +12,40 @@ # License for the specific language governing permissions and limitations # under the License. -from wan_qos.agent import tc_driver +import time from neutron.tests import base +from oslo_config import cfg + +from wan_qos.agent import tc_driver +from wan_qos.agent import tc_manager +from wan_qos.services import plugin + +wanqos_group = cfg.OptGroup(name='WANQOS', + title='WAN QoS options') + +opts = [ + cfg.StrOpt('lan_port_name', + default='enp1s0f0', + help='LAN side port name'), + cfg.StrOpt('lan_max_rate', + default='100mbit', + help='LAN side port rate'), + cfg.StrOpt('wan_port_name', + default='enp1s0f1', + help='WAN side port name'), + cfg.StrOpt('wan_max_rate', + default='100mbit', + help='WAN side port rate') +] + class TestTcDriver(base.BaseTestCase): def setUp(self): super(TestTcDriver, self).setUp() + cfg.CONF.register_group(wanqos_group) + cfg.CONF.register_opts(opts, group='WANQOS') self.tc_agent = tc_driver.TcDriver() self.tc_agent.set_ports('enp1s0f0', 'enp1s0f1') @@ -112,3 +138,14 @@ class TestTcDriver(base.BaseTestCase): 'child': '10' } self.tc_agent.remove_traffic_limiter(tc_dict) + + +class TestApiMessages(base.BaseTestCase): + def setUp(self): + super(TestApiMessages, self).setUp() + cfg.CONF.register_group(wanqos_group) + cfg.CONF.register_opts(opts, group='WANQOS') + self.plugin = plugin.WanQosPlugin() + + +