Import python_neutron_pd_driver from old reviews

Change-Id: I49cf970eed3283a47a73bdadd58ba193a9c1063d
This commit is contained in:
Sam Betts 2015-06-26 11:25:26 +01:00
parent 3e192155a7
commit 5d3c39b1c0
17 changed files with 1468 additions and 2 deletions

View File

@ -0,0 +1,7 @@
[DEFAULT]
# Location to store the unix sockets used to communicate with
# the neutron PD service.
# pd_socket_loc = /tmp
# Interface for external communication to the prefix delegation server.
# pd_interface = br-ex

View File

@ -0,0 +1,4 @@
#! /bin/sh
cd .tox/$1/src/neutron
git fetch https://review.openstack.org/openstack/neutron refs/changes/77/185977/12 && git checkout FETCH_HEAD

View File

@ -0,0 +1,158 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import signal
import stat
import sys
from threading import Thread
from oslo_config import cfg
from oslo_log import log as logging
from neutron.agent.linux import pd_driver # noqa
from neutron.common import config as common_config
from python_neutron_pd_driver import config # noqa
from python_neutron_pd_driver import constants
from python_neutron_pd_driver import listener
from python_neutron_pd_driver import subnetpd
from python_neutron_pd_driver import utils
LOG = logging.getLogger(__name__)
SUBNET_INFO_FILEPATH = "%s/subnet_%s"
def notify_l3_agent(pid):
try:
os.kill(int(pid), signal.SIGHUP)
except Exception as e:
LOG.warn(_("Failed to send SIGNUP to %(pid)s: %(exc)s"),
{'pid': pid, 'exc': e})
class DHCPV6Agent(Thread):
def __init__(self):
super(DHCPV6Agent, self).__init__()
self.pd_clients = {}
try:
subnet_ids = os.listdir(cfg.CONF.pd_confs)
for subnet_id in subnet_ids:
if subnet_id[0:7] != "subnet_":
continue
try:
with open("%s/%s" % (cfg.CONF.pd_confs, subnet_id)) as f:
content = f.readlines()
if content:
self.enable(subnet_id[7:], content[0])
except IOError:
LOG.warn(_("Failed to read subnet %s info from system!"),
subnet_id)
except OSError:
LOG.warn(_("Failed to read existing subnet info from system: %s!"),
cfg.CONF.pd_confs)
listener.start()
utils.socket_delete(constants.CONTROL_PATH)
self.server = utils.socket_bind(constants.CONTROL_PATH)
os.chmod(cfg.CONF.pd_socket_loc + '/' +
constants.CONTROL_PATH, stat.S_IRWXO)
self.running = True
def processor(self, task):
task = task.split(',')
if task[0] == 'enable':
self.enable(task[1], task[2])
elif task[0] == 'disable':
self.disable(task[1])
elif task[0] == 'get':
prefix = self.get_prefix(task[1])
path = constants.RESP_PATH % task[2]
re = utils.socket_connect(path)
re.send(prefix)
def run(self):
while self.running:
task = self.server.recv(1024)
utils.new_daemon_thread(self.processor, (task,))
def stop(self):
self.running = False
self.server.shutdown(utils.socket.SHUT_RDWR)
self.server.close()
def _write_subnet_info_to_system(self, subnet_id, pid):
try:
subnet_list = open(
SUBNET_INFO_FILEPATH % (cfg.CONF.pd_confs, subnet_id), "w")
subnet_list.write(pid)
subnet_list.close()
except IOError:
LOG.warn(_("Failed to write subnet info to system!"))
def _delete_subnet_info_from_system(self, subnet_id):
if os.path.exists(
SUBNET_INFO_FILEPATH % (cfg.CONF.pd_confs, subnet_id)):
os.remove(
SUBNET_INFO_FILEPATH % (cfg.CONF.pd_confs, subnet_id))
def _get_subnet_pd_object(self, subnet_id):
subnet = self.pd_clients.get(subnet_id)
if not subnet:
LOG.warn(_("Prefix delegation not running for %(subnet_id)s"),
{'subnet_id': subnet_id})
return subnet
def enable(self, subnet_id, pid):
"""Enable/Start a PD client for a subnet"""
def respond():
notify_l3_agent(pid)
if subnet_id not in self.pd_clients:
conf = {'subnet_id': str(subnet_id), 'pd_update_cb': respond}
self.pd_clients[subnet_id] = subnetpd.SubnetPD(conf)
self._write_subnet_info_to_system(subnet_id, pid)
else:
respond()
LOG.debug(_("Prefix delegation already running for %(subnet_id)s"),
{'subnet_id': subnet_id})
def disable(self, subnet_id):
"""Get process back from existing process and kill the process"""
subnet = self._get_subnet_pd_object(subnet_id)
if subnet:
subnet.shutdown()
del self.pd_clients[subnet_id]
self._delete_subnet_info_from_system(subnet_id)
def get_prefix(self, subnet_id):
"""Get Prefix from PD client"""
subnet = self._get_subnet_pd_object(subnet_id)
if subnet:
return subnet.get()
return "NOT_RUNNING"
def main():
common_config.init(sys.argv[1:])
pd_agent = DHCPV6Agent()
pd_agent.start()
def signal_handler(signal, frame):
pd_agent.stop()
signal.signal(signal.SIGINT, signal_handler)
signal.pause()

