Retire the project

Change-Id: I2b44230be9b5d7395de35a0113e8641973abead9
Depends-on: I77099ba826b8c7d28379a823b4dc74aa65e653d8
This commit is contained in:
Armando Migliaccio 2016-10-19 15:49:18 -07:00
parent 5d3c39b1c0
commit ef9ce481d9
40 changed files with 11 additions and 2092 deletions

View File

@ -1,7 +0,0 @@
[run]
branch = True
source = python_neutron_pd_driver
omit = python_neutron_pd_driver/openstack/*
[report]
ignore-errors = True

54
.gitignore vendored
View File

@ -1,54 +0,0 @@
*.py[cod]
# C extensions
*.so
# Packages
*.egg
*.egg-info
dist
build
.eggs
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg
lib
lib64
# Installer logs
pip-log.txt
# Unit test / coverage reports
.coverage
.tox
nosetests.xml
.testrepository
.venv
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# Complexity
output/*.html
output/*/index.html
# Sphinx
doc/build
# pbr generates these
AUTHORS
ChangeLog
# Editors
*~
.*.swp
.*sw?

View File

@ -1,4 +0,0 @@
[gerrit]
host=review.openstack.org
port=29418
project=openstack/python-neutron-pd-driver.git

View File

@ -1,3 +0,0 @@
# Format is:
# <preferred e-mail> <other e-mail 1>
# <preferred e-mail> <other e-mail 2>

View File

@ -1,7 +0,0 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -1,17 +0,0 @@
If you would like to contribute to the development of OpenStack, you must
follow the steps in this page:
http://docs.openstack.org/infra/manual/developers.html
If you already have a good understanding of how the system works and your
OpenStack accounts are set up, you can skip to the development workflow
section of this documentation to learn how changes to OpenStack should be
submitted for review via the Gerrit tool:
http://docs.openstack.org/infra/manual/developers.html#development-workflow
Pull requests submitted through GitHub will be ignored.
Bugs should be filed on Launchpad, not GitHub:
https://bugs.launchpad.net/python-neutron-pd-driver

View File

@ -1,4 +0,0 @@
python-neutron-pd-driver Style Commandments
===============================================
Read the OpenStack Style Commandments http://docs.openstack.org/developer/hacking/

176
LICENSE
View File

@ -1,176 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.

View File

@ -1,6 +0,0 @@
include AUTHORS
include ChangeLog
exclude .gitignore
exclude .gitreview
global-exclude *.pyc

View File

@ -1,15 +1,14 @@
===============================
python-neutron-pd-driver
===============================
This project is no longer maintained.
A prefix delegation driver written using pure python with no dependencies for use with OpenStack Neutron.
The contents of this repository are still available in the Git
source code management system. To see the contents of this
repository before it reached its end of life, please check out the
previous commit with "git checkout HEAD^1".
* Free software: Apache license
* Documentation: http://docs.openstack.org/developer/python-neutron-pd-driver
* Source: http://git.openstack.org/cgit/openstack/python-neutron-pd-driver
* Bugs: http://bugs.launchpad.net/python-neutron-pd-driver
(Optional:)
For an alternative project, please see Neutron and in-tree dibbler
driver.
Features
--------
* TODO
For any further questions, please email
openstack-dev@lists.openstack.org or join #openstack-dev on
Freenode.

View File

@ -1,2 +0,0 @@
[python: **.py]

View File

@ -1,75 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
sys.path.insert(0, os.path.abspath('../..'))
# -- General configuration ----------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = [
'sphinx.ext.autodoc',
#'sphinx.ext.intersphinx',
'oslosphinx'
]
# autodoc generation is a bit aggressive and a nuisance when doing heavy
# text edit cycles.
# execute "export SPHINX_DEBUG=1" in your terminal to disable
# The suffix of source filenames.
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'python-neutron-pd-driver'
copyright = u'2013, OpenStack Foundation'
# If true, '()' will be appended to :func: etc. cross-reference text.
add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
add_module_names = True
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# -- Options for HTML output --------------------------------------------------
# The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'.
# html_theme_path = ["."]
# html_theme = '_theme'
# html_static_path = ['static']
# Output file base name for HTML help builder.
htmlhelp_basename = '%sdoc' % project
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass
# [howto/manual]).
latex_documents = [
('index',
'%s.tex' % project,
u'%s Documentation' % project,
u'OpenStack Foundation', 'manual'),
]
# Example configuration for intersphinx: refer to the Python standard library.
#intersphinx_mapping = {'http://docs.python.org/': None}

View File

@ -1,4 +0,0 @@
============
Contributing
============
.. include:: ../../CONTRIBUTING.rst

View File

@ -1,25 +0,0 @@
.. python-neutron-pd-driver documentation master file, created by
sphinx-quickstart on Tue Jul 9 22:26:36 2013.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to python-neutron-pd-driver's documentation!
========================================================
Contents:
.. toctree::
:maxdepth: 2
readme
installation
usage
contributing
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

View File

@ -1,12 +0,0 @@
============
Installation
============
At the command line::
$ pip install python-neutron-pd-driver
Or, if you have virtualenvwrapper installed::
$ mkvirtualenv python-neutron-pd-driver
$ pip install python-neutron-pd-driver

View File

@ -1 +0,0 @@
.. include:: ../../README.rst

View File

@ -1,7 +0,0 @@
========
Usage
========
To use python-neutron-pd-driver in a project::
import python_neutron_pd_driver

View File

@ -1,7 +0,0 @@
[DEFAULT]
# Location to store the unix sockets used to communicate with
# the neutron PD service.
# pd_socket_loc = /tmp
# Interface for external communication to the prefix delegation server.
# pd_interface = br-ex

View File

@ -1,4 +0,0 @@
#! /bin/sh
cd .tox/$1/src/neutron
git fetch https://review.openstack.org/openstack/neutron refs/changes/77/185977/12 && git checkout FETCH_HEAD

View File

@ -1,6 +0,0 @@
[DEFAULT]
# The list of modules to copy from oslo-incubator.git
# The base module to hold the copy of openstack.common
base=python_neutron_pd_driver

View File

