Reorganize files for akanda.router package ns

Move the files around so that requirements, testing, and source are
clustered for the router component.  The new organization also modifies
import paths to move modules into the akanda.router namespace.  Tests
are also moved into a single heirarchy.
This commit is contained in:
Mark McClain 2012-08-21 17:15:39 -07:00
commit 41ebec2ba9
36 changed files with 2030 additions and 0 deletions

1
akanda/__init__.py Normal file
View File

@ -0,0 +1 @@
__import__('pkg_resources').declare_namespace(__name__)

View File

View File

View File

@ -0,0 +1,25 @@
"""Set up the API server application instance
"""
import flask
from akanda.router.api import v1
app = flask.Flask(__name__)
app.register_blueprint(v1.base)
app.register_blueprint(v1.system)
app.register_blueprint(v1.firewall)
@app.before_request
def attach_config():
'''
Attach any configuration before instantiating API
'''
pass
def main():
#TODO(mark): make this use a config file
app.debug = True
app.run(host='0.0.0.0', port=5000)

View File

@ -0,0 +1,3 @@
from .base import base
from .firewall import firewall
from .system import system

View File

@ -0,0 +1,21 @@
"""
Blueprint for the "base" portion of the version 1 of the API.
"""
from akanda.router import utils
base = utils.blueprint_factory(__name__)
@base.before_request
def attach_config():
#Use for attaching config prior to starting
pass
@base.route('/')
def welcome():
'''
Show welcome message
'''
return 'Welcome to the Akanda appliance'

View File

@ -0,0 +1,98 @@
"""
Blueprint for version 1 of the firewall API.
"""
from akanda.router import utils
from akanda.router.drivers import pf
firewall = utils.blueprint_factory(__name__)
@firewall.route('/rules')
def get_rules():
'''
Show loaded firewall rules by pfctl
'''
pf_mgr = pf.PFManager()
results = pf_mgr.get_rules()
return results
@firewall.route('/states')
def get_states():
'''
Show firewall state table
'''
pf_mgr = pf.PFManager()
results = pf_mgr.get_states()
return results
@firewall.route('/anchors')
def get_anchors():
'''
Show loaded firewall anchors by pfctl
'''
pf_mgr = pf.PFManager()
results = pf_mgr.get_anchors()
return results
@firewall.route('/sources')
def get_sources():
'''
Show loaded firewall sources by pfctl
'''
pf_mgr = pf.PFManager()
results = pf_mgr.get_sources()
return results
@firewall.route('/info')
def get_info():
'''
Show verbose running firewall information
'''
pf_mgr = pf.PFManager()
results = pf_mgr.get_info()
return results
@firewall.route('/tables')
def get_tables():
'''
Show loaded firewall tables by pfctl
'''
pf_mgr = pf.PFManager()
results = pf_mgr.get_tables()
return results
@firewall.route('/labels')
def get_labels():
'''
Show loaded firewall labels by pfctl
'''
pf_mgr = pf.PFManager()
results = pf_mgr.get_labels()
return results
@firewall.route('/timeouts')
def get_timeouts():
'''
Show firewall connection timeouts
'''
pf_mgr = pf.PFManager()
results = pf_mgr.get_timeouts()
return results
@firewall.route('/memory')
def get_memory():
'''
Show firewall memory
'''
pf_mgr = pf.PFManager()
results = pf_mgr.get_memory()
return results

View File

@ -0,0 +1,43 @@
"""
Blueprint for the "system" portion of the version 1 of the API.
"""
import json
from flask import Response
from akanda.router import utils
from akanda.router.drivers import ifconfig
system = utils.blueprint_factory(__name__)
@system.route('/check_route')
def check_route():
return Response("you got it! *** " + __name__ + " *** " + __file__)
@system.route('/interface/<ifname>')
def get_interface(ifname):
'''
Show interface parameters given an interface name.
For example ge1, ge2 for generic ethernet
'''
if_mgr = ifconfig.InterfaceManager()
result = if_mgr.get_interface(ifname)
js = json.dumps({"interface": result.to_dict()}, cls=utils.ModelSerializer)
resp = Response(js, status=200, mimetype='application/json')
return resp
@system.route('/interfaces')
def get_interfaces():
'''
Show all interfaces and parameters
'''
if_mgr = ifconfig.InterfaceManager()
results = if_mgr.get_interfaces()
interfaces = [x.to_dict() for x in results]
js = json.dumps({"interfaces": interfaces}, cls=utils.ModelSerializer)
resp = Response(js, status=200, mimetype='application/json')
return resp

View File

View File

@ -0,0 +1,33 @@
import re
import sys
from akanda.router.drivers import ifconfig
def configure_ssh():
"""
"""
mgr = ifconfig.InterfaceManager()
interfaces = mgr.get_interfaces(['em', 're'])
interfaces.sort(key=lambda x: x.ifname)
primary = interfaces[0]
if not primary.is_up:
mgr.up(primary)
primary = mgr.get_interface(primary)
for address in primary.addresses:
if str(address.ip).startswith('fe80'):
listen_ip = '%s%%%s' % (address.ip, primary.ifname)
else:
sys.stderr.write('Unable to bring up first interface (%s)!\n' %
primary.ifname)
sys.exit(1)
config = open('/etc/ssh/sshd_config', 'r').read()
config = re.sub('(^|\n)(#)?(ListenAddress|AddressFamily) .*', '', config)
config += '\n'.join(
['ListenAddress %s' % listen_ip, 'AddressFamily inet6'])
open('/etc/ssh/sshd_config', 'w+').write(config)
sys.stderr.write('sshd configured to listen on %s\n' % listen_ip)

View File

View File

@ -0,0 +1,12 @@
from akanda.router import utils
class Manager(object):
def __init__(self, root_helper='sudo'):
self.root_helper = root_helper
def sudo(self, *args):
return utils.execute([self.EXECUTABLE] + list(args), self.root_helper)
def do(self, *args):
return utils.execute([self.EXECUTABLE] + list(args))

View File

@ -0,0 +1,113 @@
import logging
import os
import re
from cStringIO import StringIO
from akanda.drivers import base
from akanda.utils import execute, replace_file
LOG = logging.getLogger(__name__)
RUN_DIR = '/var/run/dhcp'
PID_FILE = os.path.join(RUN_DIR, 'dnsmasq.pid')
HOSTS_FILE = os.path.join(RUN_DIR, 'dnsmasq.hosts')
OPTS_FILE = os.path.join(RUN_DIR, 'dnsmasq.opts')
class DnsManager(base.Manager):
"""
"""
EXECUTABLE = '/sbin/dnsmasq'
def __init__(self, interfaces, allocations,
domain='akanda.local', root_helper='sudo'):
super(DnsManager, self).__init__(root_helper=root_helper)
self.interfaces = interfaces
self.allocations = allocations
self.domain = domain
# XXX self.tags is referenced in a couple places but never explicitly
# set; this should probably be done here; please fix
self._make_tags()
cmd = [
'--no-hosts',
'--no-resolv',
'--strict-order',
'--bind-interfaces',
'--except-interface=lo',
'--domain=%s' % self.domain,
'--pid-file=%s' % PID_FILE,
'--dhcp-hostsfile=%s' % HOSTS_FILE,
'--dhcp-optsfile=%s' % OPTS_FILE,
'--leasefile-ro',
]
for interface in interfaces:
cmd.append('--interface=%s' % interface.ifname)
for address in interface.addresses:
cmd.append('--dhcp-range=set:%s,%s,%s,%ss' %
(self.tags[address.ip],
address.network,
'static',
120))
self._output_hosts_file()
self._output_opts_file()
self.sudo(cmd)
def __del__(self):
#FIXME: ensure the pid is actually dnsmasq
execute(['kill', '-9', self.pid], self.root_helper)
@property
def pid(self):
try:
return int(open(PID_FILE, 'r').read())
except:
return
def update_allocations(self, allocations):
"""Rebuilds the dnsmasq config and signal the dnsmasq to reload."""
self.allocations = allocations
self._output_hosts_file()
execute(['kill', '-HUP', self.pid], self.root_helper)
LOG.debug('Reloading allocations')
def _make_tags(self):
i = 0
for interface in self.interfaces:
for address in self.addresses:
# XXX tags is not defined anywhere... please fix
if address in tags:
raise ValueError('Duplicate network')
self.tags[address] = 'tag%d' % i
i += 1
def _output_hosts_file(self):
"""Writes a dnsmasq compatible hosts file."""
r = re.compile('[:.]')
buf = StringIO()
for alloc in self.allocations:
name = '%s.%s' % (r.sub('-', alloc.ip_address),
self.domain)
buf.write('%s,%s,%s\n' %
(alloc.mac_address, name, alloc.ip_address))
replace_file(HOSTS_FILE, buf.getvalue())
def _output_opts_file(self):
"""Write a dnsmasq compatible options file."""
# TODO (mark): add support for nameservers
options = []
for interface in self.interfaces:
options.append((self.tags[interface.ip],
'option',
'router',
interface.ip))
# XXX name is never used; please fix (remove it or use it)
name = self.get_conf_file_name('opts')
replace_file(OPTS_FILE,
'\n'.join(['tag:%s,%s:%s,%s' % o for o in options]))