View File

@ -0,0 +1,28 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
OPTS = [
cfg.StrOpt('pd_socket_loc',
default='/tmp',
help=_("Location for storing unix sockets for comunication"
"between L3 Agent and DHCPv6 Client")),
cfg.StrOpt('pd_interface',
default='',
help=_('Interface to bind to send/receive packets for DHCPv6')),
]
cfg.CONF.register_opts(OPTS)

View File

@ -0,0 +1,17 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
CONTROL_PATH = 'neutron-dhcpv6_pd_agent-control'
RESP_PATH = 'neutron-dhcpv6_pd_agent-%s'

View File

@ -0,0 +1,252 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import logging
import netaddr
import random
logging.basicConfig(level=logging.DEBUG)
LOG = logging.getLogger(__name__)
def gen_trid():
return random.randint(0x00, 0xffffff)
def bytes_to_int(byt):
return int(binascii.hexlify(byt), 16)
def int_to_bytes(num, padding=2):
return bytearray.fromhex(
'{num:0{padding}x}'.format(padding=padding, num=num))
def options_to_dict(data, pos=4):
options = {}
next_pos = pos
done = False
while not done:
option_id = bytes_to_int(data[next_pos: next_pos + 2])
next_pos = next_pos + 2
length = bytes_to_int(data[next_pos: next_pos + 2])
next_pos = next_pos + 2
if option_id not in options:
options[option_id] = []
options[option_id].append(data[next_pos: next_pos + length])
next_pos = next_pos + length
if next_pos >= len(data):
done = True
return options
def prefix_to_string(prefix):
pos = 0
string = binascii.hexlify(prefix[pos: pos + 2]).decode()
pos = pos + 2
while pos < len(prefix):
part = binascii.hexlify(prefix[pos: pos + 2]).decode()
string += ":%s" % part
pos = pos + 2
address = netaddr.IPAddress(string, 6)
return str(address)
class Packet(object):
def to_bytes(self):
return b''
def __str__(self):
return str(self.to_bytes())
def __repr__(self):
return repr(self.to_bytes())
class DHCPOption(Packet):
option_type = 0
def __init__(self, data=None):
self.data = b''
if data:
self._process_data(data)
def _process_data(self, data):
self.option_type = data[0:2]
self.data = data[2:]
def to_bytes(self):
packet = int_to_bytes(self.option_type, 4)
packet += int_to_bytes(len(self.data), 4)
packet += binascii.a2b_qp(self.data)
return packet
def gen_client_id(unique_id):
clientid = ClientIdentifier()
clientid.xargs['EnterpriseNum'] = 8888
clientid.xargs['EnterpriseID'] = unique_id
return clientid.to_bytes()
class ClientIdentifier(DHCPOption):
option_type = 1
def __init__(self, data=None):
super(ClientIdentifier, self).__init__(data)
self.duid_type = 2
self.duid = None
self.xargs = {}
def _process_data(self, data):
super(ClientIdentifier, self)._process_data(data)
self.duid = data[4:]
self.duid_type = bytes_to_int(data[4:6])
if self.duid_type == 2:
self.xargs['EnterpriseNum'] = bytes_to_int(data[6: 10])
self.xargs['EnterpriseID'] = data[10:]
def to_bytes(self):
self.data = int_to_bytes(self.duid_type, 4) # DUID Type
if self.duid_type == 2:
self.data += int_to_bytes(self.xargs['EnterpriseNum'], 8)
self.data += binascii.a2b_qp(
self.xargs['EnterpriseID']) # Enterprise ID
return super(ClientIdentifier, self).to_bytes()
class ServerIdentifier(DHCPOption):
option_type = 2
def __init__(self, data=None):
super(ServerIdentifier, self).__init__()
self.data = data
class OptionRequest(DHCPOption):
option_type = 6
def __init__(self, options=[23, 24], data=None):
super(OptionRequest, self).__init__(data)
self.options = options
def to_bytes(self):
self.data = b''
for op in self.options:
self.data += int_to_bytes(op, 4)
return super(OptionRequest, self).to_bytes()
class ElapsedTime(DHCPOption):
option_type = 8
def __init__(self, time=0, data=None):
super(ElapsedTime, self).__init__(data)
self.time = 0
def to_bytes(self):
if self.time > 0xffff:
self.time = 0xffff
self.data = int_to_bytes(self.time, 4)
return super(ElapsedTime, self).to_bytes()
class IAPDOption(DHCPOption):
option_type = 26
def __init__(self, data=None):
super(IAPDOption, self).__init__()
self.data = data
class IAPD(DHCPOption):
option_type = 25
def __init__(self, unique_id, pd_options=None, data=None):
super(IAPD, self).__init__(data)
self.pd_options = pd_options
self.unique_id = unique_id
def to_bytes(self):
self.data = b''
self.data += binascii.a2b_qp(''.join(self.unique_id.split('-'))[0:4])
self.data += int_to_bytes(3600, 8)
self.data += int_to_bytes(5400, 8)
if self.pd_options:
self.data += IAPDOption(self.pd_options).to_bytes()
return super(IAPD, self).to_bytes()
class DHCPMessage(Packet):
message_type = 0
def __init__(self, trid, unique_id, data=None):
self.trid = trid
self.unique_id = unique_id
if data:
self._process_data(data)
def _process_data(self, data):
pass
def to_bytes(self):
packet = int_to_bytes(self.message_type, 2)
packet += int_to_bytes(self.trid, 6)
packet += gen_client_id(self.unique_id)
packet += OptionRequest().to_bytes()
return packet
class Solicit(DHCPMessage):
message_type = 1
def to_bytes(self):
packet = super(Solicit, self).to_bytes()
packet += ElapsedTime().to_bytes()
packet += IAPD(self.unique_id).to_bytes()
return packet
class DHCPResponseMessage(DHCPMessage):
def __init__(self, trid, unique_id, serverid, pd_choice, data=None):
super(DHCPResponseMessage, self).__init__(trid, unique_id, data)
self.serverid = serverid
self.pd_choice = pd_choice
def to_bytes(self):
packet = super(DHCPResponseMessage, self).to_bytes()
packet += ServerIdentifier(self.serverid).to_bytes()
packet += IAPD(self.unique_id, self.pd_choice).to_bytes()
return packet
class Request(DHCPResponseMessage):
message_type = 3
class Renew(DHCPResponseMessage):
message_type = 5
class Release(DHCPResponseMessage):
message_type = 8
def to_bytes(self):
packet = super(Release, self).to_bytes()
packet += ElapsedTime().to_bytes()
return packet