@ -1,19 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pbr.version
__version__ = pbr.version.VersionInfo(
'python_neutron_pd_driver').version_string()

View File

@ -1,158 +0,0 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import signal
import stat
import sys
from threading import Thread
from oslo_config import cfg
from oslo_log import log as logging
from neutron.agent.linux import pd_driver # noqa
from neutron.common import config as common_config
from python_neutron_pd_driver import config # noqa
from python_neutron_pd_driver import constants
from python_neutron_pd_driver import listener
from python_neutron_pd_driver import subnetpd
from python_neutron_pd_driver import utils
LOG = logging.getLogger(__name__)
SUBNET_INFO_FILEPATH = "%s/subnet_%s"
def notify_l3_agent(pid):
try:
os.kill(int(pid), signal.SIGHUP)
except Exception as e:
LOG.warn(_("Failed to send SIGNUP to %(pid)s: %(exc)s"),
{'pid': pid, 'exc': e})
class DHCPV6Agent(Thread):
def __init__(self):
super(DHCPV6Agent, self).__init__()
self.pd_clients = {}
try:
subnet_ids = os.listdir(cfg.CONF.pd_confs)
for subnet_id in subnet_ids:
if subnet_id[0:7] != "subnet_":
continue
try:
with open("%s/%s" % (cfg.CONF.pd_confs, subnet_id)) as f:
content = f.readlines()
if content:
self.enable(subnet_id[7:], content[0])
except IOError:
LOG.warn(_("Failed to read subnet %s info from system!"),
subnet_id)
except OSError:
LOG.warn(_("Failed to read existing subnet info from system: %s!"),
cfg.CONF.pd_confs)
listener.start()
utils.socket_delete(constants.CONTROL_PATH)
self.server = utils.socket_bind(constants.CONTROL_PATH)
os.chmod(cfg.CONF.pd_socket_loc + '/' +
constants.CONTROL_PATH, stat.S_IRWXO)
self.running = True
def processor(self, task):
task = task.split(',')
if task[0] == 'enable':
self.enable(task[1], task[2])
elif task[0] == 'disable':
self.disable(task[1])
elif task[0] == 'get':
prefix = self.get_prefix(task[1])
path = constants.RESP_PATH % task[2]
re = utils.socket_connect(path)
re.send(prefix)
def run(self):
while self.running:
task = self.server.recv(1024)
utils.new_daemon_thread(self.processor, (task,))
def stop(self):
self.running = False
self.server.shutdown(utils.socket.SHUT_RDWR)
self.server.close()
def _write_subnet_info_to_system(self, subnet_id, pid):
try:
subnet_list = open(
SUBNET_INFO_FILEPATH % (cfg.CONF.pd_confs, subnet_id), "w")
subnet_list.write(pid)
subnet_list.close()
except IOError:
LOG.warn(_("Failed to write subnet info to system!"))
def _delete_subnet_info_from_system(self, subnet_id):
if os.path.exists(
SUBNET_INFO_FILEPATH % (cfg.CONF.pd_confs, subnet_id)):
os.remove(
SUBNET_INFO_FILEPATH % (cfg.CONF.pd_confs, subnet_id))
def _get_subnet_pd_object(self, subnet_id):
subnet = self.pd_clients.get(subnet_id)
if not subnet:
LOG.warn(_("Prefix delegation not running for %(subnet_id)s"),
{'subnet_id': subnet_id})
return subnet
def enable(self, subnet_id, pid):
"""Enable/Start a PD client for a subnet"""
def respond():
notify_l3_agent(pid)
if subnet_id not in self.pd_clients:
conf = {'subnet_id': str(subnet_id), 'pd_update_cb': respond}
self.pd_clients[subnet_id] = subnetpd.SubnetPD(conf)
self._write_subnet_info_to_system(subnet_id, pid)
else:
respond()
LOG.debug(_("Prefix delegation already running for %(subnet_id)s"),
{'subnet_id': subnet_id})
def disable(self, subnet_id):
"""Get process back from existing process and kill the process"""
subnet = self._get_subnet_pd_object(subnet_id)
if subnet:
subnet.shutdown()
del self.pd_clients[subnet_id]
self._delete_subnet_info_from_system(subnet_id)
def get_prefix(self, subnet_id):
"""Get Prefix from PD client"""
subnet = self._get_subnet_pd_object(subnet_id)
if subnet:
return subnet.get()
return "NOT_RUNNING"
def main():
common_config.init(sys.argv[1:])
pd_agent = DHCPV6Agent()
pd_agent.start()
def signal_handler(signal, frame):
pd_agent.stop()
signal.signal(signal.SIGINT, signal_handler)
signal.pause()

View File

@ -1,28 +0,0 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
OPTS = [
cfg.StrOpt('pd_socket_loc',
default='/tmp',
help=_("Location for storing unix sockets for comunication"
"between L3 Agent and DHCPv6 Client")),
cfg.StrOpt('pd_interface',
default='',
help=_('Interface to bind to send/receive packets for DHCPv6')),
]
cfg.CONF.register_opts(OPTS)

View File

@ -1,17 +0,0 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
CONTROL_PATH = 'neutron-dhcpv6_pd_agent-control'
RESP_PATH = 'neutron-dhcpv6_pd_agent-%s'

View File