View File

@ -0,0 +1,188 @@
import re
import netaddr
from akanda.router import models
from akanda.router.drivers import base
GENERIC_IFNAME = 'ge'
PHYSICAL_INTERFACES = ['em', 're', 'en']
class InterfaceManager(base.Manager):
"""
"""
EXECUTABLE = '/sbin/ifconfig'
def __init__(self, root_helper='sudo'):
super(InterfaceManager, self).__init__(root_helper)
self.next_generic_index = 0
self.host_mapping = {}
self.generic_mapping = {}
def _ensure_mapping(self):
if not self.host_mapping:
self.get_interfaces()
def get_interfaces(self):
interfaces = _parse_interfaces(self.do('-a'),
filters=PHYSICAL_INTERFACES)
interfaces.sort(key=lambda x: x.ifname)
for i in interfaces:
if i.ifname not in self.host_mapping:
generic_name = 'ge%d' % self.next_generic_index
self.host_mapping[i.ifname] = generic_name
self.next_generic_index += 1
# change ifname to generic version
i.ifname = self.host_mapping[i.ifname]
self.generic_mapping = dict((v, k) for k, v in
self.host_mapping.iteritems())
return interfaces
def get_interface(self, ifname):
real_ifname = self.generic_to_host(ifname)
retval = _parse_interface(self.do(real_ifname))
retval.ifname = ifname
return retval
def is_valid(self, ifname):
self._ensure_mapping()
return ifname in self.generic_mapping
def generic_to_host(self, generic_name):
self._ensure_mapping()
return self.generic_mapping.get(generic_name)
def host_to_generic(self, real_name):
self._ensure_mapping()
return self.host_mapping.get(real_name)
def update_interfaces(self, interfaces):
for i in interfaces:
self.update_interface(i)
def up(self, interface):
real_ifname = self.generic_to_host(interface.ifname)
self.sudo(real_ifname, 'up')
def down(self, interface):
real_ifname = self.generic_to_host(interface.ifname)
self.sudo(real_ifname, 'down')
def update_interface(self, interface):
real_ifname = self.generic_to_host(interface.ifname)
old_interface = self.get_interface(real_ifname)
self._update_description(real_ifname, interface)
self._update_groups(real_ifname, interface, old_interface)
# Must update primary before aliases otherwise will lose address
# in case where primary and alias are swapped.
self._update_addresses(real_ifname, interface, old_interface)
def _update_description(self, real_ifname, interface):
self.sudo(real_ifname, 'description', interface.description)
def _update_groups(self, real_ifname, interface, old_interface):
add = lambda g: (real_ifname, 'group', g)
delete = lambda g: (real_ifname, '-group', g)
self._update_set(real_ifname, interface, old_interface, 'groups',
add, delete)
def _update_addresses(self, real_ifname, interface, old_interface):
add = lambda a: (real_ifname, 'alias',
str(a.ip), 'prefixlen', a.prefixlen)
delete = lambda a: (real_ifname, '-alias',
str(a.ip), 'prefixlen', a.prefixlen)
self._update_set(real_ifname, interface, old_interface,
'addresses', add, delete)
def _update_set(self, real_ifname, interface, old_interface, attribute,
fmt_args_add, fmt_args_delete):
next_set = set(getattr(interface, attribute))
prev_set = set(getattr(old_interface, attribute))
if next_set == prev_set:
return
for item in (next_set - prev_set):
self.sudo(fmt_args_add(item))
for item in (prev_set - next_set):
self.sudo(fmt_args_delete(item))
def _parse_interfaces(data, filters=None):
retval = []
for iface_data in re.split('(^|\n)(?=\w+\d{1,3}: flag)', data, re.M):
if not iface_data.strip():
continue
for f in filters or ['']:
if iface_data.startswith(f):
break
else:
continue
retval.append(_parse_interface(iface_data))
return retval
def _parse_interface(data):
retval = dict(addresses=[])
for line in data.split('\n'):
if line.startswith('\t'):
line = line.strip()
if line.startswith('inet'):
retval['addresses'].append(_parse_inet(line))
else:
retval.update(_parse_other_params(line))
else:
retval.update(_parse_head(line))
return models.Interface.from_dict(retval)
def _parse_head(line):
retval = {}
m = re.match(
'(?P<ifname>\w*): flags=\d*<(?P<flags>[\w,]*)> mtu (?P<mtu>\d*)',
line)
if m:
retval['ifname'] = m.group('ifname')
retval['flags'] = m.group('flags').split(',')
retval['mtu'] = int(m.group('mtu'))
return retval
def _parse_inet(line):
tokens = line.split()
if tokens[0] == 'inet6':
ip = tokens[1].split('%')[0]
mask = tokens[3]
else:
ip = tokens[1]
mask = str(netaddr.IPAddress(int(tokens[3], 16)))
return netaddr.IPNetwork('%s/%s' % (ip, mask))
def _parse_other_params(line):
# TODO (mark): remove the no cover for FreeBSD variant of ifconfig
if line.startswith('options'): # pragma nocover
m = re.match('options=[0-9a-f]*<(?P<options>[\w,]*)>', line)
return m.groupdict()
else:
key, value = line.split(' ', 1)
if key == 'ether': # pragma nocover
key = 'lladdr'
elif key.endswith(':'):
key = key[:-1]
return [(key, value)]

View File

@ -0,0 +1,86 @@
from akanda.router.drivers import base
from akanda.router.utils import execute, replace_file
from akanda.router import models
class PFManager(base.Manager):
"""
"""
EXECUTABLE = '/sbin/pfctl'
def _show(self, flag):
return self.sudo('-s' + flag)
def get_rules(self):
# -sr
return self._show('r')
def get_states(self):
# -ss
return self._show('s')
def get_anchors(self):
# -sA
return self._show('A')
def get_sources(self):
# -sS
return self._show('S')
def get_info(self):
# -si
return self._show('i')
def get_tables(self):
# -sT
return self._show('T')
def get_labels(self):
# -sl
return self._show('l')
def get_timeouts(self):
# -st
return self._show('t')
def get_memory(self):
# -sm
return self._show('m')
def update_conf(self, conf_data):
replace_file('/tmp/pf.ctl', conf_data)
execute(['mv', '/tmp/pf.ctl', '/etc/pf.ctl'], self.root_helper)
class TableManager(base.Manager):
"""
"""
EXECUTABLE = '/sbin/pfctl'
def __init__(self, name):
self.name = name
def add(self, cidr):
self._sudo('-t', self.name, '-T', 'add', str(cidr))
def delete(self, cidr):
self._sudo('-t', self.name, '-T', 'delete', str(cidr))
def show(self):
return self._sudo('-t', self.name, '-T', self.name)
def _parse_pf_rules(data, filters=None):
'''
Parser for pfctl -sr
'''
retval = []
return retval
def _parse_pf_rule(line):
'''
Parser for pfctl -sr
'''
retval = {}
return models.PFManager.from_dict(retval)