View File

@ -0,0 +1,61 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import uuid
from oslo_log import log as logging
from neutron.agent.linux import pd_driver
from python_neutron_pd_driver import constants
from python_neutron_pd_driver import exceptions
from python_neutron_pd_driver import utils
LOG = logging.getLogger(__name__)
class PDDriver(pd_driver.PDDriverBase):
def __init__(self, router_id, subnet_id, ri_ifname):
super(PDDriver, self).__init__(router_id, subnet_id, ri_ifname)
self.l3_pid = os.getpid()
def _send_command(self, command, misc):
control_socket = utils.control_socket_connect()
control_socket.send('%s,%s,%s,' % (command, self.subnet_id, misc))
def enable(self, *args, **kwargs):
self._send_command('enable', self.l3_pid)
def disable(self, *args, **kwargs):
self._send_command('disable', self.l3_pid)
def get_prefix(self):
response_id = uuid.uuid4()
path = constants.RESP_PATH % response_id
sw = utils.socket_bind(path)
sw.settimeout(3)
self._send_command('get', response_id)
result = sw.recv(1024)
utils.socket_delete(path)
if result == "NOT_RUNNING":
raise exceptions.DHCPv6AgentException(
msg=("Prefix Delegation not running for %s" % self.subnet_id))
return result
@staticmethod
def get_sync_data():
return []

View File

@ -0,0 +1,20 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import exceptions
class DHCPv6AgentException(exceptions.NeutronException):
message = _("A DHCP v6 Client Exception occured: %(msg)s")

View File

@ -0,0 +1,87 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
try:
import Queue
except Exception:
import queue as Queue
import threading
import time
from python_neutron_pd_driver import socketv6
from python_neutron_pd_driver import utils
logging.basicConfig(level=logging.DEBUG)
LOG = logging.getLogger(__name__)
LISTENERS = []
RUNNING = False
SOCKET = None
def processor(data, sender, listeners):
for validate, queue in listeners:
if validate(data):
queue.put((data, sender))
def main_listener_thread():
while RUNNING:
data, sender = SOCKET.recvfrom(1024)
utils.new_daemon_thread(processor, (data, sender, LISTENERS))
class Listener(threading.Thread):
def __init__(self, validator, timetowait=2):
super(Listener, self).__init__()
self.validator = validator
self.timetowait = timetowait
self.result = Queue.Queue()
self.start()
def get(self, *args, **kwargs):
return self.result.get(*args, **kwargs)
def run(self):
packets = Queue.Queue()
LISTENERS.append((self.validator, packets))
time.sleep(self.timetowait)
LISTENERS.remove((self.validator, packets))
results = []
while True:
if packets.empty():
break
results.append(packets.get(False))
self.result.put(results)
def stop():
global RUNNING
RUNNING = False
if SOCKET:
SOCKET.shutdown(utils.socket.SHUT_RDWR)
SOCKET.close()
def start():
global RUNNING, SOCKET
if not RUNNING:
RUNNING = True
SOCKET = socketv6.socket_v6()
utils.new_daemon_thread(main_listener_thread)

View File

@ -0,0 +1,43 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import struct
from oslo_config import cfg
from oslo_log import log as logging
from python_neutron_pd_driver import config # noqa
LOG = logging.getLogger(__name__)
SO_BINDTODEVICE = 25
def socket_v6():
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, SO_BINDTODEVICE, cfg.CONF.pd_interface)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS,
struct.pack('@i', 1))
s.bind(('', 546))
return s
def send_packet(packet, address="ff02::1:2"):
s = socket_v6()
s.settimeout(3)
s.sendto(str(packet), (address, 547))

View File