@ -1,252 +0,0 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import logging
import netaddr
import random
logging.basicConfig(level=logging.DEBUG)
LOG = logging.getLogger(__name__)
def gen_trid():
return random.randint(0x00, 0xffffff)
def bytes_to_int(byt):
return int(binascii.hexlify(byt), 16)
def int_to_bytes(num, padding=2):
return bytearray.fromhex(
'{num:0{padding}x}'.format(padding=padding, num=num))
def options_to_dict(data, pos=4):
options = {}
next_pos = pos
done = False
while not done:
option_id = bytes_to_int(data[next_pos: next_pos + 2])
next_pos = next_pos + 2
length = bytes_to_int(data[next_pos: next_pos + 2])
next_pos = next_pos + 2
if option_id not in options:
options[option_id] = []
options[option_id].append(data[next_pos: next_pos + length])
next_pos = next_pos + length
if next_pos >= len(data):
done = True
return options
def prefix_to_string(prefix):
pos = 0
string = binascii.hexlify(prefix[pos: pos + 2]).decode()
pos = pos + 2
while pos < len(prefix):
part = binascii.hexlify(prefix[pos: pos + 2]).decode()
string += ":%s" % part
pos = pos + 2
address = netaddr.IPAddress(string, 6)
return str(address)
class Packet(object):
def to_bytes(self):
return b''
def __str__(self):
return str(self.to_bytes())
def __repr__(self):
return repr(self.to_bytes())
class DHCPOption(Packet):
option_type = 0
def __init__(self, data=None):
self.data = b''
if data:
self._process_data(data)
def _process_data(self, data):
self.option_type = data[0:2]
self.data = data[2:]
def to_bytes(self):
packet = int_to_bytes(self.option_type, 4)
packet += int_to_bytes(len(self.data), 4)
packet += binascii.a2b_qp(self.data)
return packet
def gen_client_id(unique_id):
clientid = ClientIdentifier()
clientid.xargs['EnterpriseNum'] = 8888
clientid.xargs['EnterpriseID'] = unique_id
return clientid.to_bytes()
class ClientIdentifier(DHCPOption):
option_type = 1
def __init__(self, data=None):
super(ClientIdentifier, self).__init__(data)
self.duid_type = 2
self.duid = None
self.xargs = {}
def _process_data(self, data):
super(ClientIdentifier, self)._process_data(data)
self.duid = data[4:]
self.duid_type = bytes_to_int(data[4:6])
if self.duid_type == 2:
self.xargs['EnterpriseNum'] = bytes_to_int(data[6: 10])
self.xargs['EnterpriseID'] = data[10:]
def to_bytes(self):
self.data = int_to_bytes(self.duid_type, 4) # DUID Type
if self.duid_type == 2:
self.data += int_to_bytes(self.xargs['EnterpriseNum'], 8)
self.data += binascii.a2b_qp(
self.xargs['EnterpriseID']) # Enterprise ID
return super(ClientIdentifier, self).to_bytes()
class ServerIdentifier(DHCPOption):
option_type = 2
def __init__(self, data=None):
super(ServerIdentifier, self).__init__()
self.data = data
class OptionRequest(DHCPOption):
option_type = 6
def __init__(self, options=[23, 24], data=None):
super(OptionRequest, self).__init__(data)
self.options = options
def to_bytes(self):
self.data = b''
for op in self.options:
self.data += int_to_bytes(op, 4)
return super(OptionRequest, self).to_bytes()
class ElapsedTime(DHCPOption):
option_type = 8
def __init__(self, time=0, data=None):
super(ElapsedTime, self).__init__(data)
self.time = 0
def to_bytes(self):
if self.time > 0xffff:
self.time = 0xffff
self.data = int_to_bytes(self.time, 4)
return super(ElapsedTime, self).to_bytes()
class IAPDOption(DHCPOption):
option_type = 26
def __init__(self, data=None):
super(IAPDOption, self).__init__()
self.data = data
class IAPD(DHCPOption):
option_type = 25
def __init__(self, unique_id, pd_options=None, data=None):
super(IAPD, self).__init__(data)
self.pd_options = pd_options
self.unique_id = unique_id
def to_bytes(self):
self.data = b''
self.data += binascii.a2b_qp(''.join(self.unique_id.split('-'))[0:4])
self.data += int_to_bytes(3600, 8)
self.data += int_to_bytes(5400, 8)
if self.pd_options:
self.data += IAPDOption(self.pd_options).to_bytes()
return super(IAPD, self).to_bytes()
class DHCPMessage(Packet):
message_type = 0
def __init__(self, trid, unique_id, data=None):
self.trid = trid
self.unique_id = unique_id
if data:
self._process_data(data)
def _process_data(self, data):
pass
def to_bytes(self):
packet = int_to_bytes(self.message_type, 2)
packet += int_to_bytes(self.trid, 6)
packet += gen_client_id(self.unique_id)
packet += OptionRequest().to_bytes()
return packet
class Solicit(DHCPMessage):
message_type = 1
def to_bytes(self):
packet = super(Solicit, self).to_bytes()
packet += ElapsedTime().to_bytes()
packet += IAPD(self.unique_id).to_bytes()
return packet
class DHCPResponseMessage(DHCPMessage):
def __init__(self, trid, unique_id, serverid, pd_choice, data=None):
super(DHCPResponseMessage, self).__init__(trid, unique_id, data)
self.serverid = serverid
self.pd_choice = pd_choice
def to_bytes(self):
packet = super(DHCPResponseMessage, self).to_bytes()
packet += ServerIdentifier(self.serverid).to_bytes()
packet += IAPD(self.unique_id, self.pd_choice).to_bytes()
return packet
class Request(DHCPResponseMessage):
message_type = 3
class Renew(DHCPResponseMessage):
message_type = 5
class Release(DHCPResponseMessage):
message_type = 8
def to_bytes(self):
packet = super(Release, self).to_bytes()
packet += ElapsedTime().to_bytes()
return packet

View File

@ -1,61 +0,0 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import uuid
from oslo_log import log as logging
from neutron.agent.linux import pd_driver
from python_neutron_pd_driver import constants
from python_neutron_pd_driver import exceptions
from python_neutron_pd_driver import utils
LOG = logging.getLogger(__name__)
class PDDriver(pd_driver.PDDriverBase):
def __init__(self, router_id, subnet_id, ri_ifname):
super(PDDriver, self).__init__(router_id, subnet_id, ri_ifname)
self.l3_pid = os.getpid()
def _send_command(self, command, misc):
control_socket = utils.control_socket_connect()
control_socket.send('%s,%s,%s,' % (command, self.subnet_id, misc))
def enable(self, *args, **kwargs):
self._send_command('enable', self.l3_pid)
def disable(self, *args, **kwargs):
self._send_command('disable', self.l3_pid)
def get_prefix(self):
response_id = uuid.uuid4()
path = constants.RESP_PATH % response_id
sw = utils.socket_bind(path)
sw.settimeout(3)
self._send_command('get', response_id)
result = sw.recv(1024)
utils.socket_delete(path)
if result == "NOT_RUNNING":
raise exceptions.DHCPv6AgentException(
msg=("Prefix Delegation not running for %s" % self.subnet_id))
return result
@staticmethod
def get_sync_data():
return []