227
akanda/router/models.py Normal file
View File

@ -0,0 +1,227 @@
import os
import re
import netaddr
class Interface(object):
"""
"""
def __init__(self, ifname=None, addresses=[], groups=None, flags=None,
lladdr=None, mtu=1500, media=None, description=None,
**extra_params):
self.ifname = ifname
self.description = description
self.addresses = addresses
self.groups = groups or []
self.flags = flags or []
self.lladdr = lladdr
self.mtu = mtu
self.media = media
self.extra_params = extra_params
def __repr__(self):
return '<Interface: %s %s>' % (self.ifname,
[str(a) for a in self.addresses])
@property
def description(self):
return self._description
@description.setter
def description(self, value):
if not value:
self._description = ''
elif re.match('\w*$', value):
self._description = value
else:
raise ValueError('Description must be chars from [a-zA-Z0-9_]')
@property
def addresses(self):
return self._addresses
@addresses.setter
def addresses(self, value):
self._addresses = [netaddr.IPNetwork(a) for a in value]
@property
def is_up(self):
if ('state' in self.extra_params
and self.extra_params['state'].lower() == 'up'):
return 'UP'
return 'UP' in self.flags
@classmethod
def from_dict(cls, d):
return Interface(**d)
def to_dict(self, extended=False):
include = ['ifname', 'groups', 'mtu', 'lladdr', 'media']
if extended:
include.extend(['flags', 'extra_params'])
retval = dict(
[(k, v) for k, v in vars(self).iteritems() if k in include])
retval['description'] = self.description
retval['addresses'] = self.addresses
retval['state'] = (self.is_up and 'up') or 'down'
return retval
class FilterRule(object):
"""
"""
def __init__(self, action=None, interface=None, family=None,
protocol=None, source=None, source_port=None,
destination_interface=None,
destination=None, destination_port=None,
redirect=None, redirect_port=None):
self.action = action
self.interface = interface
self.family = family
self.protocol = protocol
self.source = source
self.source_port = source_port
self.destination_interface = destination_interface
self.destination = destination
self.destination_port = destination_port
self.redirect = redirect
self.redirect_port = redirect_port
def __setattr__(self, name, value):
if name != 'action' and not value:
pass
elif name == 'action':
if value not in ('pass', 'block'):
raise ValueError("Action must be 'pass' or 'block' not '%s'" %
value)
elif name in ('source', 'destination'):
if '/' in value:
value = netaddr.IPNetwork(value)
elif name == 'redirect':
value = netaddr.IPAddress(value)
elif name.endswith('_port'):
value = int(value)
elif name == 'family':
if value not in ('inet', 'inet6'):
raise ValueError("Family must be 'inet', 'inet6', None and not"
" %s" % value)
elif name == 'protocol':
if value not in ('tcp', 'udp', 'imcp'):
raise ValueError("Protocol must be tcp|udp|imcp not '%s'." %
value)
super(FilterRule, self).__setattr__(name, value)
@property
def pf_rule(self):
retval = [self.action]
if self.interface:
retval.append('on %s' % self.interface)
if self.family:
retval.append(self.family)
if self.protocol:
retval.append('proto %s' % self.protocol)
if self.source or self.source_port:
retval.append('from')
if self.source:
retval.append(str(self.source))
if self.source_port:
retval.append('port %s' % self.source_port)
if (self.destination_interface
or self.destination
or self.destination_port):
retval.append('to')
if self.destination_interface:
retval.append(self.destination_interface)
if self.destination:
retval.append(str(self.destination))
if self.destination_port:
retval.append('port %s' % self.destination_port)
if self.redirect or self.redirect_port:
retval.append('rdr-to')
if self.redirect:
retval.append(str(self.redirect))
if self.redirect_port:
retval.append('port %s' % self.redirect_port)
return ' '.join(retval)
@classmethod
def from_dict(cls, d):
return FilterRule(**d)
def to_dict(self):
return vars(self)
class Anchor(object):
def __init__(self, name, rules=[]):
self.name = name
self.rules = rules
@property
def pf_rule(self):
pf_rules = '\n\t'.join([r.pf_rule for r in self.rules])
return "anchor %s {\n%s\n}\n" % (self.name, pf_rules)
def external_pf_rule(self, base_dir):
path = os.path.abspath(os.path.join(base_dir, self.name))
return 'anchor %s\nload anchor %s from %s' % (self.name,
self.name,
path)
class AddressBookEntry(object):
def __init__(self, name, cidrs=[]):
self.name = name
self.cidrs = cidrs
@property
def cidrs(self):
return self._cidrs
@cidrs.setter
def cidrs(self, values):
self._cidrs = [netaddr.IPNetwork(a) for a in values]
@property
def pf_rule(self):
return 'table <%s> {%s}' % (self.name, ', '.join(map(str, self.cidrs)))
def external_pf_rule(self, base_dir):
path = os.path.abspath(os.path.join(base_dir, self.name))
return 'table %s\npersist file "%s"' % (self.name,
path)
def external_table_data(self):
return '\n'.join(map(str, self.cidrs))
class Allocation(object):
def __init__(self, lladdr, hostname, ip_address):
self.lladdr = lladdr
self.hostname = hostname
self.ip_address = ip_address
class StaticRoute(object):
def __init__(self, destination, next_hop):
self.destination = destination
self.next_hop = next_hop
@property
def destination(self):
return self._destination
@destination.setter
def destination(self, value):
self._destination = netaddr.IPNetwork(value)
@property
def next_hop(self):
return self._next_hop
@next_hop.setter
def next_hop(self, value):
self._next_hop = netaddr.IPAddress(value)

53
akanda/router/utils.py Normal file
View File

@ -0,0 +1,53 @@
from json import JSONEncoder
import os
import shlex
import subprocess
import tempfile
import flask
def execute(args, root_helper=None):
if root_helper:
cmd = shlex.split(root_helper) + args
else:
cmd = args
return subprocess.check_output(map(str, cmd))
def replace_file(file_name, data):
"""Replaces the contents of file_name with data in a safe manner.
First write to a temp file and then rename. Since POSIX renames are
atomic, the file is unlikely to be corrupted by competing writes.
We create the tempfile on the same device to ensure that it can be renamed.
"""
base_dir = os.path.dirname(os.path.abspath(file_name))
tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False)
tmp_file.write(data)
tmp_file.close()
os.chmod(tmp_file.name, 0644)
os.rename(tmp_file.name, file_name)
class ModelSerializer(JSONEncoder):
"""
"""
def default(self, obj):
# import here to avoid circualar imports... ugh; we may need to move
# this serializer as part of a long-term fix
import netaddr
if isinstance(obj, set):
return list(obj)
if isinstance(obj, netaddr.IPNetwork):
return str(obj)
return super(ModelSerializer, self).default(obj)
def blueprint_factory(name):
name_parts = name.split(".")[-2:]
blueprint_name = "_".join(name_parts)
url_prefix = "/" + "/".join(name_parts)
return flask.Blueprint(blueprint_name, name, url_prefix=url_prefix)

5
setup.cfg Normal file
View File

@ -0,0 +1,5 @@
[nosetests]
where = test
verbosity = 2
detailed-errors = 1
cover-package = akanda

29
setup.py Normal file
View File

