This commit is contained in:
Dima Kuznetsov 2017-04-18 16:14:06 +03:00
parent 8625c47f16
commit 56173f841e
17 changed files with 81 additions and 105 deletions

View File

@ -19,39 +19,40 @@ import six
@six.add_metaclass(abc.ABCMeta)
class AgentInterface(object):
""" Base class that defines the contract for TC agent"""
"Base class that defines the contract for TC agent"
@abc.abstractmethod
def clear_all(self):
""" delete all traffic control configurations """
"Delete all traffic control configurations"
@abc.abstractmethod
def set_ports(self, in_port, out_port):
""" set the names of the LAN and WAN facing ports """
"Set the names of the LAN and WAN facing ports"
@abc.abstractmethod
def set_root_queue(self, tc_dict):
""" sets the root qdisc with its max rate of the WAN link to be set
as upper limit"""
"""Sets the root qdisc with its max rate of the WAN link to be set
as upper limit
"""
@abc.abstractmethod
def create_traffic_class(self, tc_dict):
""" Add traffic class using traffic information from the
dictionary. """
"Add traffic class using traffic information from the dictionary."
@abc.abstractmethod
def update_traffic_class(self, tc_dict):
""" update traffic control using information from tc dictionary. """
"Update traffic control using information from tc dictionary."
@abc.abstractmethod
def remove_traffic_class(self, tc_dict):
""" update traffic control using information from tc dictionary. """
"Update traffic control using information from tc dictionary."
@abc.abstractmethod
def create_filter(self, tc_dict):
""" create traffic filter that is used to route packets to the
right queue"""
"""Create traffic filter that is used to route packets to the
right queue
"""
@abc.abstractmethod
def remove_filter(self, tc_dict):
""" remove traffic filter """
"Remove traffic filter"

View File

@ -15,16 +15,15 @@
import sys
import eventlet
from neutron.common import config as common_config
from neutron.conf.agent import common as config
from neutron import service as neutron_service
from oslo_config import cfg
from oslo_service import service
from neutron.conf.agent import common as config
from neutron.common import config as common_config
from neutron import service as neutron_service
from wan_qos.common import topics
import eventlet
eventlet.monkey_patch()
WANTC_OPTS = [
@ -54,5 +53,6 @@ def main():
manager='wan_qos.agent.tc_manager.TcAgentManager')
service.launch(cfg.CONF, server).wait()
if __name__ == '__main__':
main()

View File