View File

@ -1,20 +0,0 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import exceptions
class DHCPv6AgentException(exceptions.NeutronException):
message = _("A DHCP v6 Client Exception occured: %(msg)s")

View File

@ -1,87 +0,0 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
try:
import Queue
except Exception:
import queue as Queue
import threading
import time
from python_neutron_pd_driver import socketv6
from python_neutron_pd_driver import utils
logging.basicConfig(level=logging.DEBUG)
LOG = logging.getLogger(__name__)
LISTENERS = []
RUNNING = False
SOCKET = None
def processor(data, sender, listeners):
for validate, queue in listeners:
if validate(data):
queue.put((data, sender))
def main_listener_thread():
while RUNNING:
data, sender = SOCKET.recvfrom(1024)
utils.new_daemon_thread(processor, (data, sender, LISTENERS))
class Listener(threading.Thread):
def __init__(self, validator, timetowait=2):
super(Listener, self).__init__()
self.validator = validator
self.timetowait = timetowait
self.result = Queue.Queue()
self.start()
def get(self, *args, **kwargs):
return self.result.get(*args, **kwargs)
def run(self):
packets = Queue.Queue()
LISTENERS.append((self.validator, packets))
time.sleep(self.timetowait)
LISTENERS.remove((self.validator, packets))
results = []
while True:
if packets.empty():
break
results.append(packets.get(False))
self.result.put(results)
def stop():
global RUNNING
RUNNING = False
if SOCKET:
SOCKET.shutdown(utils.socket.SHUT_RDWR)
SOCKET.close()
def start():
global RUNNING, SOCKET
if not RUNNING:
RUNNING = True
SOCKET = socketv6.socket_v6()
utils.new_daemon_thread(main_listener_thread)

View File

@ -1,43 +0,0 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import struct
from oslo_config import cfg
from oslo_log import log as logging
from python_neutron_pd_driver import config # noqa
LOG = logging.getLogger(__name__)
SO_BINDTODEVICE = 25
def socket_v6():
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, SO_BINDTODEVICE, cfg.CONF.pd_interface)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS,
struct.pack('@i', 1))
s.bind(('', 546))
return s
def send_packet(packet, address="ff02::1:2"):
s = socket_v6()
s.settimeout(3)
s.sendto(str(packet), (address, 547))

View File

@ -1,200 +0,0 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
try:
import Queue
except Exception:
import queue as Queue
from threading import Timer
import time
from oslo_log import log as logging
from python_neutron_pd_driver import dhcpv6
from python_neutron_pd_driver import listener
from python_neutron_pd_driver import socketv6
from python_neutron_pd_driver import utils
LOG = logging.getLogger(__name__)
class Server(object):
def __init__(self, serverid, address):
self.serverid = serverid
self.address = address[0]
class Prefix(object):
def __init__(self, ia_prefix):
self.pref_liftime = dhcpv6.bytes_to_int(ia_prefix[0:4])
self.valid_liftime = dhcpv6.bytes_to_int(ia_prefix[4:8])
self.pref_length = dhcpv6.bytes_to_int(ia_prefix[8:9])
self.prefix = ia_prefix[9: 9 + self.pref_length]
def __str__(self):
return ("%s/%s" %
(dhcpv6.prefix_to_string(self.prefix), self.pref_length))
def create_validator(ty, trid):
def validator(data):
if (data[0:1] == ty and
dhcpv6.bytes_to_int(data[1:4]) == trid):
return True
return False
return validator
class SubnetPD(object):
def __init__(self, conf):
super(SubnetPD, self).__init__()
self.subnet_id = conf['subnet_id']
self.ready = conf['pd_update_cb']
self.prefix = "::/64"
self.server = None
self.ias = None
self.renew_timer = None
self.setup()
def get(self):
return str(self.prefix)
def reset_renew_timer(self):
self.renew_timer = Timer(self.prefix.pref_liftime, self.renew_prefix)
self.renew_timer.start()
def process_REPLY(self, responses):
# Process reply
data, sender = responses[0]
options = dhcpv6.options_to_dict(data)
ia_pd = options[25][0]
# T1 = bytes_to_int(ia_pd[4:8])
# T2 = bytes_to_int(ia_pd[8:12])
ia_options = dhcpv6.options_to_dict(ia_pd, 12)
self.prefix = Prefix(ia_options[26][0])
def renew_prefix(self, pd_choice):
rn_trid = dhcpv6.gen_trid()
lis = listener.Listener(create_validator("\x07", rn_trid))
socketv6.send_packet(dhcpv6.Renew(
rn_trid, self.subnet_id, self.server.serverid, pd_choice),
self.server.address)
res = lis.get()
if not res:
raise Exception('RENEW: Failed to get valid REPLY...')
self.process_REPLY(res)
self.reset_renew_timer()
def _solicit(self):
sol_trid = dhcpv6.gen_trid()
res = None
retrys = 3
validator = create_validator("\x02", sol_trid)
while not res and retrys > 0:
lis = listener.Listener(validator, 5)
time.sleep(1)
socketv6.send_packet(dhcpv6.Solicit(sol_trid, self.subnet_id))
res = lis.get()
retrys = retrys - 1
if not res:
raise Exception('Failed to get valid Advertise after 3 retries...')
highest = -1
serverid = None
ia_pd = None
s_address = None
for data, sender in res:
options = dhcpv6.options_to_dict(data)
preference = dhcpv6.bytes_to_int(options.get(7, [b'\x00'])[0])
if preference <= highest:
continue
highest = preference
serverid = options[2][0]
ia_pd = options[25][0]
s_address = sender
ia_options = dhcpv6.options_to_dict(ia_pd, 12)
self.ias = ia_options[26]
self.server = Server(serverid, s_address)
def _request(self):
req_trid = dhcpv6.gen_trid()
res = None
retrys = 3
validator = create_validator("\x07", req_trid)
while not res and retrys > 0:
lis = listener.Listener(validator)
time.sleep(1)
socketv6.send_packet(dhcpv6.Request(
req_trid, self.subnet_id, self.server.serverid, self.ias[0]),
self.server.address)
res = lis.get()
retrys = retrys - 1
if not res:
raise Exception('Failed to get valid REPLY!')
self.process_REPLY(res)
def _release(self):
rel_trid = dhcpv6.gen_trid()
validator = create_validator("\x07", rel_trid)
lis = listener.Listener(validator)
socketv6.send_packet(dhcpv6.Release(
rel_trid, self.subnet_id, self.server.serverid, self.ias[0]),
self.server.address)
while True:
try:
res = lis.get(timeout=10)
break
except Queue.Empty:
socketv6.send_packet(
dhcpv6.release(rel_trid, self.server.serverid,
self.subnet_id, self.ias[0]),
self.server.address)
return res
def setup(self):
def processor():
LOG.debug(
"Starting new prefix delegation for %s...", self.subnet_id)
self._solicit()
self._request()
self.reset_renew_timer()
self.ready()
LOG.debug("Prefix delegation ready for %s...", self.subnet_id)
utils.new_daemon_thread(processor, None)
def shutdown(self):
try:
self.renew_timer.cancel()
self._release()
except Exception:
pass
LOG.debug("Ending prefix delegation for %s, bye!! First try!",
self.subnet_id)
return True