@ -0,0 +1,29 @@
import os
from setuptools import setup, find_packages
setup(
name='Akanda Router Appliance',
version='0.1.0',
description='A packet filter based router appliance',
author='DreamHost',
author_email='dev-community@dreamhost.com',
url='http://github.com/dreamhost/akanda',
license='BSD',
install_requires=[
'flask>=0.9',
'netaddr>=0.7.7',
],
namespace_packages=['akanda'],
packages=find_packages(),
include_package_data=True,
zip_safe=False,
entry_points={
'console_scripts': [
'akanda-configure-ssh ='
'akanda.router.commands.management:configure_ssh',
'akanda-api-service =akanda.router.api.server:main',
]
},
)

0
test/__init__.py Normal file
View File

0
test/unit/__init__.py Normal file
View File

View File

View File

152
test/unit/api/v1/fakes.py Normal file
View File

@ -0,0 +1,152 @@
from akanda.router import models
class FakeIFManager(object):
"""
The methods implemented here in the fake interface manager should not be
built using the payloads, since that's what we're using to verify the data.
Instead, each method should create akanda objects as needed that will
serialize to the appropriate data to return the proper payload.
"""
@classmethod
def fake_get_interface(cls, ifname):
return models.Interface(
media="Ethernet autoselect (1000baseT full-duplex,master)",
state="up",
ifname="ge1",
groups="egress",
lladdr="00:0c:29:e8:f9:2e",
addresses=["fe80::20c:29ff:fee8:f92e/64", "192.168.229.129/24"])
@classmethod
def fake_get_interfaces(cls):
iface1 = models.Interface(
media="null", state="down", ifname="ge0", groups="enc",
lladdr="null", addresses=[])
iface2 = models.Interface(
media="Ethernet autoselect (1000baseT full-duplex,master)",
state="up", ifname="ge1", groups="egress",
lladdr="00:0c:29:e8:f9:2e",
addresses=["fe80::20c:29ff:fee8:f92e/64", "192.168.229.129/24"])
iface3 = models.Interface(
media="Ethernet autoselect (1000baseT full-duplex,master)",
state="up", ifname="ge2", groups=[],
lladdr="00:0c:29:e8:f9:38",
addresses=["192.168.57.101/24", "fe80::20c:29ff:fee8:f938/64"])
return [iface1, iface2, iface3]
class FakePFManager(object):
"""
The methods implemented here in the fake PF manager should not be
built using the payloads, since that's what we're using to verify the data.
Instead, each method should create akanda objects as needed that will
serialize to the appropriate data to return the proper payload.
However, since for version 1 we are simply presenting the actual textual
results of the commands and not converting them to models, we just do
straight-up text here.
"""
@classmethod
def fake_get_rules(self):
return ('pass all flags S/SA\n'
'block drop in on ! lo0 proto tcp from '
'any to any port 6000:6010')
@classmethod
def fake_get_states(self):
return ('all tcp 192.168.229.129:22 <- 192.168.229.1:52130'
' ESTABLISHED:ESTABLISHED\n'
'all udp 192.168.229.255:17500 <- 192.168.229.1:17500'
' NO_TRAFFIC:SINGLE\n'
'all udp 172.16.5.255:17500 <- 172.16.5.1:17500'
' NO_TRAFFIC:SINGLE')
@classmethod
def fake_get_anchors(self):
return ('dh\n'
'dh-ssh\n'
'dh-www\n'
'goodguys')
@classmethod
def fake_get_sources(self):
return ("""
No ALTQ support in kernel
ALTQ related functions disabled
""")
@classmethod
def fake_get_info(self):
return("""
Status: Enabled for 0 days 01:57:48 Debug: err
State Table Total Rate
current entries 4
searches 5638 0.8/s
inserts 86 0.0/s
removals 82 0.0/s
Counters
match 86 0.0/s
bad-offset 0 0.0/s
fragment 0 0.0/s
short 0 0.0/s
normalize 0 0.0/s
memory 0 0.0/s
bad-timestamp 0 0.0/s
congestion 0 0.0/s
ip-option 0 0.0/s
proto-cksum 0 0.0/s
state-mismatch 0 0.0/s
state-insert 0 0.0/s
state-limit 0 0.0/s
src-limit 0 0.0/s
synproxy 0 0.0/s
""")
@classmethod
def fake_get_timeouts(self):
return ("""
tcp.first 120s
tcp.opening 30s
tcp.established 86400s
tcp.closing 900s
tcp.finwait 45s
tcp.closed 90s
tcp.tsdiff 30s
udp.first 60s
udp.single 30s
udp.multiple 60s
icmp.first 20s
icmp.error 10s
other.first 60s
other.single 30s
other.multiple 60s
frag 30s
interval 10s
adaptive.start 6000 states
adaptive.end 12000 states
src.track 0s
""")
@classmethod
def fake_get_labels(self):
return ("""
No ALTQ support in kernel
ALTQ related functions disabled
""")
@classmethod
def fake_get_memory(self):
return ('states hard limit 10000\n'
'src-nodes hard limit 10000\n'
'frags hard limit 5000\n'
'tables hard limit 1000\n'
'table-entries hard limit 200000')
@classmethod
def fake_get_tables(self):
return ("""
table <block_hosts> persist
table <private> const { 10/8, 172.16/12, 192.168/16, 224/8 }
""")

View File

View File

@ -0,0 +1,104 @@
"""
The text output generated by the Firewall API.
"""
sample_firewall_rules = ('pass all flags S/SA block drop in on ! lo0 proto '
'tcp from any to any port 6000:6010')
sample_pfctl_sr = """
pass all flags S/SA
block drop in on ! lo0 proto tcp from any to any port 6000:6010
"""
sample_pfctl_ss = """
all tcp 192.168.229.129:22 <- 192.168.229.1:52130 ESTABLISHED:ESTABLISHED
all udp 192.168.229.255:17500 <- 192.168.229.1:17500 NO_TRAFFIC:SINGLE
all udp 172.16.5.255:17500 <- 172.16.5.1:17500 NO_TRAFFIC:SINGLE
"""
sample_pfctl_si = """
Status: Enabled for 0 days 01:57:48 Debug: err
State Table Total Rate
current entries 4
searches 5638 0.8/s
inserts 86 0.0/s
removals 82 0.0/s
Counters
match 86 0.0/s
bad-offset 0 0.0/s
fragment 0 0.0/s
short 0 0.0/s
normalize 0 0.0/s
memory 0 0.0/s
bad-timestamp 0 0.0/s
congestion 0 0.0/s
ip-option 0 0.0/s
proto-cksum 0 0.0/s
state-mismatch 0 0.0/s
state-insert 0 0.0/s
state-limit 0 0.0/s
src-limit 0 0.0/s
synproxy 0 0.0/s
"""
sample_pfctl_st = """
tcp.first 120s
tcp.opening 30s
tcp.established 86400s
tcp.closing 900s
tcp.finwait 45s
tcp.closed 90s
tcp.tsdiff 30s
udp.first 60s
udp.single 30s
udp.multiple 60s
icmp.first 20s
icmp.error 10s
other.first 60s
other.single 30s
other.multiple 60s
frag 30s
interval 10s
adaptive.start 6000 states
adaptive.end 12000 states
src.track 0s
"""
sample_pfctl_sm = """
states hard limit 10000
src-nodes hard limit 10000
frags hard limit 5000
tables hard limit 1000
table-entries hard limit 200000
"""
sample_pfctl_sl = """
No ALTQ support in kernel
ALTQ related functions disabled
"""
sample_pfctl_sA = """
dh
dh-ssh
dh-www
goodguys
"""
sample_pfctl_sS = """
No ALTQ support in kernel
ALTQ related functions disabled
"""
sample_pfctl_sT = """
table <block_hosts> persist
table <private> const { 10/8, 172.16/12, 192.168/16, 224/8 }
"""