@ -0,0 +1,200 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
try:
import Queue
except Exception:
import queue as Queue
from threading import Timer
import time
from oslo_log import log as logging
from python_neutron_pd_driver import dhcpv6
from python_neutron_pd_driver import listener
from python_neutron_pd_driver import socketv6
from python_neutron_pd_driver import utils
LOG = logging.getLogger(__name__)
class Server(object):
def __init__(self, serverid, address):
self.serverid = serverid
self.address = address[0]
class Prefix(object):
def __init__(self, ia_prefix):
self.pref_liftime = dhcpv6.bytes_to_int(ia_prefix[0:4])
self.valid_liftime = dhcpv6.bytes_to_int(ia_prefix[4:8])
self.pref_length = dhcpv6.bytes_to_int(ia_prefix[8:9])
self.prefix = ia_prefix[9: 9 + self.pref_length]
def __str__(self):
return ("%s/%s" %
(dhcpv6.prefix_to_string(self.prefix), self.pref_length))
def create_validator(ty, trid):
def validator(data):
if (data[0:1] == ty and
dhcpv6.bytes_to_int(data[1:4]) == trid):
return True
return False
return validator
class SubnetPD(object):
def __init__(self, conf):
super(SubnetPD, self).__init__()
self.subnet_id = conf['subnet_id']
self.ready = conf['pd_update_cb']
self.prefix = "::/64"
self.server = None
self.ias = None
self.renew_timer = None
self.setup()
def get(self):
return str(self.prefix)
def reset_renew_timer(self):
self.renew_timer = Timer(self.prefix.pref_liftime, self.renew_prefix)
self.renew_timer.start()
def process_REPLY(self, responses):
# Process reply
data, sender = responses[0]
options = dhcpv6.options_to_dict(data)
ia_pd = options[25][0]
# T1 = bytes_to_int(ia_pd[4:8])
# T2 = bytes_to_int(ia_pd[8:12])
ia_options = dhcpv6.options_to_dict(ia_pd, 12)
self.prefix = Prefix(ia_options[26][0])
def renew_prefix(self, pd_choice):
rn_trid = dhcpv6.gen_trid()
lis = listener.Listener(create_validator("\x07", rn_trid))
socketv6.send_packet(dhcpv6.Renew(
rn_trid, self.subnet_id, self.server.serverid, pd_choice),
self.server.address)
res = lis.get()
if not res:
raise Exception('RENEW: Failed to get valid REPLY...')
self.process_REPLY(res)
self.reset_renew_timer()
def _solicit(self):
sol_trid = dhcpv6.gen_trid()
res = None
retrys = 3
validator = create_validator("\x02", sol_trid)
while not res and retrys > 0:
lis = listener.Listener(validator, 5)
time.sleep(1)
socketv6.send_packet(dhcpv6.Solicit(sol_trid, self.subnet_id))
res = lis.get()
retrys = retrys - 1
if not res:
raise Exception('Failed to get valid Advertise after 3 retries...')
highest = -1
serverid = None
ia_pd = None
s_address = None
for data, sender in res:
options = dhcpv6.options_to_dict(data)
preference = dhcpv6.bytes_to_int(options.get(7, [b'\x00'])[0])
if preference <= highest:
continue
highest = preference
serverid = options[2][0]
ia_pd = options[25][0]
s_address = sender
ia_options = dhcpv6.options_to_dict(ia_pd, 12)
self.ias = ia_options[26]
self.server = Server(serverid, s_address)
def _request(self):
req_trid = dhcpv6.gen_trid()
res = None
retrys = 3
validator = create_validator("\x07", req_trid)
while not res and retrys > 0:
lis = listener.Listener(validator)
time.sleep(1)
socketv6.send_packet(dhcpv6.Request(
req_trid, self.subnet_id, self.server.serverid, self.ias[0]),
self.server.address)
res = lis.get()
retrys = retrys - 1
if not res:
raise Exception('Failed to get valid REPLY!')
self.process_REPLY(res)
def _release(self):
rel_trid = dhcpv6.gen_trid()
validator = create_validator("\x07", rel_trid)
lis = listener.Listener(validator)
socketv6.send_packet(dhcpv6.Release(
rel_trid, self.subnet_id, self.server.serverid, self.ias[0]),
self.server.address)
while True:
try:
res = lis.get(timeout=10)
break
except Queue.Empty:
socketv6.send_packet(
dhcpv6.release(rel_trid, self.server.serverid,
self.subnet_id, self.ias[0]),
self.server.address)
return res
def setup(self):
def processor():
LOG.debug(
"Starting new prefix delegation for %s...", self.subnet_id)
self._solicit()
self._request()
self.reset_renew_timer()
self.ready()
LOG.debug("Prefix delegation ready for %s...", self.subnet_id)
utils.new_daemon_thread(processor, None)
def shutdown(self):
try:
self.renew_timer.cancel()
self._release()
except Exception:
pass
LOG.debug("Ending prefix delegation for %s, bye!! First try!",
self.subnet_id)
return True

View File