View File

@ -1,23 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base
class TestCase(base.BaseTestCase):
"""Test case base class for all unit tests."""

View File

@ -1,522 +0,0 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import signal
import socket
import struct
from oslo_config import cfg
from oslo_log import log
from neutron.agent.common import config as agent_config
from neutron.agent.linux import pd_driver
from neutron.common import config as base_config
from neutron.tests import base
from python_neutron_pd_driver import agent
from python_neutron_pd_driver import config # noqa
from python_neutron_pd_driver import constants
from python_neutron_pd_driver import dhcpv6
from python_neutron_pd_driver import driver
from python_neutron_pd_driver import exceptions
from python_neutron_pd_driver import listener
from python_neutron_pd_driver import socketv6
from python_neutron_pd_driver import utils
try:
import __builtin__
except Exception:
import builtins as __builtin__
OPTION_REQUEST = bytearray.fromhex('0006000400170018')
CLIENT_ID = bytearray.fromhex('0001000A0002000022b8') + b'fake'
SERVER_ID = b'\x00\x02\x00\x08' + b'serverid'
ELAPSED = bytearray.fromhex('000800020000')
IAPD = (bytearray.fromhex('0019000c') +
b'fake' + bytearray.fromhex('00000e1000001518'))
IAPD_WITH_OPTIONS = (
b'\x00\x19\x00\x18' + b'fake' +
b'\x00\x00\x0e\x10\x00\x00\x15\x18\x00\x1a\x00\x08' + b'pdoption')
class TestDHCPV6(base.BaseTestCase):
def test_gen_trid(self):
id1 = dhcpv6.gen_trid()
id2 = dhcpv6.gen_trid()
self.assertTrue(id1 <= 0xffffff)
self.assertTrue(id1 >= 0x00)
self.assertTrue(id2 <= 0xffffff)
self.assertTrue(id2 >= 0x00)
self.assertNotEqual(id1, id2)
def test_bytes_to_int(self):
results = dhcpv6.bytes_to_int(b'\x19')
self.assertEqual(results, 25)
def test_options_to_dict(self):
data = bytearray.fromhex(
"021008740019002927fe8f950000000000000000001"
"a00190000119400001c2040200100000000fe0000000000000000000001000e0"
"00100011c39cf88080027fe8f950002000e000100011c3825e8080027d410bb")
results = dhcpv6.options_to_dict(data)
self.assertTrue(1 in results)
self.assertTrue(25 in results)
self.assertTrue(2 in results)
def test_prefix_to_string(self):
data = (b'\x22\x22\x00\x00\x22\x22\x45\x76'
b'\x00\x00\x00\x00\x00\x00\x00\x00')
result = '2222:0:2222:4576::'
self.assertEqual(dhcpv6.prefix_to_string(data), result)
def test_DHCPOption(self):
data = b'fake'
option = dhcpv6.DHCPOption()
option.data = data
expected = b'\x00\x00\x00\x04'
expected += data
self.assertEqual(expected, option.to_bytes())
def test_ClientIdentifier_bytes(self):
clientid = dhcpv6.ClientIdentifier()
clientid.xargs['EnterpriseNum'] = 8888
clientid.xargs['EnterpriseID'] = b'fake'
self.assertEqual(CLIENT_ID, clientid.to_bytes())
def test_ServerIdentifier_bytes(self):
serverid = dhcpv6.ServerIdentifier(b'serverid')
self.assertEqual(SERVER_ID, serverid.to_bytes())
def test_OptionRequest_bytes(self):
optReq = dhcpv6.OptionRequest()
self.assertEqual(OPTION_REQUEST, optReq.to_bytes())
def test_ElapsedTime_bytes(self):
time = dhcpv6.ElapsedTime()
self.assertEqual(ELAPSED, time.to_bytes())
def test_IAPD_bytes(self):
ia_pd = dhcpv6.IAPD('fake-name')
self.assertEqual(IAPD, ia_pd.to_bytes())
def test_IAPD_with_options_bytes(self):
ia_pd = dhcpv6.IAPD('fake-name', 'pdoption')
self.assertEqual(IAPD_WITH_OPTIONS, ia_pd.to_bytes())
def _test_message_bytes(self, packet, expected_type, extra_args=[]):
msg = packet(9999, 'fake', *extra_args).to_bytes()
self.assertEqual(expected_type, msg[0:1])
self.assertEqual(dhcpv6.int_to_bytes(9999, 6), msg[1:4])
self.assertTrue(OPTION_REQUEST in msg)
self.assertTrue(CLIENT_ID in msg)
return msg
def test_DHCPMessage_bytes(self):
self._test_message_bytes(dhcpv6.DHCPMessage, b'\x00')
def test_Solicit_bytes(self):
sol = self._test_message_bytes(dhcpv6.Solicit, b'\x01')
self.assertTrue(ELAPSED in sol)
self.assertTrue(IAPD in sol)
def _test_response_message_bytes(self, packet, expected_type):
args = ['serverid', 'pdoption']
msg = self._test_message_bytes(packet, expected_type, args)
self.assertTrue(SERVER_ID in msg)
self.assertTrue(IAPD_WITH_OPTIONS in msg)
return msg
def test_DHCPResponseMessage_bytes(self):
self._test_response_message_bytes(dhcpv6.DHCPResponseMessage, b'\x00')
def test_Request_bytes(self):
self._test_response_message_bytes(dhcpv6.Request, b'\x03')
def test_Renew_bytes(self):
self._test_response_message_bytes(dhcpv6.Renew, b'\x05')
def test_Release_bytes(self):
msg = self._test_response_message_bytes(dhcpv6.Release, b'\x08')
self.assertTrue(ELAPSED in msg)
class TestDHCPv6API(base.BaseTestCase):
def setUp(self):
super(TestDHCPv6API, self).setUp()
mock.patch('os.getpid', return_value='12345').start()
mock.patch('uuid.uuid4', return_value='uuid').start()
self.pd_manager = driver.PDDriver("router", "subnet", "blah")
self.socket = mock.patch('socket.socket').start()
def _send_command(self):
self.pd_manager._send_command("command", "misc")
self.socket().send.assert_called_once_with('command,subnet,misc,')
def test_enable(self):
self.pd_manager.enable()
self.socket().send.assert_called_once_with('enable,subnet,12345,')
def test_disable(self):
self.pd_manager.disable()
self.socket().send.assert_called_once_with('disable,subnet,12345,')
def test_get_prefix(self):
self.socket().recv.return_value = "prefix"
prefix = self.pd_manager.get_prefix()
self.socket().send.assert_called_once_with('get,subnet,uuid,')
self.assertEqual(prefix, "prefix")
def test_get_prefix_error_handling(self):
self.socket().recv.return_value = "NOT_RUNNING"
self.assertRaises(
exceptions.DHCPv6AgentException, self.pd_manager.get_prefix)
self.socket().send.assert_called_once_with('get,subnet,uuid,')
def test_sync_data(self):
response = driver.PDDriver.get_sync_data()
self.assertEqual(response, [])
class TestAgent(base.BaseTestCase):
def setUp(self):
super(TestAgent, self).setUp()
self.subnetpd = mock.patch(
'python_neutron_pd_driver.subnetpd.SubnetPD').start()
self.listener = mock.patch(
'python_neutron_pd_driver.listener.start').start()
self.listener.isAlive.return_value = False
self.socket = mock.patch('socket.socket').start()
self.conf = agent_config.setup_conf()
cfg.CONF.set_override('state_path', "/tmp/neutron")
self.conf.set_override('state_path', "/tmp/neutron")
self.conf.register_opts(base_config.core_opts)
log.register_options(self.conf)
self.conf.register_opts(config.OPTS)
self.conf.register_opts(pd_driver.OPTS)
def test_notify_l3_agent(self):
with mock.patch('os.kill') as kill:
agent.notify_l3_agent(12345)
kill.assert_called_once_with(12345, signal.SIGHUP)
def test_notify_l3_agent_exception(self):
with mock.patch('os.kill') as kill:
kill.side_effect = Exception("BOOM!")
agent.notify_l3_agent(12345)
kill.assert_called_once_with(12345, signal.SIGHUP)
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
def test_DHCPv6Agent___init__(self, mock_open, mock_listdir,
mock_chmod, mock_remove):
subnet_ids = ["subnet_1", "subnet_2", "subnet_3", "subnet_4"]
mock_listdir.return_value = subnet_ids
dc = agent.DHCPV6Agent()
pd_clients_keys = dc.pd_clients.keys()
pd_clients_keys = sorted(pd_clients_keys)
self.assertEqual(['1', '2', '3', '4'], pd_clients_keys)
self.listener.assert_called_once_with()
self.socket().bind.assert_called_once_with(
self.conf.pd_socket_loc + "/" + constants.CONTROL_PATH)
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
def test_DHCPv6Agent_system_load_listdir_exception(
self, mock_open, mock_listdir, mock_chmod, mock_remove):
mock_listdir.side_effect = OSError()
dc = agent.DHCPV6Agent()
self.assertEqual({}, dc.pd_clients)
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
def test_DHCPv6Agent_system_load_open_exception(
self, mock_open, mock_listdir, mock_chmod, mock_remove):
subnet_ids = ["subnet_1", "subnet_2", "subnet_3", "subnet_4"]
mock_listdir.return_value = subnet_ids
def side_effect(*args, **kwargs):
if "subnet_4" in args[0]:
raise IOError()
return mock.MagicMock()
mock_open.side_effect = side_effect
dc = agent.DHCPV6Agent()
pd_clients_keys = dc.pd_clients.keys()
pd_clients_keys = sorted(pd_clients_keys)
self.assertEqual(['1', '2', '3'], pd_clients_keys)
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
def test_DHCPv6Agent_processor_enable(
self, mock_open, mock_listdir, mock_chmod, mock_remove):
dc = agent.DHCPV6Agent()
dc.processor('enable,1,12345')
pd_clients_keys = dc.pd_clients.keys()
pd_clients_keys = sorted(pd_clients_keys)
self.assertEqual(['1'], pd_clients_keys)
mock_open.assert_called_once_with(
"%s/subnet_%s" % (self.conf.pd_confs, 1), 'w')
mock_open().write.assert_called_once_with('12345')
mock_open().close.assert_called_once_with()
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
@mock.patch('os.path.exists')
def test_DHCPv6Agent_processor_disable(
self, mock_exists, mock_open, mock_listdir, mock_chmod,
mock_remove):
dc = agent.DHCPV6Agent()
subnetpd = mock.Mock()
dc.pd_clients = {'1': subnetpd}
dc.processor('disable,1,12345')
pd_clients_keys = dc.pd_clients.keys()
pd_clients_keys = sorted(pd_clients_keys)
self.assertEqual([], pd_clients_keys)
subnetpd.shutdown.assert_called_once_with()
mock_exists.return_value = True
mock_remove.assert_called_with(
"%s/subnet_%s" % (self.conf.pd_confs, 1))
@mock.patch('os.remove')
@mock.patch('os.chmod')
@mock.patch('os.listdir')
@mock.patch.object(__builtin__, 'open')
def test_DHCPv6Agent_processor_get(
self, mock_open, mock_listdir, mock_chmod, mock_remove):
dc = agent.DHCPV6Agent()
subnetpd = mock.Mock()
subnetpd.get.return_value = "prefix"
dc.pd_clients = {'1': subnetpd}
dc.processor('get,1,12345')
pd_clients_keys = dc.pd_clients.keys()
pd_clients_keys = sorted(pd_clients_keys)
subnetpd.get.assert_called_once_with()
self.socket().connect.assert_called_once_with(
self.conf.pd_socket_loc + "/" + constants.RESP_PATH % '12345')
self.socket().send.assert_called_once_with("prefix")
class TestUtils(base.BaseTestCase):
def setUp(self):
super(TestUtils, self).setUp()
self.socket = mock.patch('socket.socket').start()
self.conf = agent_config.setup_conf()
self.conf.register_opts(base_config.core_opts)
log.register_options(self.conf)
self.conf.register_opts(config.OPTS)
self.conf.register_opts(pd_driver.OPTS)
def test_socket_connect(self):
s = utils.socket_connect("hello")
self.socket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
s.connect.assert_called_once_with(
self.conf.pd_socket_loc + "/" + "hello")
def test_socket_bind(self):
s = utils.socket_bind("hello")
self.socket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
s.bind.assert_called_once_with(
self.conf.pd_socket_loc + "/" + "hello")
def test_control_socket_connect(self):
s = utils.control_socket_connect()
self.socket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
s.connect.assert_called_once_with(
self.conf.pd_socket_loc + "/" + constants.CONTROL_PATH)
@mock.patch('os.remove')
@mock.patch('os.path.exists')
def test_socket_delete(self, mock_exists, mock_remove):
utils.socket_delete("hello")
mock_exists.return_value = True
mock_remove.assert_called_once_with(
self.conf.pd_socket_loc + "/" + "hello")
@mock.patch('threading.Thread')
def test_new_daemon_thread(self, mock_thread):
func = mock.Mock()
utils.new_daemon_thread(func)
mock_thread.assert_called_once_with(target=func, args=())
th = mock_thread()
self.assertTrue(th.daemon)
th.start.assert_called_once_with()
@mock.patch('threading.Thread')
def test_new_daemon_thread_with_args(self, mock_thread):
func = mock.Mock()
arg1 = mock.Mock()
utils.new_daemon_thread(func, (arg1,))
mock_thread.assert_called_once_with(target=func, args=(arg1,))
th = mock_thread()
self.assertTrue(th.daemon)
th.start.assert_called_once_with()
class TestSubnetPD(base.BaseTestCase):
pass
class TestSocketV6(base.BaseTestCase):
def setUp(self):
super(TestSocketV6, self).setUp()
self.socket = mock.patch('socket.socket').start()
self.conf = agent_config.setup_conf()
self.conf.register_opts(base_config.core_opts)
log.register_options(self.conf)
self.conf.register_opts(config.OPTS)
self.conf.register_opts(pd_driver.OPTS)
def test_socket_v6(self):
sock = socketv6.socket_v6()
self.socket.assert_called_once_with(socket.AF_INET6, socket.SOCK_DGRAM)
sock.setsockopt.assert_any_call(socket.SOL_SOCKET,
socketv6.SO_BINDTODEVICE,
self.conf.pd_interface)
sock.setsockopt.assert_any_call(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
sock.setsockopt.assert_any_call(socket.SOL_SOCKET,
socket.SO_BROADCAST, 1)
sock.setsockopt.assert_any_call(socket.IPPROTO_IPV6,
socket.IPV6_MULTICAST_HOPS,
struct.pack('@i', 1))
sock.bind.assert_called_once_with(('', 546))
@mock.patch('python_neutron_pd_driver.socketv6.socket_v6')
def test_send_packet(self, mock_socketv6):
socketv6.send_packet("hello")
mock_socketv6.assert_called_once_with()
mock_socketv6().settimeout.assert_called_once_with(3)
mock_socketv6().sendto.assert_called_once_with(
"hello", ("ff02::1:2", 547))
@mock.patch('python_neutron_pd_driver.socketv6.socket_v6')
def test_send_packet_with_non_string(self, mock_socketv6):
def str(s):
return "hello"
pack = mock.Mock()
pack.__str__ = str
socketv6.send_packet(pack)
mock_socketv6.assert_called_once_with()
mock_socketv6().settimeout.assert_called_once_with(3)
mock_socketv6().sendto.assert_called_once_with(
"hello", ("ff02::1:2", 547))
@mock.patch('python_neutron_pd_driver.socketv6.socket_v6')
def test_send_packet_with_address(self, mock_socketv6):
socketv6.send_packet("hello", address="address")
mock_socketv6.assert_called_once_with()
mock_socketv6().settimeout.assert_called_once_with(3)
mock_socketv6().sendto.assert_called_once_with(
"hello", ("address", 547))
class TestListener(base.BaseTestCase):
def test_processor(self):
def validator1(data):
if data == "good1":
return True
return False
def validator2(data):
if data == "good2":
return True
return False
def validator3(data):
if data == "good3":
return True
return False
queue1 = mock.Mock()
queue2 = mock.Mock()
queue3 = mock.Mock()
queue4 = mock.Mock()
listeners = [(validator1, queue1), (validator1, queue2),
(validator2, queue3), (validator3, queue4)]
listener.processor("good1", "sender1", listeners)
listener.processor("good2", "sender2", listeners)
listener.processor("good2", "sender3", listeners)
queue1.put.assert_called_once_with(("good1", "sender1"))
queue2.put.assert_called_once_with(("good1", "sender1"))
queue3.put.assert_any_call(("good2", "sender2"))
queue3.put.assert_any_call(("good2", "sender3"))
self.assertFalse(queue4.put.called)
@mock.patch('python_neutron_pd_driver.socketv6.socket_v6')
@mock.patch('python_neutron_pd_driver.utils.new_daemon_thread')
@mock.patch('python_neutron_pd_driver.listener.SOCKET')
def test_start_not_running(self, mock_SOC, mock_daemon, mock_sock):
listener.RUNNING = False
listener.start()
self.assertEqual(mock_sock(), listener.SOCKET)
self.assertTrue(listener.RUNNING)
mock_daemon.assert_called_once_with(listener.main_listener_thread)
@mock.patch('python_neutron_pd_driver.socketv6.socket_v6')
@mock.patch('python_neutron_pd_driver.utils.new_daemon_thread')
@mock.patch('python_neutron_pd_driver.listener.SOCKET')
def test_start_already_running(self, mock_SOC, mock_daemon, mock_sock):
listener.RUNNING = True
mock_sock.return_value = "hello"
old = listener.SOCKET
listener.start()
self.assertEqual(listener.SOCKET, old)
self.assertFalse(mock_daemon.called)
def test_stop_running(self):
listener.SOCKET = mock.Mock()
listener.RUNNING = True
listener.stop()
listener.SOCKET.shutdown.assert_called_once_with(socket.SHUT_RDWR)
listener.SOCKET.close.assert_called_once_with()
self.assertFalse(listener.RUNNING)
def test_stop_not_running(self):
listener.SOCKET = mock.Mock()
listener.RUNNING = False
listener.stop()
listener.SOCKET.shutdown.assert_called_once_with(socket.SHUT_RDWR)
listener.SOCKET.close.assert_called_once_with()
self.assertFalse(listener.RUNNING)
def test_stop_no_socket(self):
listener.SOCKET = None
listener.RUNNING = False
listener.stop()
self.assertFalse(listener.RUNNING)

View File

@ -1,28 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
test_python_neutron_pd_driver
----------------------------------
Tests for `python_neutron_pd_driver` module.
"""
from python_neutron_pd_driver.tests import base
class TestPython_neutron_pd_driver(base.TestCase):
def test_something(self):
pass

View File

@ -1,51 +0,0 @@
# Copyright 2015 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import socket
import threading
from oslo_config import cfg
from python_neutron_pd_driver import config # noqa
from python_neutron_pd_driver import constants
def socket_connect(path):
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.connect(cfg.CONF.pd_socket_loc + "/" + path)
return s
def socket_bind(path):
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.bind(cfg.CONF.pd_socket_loc + "/" + path)
return s
def control_socket_connect():
return socket_connect(constants.CONTROL_PATH)
def socket_delete(path):
if os.path.exists(cfg.CONF.pd_socket_loc + "/" + path):
os.remove(cfg.CONF.pd_socket_loc + "/" + path)
def new_daemon_thread(target, params=None):
th = threading.Thread(target=target, args=params or ())
th.daemon = True
th.start()
return th

View File

@ -1,8 +0,0 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
pbr>=0.6,!=0.7,<1.0
Babel>=1.3
oslo.config>=1.11.0 # Apache-2.0
oslo.log>=1.2.0 # Apache-2.0

View File

@ -1,53 +0,0 @@
[metadata]
name = python-neutron-pd-driver
summary = A prefix delegation driver written using pure python with no dependencies for use with OpenStack Neutron.
description-file =
README.rst
author = OpenStack
author-email = openstack-dev@lists.openstack.org
home-page = http://www.openstack.org/
classifier =
Environment :: OpenStack
Intended Audience :: Information Technology
Intended Audience :: System Administrators
License :: OSI Approved :: Apache Software License
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
Programming Language :: Python :: 2.6
Programming Language :: Python :: 3
Programming Language :: Python :: 3.3
Programming Language :: Python :: 3.4
[entry_points]
console_scripts =
neutron-pd-agent = python_neutron_pd_driver.agent:main
neutron.agent.linux.pd_drivers =
neutron_pd_agent = python_neutron_pd_driver.driver:PDDriver
[files]
packages =
python_neutron_pd_driver
[build_sphinx]
source-dir = doc/source
build-dir = doc/build
all_files = 1
[upload_sphinx]
upload-dir = doc/build/html
[compile_catalog]
directory = python_neutron_pd_driver/locale
domain = python-neutron-pd-driver
[update_catalog]
domain = python-neutron-pd-driver
output_dir = python_neutron_pd_driver/locale
input_file = python_neutron_pd_driver/locale/python-neutron-pd-driver.pot
[extract_messages]
keywords = _ gettext ngettext l_ lazy_gettext
mapping_file = babel.cfg
output_file = python_neutron_pd_driver/locale/python-neutron-pd-driver.pot

View File

@ -1,30 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT
import setuptools
# In python < 2.7.4, a lazy loading of package `pbr` will break
# setuptools if some other modules registered functions in `atexit`.
# solution from: http://bugs.python.org/issue15881#msg170215
try:
import multiprocessing # noqa
except ImportError:
pass
setuptools.setup(
setup_requires=['pbr'],
pbr=True)

View File

@ -1,17 +0,0 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
hacking<0.11,>=0.10.0
coverage>=3.6
discover
python-subunit>=0.0.18
sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3
oslosphinx>=2.2.0 # Apache-2.0
oslotest>=1.2.0 # Apache-2.0
testrepository>=0.0.18
testscenarios>=0.4
testtools>=0.9.36,!=1.2.0
-e git+git://git.openstack.org/openstack/neutron.git#egg=neutron

42
tox.ini
View File

@ -1,42 +0,0 @@
[tox]
minversion = 1.6
envlist = docs,py34,py27,pep8
skipsdist = True
[testenv]
usedevelop = True
install_command = pip install -U {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands =
{toxinidir}/install_dependant_neutron_commit {envname}
python setup.py test --slowest --testr-args='{posargs}'
[testenv:py34]
commands = python -m testtools.run \
python_neutron_pd_driver.tests.test_dhcpv6_pd_agent
[testenv:pep8]
commands = flake8
[testenv:venv]
commands = {posargs}
[testenv:cover]
commands = python setup.py test --coverage --testr-args='{posargs}'
[testenv:docs]
commands = python setup.py build_sphinx
[testenv:debug]
commands = oslo_debug_helper {posargs}
[flake8]
# E123, E125 skipped as they are invalid PEP-8.
show-source = True
ignore = E123,E125
builtins = _
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build