View File

@ -0,0 +1,24 @@
"""
Various JSON and text payloads generated by the Router API.
"""
sample_root = """Welcome to the Akanda appliance"""
sample_system_interface = (
'{"interface": {"description": "", "media": "Ethernet autoselect '
'(1000baseT full-duplex,master)", "mtu": 1500, "state": "up", '
'"groups": "egress", "ifname": "ge1", "lladdr": "00:0c:29:e8:f9:2e", '
'"addresses": ["fe80::20c:29ff:fee8:f92e/64", "192.168.229.129/24"]}}')
sample_system_interfaces = (
'{"interfaces": [{"description": "", "media": "null", "mtu": 1500, '
'"state": "down", "groups": "enc", "ifname": "ge0", "lladdr": "null", '
'"addresses": []}, {"description": "", "media": "Ethernet autoselect '
'(1000baseT full-duplex,master)", "mtu": 1500, "state": "up", '
'"groups": "egress", "ifname": "ge1", "lladdr": "00:0c:29:e8:f9:2e", '
'"addresses": ["fe80::20c:29ff:fee8:f92e/64", "192.168.229.129/24"]}, '
'{"description": "", "media": "Ethernet autoselect (1000baseT '
'full-duplex,master)", "mtu": 1500, "state": "up", "groups": [], '
'"ifname": "ge2", "lladdr": "00:0c:29:e8:f9:38", "addresses": '
'["192.168.57.101/24", "fe80::20c:29ff:fee8:f938/64"]}]}')

View File

@ -0,0 +1,26 @@
"""
Base classes for Router API tests.
"""
from unittest import TestCase
import flask
from akanda.router.api import v1
from .payloads import routerapi_system as payload
class BaseAPITestCase(TestCase):
"""
This test case contains the unit tests for the Python server implementation
of the Router API. The focus of these tests is to ensure that the server is
behaving appropriately.
"""
def setUp(self):
self.app = flask.Flask('base_test')
self.app.register_blueprint(v1.base)
self.test_app = self.app.test_client()
def test_root(self):
rv = self.test_app.get('/v1/base', follow_redirects=True)
expected = payload.sample_root
self.assertEqual(rv.data, expected)

View File

@ -0,0 +1,75 @@
"""
Base classes for Router API tests.
"""
from unittest import TestCase
import flask
from mock import patch
from akanda.router.api import v1
from akanda.router.drivers.pf import PFManager
from .fakes import FakePFManager
from .payloads import routerapi_firewall as payload
class FirewallAPITestCase(TestCase):
"""
"""
def setUp(self):
self.app = flask.Flask('firewall_test')
self.app.register_blueprint(v1.firewall)
self.test_app = self.app.test_client()
@patch.object(PFManager, 'get_rules', FakePFManager.fake_get_rules)
def test_get_rules(self):
result = self.test_app.get('/v1/firewall/rules').data.strip()
expected = payload.sample_pfctl_sr.strip()
self.assertEqual(result, expected)
@patch.object(PFManager, 'get_states', FakePFManager.fake_get_states)
def test_get_states(self):
result = self.test_app.get('/v1/firewall/states').data.strip()
expected = payload.sample_pfctl_ss.strip()
self.assertEqual(result, expected)
@patch.object(PFManager, 'get_anchors', FakePFManager.fake_get_anchors)
def test_get_anchors(self):
result = self.test_app.get('/v1/firewall/anchors').data.strip()
expected = payload.sample_pfctl_sA.strip()
self.assertEqual(result, expected)
@patch.object(PFManager, 'get_sources', FakePFManager.fake_get_sources)
def test_get_sources(self):
result = self.test_app.get('/v1/firewall/sources').data.strip()
expected = payload.sample_pfctl_sS.strip()
self.assertEqual(result, expected)
@patch.object(PFManager, 'get_info', FakePFManager.fake_get_info)
def test_get_info(self):
result = self.test_app.get('/v1/firewall/info').data.strip()
expected = payload.sample_pfctl_si.strip()
self.assertEqual(result, expected)
@patch.object(PFManager, 'get_timeouts', FakePFManager.fake_get_timeouts)
def test_get_timeouts(self):
result = self.test_app.get('/v1/firewall/timeouts').data.strip()
expected = payload.sample_pfctl_st.strip()
self.assertEqual(result, expected)
@patch.object(PFManager, 'get_labels', FakePFManager.fake_get_labels)
def test_get_labels(self):
result = self.test_app.get('/v1/firewall/labels').data.strip()
expected = payload.sample_pfctl_sl.strip()
self.assertEqual(result, expected)
@patch.object(PFManager, 'get_tables', FakePFManager.fake_get_tables)
def test_get_tables(self):
result = self.test_app.get('/v1/firewall/tables').data.strip()
expected = payload.sample_pfctl_sT.strip()
self.assertEqual(result, expected)
@patch.object(PFManager, 'get_memory', FakePFManager.fake_get_memory)
def test_get_memory(self):
result = self.test_app.get('/v1/firewall/memory').data.strip()
expected = payload.sample_pfctl_sm.strip()
self.assertEqual(result, expected)

View File

@ -0,0 +1,38 @@
"""
Base classes for System Router API tests.
"""
from unittest import TestCase
import flask
from mock import patch
from akanda.router.api import v1
from akanda.router.drivers.ifconfig import InterfaceManager as IFManager
from .fakes import FakeIFManager
from .payloads import routerapi_system as payload
class SystemAPITestCase(TestCase):
"""
This test case contains the unit tests for the Python server implementation
of the Router API. The focus of these tests is to ensure that the server is
behaving appropriately.
"""
def setUp(self):
self.app = flask.Flask('system_test')
self.app.register_blueprint(v1.system)
self.test_app = self.app.test_client()
@patch.object(IFManager, 'get_interface', FakeIFManager.fake_get_interface)
def test_get_interface(self):
result = self.test_app.get('/v1/system/interface/ge1')
expected = payload.sample_system_interface
self.assertEqual(result.data, expected)
@patch.object(
IFManager, 'get_interfaces', FakeIFManager.fake_get_interfaces)
def test_get_interfaces(self):
result = self.test_app.get('/v1/system/interfaces')
expected = payload.sample_system_interfaces
self.assertEqual(result.data, expected)

View File

View File