@ -13,14 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from subprocess import call
from subprocess import check_call
from oslo_log import log as logging
import subprocess
from neutron_lib import exceptions
from oslo_log import log as logging
import agent_api
from wan_qos.agent import agent_api
LOG = logging.getLogger(__name__)
@ -38,19 +36,20 @@ class TcDriver(agent_api.AgentInterface):
def clear_all(self):
for port in self.ports.values():
call('sudo tc qdisc del dev %s root' % port, shell=True)
subprocess.call('sudo tc qdisc del dev %s root' % port, shell=True)
def set_root_queue(self, tc_dict):
check_call('sudo tc qdisc add dev %s handle 1: root htb' %
self.ports[tc_dict['port_side']], shell=True)
subprocess.check_call('sudo tc qdisc add dev %s handle 1: root htb' %
self.ports[tc_dict['port_side']], shell=True)
class_str = 'sudo tc class add dev %s parent 1: classid 1:1 ' \
'htb rate %s ceil %s'
check_call(class_str % (self.ports[tc_dict['port_side']],
str(tc_dict['max_rate']),
str(tc_dict['max_rate'])), shell=True)
subprocess.check_call(class_str % (self.ports[tc_dict['port_side']],
str(tc_dict['max_rate']),
str(tc_dict['max_rate'])),
shell=True)
def create_traffic_class(self, tc_dict):
""" Create new traffic class.
"""Create new traffic class.
Parameters:
port_side - lan_port / wan_port
parent - the parent class
@ -59,7 +58,8 @@ class TcDriver(agent_api.AgentInterface):
max - maximum traffic rate. if not provide, the maximum rate will
be limitted by parent maximum rate.
"""
LOG.debug('got request for new class: %s' % tc_dict)
LOG.debug('got request for new class: %s', tc_dict)
tc_dict['command'] = 'add'
self._create_or_update_class(tc_dict)
LOG.debug('new class created.')
@ -75,7 +75,7 @@ class TcDriver(agent_api.AgentInterface):
self.ports[tc_dict['port_side']],
tc_dict['child']
)
check_call(cmd, shell=True)
subprocess.check_call(cmd, shell=True)
def _create_or_update_class(self, tc_dict):
cmd = 'sudo tc class %s dev %s parent 1:%s classid 1:%s htb' % (
@ -90,7 +90,7 @@ class TcDriver(agent_api.AgentInterface):
cmd += ' rate 1kbit'
if 'max' in tc_dict:
cmd += ' ceil %s' % tc_dict['max']
check_call(cmd, shell=True)
subprocess.check_call(cmd, shell=True)
def create_filter(self, tc_dict):
@ -109,11 +109,11 @@ class TcDriver(agent_api.AgentInterface):
cmd += ' match u16 0x12B5 0xFFFF at 22' # VxLAN port
cmd += ' match u32 0x%0.6X00 0xFFFFFF00 at 32' % int(vni)
cmd += ' flowid 1:%s' % tc_dict['child']
LOG.debug('creating filter: %s' % cmd)
check_call(cmd, shell=True)
LOG.debug('creating filter: %s', cmd)
subprocess.check_call(cmd, shell=True)
def remove_filter(self, tc_dict):
cmd = 'sudo tc filter del dev %s ' % self.ports[tc_dict['port_side']]
cmd += ' parent 1:0 protocol ip prio 1 u32'
cmd += ' flowid 1:%s' % tc_dict['child']
check_call(cmd, shell=True)
subprocess.check_call(cmd, shell=True)

View File

@ -86,7 +86,7 @@ class TcAgentManager(manager.Manager):
self.plugin_rpc.device_heartbeat(context, self.host)
def create_wtc_class(self, context, wtc_class_dict):
LOG.debug('got request for new class: %s' % wtc_class_dict)
LOG.debug('got request for new class: %s', wtc_class_dict)
class_dict = {
'parent': wtc_class_dict['parent_class_ext_id'],
'child': wtc_class_dict['class_ext_id']
@ -97,12 +97,16 @@ class TcAgentManager(manager.Manager):
class_dict['min'] = wtc_class_dict['min']
if wtc_class_dict['max']:
class_dict['max'] = wtc_class_dict['max']
if wtc_class_dict['direction'] == 'in' or wtc_class_dict[
'direction'] == 'both':
if (
wtc_class_dict['direction'] == 'in' or
wtc_class_dict['direction'] == 'both'
):
class_dict['port_side'] = 'lan_port'
self._create_wtc_class(class_dict)
if wtc_class_dict['direction'] == 'out' or wtc_class_dict[
'direction'] == 'both':
if (
wtc_class_dict['direction'] == 'out' or
wtc_class_dict['direction'] == 'both'
):
class_dict['port_side'] = 'wan_port'
self._create_wtc_class(class_dict)

View File

@ -78,4 +78,4 @@ class TcAgentApi(object):
cctxt = self.client.prepare()
return cctxt.call(context,
'delete_wtc_filter',
wtc_filter=wtc_filter)
wtc_filter=wtc_filter)

View File

@ -15,18 +15,13 @@
import threading
from oslo_utils import uuidutils
from oslo_utils import timeutils
from oslo_log import log as logging
import sqlalchemy as sa
from neutron import context as ctx
from neutron.db.models import segment
from neutron_lib import exceptions
from oslo_log import log as logging
from oslo_utils import timeutils
from oslo_utils import uuidutils
from wan_qos.db.models import wan_tc as models
from wan_qos.common import constants
LOG = logging.getLogger(__name__)
@ -58,7 +53,7 @@ class WanTcDb(object):
with context.session.begin(subtransactions=True):
if not device:
LOG.debug('New device connected: %s' % host_info)
LOG.debug('New device connected: %s', host_info)
now = timeutils.utcnow()
wan_tc_device = models.WanTcDevice(
id=uuidutils.generate_uuid(),
@ -70,7 +65,7 @@ class WanTcDb(object):
)
return context.session.add(wan_tc_device)
else:
LOG.debug('updating uptime for device: %s' % host_info['host'])
LOG.debug('updating uptime for device: %s', host_info['host'])
device.uptime = timeutils.utcnow()
def device_heartbeat(self, context, host):
@ -81,7 +76,7 @@ class WanTcDb(object):
with context.session.begin(subtransactions=True):
device.heartbeat_timestamp = timeutils.utcnow()
else:
LOG.error('Got heartbeat for non-existing device: %s' % host)
LOG.error('Got heartbeat for non-existing device: %s', host)
def get_all_devices(self, context, filters=None,
fields=None,
@ -209,7 +204,7 @@ class WanTcDb(object):
with context.session.begin(subtransactions=True):
context.session.delete(device)
else:
LOG.error('Trying to delete none existing device. id=%s' % id)
LOG.error('Trying to delete none existing device. id=%s', id)
def get_device(self, context, id):
device = context.session.query(models.WanTcDevice).filter_by(
@ -291,7 +286,7 @@ class WanTcDb(object):
with context.session.begin(subtransactions=True):
context.session.delete(filter_db)
else:
LOG.error('Trying to delete none existing tc filter. id=%s' % id)
LOG.error('Trying to delete none existing tc filter. id=%s', id)
def _get_collection(self, context, model, dict_func, filters=None,
fields=None, sorts=None, limit=None, marker_obj=None,

View File

@ -15,8 +15,8 @@
import abc
from neutron_lib.api import extensions
from neutron.api.v2 import resource_helper
from neutron_lib.api import extensions
from wan_qos.common import constants

View File

@ -15,8 +15,8 @@
import abc
from neutron_lib.api import extensions
from neutron.api.v2 import resource_helper
from neutron_lib.api import extensions
from wan_qos.common import constants

View File

@ -15,8 +15,8 @@
import abc
from neutron_lib.api import extensions
from neutron.api.v2 import resource_helper
from neutron_lib.api import extensions
from wan_qos.common import constants

View File

@ -15,8 +15,8 @@
import abc
from neutron_lib.api import extensions
from neutron.api.v2 import resource_helper
from neutron_lib.api import extensions
from wan_qos.common import constants
@ -60,7 +60,7 @@ class Wantcfilter(extensions.ExtensionDescriptor):
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
"Returns Ext Resources."
mem_actions = {}
plural_mappings = resource_helper.build_plural_mappings(

View File

@ -13,25 +13,23 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.plugins import directory
from neutron.common import rpc as n_rpc
from neutron.db import agents_db
from neutron_lib import exceptions
from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
import oslo_messaging as messaging
from oslo_utils import importutils
from wan_qos.common import api
from wan_qos.common import constants
from wan_qos.common import topics
from wan_qos.db import wan_qos_db
from wan_qos.extensions import wantcfilter
from wan_qos.extensions import wantcdevice
from wan_qos.extensions import wantcclass
from wan_qos.extensions import wantc
from wan_qos.extensions import wantcclass
from wan_qos.extensions import wantcdevice
from wan_qos.extensions import wantcfilter
LOG = logging.getLogger(__name__)
@ -45,7 +43,7 @@ class PluginRpcCallback(object):
LOG.debug('rpc callback started.')
def agent_up_notification(self, context, host_info):
LOG.debug('got up notification from %s' % host_info['host'])
LOG.debug('got up notification from %s', host_info['host'])
self.plugin.db.agent_up_notification(context, host_info)
def device_heartbeat(self, context, host):
@ -114,7 +112,7 @@ class WanQosPlugin(wantcfilter.WanTcFilterPluginBase,
pass
def create_wan_tc_class(self, context, wan_tc_class):
LOG.debug('got new class request: %s' % wan_tc_class)
LOG.debug('got new class request: %s', wan_tc_class)
wtc_class_db = self.db.create_wan_tc_class(context,
wan_tc_class[
'wan_tc_class'])
@ -122,7 +120,7 @@ class WanQosPlugin(wantcfilter.WanTcFilterPluginBase,
return wtc_class_db
def delete_wan_tc_class(self, context, id):
LOG.debug('Got request to delete class id: %s' % id)
LOG.debug('Got request to delete class id: %s', id)
class_tree = self.db.get_class_tree(id)
self.db.delete_wtc_class(context, id)
self.agent_rpc.delete_wtc_class(context, class_tree)
@ -167,8 +165,10 @@ class WanQosPlugin(wantcfilter.WanTcFilterPluginBase,
"""Get tenant id for creation of resources."""
if context.is_admin and 'tenant_id' in resource:
tenant_id = resource['tenant_id']
elif ('tenant_id' in resource and
resource['tenant_id'] != context.tenant_id):
elif (
'tenant_id' in resource and
resource['tenant_id'] != context.tenant_id
):
reason = 'Cannot create resource for another tenant'
raise exceptions.AdminRequired(reason=reason)
else:
@ -194,7 +194,7 @@ class WanQosPlugin(wantcfilter.WanTcFilterPluginBase,
return filters
def create_wan_tc(self, context, wan_tc):
LOG.debug('got WAN_TC: %s' % wan_tc)
LOG.debug('got WAN_TC: %s', wan_tc)
wan_tc_req = wan_tc['wan_tc']
filter_db = self.get_wan_tc_filters(context, filters={
@ -233,7 +233,7 @@ class WanQosPlugin(wantcfilter.WanTcFilterPluginBase,
raise exceptions.BadRequest(msg='Not implemented yet!')
def delete_wan_tc(self, context, id):
LOG.debug('Deleting TC: %s' % id)
LOG.debug('Deleting TC: %s', id)
tc_filter = self.get_wan_tc_filter(context, id)
class_id = tc_filter['class_id']
self.delete_wan_tc_filter(context, id)

View File

@ -14,24 +14,19 @@
# under the License.
import time
import sys
from oslo_config import cfg
from oslo_service import service
import time
from neutron.agent.common import config
from neutron.common import config as common_config
from neutron import service as neutron_service
from wan_qos.common import topics
from wan_qos.services import plugin
def main():
common_config.init(sys.argv[1:])
config.setup_logging()
wanqos_plugin = plugin.WanQosPlugin()
plugin.WanQosPlugin()
while True:
time.sleep(3)

View File

@ -1,9 +1,6 @@
from neutron import context as ctx
from neutron.tests import base
from neutron.tests.unit import testlib_api
from oslo_config import cfg
from wan_qos.db import wan_qos_db
from wan_qos.services import plugin
@ -28,8 +25,8 @@ class TestTcDb(testlib_api.SqlTestCase):
class_db_1 = self._add_class(None, 'both', '1mbit', '2mbit')
class_db_2 = self._add_class(class_db_1['id'], 'both', '2mbit',
'3mbit')
class_db_3 = self._add_class(class_db_2['id'], 'both', '3mbit',
'4mbit')
self._add_class(class_db_2['id'], 'both', '3mbit',
'4mbit')
class_by_id = self.db.get_class_by_id(self.context, class_db_1['id'])
# class_by_id = self.db.get_class_by_id(self.context, '111')
@ -102,14 +99,13 @@ class TestPlugin(testlib_api.SqlTestCase):
class_db_1 = self._add_class(None, 'both', '1mbit', '2mbit')
class_db_2 = self._add_class(class_db_1['id'], 'both', '2mbit',
'3mbit')
class_db_3 = self._add_class(class_db_2['id'], 'both', '3mbit',
'4mbit')
self._add_class(class_db_2['id'], 'both', '3mbit',
'4mbit')
tc_class = self.plugin.get_wan_tc_class(ctx.get_admin_context(),
class_db_1['id'])
print(tc_class)
filters = {'id': [class_db_1['id']]}
tc_classes = self.plugin.get_wan_tc_classs(ctx.get_admin_context())
print(tc_classes)
@ -135,12 +131,6 @@ class TestPlugin(testlib_api.SqlTestCase):
print(tc_classes)
def test_add_filter(self):
class_db = self._add_class(None, 'both', '1mbit', '2mbit')

View File

@ -12,14 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from neutron.tests import base
from oslo_config import cfg
from wan_qos.agent import tc_driver
from wan_qos.agent import tc_manager
from wan_qos.services import plugin
WANTC_group = cfg.OptGroup(name='WANTC',
@ -146,5 +143,3 @@ class TestApiMessages(base.BaseTestCase):
cfg.CONF.register_group(WANTC_group)
cfg.CONF.register_opts(opts, group='WANTC')
self.plugin = plugin.WanQosPlugin()

View File

@ -15,9 +15,8 @@
from neutronclient._i18n import _
from neutronclient.common import extension
from neutronclient.common import exceptions
from neutronclient.common import extension
from wan_qos.common import constants

View File

@ -15,9 +15,8 @@
from neutronclient._i18n import _
from neutronclient.common import extension
from neutronclient.common import exceptions
from neutronclient.common import extension
from wan_qos.common import constants

View File

@ -15,9 +15,7 @@
from neutronclient._i18n import _
from neutronclient.common import extension
from neutronclient.common import exceptions
from wan_qos.common import constants