@ -0,0 +1,522 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import signal
import socket
import struct
from oslo_config import cfg
from oslo_log import log
from neutron.agent.common import config as agent_config
from neutron.agent.linux import pd_driver
from neutron.common import config as base_config
from neutron.tests import base
from python_neutron_pd_driver import agent
from python_neutron_pd_driver import config # noqa
from python_neutron_pd_driver import constants
from python_neutron_pd_driver import dhcpv6
from python_neutron_pd_driver import driver
from python_neutron_pd_driver import exceptions
from python_neutron_pd_driver import listener
from python_neutron_pd_driver import socketv6
from python_neutron_pd_driver import utils
try:
import __builtin__
except Exception:
import builtins as __builtin__
OPTION_REQUEST = bytearray.fromhex('0006000400170018')
CLIENT_ID = bytearray.fromhex('0001000A0002000022b8') + b'fake'
SERVER_ID = b'\x00\x02\x00\x08' + b'serverid'
ELAPSED = bytearray.fromhex('000800020000')
IAPD = (bytearray.fromhex('0019000c') +
b'fake' + bytearray.fromhex('00000e1000001518'))
IAPD_WITH_OPTIONS = (
b'\x00\x19\x00\x18' + b'fake' +
b'\x00\x00\x0e\x10\x00\x00\x15\x18\x00\x1a\x00\x08' + b'pdoption')
class TestDHCPV6(base.BaseTestCase):
def test_gen_trid(self):
id1 = dhcpv6.gen_trid()
id2 = dhcpv6.gen_trid()
self.assertTrue(id1 <= 0xffffff)
self.assertTrue(id1 >= 0x00)
self.assertTrue(id2 <= 0xffffff)
self.assertTrue(id2 >= 0x00)
self.assertNotEqual(id1, id2)
def test_bytes_to_int(self):
results = dhcpv6.bytes_to_int(b'\x19')
self.assertEqual(results, 25)
def test_options_to_dict(self):
data = bytearray.fromhex(
"021008740019002927fe8f950000000000000000001"
"a00190000119400001c2040200100000000fe0000000000000000000001000e0"
"00100011c39cf88080027fe8f950002000e000100011c3825e8080027d410bb")
results = dhcpv6.options_to_dict(data)
self.assertTrue(1 in results)
self.assertTrue(25 in results)
self.assertTrue(2 in results)
def test_prefix_to_string(self):
data = (b'\x22\x22\x00\x00\x22\x22\x45\x76'
b'\x00\x00\x00\x00\x00\x00\x00\x00')
result = '2222:0:2222:4576::'
self.assertEqual(dhcpv6.prefix_to_string(data), result)
def test_DHCPOption(self):
data = b'fake'
option = dhcpv6.DHCPOption()
option.data = data
expected = b'\x00\x00\x00\x04'
expected += data
self.assertEqual(expected, option.to_bytes())
def test_ClientIdentifier_bytes(self):
clientid = dhcpv6.ClientIdentifier()
clientid.xargs['EnterpriseNum'] = 8888
clientid.xargs['EnterpriseID'] = b'fake'
self.assertEqual(CLIENT_ID, clientid.to_bytes())
def test_ServerIdentifier_bytes(self):
serverid = dhcpv6.ServerIdentifier(b'serverid')
self.assertEqual(SERVER_ID, serverid.to_bytes())
def test_OptionRequest_bytes(self):
optReq = dhcpv6.OptionRequest()
self.assertEqual(OPTION_REQUEST, optReq.to_bytes())
def test_ElapsedTime_bytes(self):
time = dhcpv6.ElapsedTime()
self.assertEqual(ELAPSED, time.to_bytes())
def test_IAPD_bytes(self):
ia_pd = dhcpv6.IAPD('fake-name')
self.assertEqual(IAPD, ia_pd.to_bytes())
def test_IAPD_with_options_bytes(self):
ia_pd = dhcpv6.IAPD('fake-name', 'pdoption')
self.assertEqual(IAPD_WITH_OPTIONS, ia_pd.to_bytes())
def _test_message_bytes(self, packet, expected_type, extra_args=[]):
msg = packet(9999, 'fake', *extra_args).to_bytes()
self.assertEqual(expected_type, msg[0:1])
self.assertEqual(dhcpv6.int_to_bytes(9999, 6), msg[1:4])
self.assertTrue(OPTION_REQUEST in msg)
self.assertTrue(CLIENT_ID in msg)
return msg
def test_DHCPMessage_bytes(self):
self._test_message_bytes(dhcpv6.DHCPMessage, b'\x00')
def test_Solicit_bytes(self):
sol = self._test_message_bytes(dhcpv6.Solicit, b'\x01')
self.assertTrue(ELAPSED in sol)
self.assertTrue(IAPD in sol)
def _test_response_message_bytes(self, packet, expected_type):
args = ['serverid', 'pdoption']
msg = self._test_message_bytes(packet, expected_type, args)
self.assertTrue(SERVER_ID in msg)
self.assertTrue(IAPD_WITH_OPTIONS in msg)
return msg
def test_DHCPResponseMessage_bytes(self):
self._test_response_message_bytes(dhcpv6.DHCPResponseMessage, b'\x00')
def test_Request_bytes(self):
self._test_response_message_bytes(dhcpv6.Request, b'\x03')
def test_Renew_bytes(self):
self._test_response_message_bytes(dhcpv6.Renew, b'\x05')
def test_Release_bytes(self):
msg = self._test_response_message_bytes(dhcpv6.Release, b'\x08')
self.assertTrue(ELAPSED in msg)
class TestDHCPv6API(base.BaseTestCase):
def setUp(self):
super(TestDHCPv6API, self).setUp()
mock.patch('os.getpid', return_value='12345').start()
mock.patch('uuid.uuid4', return_value='uuid').start()
self.pd_manager = driver.PDDriver("router", "subnet", "blah")
self.socket = mock.patch('socket.socket').start()
def _send_command(self):
self.pd_manager._send_command("command", "misc")
self.socket().send.assert_called_once_with('command,subnet,misc,')
def test_enable(self):
self.pd_manager.enable()
self.socket().send.assert_called_once_with('enable,subnet,12345,')
def test_disable(self):
self.pd_manager.disable()
self.socket().send.assert_called_once_with('disable,subnet,12345,')
def test_get_prefix(self):
self.socket().recv.return_value = "prefix"
prefix = self.pd_manager.get_prefix()
self.socket().send.assert_called_once_with('get,subnet,uuid,')
self.assertEqual(prefix, "prefix")
def test_get_prefix_error_handling(self):
self.socket().recv.return_value = "NOT_RUNNING"
self.assertRaises(
exceptions.DHCPv6AgentException, self.pd_manager.get_prefix)
self.socket().send.assert_called_once_with('get,subnet,uuid,')
def test_sync_data(self):
response = driver.PDDriver.get_sync_data()
self.assertEqual(response, [])
class TestAgent(base.BaseTestCase):
def setUp(self):
super(TestAgent, self).setUp()
self.subnetpd = mock.patch(
'python_neutron_pd_driver.subnetpd.SubnetPD').start()
self.listener = mock.patch(
'python_neutron_pd_driver.listener.start').start()
self.listener.isAlive.return_value = False
self.socket = mock.patch('socket.socket').start()
self.conf = agent_config.setup_conf()
cfg.CONF.set_override('state_path', "/tmp/neutron")
self.conf.set_override('state_path', "/tmp/neutron")
self.conf.register_opts(base_config.core_opts)
log.register_options(self.conf)
self.conf.register_opts(config.OPTS)
self.conf.register_opts(pd_driver.OPTS)
def test_notify_l3_agent(self):
with mock.patch('os.kill') as kill:
agent.notify_l3_agent(12345)
kill.assert_called_once_with(12345, signal.SIGHUP)
def test_notify_l3_agent_exception(self):
with mock.patch('os.kill') as kill:
kill.side_effect = Exception("BOOM!")
agent.notify_l3_agent(12345)
kill.assert_called_once_with(12345, signal.SIGHUP)
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
def test_DHCPv6Agent___init__(self, mock_open, mock_listdir,
mock_chmod, mock_remove):
subnet_ids = ["subnet_1", "subnet_2", "subnet_3", "subnet_4"]
mock_listdir.return_value = subnet_ids
dc = agent.DHCPV6Agent()
pd_clients_keys = dc.pd_clients.keys()
pd_clients_keys = sorted(pd_clients_keys)
self.assertEqual(['1', '2', '3', '4'], pd_clients_keys)
self.listener.assert_called_once_with()
self.socket().bind.assert_called_once_with(
self.conf.pd_socket_loc + "/" + constants.CONTROL_PATH)
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
def test_DHCPv6Agent_system_load_listdir_exception(
self, mock_open, mock_listdir, mock_chmod, mock_remove):
mock_listdir.side_effect = OSError()
dc = agent.DHCPV6Agent()
self.assertEqual({}, dc.pd_clients)
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
def test_DHCPv6Agent_system_load_open_exception(
self, mock_open, mock_listdir, mock_chmod, mock_remove):
subnet_ids = ["subnet_1", "subnet_2", "subnet_3", "subnet_4"]
mock_listdir.return_value = subnet_ids
def side_effect(*args, **kwargs):
if "subnet_4" in args[0]:
raise IOError()
return mock.MagicMock()
mock_open.side_effect = side_effect
dc = agent.DHCPV6Agent()
pd_clients_keys = dc.pd_clients.keys()
pd_clients_keys = sorted(pd_clients_keys)
self.assertEqual(['1', '2', '3'], pd_clients_keys)
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
def test_DHCPv6Agent_processor_enable(
self, mock_open, mock_listdir, mock_chmod, mock_remove):
dc = agent.DHCPV6Agent()
dc.processor('enable,1,12345')
pd_clients_keys = dc.pd_clients.keys()
pd_clients_keys = sorted(pd_clients_keys)
self.assertEqual(['1'], pd_clients_keys)
mock_open.assert_called_once_with(
"%s/subnet_%s" % (self.conf.pd_confs, 1), 'w')
mock_open().write.assert_called_once_with('12345')
mock_open().close.assert_called_once_with()
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
@mock.patch('os.path.exists')
def test_DHCPv6Agent_processor_disable(
self, mock_exists, mock_open, mock_listdir, mock_chmod,
mock_remove):
dc = agent.DHCPV6Agent()
subnetpd = mock.Mock()
dc.pd_clients = {'1': subnetpd}
dc.processor('disable,1,12345')
pd_clients_keys = dc.pd_clients.keys()
pd_clients_keys = sorted(pd_clients_keys)
self.assertEqual([], pd_clients_keys)
subnetpd.shutdown.assert_called_once_with()
mock_exists.return_value = True
mock_remove.assert_called_with(
"%s/subnet_%s" % (self.conf.pd_confs, 1))
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
def test_DHCPv6Agent_processor_get(
self, mock_open, mock_listdir, mock_chmod, mock_remove):
dc = agent.DHCPV6Agent()
subnetpd = mock.Mock()
subnetpd.get.return_value = "prefix"
dc.pd_clients = {'1': subnetpd}
dc.processor('get,1,12345')
pd_clients_keys = dc.pd_clients.keys()
pd_clients_keys = sorted(pd_clients_keys)
subnetpd.get.assert_called_once_with()
self.socket().connect.assert_called_once_with(
self.conf.pd_socket_loc + "/" + constants.RESP_PATH % '12345')
self.socket().send.assert_called_once_with("prefix")
class TestUtils(base.BaseTestCase):
def setUp(self):
super(TestUtils, self).setUp()
self.socket = mock.patch('socket.socket').start()
self.conf = agent_config.setup_conf()
self.conf.register_opts(base_config.core_opts)
log.register_options(self.conf)
self.conf.register_opts(config.OPTS)
self.conf.register_opts(pd_driver.OPTS)
def test_socket_connect(self):
s = utils.socket_connect("hello")
self.socket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
s.connect.assert_called_once_with(
self.conf.pd_socket_loc + "/" + "hello")
def test_socket_bind(self):
s = utils.socket_bind("hello")
self.socket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
s.bind.assert_called_once_with(
self.conf.pd_socket_loc + "/" + "hello")
def test_control_socket_connect(self):
s = utils.control_socket_connect()
self.socket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
s.connect.assert_called_once_with(
self.conf.pd_socket_loc + "/" + constants.CONTROL_PATH)
@mock.patch('os.remove')
@mock.patch('os.path.exists')
def test_socket_delete(self, mock_exists, mock_remove):
utils.socket_delete("hello")
mock_exists.return_value = True
mock_remove.assert_called_once_with(
self.conf.pd_socket_loc + "/" + "hello")
@mock.patch('threading.Thread')
def test_new_daemon_thread(self, mock_thread):
func = mock.Mock()
utils.new_daemon_thread(func)
mock_thread.assert_called_once_with(target=func, args=())
th = mock_thread()
self.assertTrue(th.daemon)
th.start.assert_called_once_with()
@mock.patch('threading.Thread')
def test_new_daemon_thread_with_args(self, mock_thread):
func = mock.Mock()
arg1 = mock.Mock()
utils.new_daemon_thread(func, (arg1,))
mock_thread.assert_called_once_with(target=func, args=(arg1,))
th = mock_thread()
self.assertTrue(th.daemon)
th.start.assert_called_once_with()
class TestSubnetPD(base.BaseTestCase):
pass
class TestSocketV6(base.BaseTestCase):
def setUp(self):
super(TestSocketV6, self).setUp()
self.socket = mock.patch('socket.socket').start()
self.conf = agent_config.setup_conf()
self.conf.register_opts(base_config.core_opts)
log.register_options(self.conf)
self.conf.register_opts(config.OPTS)
self.conf.register_opts(pd_driver.OPTS)
def test_socket_v6(self):
sock = socketv6.socket_v6()
self.socket.assert_called_once_with(socket.AF_INET6, socket.SOCK_DGRAM)
sock.setsockopt.assert_any_call(socket.SOL_SOCKET,
socketv6.SO_BINDTODEVICE,
self.conf.pd_interface)
sock.setsockopt.assert_any_call(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
sock.setsockopt.assert_any_call(socket.SOL_SOCKET,
socket.SO_BROADCAST, 1)
sock.setsockopt.assert_any_call(socket.IPPROTO_IPV6,
socket.IPV6_MULTICAST_HOPS,
struct.pack('@i', 1))
sock.bind.assert_called_once_with(('', 546))
@mock.patch('python_neutron_pd_driver.socketv6.socket_v6')
def test_send_packet(self, mock_socketv6):
socketv6.send_packet("hello")
mock_socketv6.assert_called_once_with()
mock_socketv6().settimeout.assert_called_once_with(3)
mock_socketv6().sendto.assert_called_once_with(
"hello", ("ff02::1:2", 547))
@mock.patch('python_neutron_pd_driver.socketv6.socket_v6')
def test_send_packet_with_non_string(self, mock_socketv6):
def str(s):
return "hello"
pack = mock.Mock()
pack.__str__ = str
socketv6.send_packet(pack)
mock_socketv6.assert_called_once_with()
mock_socketv6().settimeout.assert_called_once_with(3)
mock_socketv6().sendto.assert_called_once_with(
"hello", ("ff02::1:2", 547))
@mock.patch('python_neutron_pd_driver.socketv6.socket_v6')
def test_send_packet_with_address(self, mock_socketv6):
socketv6.send_packet("hello", address="address")
mock_socketv6.assert_called_once_with()
mock_socketv6().settimeout.assert_called_once_with(3)
mock_socketv6().sendto.assert_called_once_with(
"hello", ("address", 547))
class TestListener(base.BaseTestCase):
def test_processor(self):
def validator1(data):
if data == "good1":
return True
return False
def validator2(data):
if data == "good2":
return True
return False
def validator3(data):
if data == "good3":
return True
return False
queue1 = mock.Mock()
queue2 = mock.Mock()
queue3 = mock.Mock()
queue4 = mock.Mock()
listeners = [(validator1, queue1), (validator1, queue2),
(validator2, queue3), (validator3, queue4)]
listener.processor("good1", "sender1", listeners)
listener.processor("good2", "sender2", listeners)
listener.processor("good2", "sender3", listeners)
queue1.put.assert_called_once_with(("good1", "sender1"))
queue2.put.assert_called_once_with(("good1", "sender1"))
queue3.put.assert_any_call(("good2", "sender2"))
queue3.put.assert_any_call(("good2", "sender3"))
self.assertFalse(queue4.put.called)
@mock.patch('python_neutron_pd_driver.socketv6.socket_v6')
@mock.patch('python_neutron_pd_driver.utils.new_daemon_thread')
@mock.patch('python_neutron_pd_driver.listener.SOCKET')
def test_start_not_running(self, mock_SOC, mock_daemon, mock_sock):
listener.RUNNING = False
listener.start()
self.assertEqual(mock_sock(), listener.SOCKET)
self.assertTrue(listener.RUNNING)
mock_daemon.assert_called_once_with(listener.main_listener_thread)
@mock.patch('python_neutron_pd_driver.socketv6.socket_v6')
@mock.patch('python_neutron_pd_driver.utils.new_daemon_thread')
@mock.patch('python_neutron_pd_driver.listener.SOCKET')
def test_start_already_running(self, mock_SOC, mock_daemon, mock_sock):
listener.RUNNING = True
mock_sock.return_value = "hello"
old = listener.SOCKET
listener.start()
self.assertEqual(listener.SOCKET, old)
self.assertFalse(mock_daemon.called)
def test_stop_running(self):
listener.SOCKET = mock.Mock()
listener.RUNNING = True
listener.stop()
listener.SOCKET.shutdown.assert_called_once_with(socket.SHUT_RDWR)
listener.SOCKET.close.assert_called_once_with()
self.assertFalse(listener.RUNNING)
def test_stop_not_running(self):
listener.SOCKET = mock.Mock()
listener.RUNNING = False
listener.stop()
listener.SOCKET.shutdown.assert_called_once_with(socket.SHUT_RDWR)
listener.SOCKET.close.assert_called_once_with()
self.assertFalse(listener.RUNNING)
def test_stop_no_socket(self):
listener.SOCKET = None
listener.RUNNING = False
listener.stop()
self.assertFalse(listener.RUNNING)

View File

@ -0,0 +1,51 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import socket
import threading
from oslo_config import cfg
from python_neutron_pd_driver import config # noqa
from python_neutron_pd_driver import constants
def socket_connect(path):
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.connect(cfg.CONF.pd_socket_loc + "/" + path)
return s
def socket_bind(path):
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.bind(cfg.CONF.pd_socket_loc + "/" + path)
return s
def control_socket_connect():
return socket_connect(constants.CONTROL_PATH)
def socket_delete(path):
if os.path.exists(cfg.CONF.pd_socket_loc + "/" + path):
os.remove(cfg.CONF.pd_socket_loc + "/" + path)
def new_daemon_thread(target, params=None):
th = threading.Thread(target=target, args=params or ())
th.daemon = True
th.start()
return th

View File

@ -4,3 +4,5 @@
pbr>=0.6,!=0.7,<1.0
Babel>=1.3
oslo.config>=1.11.0 # Apache-2.0
oslo.log>=1.2.0 # Apache-2.0

View File

@ -20,6 +20,12 @@ classifier =
Programming Language :: Python :: 3.3
Programming Language :: Python :: 3.4
[entry_points]
console_scripts =
neutron-pd-agent = python_neutron_pd_driver.agent:main
neutron.agent.linux.pd_drivers =
neutron_pd_agent = python_neutron_pd_driver.driver:PDDriver
[files]
packages =
python_neutron_pd_driver

View File

@ -13,3 +13,5 @@ oslotest>=1.2.0 # Apache-2.0
testrepository>=0.0.18
testscenarios>=0.4
testtools>=0.9.36,!=1.2.0
-e git+git://git.openstack.org/openstack/neutron.git#egg=neutron

10
tox.ini
View File

@ -1,6 +1,6 @@
[tox]
minversion = 1.6
envlist = py33,py34,py26,py27,pypy,pep8
envlist = docs,py34,py27,pep8
skipsdist = True
[testenv]
@ -10,7 +10,13 @@ setenv =
VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py test --slowest --testr-args='{posargs}'
commands =
{toxinidir}/install_dependant_neutron_commit {envname}
python setup.py test --slowest --testr-args='{posargs}'
[testenv:py34]
commands = python -m testtools.run \
python_neutron_pd_driver.tests.test_dhcpv6_pd_agent
[testenv:pep8]
commands = flake8