@ -0,0 +1,317 @@
from unittest2 import TestCase
import mock
import netaddr
from akanda.router.drivers import ifconfig
SAMPLE_OUTPUT = """lo0: flags=8049<UP,LOOPBACK,RUNNING,MULTICAST> mtu 33152
\tpriority: 0
\tgroups: lo
\tinet6 ::1 prefixlen 128
\tinet6 fe80::1%lo0 prefixlen 64 scopeid 0x5
\tinet 127.0.0.1 netmask 0xff000000
em0: flags=8843<UP,BROADCAST,RUNNING,SIMPLEX,MULTICAST> mtu 1500
\tlladdr 08:00:27:7a:6f:46
\tpriority: 0
\tmedia: Ethernet autoselect (1000baseT full-duplex)
\tstatus: active
\tinet6 fe80::a00:27ff:fe7a:6f46%em0 prefixlen 64 scopeid 0x1
em1: flags=8843<UP,BROADCAST,RUNNING,SIMPLEX,MULTICAST> mtu 1500
\tlladdr 08:00:27:32:1f:d1
\tpriority: 0
\tgroups: egress
\tmedia: Ethernet autoselect (1000baseT full-duplex)
\tstatus: active
\tinet6 fe80::a00:27ff:fe32:1fd1%em1 prefixlen 64 scopeid 0x2
\tinet 10.0.3.15 netmask 0xffffff00 broadcast 10.0.3.255
em2: flags=8802<BROADCAST,SIMPLEX,MULTICAST> mtu 1500
\tlladdr 08:00:27:53:cd:c8
\tpriority: 0
\tmedia: Ethernet autoselect (1000baseT full-duplex)
\tstatus: active
enc0: flags=0<>
\tpriority: 0
\tgroups: enc
\tstatus: active
pflog0: flags=141<UP,RUNNING,PROMISC> mtu 33152
\tpriority: 0
\tgroups: pflog
"""
SAMPLE_SINGLE_OUTPUT = (
"""em1: flags=8843<UP,BROADCAST,RUNNING,SIMPLEX,MULTICAST> mtu 1500
\tlladdr 08:00:27:32:1f:d1
\tpriority: 0
\tgroups: egress
\tmedia: Ethernet autoselect (1000baseT full-duplex)
\tstatus: active
\tinet6 fe80::a00:27ff:fe32:1fd1%em1 prefixlen 64 scopeid 0x2
\tinet 10.0.3.15 netmask 0xffffff00 broadcast 10.0.3.255
""")
class IfconfigTestCase(TestCase):
"""
"""
def setUp(self):
self.execute_patch = mock.patch('akanda.router.utils.execute')
self.mock_execute = self.execute_patch.start()
def tearDown(self):
self.execute_patch.stop()
def test_init(self):
mgr = ifconfig.InterfaceManager()
self.assertEqual(mgr.host_mapping.keys(), [])
def test_get_interfaces(self):
iface_a = mock.Mock()
iface_a.ifname = 'em0'
iface_b = mock.Mock()
iface_b.ifname = 'em1'
ifaces = 'akanda.router.drivers.ifconfig._parse_interfaces'
with mock.patch(ifaces) as parse:
parse.return_value = [iface_a, iface_b]
mgr = ifconfig.InterfaceManager()
interfaces = mgr.get_interfaces()
self.assertEqual(interfaces, [iface_a, iface_b])
self.mock_execute.assert_has_calls(
[mock.call(['/sbin/ifconfig', '-a'])])
def test_get_interface(self):
iface_a = mock.Mock()
iface_a.ifname = 'em0'
iface = 'akanda.router.drivers.ifconfig._parse_interface'
ifaces = 'akanda.router.drivers.ifconfig._parse_interfaces'
with mock.patch(iface) as parse:
with mock.patch(ifaces) as pi:
pi.return_value = [iface_a]
parse.return_value = iface_a
mgr = ifconfig.InterfaceManager()
interface = mgr.get_interface('ge0')
self.assertEqual(interface, iface_a)
self.assertEqual(iface_a.ifname, 'ge0')
self.mock_execute.assert_has_calls(
[mock.call(['/sbin/ifconfig', '-a'])])
def test_ensure_mapping_uninitialized(self):
attr = 'get_interfaces'
with mock.patch.object(ifconfig.InterfaceManager, attr) as get_ifaces:
mgr = ifconfig.InterfaceManager()
mgr._ensure_mapping()
get_ifaces.assert_called_once_with()
def test_ensure_mapping_initialized(self):
attr = 'get_interfaces'
with mock.patch.object(ifconfig.InterfaceManager, attr) as get_ifaces:
mgr = ifconfig.InterfaceManager()
mgr.host_mapping['em0'] = 'ge0'
mgr._ensure_mapping()
self.assertEqual(get_ifaces.call_count, 0)
def test_is_valid(self):
mgr = ifconfig.InterfaceManager()
mgr.host_mapping = {'em0': 'ge0'}
mgr.generic_mapping = {'ge0': 'em0'}
self.assertTrue(mgr.is_valid('ge0'))
def test_generic_to_host(self):
mgr = ifconfig.InterfaceManager()
mgr.host_mapping = {'em0': 'ge0'}
mgr.generic_mapping = {'ge0': 'em0'}
self.assertEqual(mgr.generic_to_host('ge0'), 'em0')
self.assertIsNone(mgr.generic_to_host('ge1'))
def test_host_to_generic(self):
mgr = ifconfig.InterfaceManager()
mgr.host_mapping = {'em0': 'ge0'}
mgr.generic_mapping = {'ge0': 'em0'}
self.assertEqual(mgr.host_to_generic('em0'), 'ge0')
self.assertIsNone(mgr.host_to_generic('em1'))
def test_update_interfaces(self):
iface_a = mock.Mock()
iface_b = mock.Mock()
attr = 'update_interface'
with mock.patch.object(ifconfig.InterfaceManager, attr) as update:
mgr = ifconfig.InterfaceManager()
mgr.update_interfaces([iface_a, iface_b])
update.assert_has_calls([mock.call(iface_a), mock.call(iface_b)])
def test_up(self):
iface = mock.Mock()
iface.ifname = 'ge0'
mgr = ifconfig.InterfaceManager()
mgr.host_mapping = {'em0': 'ge0'}
mgr.generic_mapping = {'ge0': 'em0'}
mgr.up(iface)
self.mock_execute.assert_has_calls(
[mock.call(['/sbin/ifconfig', 'em0', 'up'], 'sudo')])
def test_down(self):
iface = mock.Mock()
iface.ifname = 'ge0'
mgr = ifconfig.InterfaceManager()
mgr.host_mapping = {'em0': 'ge0'}
mgr.generic_mapping = {'ge0': 'em0'}
mgr.down(iface)
self.mock_execute.assert_has_calls(
[mock.call(['/sbin/ifconfig', 'em0', 'down'], 'sudo')])
def test_update_interface(self):
iface = mock.Mock()
iface.ifname = 'ge0'
old_iface = mock.Mock(name='old')
old_iface.ifname = 'ge0'
mock_methods = {
'generic_to_host': mock.Mock(return_value='em0'),
'get_interface': mock.Mock(return_value=old_iface),
'_update_description': mock.Mock(),
'_update_groups': mock.Mock(),
'_update_addresses': mock.Mock()}
with mock.patch.multiple(ifconfig.InterfaceManager, **mock_methods):
mgr = ifconfig.InterfaceManager()
mgr.update_interface(iface)
mock_methods['generic_to_host'].assert_called_once_with('ge0')
mock_methods['get_interface'].assert_called_once_with('em0')
mock_methods['_update_description'].assert_called_once_with(
'em0', iface)
mock_methods['_update_groups'].assert_called_once_with(
'em0', iface, old_iface)
mock_methods['_update_addresses'].assert_called_once_with(
'em0', iface, old_iface)
def test_update_description(self):
iface = mock.Mock()
iface.description = 'internal'
mgr = ifconfig.InterfaceManager()
mgr._update_description('em0', iface)
self.mock_execute.assert_has_calls(
[mock.call(['/sbin/ifconfig', 'em0', 'description', 'internal'],
'sudo')])
def test_update_groups(self):
iface = mock.Mock()
old_iface = mock.Mock()
with mock.patch.object(ifconfig.InterfaceManager, '_update_set') as us:
mgr = ifconfig.InterfaceManager()
mgr._update_groups('em0', iface, old_iface)
us.assert_called_once_with('em0', iface, old_iface, 'groups',
mock.ANY, mock.ANY)
def test_update_addresses(self):
iface = mock.Mock()
old_iface = mock.Mock()
with mock.patch.object(ifconfig.InterfaceManager, '_update_set') as us:
mgr = ifconfig.InterfaceManager()
mgr._update_addresses('em0', iface, old_iface)
us.assert_called_once_with('em0', iface, old_iface, 'addresses',
mock.ANY, mock.ANY)
def test_update_set(self):
iface = mock.Mock()
iface.groups = ['a', 'b']
old_iface = mock.Mock()
old_iface.groups = ['b', 'c']
add = lambda g: ('em0', 'group', g)
delete = lambda g: ('em0', '-group', g)
mgr = ifconfig.InterfaceManager()
mgr._update_set('em0', iface, old_iface, 'groups', add, delete)
self.mock_execute.assert_has_calls([
mock.call(['/sbin/ifconfig', ('em0', 'group', 'a')], 'sudo'),
mock.call(['/sbin/ifconfig', ('em0', '-group', 'c')], 'sudo')
])
def test_update_set_no_diff(self):
iface = mock.Mock()
iface.groups = ['a', 'b']
old_iface = mock.Mock()
old_iface.groups = ['a', 'b']
add = lambda g: ('em0', 'group', g)
delete = lambda g: ('em0', '-group', g)
mgr = ifconfig.InterfaceManager()
mgr._update_set('em0', iface, old_iface, 'groups', add, delete)
self.assertEqual(self.mock_execute.call_count, 0)
class ParseTestCase(TestCase):
def test_parse_interfaces(self):
with mock.patch.object(ifconfig, '_parse_interface') as parse:
parse.side_effect = lambda x: x
retval = ifconfig._parse_interfaces(SAMPLE_OUTPUT)
self.assertEqual(len(retval), 6)
def test_parse_interfaces_with_filter(self):
with mock.patch.object(ifconfig, '_parse_interface') as parse:
parse.side_effect = lambda x: x
retval = ifconfig._parse_interfaces(SAMPLE_OUTPUT, ['em'])
self.assertEqual(len(retval), 3)
for chunk in retval:
self.assertTrue(chunk.startswith('em'))
def test_parse_interface(self):
retval = ifconfig._parse_interface(SAMPLE_SINGLE_OUTPUT)
self.assertEqual(retval.ifname, 'em1')
self.assertEqual(retval.flags,
['UP', 'BROADCAST', 'RUNNING',
'SIMPLEX', 'MULTICAST'])
self.assertEqual(retval.mtu, 1500)
def test_parse_head(self):
expected = dict(
ifname='em1',
flags=['UP', 'BROADCAST', 'RUNNING', 'SIMPLEX', 'MULTICAST'],
mtu=1500)
retval = ifconfig._parse_head(SAMPLE_SINGLE_OUTPUT.split('\n')[0])
self.assertEqual(retval, expected)
def test_parse_inet(self):
inet_sample = SAMPLE_SINGLE_OUTPUT.split('\n')[-2].strip()
retval = ifconfig._parse_inet(inet_sample)
self.assertEqual(retval, netaddr.IPNetwork('10.0.3.15/24'))
def test_parse_inet6(self):
inet_sample = SAMPLE_SINGLE_OUTPUT.split('\n')[-3].strip()
retval = ifconfig._parse_inet(inet_sample)
self.assertEqual(retval,
netaddr.IPNetwork('fe80::a00:27ff:fe32:1fd1/64'))
def test_parse_other_options(self):
lladdr_sample = SAMPLE_SINGLE_OUTPUT.split('\n')[1].strip()
retval = ifconfig._parse_other_params(lladdr_sample)
self.assertEqual(retval, [('lladdr', '08:00:27:32:1f:d1')])

287
test/unit/test_models.py Normal file
View File

@ -0,0 +1,287 @@
from unittest2 import TestCase
import netaddr
from akanda.router import models
class InterfaceModelTestCase(TestCase):
"""
"""
def test_ifname(self):
iface = models.Interface(ifname="em0")
self.assertEquals(iface.ifname, "em0")
def test_to_dict(self):
iface = models.Interface()
result = iface.to_dict()
expected = [
'addresses', 'description', 'groups', 'ifname', 'lladdr',
'media', 'mtu', 'state']
self.assertTrue(isinstance(result, dict))
self.assertItemsEqual(result.keys(), expected)
def test_to_dict_extended(self):
iface = models.Interface()
result = iface.to_dict(True)
expected = [
'addresses', 'description', 'groups', 'ifname', 'lladdr',
'media', 'mtu', 'state', 'flags', 'extra_params']
self.assertTrue(isinstance(result, dict))
self.assertItemsEqual(result.keys(), expected)
def test_repr(self):
iface = models.Interface(ifname='ge0', addresses=['192.168.1.1/24'])
expected = "<Interface: ge0 ['192.168.1.1/24']>"
self.assertEqual(expected, repr(iface))
def test_description(self):
iface = models.Interface()
iface.description = 'the_description'
self.assertEqual('the_description', iface.description)
def test_description_failure(self):
iface = models.Interface()
with self.assertRaises(ValueError):
iface.description = 'the description'
def test_is_up_extra_params(self):
self.assertFalse(models.Interface().is_up)
iface = models.Interface(state='up')
self.assertTrue(iface.is_up)
def test_is_up_flags(self):
self.assertFalse(models.Interface().is_up)
iface = models.Interface(flags=['UP'])
self.assertTrue(iface.is_up)
def test_from_dict(self):
d = {'ifname': 'ge0',
'addresses': ['192.168.1.1/24'],
'state': 'up',
'flags': ['UP', 'BROADCAST'],
'lladdr': 'aa:bb:cc:dd:ee:ff'}
iface = models.Interface.from_dict(d)
self.assertEqual(iface.ifname, 'ge0')
self.assertEqual(iface.addresses,
[netaddr.IPNetwork('192.168.1.1/24')])
self.assertEqual(iface.extra_params["state"], 'up')
self.assertEqual(iface.flags, ['UP', 'BROADCAST'])
self.assertEqual(iface.lladdr, 'aa:bb:cc:dd:ee:ff')
def test_from_dict_function(self):
d = dict(ifname='ge0',
addresses=['192.168.1.1/24'],
flags=['UP', 'BROADCAST'],
lladdr='aa:bb:cc:dd:ee:ff')
iface = models.Interface.from_dict(d)
self.assertEqual(iface.ifname, 'ge0')
self.assertEqual(iface.addresses,
[netaddr.IPNetwork('192.168.1.1/24')])
self.assertEqual(iface.flags, ['UP', 'BROADCAST'])
self.assertEqual(iface.lladdr, 'aa:bb:cc:dd:ee:ff')
class FilterRuleModelTestCase(TestCase):
def test_filter_rule(self):
fr = models.FilterRule(action='pass', family='inet',
destination='192.168.1.1/32')
self.assertEqual(fr.action, 'pass')
self.assertEqual(fr.family, 'inet')
self.assertEqual(fr.destination, netaddr.IPNetwork('192.168.1.1/32'))
def test_setattr_action_valid(self):
fr = models.FilterRule(action='block')
self.assertEqual(fr.action, 'block')
def test_setattr_action_invalid(self):
with self.assertRaises(ValueError):
models.FilterRule(action='reject')
def test_setattr_invalid_family(self):
with self.assertRaises(ValueError):
models.FilterRule(action='pass', family='raw')
def test_setattr_source_destination_cidr(self):
fr = models.FilterRule(action='pass',
destination='192.168.1.2/32')
self.assertEqual(fr.destination, netaddr.IPNetwork('192.168.1.2/32'))
fr = models.FilterRule(action='pass',
source='192.168.1.2/32')
self.assertEqual(fr.source, netaddr.IPNetwork('192.168.1.2/32'))
def test_setattr_source_destination_label(self):
fr = models.FilterRule(action='pass',
destination='foo')
self.assertEqual(fr.destination, 'foo')
fr = models.FilterRule(action='pass',
source='bar')
self.assertEqual(fr.source, 'bar')
def test_setattr_redirect(self):
fr = models.FilterRule(action='pass',
redirect='192.168.1.1')
self.assertEqual(fr.redirect, netaddr.IPAddress('192.168.1.1'))
def test_setattr_port(self):
fr = models.FilterRule(action='pass',
source_port='22')
self.assertEqual(fr.source_port, 22)
fr = models.FilterRule(action='pass',
destination_port='23')
self.assertEqual(fr.destination_port, 23)
def test_setattr_port_none(self):
fr = models.FilterRule(action='pass',
destination_port=None)
self.assertIs(fr.destination_port, None)
def test_setattr_protocol_valid(self):
for p in ['tcp', 'udp', 'imcp']:
fr = models.FilterRule(action='pass', protocol=p)
self.assertEqual(fr.protocol, p)
def test_setattr_protocol_invalid(self):
with self.assertRaises(ValueError):
models.FilterRule(action='pass', protocol='made_up_proto')
def _pf_rule_test_helper(self, d, expected):
fr = models.FilterRule(**d)
self.assertEqual(fr.pf_rule, expected)
def test_pf_rule_basic(self):
self._pf_rule_test_helper(dict(action='pass'), 'pass')
self._pf_rule_test_helper(dict(action='block'), 'block')
def test_pf_rule_interface(self):
self._pf_rule_test_helper(dict(action='pass', interface='ge0'),
'pass on ge0')
def test_pf_rule_family(self):
self._pf_rule_test_helper(dict(action='block', family='inet6'),
'block inet6')
def test_pf_rule_protocol(self):
self._pf_rule_test_helper(dict(action='block', protocol='tcp'),
'block proto tcp')
def test_pf_rule_source_table(self):
self._pf_rule_test_helper(dict(action='block', source='foo'),
'block from foo')
def test_pf_rule_source_address(self):
args = dict(action='block', source='192.168.1.0/24')
self._pf_rule_test_helper(args, 'block from 192.168.1.0/24')
def test_pf_rule_source_port(self):
args = dict(action='block', source_port=22)
self._pf_rule_test_helper(args, 'block from port 22')
def test_pf_rule_source_address_and_port(self):
args = dict(action='pass', source='192.168.1.1/32', source_port=22)
self._pf_rule_test_helper(args, 'pass from 192.168.1.1/32 port 22')
def test_pf_rule_destination_interface(self):
args = dict(action='block', destination_interface="ge1")
self._pf_rule_test_helper(args, 'block to ge1')
def test_pf_rule_destination_table(self):
args = dict(action='block', destination="foo")
self._pf_rule_test_helper(args, 'block to foo')
def test_pf_rule_destination_address(self):
args = dict(action='block', destination="192.168.1.0/24")
self._pf_rule_test_helper(args, 'block to 192.168.1.0/24')
def test_pf_rule_destination_port(self):
args = dict(action='block', destination_port="23")
self._pf_rule_test_helper(args, 'block to port 23')
def test_pf_rule_destination_address_and_port(self):
args = dict(action='block', destination='192.168.1.2/32',
destination_port="23")
self._pf_rule_test_helper(args, 'block to 192.168.1.2/32 port 23')
def test_pf_rule_redirect(self):
args = dict(action='pass',
destination_port="23",
redirect="192.168.1.1")
self._pf_rule_test_helper(args, 'pass to port 23 rdr-to 192.168.1.1')
def test_pf_rule_redirect_port(self):
args = dict(action='pass',
destination_port="23",
redirect_port="24")
self._pf_rule_test_helper(args, 'pass to port 23 rdr-to port 24')
def test_pf_rule_from_dict(self):
args = dict(action='pass',
destination_port="23",
redirect="192.168.1.2")
pr = models.FilterRule.from_dict(args)
self.assertEqual(pr.action, 'pass')
self.assertEqual(pr.destination_port, 23)
self.assertEqual(pr.redirect, netaddr.IPAddress('192.168.1.2'))
class AnchorTestCase(TestCase):
def test_anchor(self):
a = models.Anchor('foo', [])
self.assertEqual(a.name, 'foo')
self.assertEqual(a.rules, [])
def test_anchor_external_pf_rule(self):
a = models.Anchor('foo', [])
self.assertEqual(a.external_pf_rule('/etc/pf'),
'anchor foo\nload anchor foo from /etc/pf/foo')
def test_anchor_pf_rule_empty(self):
a = models.Anchor('foo', [])
self.assertEqual(a.pf_rule, 'anchor foo {\n\n}\n')
def test_anchor_pf_rule(self):
fr = models.FilterRule(action='block', interface="ge0")
a = models.Anchor('foo', [fr])
self.assertEqual(a.pf_rule, 'anchor foo {\nblock on ge0\n}\n')
class AddressBookTestCase(TestCase):
def test_entry(self):
ab = models.AddressBookEntry('foo', ['192.168.1.0/24'])
self.assertEqual(ab.name, 'foo')
self.assertEqual(ab.cidrs, [netaddr.IPNetwork('192.168.1.0/24')])
def test_pf_rule(self):
ab = models.AddressBookEntry('foo', ['192.168.1.0/24'])
self.assertEqual(ab.pf_rule, 'table <foo> {192.168.1.0/24}')
def test_external_pf_rule(self):
ab = models.AddressBookEntry('foo', ['192.168.1.0/24'])
self.assertEqual(ab.external_pf_rule('/etc'),
'table foo\npersist file "/etc/foo"')
def test_external_table_data(self):
ab = models.AddressBookEntry('foo', ['192.168.1.0/24',
'172.16.16.0/16'])
self.assertEqual(ab.external_table_data(),
'192.168.1.0/24\n172.16.16.0/16')
class AllocationTestCase(TestCase):
def test_allocation(self):
a = models.Allocation('aa:bb:cc:dd:ee:ff', 'hosta.com', '192.168.1.1')
self.assertEqual(a.lladdr, 'aa:bb:cc:dd:ee:ff')
self.assertEqual(a.hostname, 'hosta.com')
self.assertEqual(a.ip_address, '192.168.1.1')
class StaticRouteTestCase(TestCase):
def test_static_route(self):
sr = models.StaticRoute('0.0.0.0/0', '192.168.1.1')
self.assertEqual(sr.destination, netaddr.IPNetwork('0.0.0.0/0'))
self.assertEqual(sr.next_hop, netaddr.IPAddress('192.168.1.1'))

37
test/unit/test_utils.py Normal file
View File

@ -0,0 +1,37 @@
import json
from unittest import TestCase
import netaddr
from akanda.router import utils
class ModelSerializerTestCase(TestCase):
"""
"""
def test_default(self):
data = {
"a": [1, 2, 3],
"b": {"c": 4},
"d": "e",
"f": u"g",
"h": 42,
"i": float(3),
"j": False,
"k": None,
"l": (4, 5, 6),
"m": 12345671238792347L,
"n": netaddr.IPNetwork('192.168.1.1/24'),
}
expected = (
'{"a": [1, 2, 3], "b": {"c": 4}, "d": "e", "f": "g", '
'"i": 3.0, "h": 42, "k": null, "j": false, '
'"m": 12345671238792347, "l": [4, 5, 6], "n": "192.168.1.1/24"}')
serialized = json.dumps(data, cls=utils.ModelSerializer)
self.assertEqual(serialized, expected)
def test_default_with_set(self):
data = {"a": set([1, 2, 3])}
expected = '{"a": [1, 2, 3]}'
serialized = json.dumps(data, cls=utils.ModelSerializer)
self.assertEqual(serialized, expected)

7
test_requirements.txt Normal file
View File

@ -0,0 +1,7 @@
tox
unittest2
nose
coverage
mock>=0.8.0
pep8
pyflakes

26
tox.ini Normal file
View File

@ -0,0 +1,26 @@
[tox]
envlist = py26,py27,pep8,pyflakes
[testenv]
setenv = VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/test_requirements.txt
commands = nosetests {posargs}
sitepackages = True
[tox:jenkins]
[testenv:pep8]
deps = pep8
setuptools_git>=0.4
commands = pep8 --repeat --show-source --ignore=E125 --exclude=.venv,.tox,dist,doc,*egg .
[testenv:cover]
setenv = NOSE_WITH_COVERAGE=1
[testenv:venv]
commands = {posargs}
[testenv:pyflakes]
deps = pyflakes
commands = pyflakes akanda