Merge "Initial provider driver library checkin"

This commit is contained in:
Zuul 2018-12-11 23:41:12 +00:00 committed by Gerrit Code Review
commit d045062aad
31 changed files with 2826 additions and 39 deletions

97
.pylintrc Normal file
View File

@ -0,0 +1,97 @@
# The format of this file isn't really documented; just use --generate-rcfile
[MASTER]
# Add <file or directory> to the black list. It should be a base name, not a
# path. You may set this option multiple times.
ignore=.git,tests
[MESSAGES CONTROL]
# NOTE: The options which do not need to be suppressed can be removed.
disable=
# "F" Fatal errors that prevent further processing
# "I" Informational noise
locally-disabled,
# "E" Error for important programming issues (likely bugs)
import-error,
not-callable,
no-member,
# "W" Warnings for stylistic problems or minor programming issues
abstract-method,
anomalous-backslash-in-string,
arguments-differ,
attribute-defined-outside-init,
bad-builtin,
broad-except,
fixme,
global-statement,
no-init,
pointless-string-statement,
protected-access,
redefined-builtin,
redefined-outer-name,
signature-differs,
unidiomatic-typecheck,
unused-argument,
unused-variable,
useless-super-delegation,
# "C" Coding convention violations
bad-continuation,
invalid-name,
line-too-long,
missing-docstring,
# "R" Refactor recommendations
duplicate-code,
interface-not-implemented,
no-self-use,
too-few-public-methods,
too-many-ancestors,
too-many-arguments,
too-many-branches,
too-many-instance-attributes,
too-many-lines,
too-many-locals,
too-many-public-methods,
too-many-return-statements,
too-many-statements,
multiple-statements,
duplicate-except,
keyword-arg-before-vararg
[BASIC]
# Variable names can be 1 to 31 characters long, with lowercase and underscores
variable-rgx=[a-z_][a-z0-9_]{0,30}$
# Argument names can be 2 to 31 characters long, with lowercase and underscores
argument-rgx=[a-z_][a-z0-9_]{1,30}$
# Method names should be at least 3 characters long
# and be lowercased with underscores
method-rgx=([a-z_][a-z0-9_]{2,}|setUp|tearDown)$
# Module names matching
module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
# Don't require docstrings on tests.
no-docstring-rgx=((__.*__)|([tT]est.*)|setUp|tearDown)$
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=79
[VARIABLES]
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
additional-builtins=
[CLASSES]
[IMPORTS]
# Deprecated modules which should not be used, separated by a comma
deprecated-modules=
[TYPECHECK]
# List of module names for which member attributes should not be checked
ignored-modules=six.moves,_MovedItems
[REPORTS]
# Tells whether to display a full report or only the messages
reports=no

View File

@ -1,3 +1,3 @@
[DEFAULT]
test_path=./octavia_lib/tests
test_path=${OS_TEST_PATH:-./octavia_lib/tests/unit}
top_dir=./

View File

@ -1,8 +1,84 @@
octavia-lib Style Commandments
Octavia-lib Style Commandments
==============================
This project was ultimately spawned from work done on the Neutron project.
As such, we tend to follow Neutron conventions regarding coding style.
- Read the OpenStack Style Commandments:
https://docs.openstack.org/hacking/latest/
- We follow the OpenStack Style Commandments:
https://docs.openstack.org/hacking/latest
- Read the OpenStack Octavia style guide:
https://docs.openstack.org/octavia/latest/contributor/HACKING.html
-- Read the OpenStack Octavia style guide:
- https://docs.openstack.org/octavia/latest/contributor/HACKING.html
Octavia Specific Commandments
-----------------------------
- [O316] Change assertTrue(isinstance(A, B)) by optimal assert like
assertIsInstance(A, B).
- [O318] Change assert(Not)Equal(A, None) or assert(Not)Equal(None, A)
by optimal assert like assertIs(Not)None(A).
- [O319] Validate that debug level logs are not translated.
- [O321] Validate that jsonutils module is used instead of json
- [O322] Don't use author tags
- [O323] Change assertEqual(True, A) or assertEqual(False, A) to the more
specific assertTrue(A) or assertFalse(A)
- [O324] Method's default argument shouldn't be mutable
- [O338] Change assertEqual(A in B, True), assertEqual(True, A in B),
assertEqual(A in B, False) or assertEqual(False, A in B) to the more
specific assertIn/NotIn(A, B)
- [O339] LOG.warn() is not allowed. Use LOG.warning()
- [O340] Don't use xrange()
- [O341] Don't translate logs.
- [0342] Exception messages should be translated
- [O343] Python 3: do not use basestring.
- [O344] Python 3: do not use dict.iteritems.
- [O345] Usage of Python eventlet module not allowed
- [O346] Don't use backslashes for line continuation.
- [O501] Direct octavia imports not allowed.
Creating Unit Tests
-------------------
For every new feature, unit tests should be created that both test and
(implicitly) document the usage of said feature. If submitting a patch for a
bug that had no unit test, a new passing unit test should be added. If a
submitted bug fix does have a unit test, be sure to add a new one that fails
without the patch and passes with the patch.
Everything is python
--------------------
Although OpenStack apparently allows either python or C++ code, at this time
we don't envision needing anything other than python (and standard, supported
open source modules) for anything we intend to do in Octavia-lib.
Idempotency
-----------
With as much as is going on inside Octavia-lib, its likely that certain
messages and commands will be repeatedly processed. It's important that this
doesn't break the functionality of the load balancing service. Therefore, as
much as possible, algorithms and interfaces should be made as idempotent as
possible.
Avoid premature optimization
----------------------------
Understand that being "high performance" is often not the same thing as being
"scalable." First get the thing to work in an intelligent way. Only worry about
making it fast if speed becomes an issue.
Don't repeat yourself
---------------------
Octavia-lib strives to follow DRY principles. There should be one source of
truth, and repetition of code should be avoided.
Security is not an afterthought
-------------------------------
The load balancer is often both the most visible public interface to a given
user application, but load balancers themselves often have direct access to
sensitive components and data within the application environment. Security bugs
will happen, but in general we should not approve designs which have known
significant security problems, or which could be made more secure by better
design.
Octavia-lib should follow industry standards
--------------------------------------------
By "industry standards" we either mean RFCs or well-established best practices.
We are generally not interested in defining new standards if a prior open
standard already exists. We should also avoid doing things which directly
or indirectly contradict established standards.

View File

@ -1,7 +1,14 @@
bandit==1.4.0
coverage==4.0
doc8==0.6.0
hacking==0.12.0
oslo.i18n==3.15.3
oslo.log==3.36.0
oslo.utils==3.33.0
oslotest==3.2.0
pbr==2.0.0
pylint==1.9.2
python-subunit==1.0.0
six==1.10.0
stestr==2.0.0
testtools==2.2.0

View File

View File

View File

@ -0,0 +1,262 @@
# Copyright (c) 2014 Rackspace
# Copyright (c) 2016 Blue Box, an IBM Company
# Copyright 2018 Rackspace, US Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class BaseDataModel(object):
def to_dict(self, calling_classes=None, recurse=False,
render_unsets=False, **kwargs):
"""Converts a data model to a dictionary."""
calling_classes = calling_classes or []
ret = {}
for attr in self.__dict__:
if attr.startswith('_') or not kwargs.get(attr, True):
continue
value = self.__dict__[attr]
if recurse:
if isinstance(getattr(self, attr), list):
ret[attr] = []
for item in value:
if isinstance(item, BaseDataModel):
if type(self) not in calling_classes:
ret[attr].append(
item.to_dict(calling_classes=(
calling_classes + [type(self)]),
recurse=True,
render_unsets=render_unsets))
else:
ret[attr].append(None)
else:
ret[attr].append(item)
elif isinstance(getattr(self, attr), BaseDataModel):
if type(self) not in calling_classes:
ret[attr] = value.to_dict(
render_unsets=render_unsets,
calling_classes=calling_classes + [type(self)])
else:
ret[attr] = None
elif six.PY2 and isinstance(value, six.text_type):
ret[attr.encode('utf8')] = value.encode('utf8')
elif isinstance(value, UnsetType):
if render_unsets:
ret[attr] = None
else:
continue
else:
ret[attr] = value
else:
if (isinstance(getattr(self, attr), (BaseDataModel, list)) or
isinstance(value, UnsetType)):
if render_unsets:
ret[attr] = None
else:
continue
else:
ret[attr] = value
return ret
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.to_dict() == other.to_dict()
return False
def __ne__(self, other):
return not self.__eq__(other)
@classmethod
def from_dict(cls, dict):
return cls(**dict)
class UnsetType(object):
def __bool__(self):
return False
__nonzero__ = __bool__
def __repr__(self):
return 'Unset'
Unset = UnsetType()
class LoadBalancer(BaseDataModel):
def __init__(self, admin_state_up=Unset, description=Unset, flavor=Unset,
listeners=Unset, loadbalancer_id=Unset, name=Unset,
pools=Unset, project_id=Unset, vip_address=Unset,
vip_network_id=Unset, vip_port_id=Unset, vip_subnet_id=Unset,
vip_qos_policy_id=Unset):
self.admin_state_up = admin_state_up
self.description = description
self.flavor = flavor
self.listeners = listeners
self.loadbalancer_id = loadbalancer_id
self.name = name
self.pools = pools
self.project_id = project_id
self.vip_address = vip_address
self.vip_network_id = vip_network_id
self.vip_port_id = vip_port_id
self.vip_subnet_id = vip_subnet_id
self.vip_qos_policy_id = vip_qos_policy_id
class Listener(BaseDataModel):
def __init__(self, admin_state_up=Unset, connection_limit=Unset,
default_pool=Unset, default_pool_id=Unset,
default_tls_container_ref=Unset,
default_tls_container_data=Unset, description=Unset,
insert_headers=Unset, l7policies=Unset, listener_id=Unset,
loadbalancer_id=Unset, name=Unset, protocol=Unset,
protocol_port=Unset, sni_container_refs=Unset,
sni_container_data=Unset, timeout_client_data=Unset,
timeout_member_connect=Unset, timeout_member_data=Unset,
timeout_tcp_inspect=Unset):
self.admin_state_up = admin_state_up
self.connection_limit = connection_limit
self.default_pool = default_pool
self.default_pool_id = default_pool_id
self.default_tls_container_data = default_tls_container_data
self.default_tls_container_ref = default_tls_container_ref
self.description = description
self.insert_headers = insert_headers
self.l7policies = l7policies
self.listener_id = listener_id
self.loadbalancer_id = loadbalancer_id
self.name = name
self.protocol = protocol
self.protocol_port = protocol_port
self.sni_container_data = sni_container_data
self.sni_container_refs = sni_container_refs
self.timeout_client_data = timeout_client_data
self.timeout_member_connect = timeout_member_connect
self.timeout_member_data = timeout_member_data
self.timeout_tcp_inspect = timeout_tcp_inspect
class Pool(BaseDataModel):
def __init__(self, admin_state_up=Unset, description=Unset,
healthmonitor=Unset, lb_algorithm=Unset,
loadbalancer_id=Unset, members=Unset, name=Unset,
pool_id=Unset, listener_id=Unset, protocol=Unset,
session_persistence=Unset):
self.admin_state_up = admin_state_up
self.description = description
self.healthmonitor = healthmonitor
self.lb_algorithm = lb_algorithm
self.loadbalancer_id = loadbalancer_id
self.members = members
self.name = name
self.pool_id = pool_id
self.listener_id = listener_id
self.protocol = protocol
self.session_persistence = session_persistence
class Member(BaseDataModel):
def __init__(self, address=Unset, admin_state_up=Unset, member_id=Unset,
monitor_address=Unset, monitor_port=Unset, name=Unset,
pool_id=Unset, protocol_port=Unset, subnet_id=Unset,
weight=Unset, backup=Unset):
self.address = address
self.admin_state_up = admin_state_up
self.member_id = member_id
self.monitor_address = monitor_address
self.monitor_port = monitor_port
self.name = name
self.pool_id = pool_id
self.protocol_port = protocol_port
self.subnet_id = subnet_id
self.weight = weight
self.backup = backup
class HealthMonitor(BaseDataModel):
def __init__(self, admin_state_up=Unset, delay=Unset, expected_codes=Unset,
healthmonitor_id=Unset, http_method=Unset, max_retries=Unset,
max_retries_down=Unset, name=Unset, pool_id=Unset,
timeout=Unset, type=Unset, url_path=Unset):
self.admin_state_up = admin_state_up
self.delay = delay
self.expected_codes = expected_codes
self.healthmonitor_id = healthmonitor_id
self.http_method = http_method
self.max_retries = max_retries
self.max_retries_down = max_retries_down
self.name = name
self.pool_id = pool_id
self.timeout = timeout
self.type = type
self.url_path = url_path
class L7Policy(BaseDataModel):
def __init__(self, action=Unset, admin_state_up=Unset, description=Unset,
l7policy_id=Unset, listener_id=Unset, name=Unset,
position=Unset, redirect_pool_id=Unset, redirect_url=Unset,
rules=Unset, redirect_prefix=Unset):
self.action = action
self.admin_state_up = admin_state_up
self.description = description
self.l7policy_id = l7policy_id
self.listener_id = listener_id
self.name = name
self.position = position
self.redirect_pool_id = redirect_pool_id
self.redirect_url = redirect_url
self.rules = rules
self.redirect_prefix = redirect_prefix
class L7Rule(BaseDataModel):
def __init__(self, admin_state_up=Unset, compare_type=Unset, invert=Unset,
key=Unset, l7policy_id=Unset, l7rule_id=Unset, type=Unset,
value=Unset):
self.admin_state_up = admin_state_up
self.compare_type = compare_type
self.invert = invert
self.key = key
self.l7policy_id = l7policy_id
self.l7rule_id = l7rule_id
self.type = type
self.value = value
class VIP(BaseDataModel):
def __init__(self, vip_address=Unset, vip_network_id=Unset,
vip_port_id=Unset, vip_subnet_id=Unset,
vip_qos_policy_id=Unset):
self.vip_address = vip_address
self.vip_network_id = vip_network_id
self.vip_port_id = vip_port_id
self.vip_subnet_id = vip_subnet_id
self.vip_qos_policy_id = vip_qos_policy_id

View File

@ -0,0 +1,115 @@
# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
from oslo_serialization import jsonutils
from octavia_lib.api.drivers import exceptions as driver_exceptions
from octavia_lib.common import constants
DEFAULT_STATUS_SOCKET = '/var/run/octavia/status.sock'
DEFAULT_STATS_SOCKET = '/var/run/octavia/stats.sock'
SOCKET_TIMEOUT = 5
class DriverLibrary(object):
def __init__(self, status_socket=DEFAULT_STATUS_SOCKET,
stats_socket=DEFAULT_STATS_SOCKET, **kwargs):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.settimeout(SOCKET_TIMEOUT)
self.status_socket = status_socket
self.stats_socket = stats_socket
super(DriverLibrary, self).__init__(**kwargs)
def _recv(self):
size_str = ''
char = self.sock.recv(1)
while char != '\n':
size_str += char
char = self.sock.recv(1)
payload_size = int(size_str)
mv_buffer = memoryview(bytearray(payload_size))
next_offset = 0
while payload_size - next_offset > 0:
recv_size = self.sock.recv_into(mv_buffer[next_offset:],
payload_size - next_offset)
next_offset += recv_size
return jsonutils.loads(mv_buffer.tobytes())
def _send(self, socket_path, data):
self.sock.connect(socket_path)
try:
json_data = jsonutils.dumps(data)
self.sock.send('%d\n' % len(json_data))
self.sock.sendall(json_data)
response = self._recv()
finally:
self.sock.close()
return response
def update_loadbalancer_status(self, status):
"""Update load balancer status.
:param status: dictionary defining the provisioning status and
operating status for load balancer objects, including pools,
members, listeners, L7 policies, and L7 rules.
iod (string): ID for the object.
provisioning_status (string): Provisioning status for the object.
operating_status (string): Operating status for the object.
:type status: dict
:raises: UpdateStatusError
:returns: None
"""
try:
response = self._send(self.status_socket, status)
except Exception as e:
raise driver_exceptions.UpdateStatusError(fault_string=str(e))
if response[constants.STATUS_CODE] != constants.DRVR_STATUS_CODE_OK:
raise driver_exceptions.UpdateStatusError(
fault_string=response.pop(constants.FAULT_STRING, None),
status_object=response.pop(constants.STATUS_OBJECT, None),
status_object_id=response.pop(constants.STATUS_OBJECT_ID,
None),
status_record=response.pop(constants.STATUS_RECORD, None))
def update_listener_statistics(self, statistics):
"""Update listener statistics.
:param statistics: Statistics for listeners:
id (string): ID for listener.
active_connections (int): Number of currently active connections.
bytes_in (int): Total bytes received.
bytes_out (int): Total bytes sent.
request_errors (int): Total requests not fulfilled.
total_connections (int): The total connections handled.
:type statistics: dict
:raises: UpdateStatisticsError
:returns: None
"""
try:
response = self._send(self.stats_socket, statistics)
except Exception as e:
raise driver_exceptions.UpdateStatisticsError(
fault_string=str(e), stats_object=constants.LISTENERS)
if response[constants.STATUS_CODE] != constants.DRVR_STATUS_CODE_OK:
raise driver_exceptions.UpdateStatisticsError(
fault_string=response.pop(constants.FAULT_STRING, None),
stats_object=response.pop(constants.STATS_OBJECT, None),
stats_object_id=response.pop(constants.STATS_OBJECT_ID, None),
stats_record=response.pop(constants.STATS_RECORD, None))

View File

@ -0,0 +1,148 @@
# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from octavia_lib.i18n import _
class DriverError(Exception):
"""Catch all exception that drivers can raise.
This exception includes two strings: The user fault string and the
optional operator fault string. The user fault string,
"user_fault_string", will be provided to the API requester. The operator
fault string, "operator_fault_string", will be logged in the Octavia API
log file for the operator to use when debugging.
:param user_fault_string: String provided to the API requester.
:type user_fault_string: string
:param operator_fault_string: Optional string logged by the Octavia API
for the operator to use when debugging.
:type operator_fault_string: string
"""
user_fault_string = _("An unknown driver error occurred.")
operator_fault_string = _("An unknown driver error occurred.")
def __init__(self, *args, **kwargs):
self.user_fault_string = kwargs.pop('user_fault_string',
self.user_fault_string)
self.operator_fault_string = kwargs.pop('operator_fault_string',
self.operator_fault_string)
super(DriverError, self).__init__(*args, **kwargs)
class NotImplementedError(Exception):
"""Exception raised when a driver does not implement an API function.
:param user_fault_string: String provided to the API requester.
:type user_fault_string: string
:param operator_fault_string: Optional string logged by the Octavia API
for the operator to use when debugging.
:type operator_fault_string: string
"""
user_fault_string = _("This feature is not implemented by the provider.")
operator_fault_string = _("This feature is not implemented by this "
"provider.")
def __init__(self, *args, **kwargs):
self.user_fault_string = kwargs.pop('user_fault_string',
self.user_fault_string)
self.operator_fault_string = kwargs.pop('operator_fault_string',
self.operator_fault_string)
super(NotImplementedError, self).__init__(*args, **kwargs)
class UnsupportedOptionError(Exception):
"""Exception raised when a driver does not support an option.
Provider drivers will validate that they can complete the request -- that
all options are supported by the driver. If the request fails validation,
drivers will raise an UnsupportedOptionError exception. For example, if a
driver does not support a flavor passed as an option to load balancer
create(), the driver will raise an UnsupportedOptionError and include a
message parameter providing an explanation of the failure.
:param user_fault_string: String provided to the API requester.
:type user_fault_string: string
:param operator_fault_string: Optional string logged by the Octavia API
for the operator to use when debugging.
:type operator_fault_string: string
"""
user_fault_string = _("A specified option is not supported by this "
"provider.")
operator_fault_string = _("A specified option is not supported by this "
"provider.")
def __init__(self, *args, **kwargs):
self.user_fault_string = kwargs.pop('user_fault_string',
self.user_fault_string)
self.operator_fault_string = kwargs.pop('operator_fault_string',
self.operator_fault_string)
super(UnsupportedOptionError, self).__init__(*args, **kwargs)
class UpdateStatusError(Exception):
"""Exception raised when a status update fails.
Each exception will include a message field that describes the
error and references to the failed record if available.
:param fault_string: String describing the fault.
:type fault_string: string
:param status_object: The object the fault occurred on.
:type status_object: string
:param status_object_id: The ID of the object that failed status update.
:type status_object_id: string
:param status_record: The status update record that caused the fault.
:type status_record: string
"""
fault_string = _("The status update had an unknown error.")
status_object = None
status_object_id = None
status_record = None
def __init__(self, *args, **kwargs):
self.fault_string = kwargs.pop('fault_string', self.fault_string)
self.status_object = kwargs.pop('status_object', None)
self.status_object_id = kwargs.pop('status_object_id', None)
self.status_record = kwargs.pop('status_record', None)
super(UpdateStatusError, self).__init__(*args, **kwargs)
class UpdateStatisticsError(Exception):
"""Exception raised when a statistics update fails.
Each exception will include a message field that describes the
error and references to the failed record if available.
:param fault_string: String describing the fault.
:type fault_string: string
:param status_object: The object the fault occurred on.
:type status_object: string
:param status_object_id: The ID of the object that failed stats update.
:type status_object_id: string
:param status_record: The stats update record that caused the fault.
:type status_record: string
"""
fault_string = _("The statistics update had an unknown error.")
stats_object = None
stats_object_id = None
stats_record = None
def __init__(self, *args, **kwargs):
self.fault_string = kwargs.pop('fault_string',
self.fault_string)
self.stats_object = kwargs.pop('stats_object', None)
self.stats_object_id = kwargs.pop('stats_object_id', None)
self.stats_record = kwargs.pop('stats_record', None)
super(UpdateStatisticsError, self).__init__(*args, **kwargs)

View File

@ -0,0 +1,481 @@
# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from octavia_lib.api.drivers import exceptions
# This class describes the abstraction of a provider driver interface.
# Load balancing provider drivers will implement this interface.
class ProviderDriver(object):
# name is for internal Octavia use and should not be used by drivers
name = None
# Load Balancer
def create_vip_port(self, loadbalancer_id, project_id, vip_dictionary):
"""Creates a port for a load balancer VIP.
If the driver supports creating VIP ports, the driver will create a
VIP port and return the vip_dictionary populated with the vip_port_id.
If the driver does not support port creation, the driver will raise
a NotImplementedError.
:param loadbalancer_id: ID of loadbalancer.
:type loadbalancer_id: string
:param project_id: The project ID to create the VIP under.
:type project_id: string
:param: vip_dictionary: The VIP dictionary.
:type vip_dictionary: dict
:returns: VIP dictionary with vip_port_id.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: The driver does not support creating
VIP ports.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support creating VIP '
'ports.',
operator_fault_string='This provider does not support creating '
'VIP ports. Octavia will create it.')
def loadbalancer_create(self, loadbalancer):
"""Creates a new load balancer.
:param loadbalancer: The load balancer object.
:type loadbalancer: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: The driver does not support create.
:raises UnsupportedOptionError: The driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support creating '
'load balancers.',
operator_fault_string='This provider does not support creating '
'load balancers. What?')
def loadbalancer_delete(self, loadbalancer, cascade=False):
"""Deletes a load balancer.
:param loadbalancer: The load balancer to delete.
:type loadbalancer: object
:param cascade: If True, deletes all child objects (listeners,
pools, etc.) in addition to the load balancer.
:type cascade: bool
:return: Nothing if the delete request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support deleting '
'load balancers.',
operator_fault_string='This provider does not support deleting '
'load balancers.')
def loadbalancer_failover(self, loadbalancer_id):
"""Performs a fail over of a load balancer.
:param loadbalancer_id: ID of the load balancer to failover.
:type loadbalancer_id: string
:return: Nothing if the failover request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises: NotImplementedError if driver does not support request.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support failing over '
'load balancers.',
operator_fault_string='This provider does not support failing '
'over load balancers.')
def loadbalancer_update(self, old_loadbalancer, new_loadbalncer):
"""Updates a load balancer.
:param old_loadbalancer: The baseline load balancer object.
:type old_loadbalancer: object
:param new_loadbalancer: The updated load balancer object.
:type new_loadbalancer: object
:return: Nothing if the update request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: The driver does not support request.
:raises UnsupportedOptionError: The driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support updating '
'load balancers.',
operator_fault_string='This provider does not support updating '
'load balancers.')
# Listener
def listener_create(self, listener):
"""Creates a new listener.
:param listener: The listener object.
:type listener: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support creating '
'listeners.',
operator_fault_string='This provider does not support creating '
'listeners.')
def listener_delete(self, listener):
"""Deletes a listener.
:param listener: The listener to delete.
:type listener: object
:return: Nothing if the delete request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support deleting '
'listeners.',
operator_fault_string='This provider does not support deleting '
'listeners.')
def listener_update(self, old_listener, new_listener):
"""Updates a listener.
:param old_listener: The baseline listener object.
:type old_listener: object
:param new_listener: The updated listener object.
:type new_listener: object
:return: Nothing if the update request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support updating '
'listeners.',
operator_fault_string='This provider does not support updating '
'listeners.')
# Pool
def pool_create(self, pool):
"""Creates a new pool.
:param pool: The pool object.
:type pool: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support creating '
'pools.',
operator_fault_string='This provider does not support creating '
'pools.')
def pool_delete(self, pool):
"""Deletes a pool and its members.
:param pool: The pool to delete.
:type pool: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support deleting '
'pools.',
operator_fault_string='This provider does not support deleting '
'pools.')
def pool_update(self, old_pool, new_pool):
"""Updates a pool.
:param pool: The baseline pool object.
:type pool: object
:param pool: The updated pool object.
:type pool: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support updating '
'pools.',
operator_fault_string='This provider does not support updating '
'pools.')
# Member
def member_create(self, member):
"""Creates a new member for a pool.
:param member: The member object.
:type member: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support creating '
'members.',
operator_fault_string='This provider does not support creating '
'members.')
def member_delete(self, member):
"""Deletes a pool member.
:param member: The member to delete.
:type member: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support deleting '
'members.',
operator_fault_string='This provider does not support deleting '
'members.')
def member_update(self, old_member, new_member):
"""Updates a pool member.
:param old_member: The baseline member object.
:type old_member: object
:param new_member: The updated member object.
:type new_member: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support updating '
'members.',
operator_fault_string='This provider does not support updating '
'members.')
def member_batch_update(self, members):
"""Creates, updates, or deletes a set of pool members.
:param members: List of member objects.
:type members: list
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support batch '
'updating members.',
operator_fault_string='This provider does not support batch '
'updating members.')
# Health Monitor
def health_monitor_create(self, healthmonitor):
"""Creates a new health monitor.
:param healthmonitor: The health monitor object.
:type healthmonitor: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support creating '
'health monitors.',
operator_fault_string='This provider does not support creating '
'health monitors.')
def health_monitor_delete(self, healthmonitor):
"""Deletes a healthmonitor_id.
:param healthmonitor: The monitor to delete.
:type healthmonitor: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support deleting '
'health monitors.',
operator_fault_string='This provider does not support deleting '
'health monitors.')
def health_monitor_update(self, old_healthmonitor, new_healthmonitor):
"""Updates a health monitor.
:param old_healthmonitor: The baseline health monitor object.
:type old_healthmonitor: object
:param new_healthmonitor: The updated health monitor object.
:type new_healthmonitor: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support updating '
'health monitors.',
operator_fault_string='This provider does not support updating '
'health monitors.')
# L7 Policy
def l7policy_create(self, l7policy):
"""Creates a new L7 policy.
:param l7policy: The L7 policy object.
:type l7policy: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support creating '
'l7policies.',
operator_fault_string='This provider does not support creating '
'l7policies.')
def l7policy_delete(self, l7policy):
"""Deletes an L7 policy.
:param l7policy: The L7 policy to delete.
:type l7policy: object
:return: Nothing if the delete request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support deleting '
'l7policies.',
operator_fault_string='This provider does not support deleting '
'l7policies.')
def l7policy_update(self, old_l7policy, new_l7policy):
"""Updates an L7 policy.
:param old_l7policy: The baseline L7 policy object.
:type old_l7policy: object
:param new_l7policy: The updated L7 policy object.
:type new_l7policy: object
:return: Nothing if the update request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support updating '
'l7policies.',
operator_fault_string='This provider does not support updating '
'l7policies.')
# L7 Rule
def l7rule_create(self, l7rule):
"""Creates a new L7 rule.
:param l7rule: The L7 rule object.
:type l7rule: object
:return: Nothing if the create request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support creating '
'l7rules.',
operator_fault_string='This provider does not support creating '
'l7rules.')
def l7rule_delete(self, l7rule):
"""Deletes an L7 rule.
:param l7rule: The L7 rule to delete.
:type l7rule: object
:return: Nothing if the delete request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support deleting '
'l7rules.',
operator_fault_string='This provider does not support deleting '
'l7rules.')
def l7rule_update(self, old_l7rule, new_l7rule):
"""Updates an L7 rule.
:param old_l7rule: The baseline L7 rule object.
:type old_l7rule: object
:param new_l7rule: The updated L7 rule object.
:type new_l7rule: object
:return: Nothing if the update request was accepted.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: if driver does not support request.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support updating '
'l7rules.',
operator_fault_string='This provider does not support updating '
'l7rules.')
# Flavor
def get_supported_flavor_metadata(self):
"""Returns a dict of flavor metadata keys supported by this driver.
The returned dictionary will include key/value pairs, 'name' and
'description.'
:returns: The flavor metadata dictionary
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: The driver does not support flavors.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support getting the '
'supported flavor metadata.',
operator_fault_string='This provider does not support getting '
'the supported flavor metadata.')
def validate_flavor(self, flavor_metadata):
"""Validates if driver can support the flavor.
:param flavor_metadata: Dictionary with flavor metadata.
:type flavor_metadata: dict
:return: Nothing if the flavor is valid and supported.
:raises DriverError: An unexpected error occurred in the driver.
:raises NotImplementedError: The driver does not support flavors.
:raises UnsupportedOptionError: if driver does not
support one of the configuration options.
"""
raise exceptions.NotImplementedError(
user_fault_string='This provider does not support validating '
'flavors.',
operator_fault_string='This provider does not support validating '
'the supported flavor metadata.')

View File

View File

@ -0,0 +1,158 @@
# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Codes the driver_agent can return to the octavia-lib driver_lib
DRVR_STATUS_CODE_FAILED = 500
DRVR_STATUS_CODE_OK = 200
STATUS_CODE = 'status_code'
FAULT_STRING = 'fault_string'
STATS_OBJECT = 'stats_object'
STATS_OBJECT_ID = 'stats_object_id'
STATS_RECORD = 'stats_record'
STATUS_OBJECT = 'status_object'
STATUS_OBJECT_ID = 'status_object_id'
STATUS_RECORD = 'status_record'
# Octavia objects
LOADBALANCERS = 'loadbalancers'
LISTENERS = 'listeners'
POOLS = 'pools'
HEALTHMONITORS = 'healthmonitors'
MEMBERS = 'members'
L7POLICIES = 'l7policies'
L7RULES = 'l7rules'
# ID fields
ID = 'id'
# Octavia statistics fields
ACTIVE_CONNECTIONS = 'active_connections'
BYTES_IN = 'bytes_in'
BYTES_OUT = 'bytes_out'
REQUEST_ERRORS = 'request_errors'
TOTAL_CONNECTIONS = 'total_connections'
# Constants common to all Octavia provider drivers
HEALTH_MONITOR_PING = 'PING'
HEALTH_MONITOR_TCP = 'TCP'
HEALTH_MONITOR_HTTP = 'HTTP'
HEALTH_MONITOR_HTTPS = 'HTTPS'
HEALTH_MONITOR_TLS_HELLO = 'TLS-HELLO'
HEALTH_MONITOR_UDP_CONNECT = 'UDP-CONNECT'
SUPPORTED_HEALTH_MONITOR_TYPES = (HEALTH_MONITOR_HTTP, HEALTH_MONITOR_HTTPS,
HEALTH_MONITOR_PING, HEALTH_MONITOR_TCP,
HEALTH_MONITOR_TLS_HELLO,
HEALTH_MONITOR_UDP_CONNECT)
HEALTH_MONITOR_HTTP_METHOD_GET = 'GET'
HEALTH_MONITOR_HTTP_METHOD_HEAD = 'HEAD'
HEALTH_MONITOR_HTTP_METHOD_POST = 'POST'
HEALTH_MONITOR_HTTP_METHOD_PUT = 'PUT'
HEALTH_MONITOR_HTTP_METHOD_DELETE = 'DELETE'
HEALTH_MONITOR_HTTP_METHOD_TRACE = 'TRACE'
HEALTH_MONITOR_HTTP_METHOD_OPTIONS = 'OPTIONS'
HEALTH_MONITOR_HTTP_METHOD_CONNECT = 'CONNECT'
HEALTH_MONITOR_HTTP_METHOD_PATCH = 'PATCH'
HEALTH_MONITOR_HTTP_DEFAULT_METHOD = HEALTH_MONITOR_HTTP_METHOD_GET
SUPPORTED_HEALTH_MONITOR_HTTP_METHODS = (
HEALTH_MONITOR_HTTP_METHOD_GET, HEALTH_MONITOR_HTTP_METHOD_HEAD,
HEALTH_MONITOR_HTTP_METHOD_POST, HEALTH_MONITOR_HTTP_METHOD_PUT,
HEALTH_MONITOR_HTTP_METHOD_DELETE, HEALTH_MONITOR_HTTP_METHOD_TRACE,
HEALTH_MONITOR_HTTP_METHOD_OPTIONS, HEALTH_MONITOR_HTTP_METHOD_CONNECT,
HEALTH_MONITOR_HTTP_METHOD_PATCH)
L7POLICY_ACTION_REJECT = 'REJECT'
L7POLICY_ACTION_REDIRECT_TO_URL = 'REDIRECT_TO_URL'
L7POLICY_ACTION_REDIRECT_TO_POOL = 'REDIRECT_TO_POOL'
L7POLICY_ACTION_REDIRECT_PREFIX = 'REDIRECT_PREFIX'
SUPPORTED_L7POLICY_ACTIONS = (L7POLICY_ACTION_REJECT,
L7POLICY_ACTION_REDIRECT_TO_URL,
L7POLICY_ACTION_REDIRECT_TO_POOL,
L7POLICY_ACTION_REDIRECT_PREFIX)
L7RULE_COMPARE_TYPE_REGEX = 'REGEX'
L7RULE_COMPARE_TYPE_STARTS_WITH = 'STARTS_WITH'
L7RULE_COMPARE_TYPE_ENDS_WITH = 'ENDS_WITH'
L7RULE_COMPARE_TYPE_CONTAINS = 'CONTAINS'
L7RULE_COMPARE_TYPE_EQUAL_TO = 'EQUAL_TO'
SUPPORTED_L7RULE_COMPARE_TYPES = (L7RULE_COMPARE_TYPE_REGEX,
L7RULE_COMPARE_TYPE_STARTS_WITH,
L7RULE_COMPARE_TYPE_ENDS_WITH,
L7RULE_COMPARE_TYPE_CONTAINS,
L7RULE_COMPARE_TYPE_EQUAL_TO)
L7RULE_TYPE_HOST_NAME = 'HOST_NAME'
L7RULE_TYPE_PATH = 'PATH'
L7RULE_TYPE_FILE_TYPE = 'FILE_TYPE'
L7RULE_TYPE_HEADER = 'HEADER'
L7RULE_TYPE_COOKIE = 'COOKIE'
SUPPORTED_L7RULE_TYPES = (L7RULE_TYPE_HOST_NAME, L7RULE_TYPE_PATH,
L7RULE_TYPE_FILE_TYPE, L7RULE_TYPE_HEADER,
L7RULE_TYPE_COOKIE)
LB_ALGORITHM_ROUND_ROBIN = 'ROUND_ROBIN'
LB_ALGORITHM_LEAST_CONNECTIONS = 'LEAST_CONNECTIONS'
LB_ALGORITHM_SOURCE_IP = 'SOURCE_IP'
SUPPORTED_LB_ALGORITHMS = (LB_ALGORITHM_LEAST_CONNECTIONS,
LB_ALGORITHM_ROUND_ROBIN,
LB_ALGORITHM_SOURCE_IP)
OPERATING_STATUS = 'operating_status'
ONLINE = 'ONLINE'
OFFLINE = 'OFFLINE'
DEGRADED = 'DEGRADED'
ERROR = 'ERROR'
DRAINING = 'DRAINING'
NO_MONITOR = 'NO_MONITOR'
SUPPORTED_OPERATING_STATUSES = (ONLINE, OFFLINE, DEGRADED, ERROR, DRAINING,
NO_MONITOR)
PROTOCOL_TCP = 'TCP'
PROTOCOL_UDP = 'UDP'
PROTOCOL_HTTP = 'HTTP'
PROTOCOL_HTTPS = 'HTTPS'
PROTOCOL_TERMINATED_HTTPS = 'TERMINATED_HTTPS'
PROTOCOL_PROXY = 'PROXY'
SUPPORTED_PROTOCOLS = (PROTOCOL_TCP, PROTOCOL_HTTPS, PROTOCOL_HTTP,
PROTOCOL_TERMINATED_HTTPS, PROTOCOL_PROXY, PROTOCOL_UDP)
PROVISIONING_STATUS = 'provisioning_status'
# Amphora has been allocated to a load balancer
AMPHORA_ALLOCATED = 'ALLOCATED'
# Amphora is being built
AMPHORA_BOOTING = 'BOOTING'
# Amphora is ready to be allocated to a load balancer
AMPHORA_READY = 'READY'
ACTIVE = 'ACTIVE'
PENDING_DELETE = 'PENDING_DELETE'
PENDING_UPDATE = 'PENDING_UPDATE'
PENDING_CREATE = 'PENDING_CREATE'
DELETED = 'DELETED'
SUPPORTED_PROVISIONING_STATUSES = (ACTIVE, AMPHORA_ALLOCATED,
AMPHORA_BOOTING, AMPHORA_READY,
PENDING_DELETE, PENDING_CREATE,
PENDING_UPDATE, DELETED, ERROR)
SESSION_PERSISTENCE_SOURCE_IP = 'SOURCE_IP'
SESSION_PERSISTENCE_HTTP_COOKIE = 'HTTP_COOKIE'
SESSION_PERSISTENCE_APP_COOKIE = 'APP_COOKIE'
SUPPORTED_SP_TYPES = (SESSION_PERSISTENCE_SOURCE_IP,
SESSION_PERSISTENCE_HTTP_COOKIE,
SESSION_PERSISTENCE_APP_COOKIE)
# List of HTTP headers which are supported for insertion
SUPPORTED_HTTP_HEADERS = ['X-Forwarded-For',
'X-Forwarded-Port',
'X-Forwarded-Proto']

View File

View File

@ -0,0 +1,348 @@
# Copyright (c) 2014 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
"""
Guidelines for writing new hacking checks
- Use only for Octavia specific tests. OpenStack general tests
should be submitted to the common 'hacking' module.
- Pick numbers in the range O3xx. Find the current test with
the highest allocated number and then pick the next value.
- Keep the test method code in the source file ordered based
on the O3xx value.
- List the new rule in the top level HACKING.rst file
- Add test cases for each new rule to
octavia_lib/tests/unit/test_hacking.py
"""
_all_log_levels = {'critical', 'error', 'exception', 'info', 'warning'}
_all_hints = {'_LC', '_LE', '_LI', '_', '_LW'}
_log_translation_hint = re.compile(
r".*LOG\.(%(levels)s)\(\s*(%(hints)s)\(" % {
'levels': '|'.join(_all_log_levels),
'hints': '|'.join(_all_hints),
})
assert_trueinst_re = re.compile(
r"(.)*assertTrue\(isinstance\((\w|\.|\'|\"|\[|\])+, "
"(\w|\.|\'|\"|\[|\])+\)\)")
assert_equal_in_end_with_true_or_false_re = re.compile(
r"assertEqual\((\w|[][.'\"])+ in (\w|[][.'\", ])+, (True|False)\)")
assert_equal_in_start_with_true_or_false_re = re.compile(
r"assertEqual\((True|False), (\w|[][.'\"])+ in (\w|[][.'\", ])+\)")
assert_equal_with_true_re = re.compile(
r"assertEqual\(True,")
assert_equal_with_false_re = re.compile(
r"assertEqual\(False,")
mutable_default_args = re.compile(r"^\s*def .+\((.+=\{\}|.+=\[\])")
assert_equal_end_with_none_re = re.compile(r"(.)*assertEqual\(.+, None\)")
assert_equal_start_with_none_re = re.compile(r".*assertEqual\(None, .+\)")
assert_not_equal_end_with_none_re = re.compile(
r"(.)*assertNotEqual\(.+, None\)")
assert_not_equal_start_with_none_re = re.compile(
r"(.)*assertNotEqual\(None, .+\)")
assert_no_xrange_re = re.compile(
r"\s*xrange\s*\(")
revert_must_have_kwargs_re = re.compile(
r'[ ]*def revert\(.+,[ ](?!\*\*kwargs)\w+\):')
untranslated_exception_re = re.compile(r"raise (?:\w*)\((.*)\)")
no_basestring_re = re.compile(r"\bbasestring\b")
no_iteritems_re = re.compile(r".*\.iteritems\(\)")
no_eventlet_re = re.compile(r'(import|from)\s+[(]?eventlet')
no_line_continuation_backslash_re = re.compile(r'.*(\\)\n')
no_logging_re = re.compile(r'(import|from)\s+[(]?logging')
namespace_imports_dot = re.compile(r"import[\s]+([\w]+)[.][^\s]+")
namespace_imports_from_dot = re.compile(r"from[\s]+([\w]+)[.]")
namespace_imports_from_root = re.compile(r"from[\s]+([\w]+)[\s]+import[\s]+")
def _check_imports(regex, submatch, logical_line):
m = re.match(regex, logical_line)
if m and m.group(1) == submatch:
return True
return False
def _check_namespace_imports(failure_code, namespace, new_ns, logical_line,
message_override=None):
if message_override is not None:
msg_o = "%s: %s" % (failure_code, message_override)
else:
msg_o = None
if _check_imports(namespace_imports_from_dot, namespace, logical_line):
msg = ("%s: '%s' must be used instead of '%s'.") % (
failure_code,
logical_line.replace('%s.' % namespace, new_ns),
logical_line)
return (0, msg_o or msg)
elif _check_imports(namespace_imports_from_root, namespace, logical_line):
msg = ("%s: '%s' must be used instead of '%s'.") % (
failure_code,
logical_line.replace(
'from %s import ' % namespace, 'import %s' % new_ns),
logical_line)
return (0, msg_o or msg)
elif _check_imports(namespace_imports_dot, namespace, logical_line):
msg = ("%s: '%s' must be used instead of '%s'.") % (
failure_code,
logical_line.replace('import', 'from').replace('.', ' import '),
logical_line)
return (0, msg_o or msg)
return None
def _translation_checks_not_enforced(filename):
# Do not do these validations on tests
return any(pat in filename for pat in ["/tests/", "rally-jobs/plugins/"])
def assert_true_instance(logical_line):
"""Check for assertTrue(isinstance(a, b)) sentences
O316
"""
if assert_trueinst_re.match(logical_line):
yield (0, "O316: assertTrue(isinstance(a, b)) sentences not allowed. "
"Use assertIsInstance instead.")
def assert_equal_or_not_none(logical_line):
"""Check for assertEqual(A, None) or assertEqual(None, A) sentences,
assertNotEqual(A, None) or assertNotEqual(None, A) sentences
O318
"""
msg = ("O318: assertEqual/assertNotEqual(A, None) or "
"assertEqual/assertNotEqual(None, A) sentences not allowed")
res = (assert_equal_start_with_none_re.match(logical_line) or
assert_equal_end_with_none_re.match(logical_line) or
assert_not_equal_start_with_none_re.match(logical_line) or
assert_not_equal_end_with_none_re.match(logical_line))
if res:
yield (0, msg)
def assert_equal_true_or_false(logical_line):
"""Check for assertEqual(True, A) or assertEqual(False, A) sentences
O323
"""
res = (assert_equal_with_true_re.search(logical_line) or
assert_equal_with_false_re.search(logical_line))
if res:
yield (0, "O323: assertEqual(True, A) or assertEqual(False, A) "
"sentences not allowed")
def no_mutable_default_args(logical_line):
msg = "O324: Method's default argument shouldn't be mutable!"
if mutable_default_args.match(logical_line):
yield (0, msg)
def assert_equal_in(logical_line):
"""Check for assertEqual(A in B, True), assertEqual(True, A in B),
assertEqual(A in B, False) or assertEqual(False, A in B) sentences
O338
"""
res = (assert_equal_in_start_with_true_or_false_re.search(logical_line) or
assert_equal_in_end_with_true_or_false_re.search(logical_line))
if res:
yield (0, "O338: Use assertIn/NotIn(A, B) rather than "
"assertEqual(A in B, True/False) when checking collection "
"contents.")
def no_log_warn(logical_line):
"""Disallow 'LOG.warn('
O339
"""
if logical_line.startswith('LOG.warn('):
yield(0, "O339:Use LOG.warning() rather than LOG.warn()")
def no_xrange(logical_line):
"""Disallow 'xrange()'
O340
"""
if assert_no_xrange_re.match(logical_line):
yield(0, "O340: Do not use xrange().")
def no_translate_logs(logical_line, filename):
"""O341 - Don't translate logs.
Check for 'LOG.*(_(' and 'LOG.*(_Lx('
Translators don't provide translations for log messages, and operators
asked not to translate them.
* This check assumes that 'LOG' is a logger.
:param logical_line: The logical line to check.
:param filename: The file name where the logical line exists.
:returns: None if the logical line passes the check, otherwise a tuple
is yielded that contains the offending index in logical line
and a message describe the check validation failure.
"""
if _translation_checks_not_enforced(filename):
return
msg = "O341: Log messages should not be translated!"
match = _log_translation_hint.match(logical_line)
if match:
yield (logical_line.index(match.group()), msg)
def check_raised_localized_exceptions(logical_line, filename):
"""O342 - Untranslated exception message.
:param logical_line: The logical line to check.
:param filename: The file name where the logical line exists.
:returns: None if the logical line passes the check, otherwise a tuple
is yielded that contains the offending index in logical line
and a message describe the check validation failure.
"""
if _translation_checks_not_enforced(filename):
return
logical_line = logical_line.strip()
raised_search = untranslated_exception_re.match(logical_line)
if raised_search:
exception_msg = raised_search.groups()[0]
if exception_msg.startswith("\"") or exception_msg.startswith("\'"):
msg = "O342: Untranslated exception message."
yield (logical_line.index(exception_msg), msg)
def check_no_basestring(logical_line):
"""O343 - basestring is not Python3-compatible.
:param logical_line: The logical line to check.
:returns: None if the logical line passes the check, otherwise a tuple
is yielded that contains the offending index in logical line
and a message describe the check validation failure.
"""
if no_basestring_re.search(logical_line):
msg = ("O343: basestring is not Python3-compatible, use "
"six.string_types instead.")
yield(0, msg)
def check_python3_no_iteritems(logical_line):
"""O344 - Use dict.items() instead of dict.iteritems().
:param logical_line: The logical line to check.
:returns: None if the logical line passes the check, otherwise a tuple
is yielded that contains the offending index in logical line
and a message describe the check validation failure.
"""
if no_iteritems_re.search(logical_line):
msg = ("O344: Use dict.items() instead of dict.iteritems() to be "
"compatible with both Python 2 and Python 3. In Python 2, "
"dict.items() may be inefficient for very large dictionaries. "
"If you can prove that you need the optimization of an "
"iterator for Python 2, then you can use six.iteritems(dict).")
yield(0, msg)
def check_no_eventlet_imports(logical_line):
"""O345 - Usage of Python eventlet module not allowed.
:param logical_line: The logical line to check.
:returns: None if the logical line passes the check, otherwise a tuple
is yielded that contains the offending index in logical line
and a message describe the check validation failure.
"""
if no_eventlet_re.match(logical_line):
msg = 'O345 Usage of Python eventlet module not allowed'
yield logical_line.index('eventlet'), msg
def check_line_continuation_no_backslash(logical_line, tokens):
"""O346 - Don't use backslashes for line continuation.
:param logical_line: The logical line to check. Not actually used.
:param tokens: List of tokens to check.
:returns: None if the tokens don't contain any issues, otherwise a tuple
is yielded that contains the offending index in the logical
line and a message describe the check validation failure.
"""
backslash = None
for token_type, text, start, end, orig_line in tokens:
m = no_line_continuation_backslash_re.match(orig_line)
if m:
backslash = (start[0], m.start(1))
break
if backslash is not None:
msg = 'O346 Backslash line continuations not allowed'
yield backslash, msg
def check_no_logging_imports(logical_line):
"""O348 - Usage of Python logging module not allowed.
:param logical_line: The logical line to check.
:returns: None if the logical line passes the check, otherwise a tuple
is yielded that contains the offending index in logical line
and a message describe the check validation failure.
"""
if no_logging_re.match(logical_line):
msg = 'O348 Usage of Python logging module not allowed, use oslo_log'
yield logical_line.index('logging'), msg
def check_no_octavia_namespace_imports(logical_line):
"""O501 - Direct octavia imports not allowed.
:param logical_line: The logical line to check.
:returns: None if the logical line passes the check, otherwise a tuple
is yielded that contains the offending index in logical line and a
message describe the check validation failure.
"""
x = _check_namespace_imports(
'O501', 'octavia', 'octavia_lib.', logical_line,
message_override="O501 Direct octavia imports not allowed")
if x is not None:
yield x
def factory(register):
register(assert_true_instance)
register(assert_equal_or_not_none)
register(no_translate_logs)
register(assert_equal_true_or_false)
register(no_mutable_default_args)
register(assert_equal_in)
register(no_log_warn)
register(no_xrange)
register(check_raised_localized_exceptions)
register(check_no_basestring)
register(check_python3_no_iteritems)
register(check_no_eventlet_imports)
register(check_line_continuation_no_backslash)
register(check_no_logging_imports)
register(check_no_octavia_namespace_imports)

20
octavia_lib/i18n.py Normal file
View File

@ -0,0 +1,20 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_i18n as i18n
_translators = i18n.TranslatorFactory(domain='octavia-lib')
# The primary translation function using the well-known name "_"
_ = _translators.primary

View File

@ -1,28 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
test_octavia_lib
----------------------------------
Tests for `octavia_lib` module.
"""
from octavia_lib.tests import base
class TestOctavia_lib(base.TestCase):
def test_something(self):
pass

View File

View File

View File

@ -0,0 +1,412 @@
# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from copy import deepcopy
from oslo_utils import uuidutils
from octavia_lib.api.drivers import data_models
from octavia_lib.tests.unit import base
class TestProviderDataModels(base.TestCase):
def setUp(self):
super(TestProviderDataModels, self).setUp()
self.loadbalancer_id = uuidutils.generate_uuid()
self.project_id = uuidutils.generate_uuid()
self.vip_address = '192.0.2.83'
self.vip_network_id = uuidutils.generate_uuid()
self.vip_port_id = uuidutils.generate_uuid()
self.vip_subnet_id = uuidutils.generate_uuid()
self.listener_id = uuidutils.generate_uuid()
self.vip_qos_policy_id = uuidutils.generate_uuid()
self.default_tls_container_ref = uuidutils.generate_uuid()
self.sni_container_ref_1 = uuidutils.generate_uuid()
self.sni_container_ref_2 = uuidutils.generate_uuid()
self.pool_id = uuidutils.generate_uuid()
self.session_persistence = {"cookie_name": "sugar",
"type": "APP_COOKIE"}
self.member_id = uuidutils.generate_uuid()
self.mem_subnet_id = uuidutils.generate_uuid()
self.healthmonitor_id = uuidutils.generate_uuid()
self.l7policy_id = uuidutils.generate_uuid()
self.l7rule_id = uuidutils.generate_uuid()
self.ref_l7rule = data_models.L7Rule(
admin_state_up=True,
compare_type='STARTS_WITH',
invert=True,
key='cookie',
l7policy_id=self.l7policy_id,
l7rule_id=self.l7rule_id,
type='COOKIE',
value='chocolate')
self.ref_l7policy = data_models.L7Policy(
action='REJECT',
admin_state_up=False,
description='A L7 Policy',
l7policy_id=self.l7policy_id,
listener_id=self.listener_id,
name='l7policy',
position=1,
redirect_pool_id=self.pool_id,
redirect_url='/test',
rules=[self.ref_l7rule],
redirect_prefix='http://example.com')
self.ref_listener = data_models.Listener(
admin_state_up=True,
connection_limit=5000,
default_pool=None,
default_pool_id=None,
default_tls_container_data='default_cert_data',
default_tls_container_ref=self.default_tls_container_ref,
description=data_models.Unset,
insert_headers={'X-Forwarded-For': 'true'},
l7policies=[self.ref_l7policy],
listener_id=self.listener_id,
loadbalancer_id=self.loadbalancer_id,
name='super_listener',
protocol='avian',
protocol_port=42,
sni_container_data=['sni_cert_data_1', 'sni_cert_data_2'],
sni_container_refs=[self.sni_container_ref_1,
self.sni_container_ref_2],
timeout_client_data=3,
timeout_member_connect=4,
timeout_member_data=5,
timeout_tcp_inspect=6)
self.ref_lb = data_models.LoadBalancer(
admin_state_up=False,
description='One great load balancer',
flavor={'cake': 'chocolate'},
listeners=[self.ref_listener],
loadbalancer_id=self.loadbalancer_id,
name='favorite_lb',
project_id=self.project_id,
vip_address=self.vip_address,
vip_network_id=self.vip_network_id,
vip_port_id=self.vip_port_id,
vip_subnet_id=self.vip_subnet_id,
vip_qos_policy_id=self.vip_qos_policy_id)
self.ref_vip = data_models.VIP(
vip_address=self.vip_address,
vip_network_id=self.vip_network_id,
vip_port_id=self.vip_port_id,
vip_subnet_id=self.vip_subnet_id,
vip_qos_policy_id=self.vip_qos_policy_id)
self.ref_member = data_models.Member(
address='192.0.2.10',
admin_state_up=True,
member_id=self.member_id,
monitor_address='192.0.2.11',
monitor_port=8888,
name='member',
pool_id=self.pool_id,
protocol_port=80,
subnet_id=self.mem_subnet_id,
weight=1,
backup=False)
self.ref_healthmonitor = data_models.HealthMonitor(
admin_state_up=False,
delay=1,
expected_codes='200,202',
healthmonitor_id=self.healthmonitor_id,
http_method='GET',
max_retries=2,
max_retries_down=3,
name='member',
pool_id=self.pool_id,
timeout=4,
type='HTTP',
url_path='/test')
self.ref_pool = data_models.Pool(
admin_state_up=True,
description='A pool',
healthmonitor=None,
lb_algorithm='fast',
loadbalancer_id=self.loadbalancer_id,
members=[self.ref_member],
name='pool',
pool_id=self.pool_id,
listener_id=self.listener_id,
protocol='avian',
session_persistence=self.session_persistence)
self.ref_l7rule_dict = {'admin_state_up': True,
'compare_type': 'STARTS_WITH',
'invert': True,
'key': 'cookie',
'l7policy_id': self.l7policy_id,
'l7rule_id': self.l7rule_id,
'type': 'COOKIE',
'value': 'chocolate'}
self.ref_l7policy_dict = {'action': 'REJECT',
'admin_state_up': False,
'description': 'A L7 Policy',
'l7policy_id': self.l7policy_id,
'listener_id': self.listener_id,
'name': 'l7policy',
'position': 1,
'redirect_pool_id': self.pool_id,
'redirect_url': '/test',
'rules': [self.ref_l7rule_dict],
'redirect_prefix': 'http://example.com'}
self.ref_lb_dict = {'project_id': self.project_id,
'flavor': {'cake': 'chocolate'},
'vip_network_id': self.vip_network_id,
'admin_state_up': False,
'loadbalancer_id': self.loadbalancer_id,
'vip_port_id': self.vip_port_id,
'vip_address': self.vip_address,
'description': 'One great load balancer',
'vip_subnet_id': self.vip_subnet_id,
'name': 'favorite_lb',
'vip_qos_policy_id': self.vip_qos_policy_id}
self.ref_listener_dict = {
'admin_state_up': True,
'connection_limit': 5000,
'default_pool': None,
'default_pool_id': None,
'default_tls_container_data': 'default_cert_data',
'default_tls_container_ref': self.default_tls_container_ref,
'description': None,
'insert_headers': {'X-Forwarded-For': 'true'},
'listener_id': self.listener_id,
'l7policies': [self.ref_l7policy_dict],
'loadbalancer_id': self.loadbalancer_id,
'name': 'super_listener',
'protocol': 'avian',
'protocol_port': 42,
'sni_container_data': ['sni_cert_data_1', 'sni_cert_data_2'],
'sni_container_refs': [self.sni_container_ref_1,
self.sni_container_ref_2],
'timeout_client_data': 3,
'timeout_member_connect': 4,
'timeout_member_data': 5,
'timeout_tcp_inspect': 6}
self.ref_lb_dict_with_listener = {
'admin_state_up': False,
'description': 'One great load balancer',
'flavor': {'cake': 'chocolate'},
'listeners': [self.ref_listener_dict],
'loadbalancer_id': self.loadbalancer_id,
'name': 'favorite_lb',
'project_id': self.project_id,
'vip_address': self.vip_address,
'vip_network_id': self.vip_network_id,
'vip_port_id': self.vip_port_id,
'vip_subnet_id': self.vip_subnet_id,
'vip_qos_policy_id': self.vip_qos_policy_id}
self.ref_vip_dict = {
'vip_address': self.vip_address,
'vip_network_id': self.vip_network_id,
'vip_port_id': self.vip_port_id,
'vip_subnet_id': self.vip_subnet_id,
'vip_qos_policy_id': self.vip_qos_policy_id}
self.ref_member_dict = {
'address': '192.0.2.10',
'admin_state_up': True,
'member_id': self.member_id,
'monitor_address': '192.0.2.11',
'monitor_port': 8888,
'name': 'member',
'pool_id': self.pool_id,
'protocol_port': 80,
'subnet_id': self.mem_subnet_id,
'weight': 1,
'backup': False}
self.ref_healthmonitor_dict = {
'admin_state_up': False,
'delay': 1,
'expected_codes': '200,202',
'healthmonitor_id': self.healthmonitor_id,
'http_method': 'GET',
'max_retries': 2,
'max_retries_down': 3,
'name': 'member',
'pool_id': self.pool_id,
'timeout': 4,
'type': 'HTTP',
'url_path': '/test'}
self.ref_pool_dict = {
'admin_state_up': True,
'description': 'A pool',
'healthmonitor': self.ref_healthmonitor_dict,
'lb_algorithm': 'fast',
'loadbalancer_id': self.loadbalancer_id,
'members': [self.ref_member_dict],
'name': 'pool',
'pool_id': self.pool_id,
'listener_id': self.listener_id,
'protocol': 'avian',
'session_persistence': self.session_persistence}
def test_equality(self):
second_ref_lb = deepcopy(self.ref_lb)
self.assertTrue(self.ref_lb == second_ref_lb)
second_ref_lb.admin_state_up = True
self.assertFalse(self.ref_lb == second_ref_lb)
self.assertFalse(self.ref_lb == self.loadbalancer_id)
def test_inequality(self):
second_ref_lb = deepcopy(self.ref_lb)
self.assertFalse(self.ref_lb != second_ref_lb)
second_ref_lb.admin_state_up = True
self.assertTrue(self.ref_lb != second_ref_lb)
self.assertTrue(self.ref_lb != self.loadbalancer_id)
def test_to_dict(self):
ref_lb_converted_to_dict = self.ref_lb.to_dict()
ref_listener_converted_to_dict = self.ref_listener.to_dict()
ref_pool_converted_to_dict = self.ref_pool.to_dict()
ref_member_converted_to_dict = self.ref_member.to_dict()
ref_healthmon_converted_to_dict = self.ref_healthmonitor.to_dict()
ref_l7policy_converted_to_dict = self.ref_l7policy.to_dict()
ref_l7rule_converted_to_dict = self.ref_l7rule.to_dict()
ref_vip_converted_to_dict = self.ref_vip.to_dict()
# This test does not recurse, so remove items for the reference
# that should not be rendered
ref_list_dict = deepcopy(self.ref_listener_dict)
ref_list_dict.pop('l7policies', None)
ref_list_dict.pop('sni_container_data', None)
ref_list_dict.pop('sni_container_refs', None)
ref_pool_dict = deepcopy(self.ref_pool_dict)
ref_pool_dict['healthmonitor'] = None
ref_pool_dict.pop('members', None)
ref_l7policy_dict = deepcopy(self.ref_l7policy_dict)
ref_l7policy_dict.pop('rules', None)
# This test does not render unsets, so remove those from the reference
ref_list_dict.pop('description', None)
self.assertEqual(self.ref_lb_dict, ref_lb_converted_to_dict)
self.assertEqual(ref_list_dict, ref_listener_converted_to_dict)
self.assertEqual(ref_pool_dict, ref_pool_converted_to_dict)
self.assertEqual(self.ref_member_dict, ref_member_converted_to_dict)
self.assertEqual(self.ref_healthmonitor_dict,
ref_healthmon_converted_to_dict)
self.assertEqual(ref_l7policy_dict, ref_l7policy_converted_to_dict)
self.assertEqual(self.ref_l7rule_dict, ref_l7rule_converted_to_dict)
self.assertEqual(self.ref_vip_dict, ref_vip_converted_to_dict)
def test_to_dict_private_attrs(self):
private_dict = {'_test': 'foo'}
ref_lb_converted_to_dict = self.ref_lb.to_dict(**private_dict)
self.assertEqual(self.ref_lb_dict, ref_lb_converted_to_dict)
def test_to_dict_partial(self):
ref_lb = data_models.LoadBalancer(loadbalancer_id=self.loadbalancer_id)
ref_lb_dict = {'loadbalancer_id': self.loadbalancer_id}
ref_lb_converted_to_dict = ref_lb.to_dict()
self.assertEqual(ref_lb_dict, ref_lb_converted_to_dict)
def test_to_dict_render_unsets(self):
ref_lb_converted_to_dict = self.ref_lb.to_dict(render_unsets=True)
new_ref_lib_dict = deepcopy(self.ref_lb_dict)
new_ref_lib_dict['pools'] = None
new_ref_lib_dict['listeners'] = None
self.assertEqual(new_ref_lib_dict, ref_lb_converted_to_dict)
def test_to_dict_recursive(self):
# Render with unsets is not set, so remove the Unset description
ref_lb_dict_with_listener = deepcopy(self.ref_lb_dict_with_listener)
ref_lb_dict_with_listener['listeners'][0].pop('description', None)
ref_lb_converted_to_dict = self.ref_lb.to_dict(recurse=True)
self.assertEqual(ref_lb_dict_with_listener,
ref_lb_converted_to_dict)
def test_to_dict_recursive_partial(self):
ref_lb = data_models.LoadBalancer(
loadbalancer_id=self.loadbalancer_id,
listeners=[self.ref_listener])
ref_lb_dict_with_listener = {
'loadbalancer_id': self.loadbalancer_id,
'listeners': [self.ref_listener_dict]}
# Render with unsets is not set, so remove the Unset description
ref_lb_dict_with_listener = deepcopy(ref_lb_dict_with_listener)
ref_lb_dict_with_listener['listeners'][0].pop('description', None)
ref_lb_converted_to_dict = ref_lb.to_dict(recurse=True)
self.assertEqual(ref_lb_dict_with_listener, ref_lb_converted_to_dict)
def test_to_dict_recursive_render_unset(self):
ref_lb = data_models.LoadBalancer(
admin_state_up=False,
description='One great load balancer',
flavor={'cake': 'chocolate'},
listeners=[self.ref_listener],
loadbalancer_id=self.loadbalancer_id,
project_id=self.project_id,
vip_address=self.vip_address,
vip_network_id=self.vip_network_id,
vip_port_id=self.vip_port_id,
vip_subnet_id=self.vip_subnet_id,
vip_qos_policy_id=self.vip_qos_policy_id)
ref_lb_dict_with_listener = deepcopy(self.ref_lb_dict_with_listener)
ref_lb_dict_with_listener['pools'] = None
ref_lb_dict_with_listener['name'] = None
ref_lb_converted_to_dict = ref_lb.to_dict(recurse=True,
render_unsets=True)
self.assertEqual(ref_lb_dict_with_listener,
ref_lb_converted_to_dict)
def test_from_dict(self):
lb_object = data_models.LoadBalancer.from_dict(self.ref_lb_dict)
self.assertEqual(self.ref_lb, lb_object)
def test_unset_bool(self):
self.assertFalse(data_models.Unset)
def test_unset_repr(self):
self.assertEqual('Unset', repr(data_models.Unset))

View File

@ -0,0 +1,106 @@
# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from octavia_lib.api.drivers import driver_lib
from octavia_lib.api.drivers import exceptions as driver_exceptions
from octavia_lib.tests.unit import base
class TestDriverLib(base.TestCase):
def setUp(self):
self.mock_socket = mock.MagicMock()
with mock.patch('socket.socket') as socket_mock:
socket_mock.return_value = self.mock_socket
self.driver_lib = driver_lib.DriverLibrary()
super(TestDriverLib, self).setUp()
@mock.patch('six.moves.builtins.memoryview')
def test_recv(self, mock_memoryview):
self.mock_socket.recv.side_effect = ['1', '\n']
self.mock_socket.recv_into.return_value = 1
mv_mock = mock.MagicMock()
mock_memoryview.return_value = mv_mock
mv_mock.tobytes.return_value = '"test data"'
response = self.driver_lib._recv()
calls = [mock.call(1), mock.call(1)]
self.mock_socket.recv.assert_has_calls(calls)
self.mock_socket.recv_into.assert_called_once_with(
mv_mock.__getitem__(), 1)
self.assertEqual('test data', response)
@mock.patch('octavia_lib.api.drivers.driver_lib.DriverLibrary._recv')
def test_send(self, mock_recv):
mock_recv.return_value = 'fake_response'
response = self.driver_lib._send('fake_path', 'test data')
self.mock_socket.connect.assert_called_once_with('fake_path')
self.mock_socket.send.assert_called_once_with('11\n')
self.mock_socket.sendall.assert_called_once_with('"test data"')
self.mock_socket.close.assert_called_once()
self.assertEqual(mock_recv.return_value, response)
@mock.patch('octavia_lib.api.drivers.driver_lib.DriverLibrary._send')
def test_update_loadbalancer_status(self, mock_send):
error_dict = {'status_code': 500, 'fault_string': 'boom',
'status_object': 'balloon', 'status_object_id': '1',
'status_record': 'tunes'}
mock_send.side_effect = [{'status_code': 200}, Exception('boom'),
error_dict]
# Happy path
self.driver_lib.update_loadbalancer_status('fake_status')
mock_send.assert_called_once_with('/var/run/octavia/status.sock',
'fake_status')
# Test general exception
self.assertRaises(driver_exceptions.UpdateStatusError,
self.driver_lib.update_loadbalancer_status,
'fake_status')
# Test bad status code returned
self.assertRaises(driver_exceptions.UpdateStatusError,
self.driver_lib.update_loadbalancer_status,
'fake_status')
@mock.patch('octavia_lib.api.drivers.driver_lib.DriverLibrary._send')
def test_update_listener_statistics(self, mock_send):
error_dict = {'status_code': 500, 'fault_string': 'boom',
'status_object': 'balloon', 'status_object_id': '1',
'status_record': 'tunes'}
mock_send.side_effect = [{'status_code': 200}, Exception('boom'),
error_dict]
# Happy path
self.driver_lib.update_listener_statistics('fake_stats')
mock_send.assert_called_once_with('/var/run/octavia/stats.sock',
'fake_stats')
# Test general exception
self.assertRaises(driver_exceptions.UpdateStatisticsError,
self.driver_lib.update_listener_statistics,
'fake_stats')
# Test bad status code returned
self.assertRaises(driver_exceptions.UpdateStatisticsError,
self.driver_lib.update_listener_statistics,
'fake_stats')

View File

@ -0,0 +1,88 @@
# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from octavia_lib.api.drivers import exceptions
from octavia_lib.tests.unit import base
class TestProviderExceptions(base.TestCase):
def setUp(self):
super(TestProviderExceptions, self).setUp()
self.user_fault_string = 'Bad driver'
self.operator_fault_string = 'Fix bad driver.'
self.fault_object = 'MCP'
self.fault_object_id = '-1'
self.fault_record = 'skip'
def test_DriverError(self):
driver_error = exceptions.DriverError(
user_fault_string=self.user_fault_string,
operator_fault_string=self.operator_fault_string)
self.assertEqual(self.user_fault_string,
driver_error.user_fault_string)
self.assertEqual(self.operator_fault_string,
driver_error.operator_fault_string)
self.assertIsInstance(driver_error, Exception)
def test_NotImplementedError(self):
not_implemented_error = exceptions.NotImplementedError(
user_fault_string=self.user_fault_string,
operator_fault_string=self.operator_fault_string)
self.assertEqual(self.user_fault_string,
not_implemented_error.user_fault_string)
self.assertEqual(self.operator_fault_string,
not_implemented_error.operator_fault_string)
self.assertIsInstance(not_implemented_error, Exception)
def test_UnsupportedOptionError(self):
unsupported_option_error = exceptions.UnsupportedOptionError(
user_fault_string=self.user_fault_string,
operator_fault_string=self.operator_fault_string)
self.assertEqual(self.user_fault_string,
unsupported_option_error.user_fault_string)
self.assertEqual(self.operator_fault_string,
unsupported_option_error.operator_fault_string)
self.assertIsInstance(unsupported_option_error, Exception)
def test_UpdateStatusError(self):
update_status_error = exceptions.UpdateStatusError(
fault_string=self.user_fault_string,
status_object=self.fault_object,
status_object_id=self.fault_object_id,
status_record=self.fault_record)
self.assertEqual(self.user_fault_string,
update_status_error.fault_string)
self.assertEqual(self.fault_object, update_status_error.status_object)
self.assertEqual(self.fault_object_id,
update_status_error.status_object_id)
self.assertEqual(self.fault_record, update_status_error.status_record)
def test_UpdateStatisticsError(self):
update_stats_error = exceptions.UpdateStatisticsError(
fault_string=self.user_fault_string,
stats_object=self.fault_object,
stats_object_id=self.fault_object_id,
stats_record=self.fault_record)
self.assertEqual(self.user_fault_string,
update_stats_error.fault_string)
self.assertEqual(self.fault_object, update_stats_error.stats_object)
self.assertEqual(self.fault_object_id,
update_stats_error.stats_object_id)
self.assertEqual(self.fault_record, update_stats_error.stats_record)

View File

@ -0,0 +1,157 @@
# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from octavia_lib.api.drivers import exceptions
from octavia_lib.api.drivers import provider_base as driver_base
from octavia_lib.tests.unit import base
class TestProviderBase(base.TestCase):
"""Test base methods.
Tests that methods not implemented by the drivers raise
NotImplementedError.
"""
def setUp(self):
super(TestProviderBase, self).setUp()
self.driver = driver_base.ProviderDriver()
def test_create_vip_port(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.create_vip_port,
False, False, False)
def test_loadbalancer_create(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.loadbalancer_create,
False)
def test_loadbalancer_delete(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.loadbalancer_delete,
False)
def test_loadbalancer_failover(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.loadbalancer_failover,
False)
def test_loadbalancer_update(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.loadbalancer_update,
False, False)
def test_listener_create(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.listener_create,
False)
def test_listener_delete(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.listener_delete,
False)
def test_listener_update(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.listener_update,
False, False)
def test_pool_create(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.pool_create,
False)
def test_pool_delete(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.pool_delete,
False)
def test_pool_update(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.pool_update,
False, False)
def test_member_create(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.member_create,
False)
def test_member_delete(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.member_delete,
False)
def test_member_update(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.member_update,
False, False)
def test_member_batch_update(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.member_batch_update,
False)
def test_health_monitor_create(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.health_monitor_create,
False)
def test_health_monitor_delete(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.health_monitor_delete,
False)
def test_health_monitor_update(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.health_monitor_update,
False, False)
def test_l7policy_create(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.l7policy_create,
False)
def test_l7policy_delete(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.l7policy_delete,
False)
def test_l7policy_update(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.l7policy_update,
False, False)
def test_l7rule_create(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.l7rule_create,
False)
def test_l7rule_delete(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.l7rule_delete,
False)
def test_l7rule_update(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.l7rule_update,
False, False)
def test_get_supported_flavor_metadata(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.get_supported_flavor_metadata)
def test_validate_flavor(self):
self.assertRaises(exceptions.NotImplementedError,
self.driver.validate_flavor,
False)

View File

@ -15,9 +15,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslotest import base
class TestCase(base.BaseTestCase):
"""Test case base class for all unit tests."""
def setUp(self):
super(TestCase, self).setUp()
self.addCleanup(mock.patch.stopall)

View File

@ -0,0 +1,250 @@
# Copyright 2015
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from oslotest import base
from octavia_lib.hacking import checks
class HackingTestCase(base.BaseTestCase):
"""Hacking test class.
This class tests the hacking checks in octavia_lib.hacking.checks by
passing strings to the check methods like the pep8/flake8 parser would.
The parser loops over each line in the file and then passes the parameters
to the check method. The parameter names in the check method dictate what
type of object is passed to the check method. The parameter types are::
logical_line: A processed line with the following modifications:
- Multi-line statements converted to a single line.
- Stripped left and right.
- Contents of strings replaced with "xxx" of same length.
- Comments removed.
physical_line: Raw line of text from the input file.
lines: a list of the raw lines from the input file
tokens: the tokens that contribute to this logical line
line_number: line number in the input file
total_lines: number of lines in the input file
blank_lines: blank lines before this one
indent_char: indentation character in this file (" " or "\t")
indent_level: indentation (with tabs expanded to multiples of 8)
previous_indent_level: indentation on previous line
previous_logical: previous logical line
filename: Path of the file being run through pep8
When running a test on a check method the return will be False/None if
there is no violation in the sample input. If there is an error a tuple is
returned with a position in the line, and a message. So to check the result
just assertTrue if the check is expected to fail and assertFalse if it
should pass.
"""
def assertLinePasses(self, func, *args):
with testtools.ExpectedException(StopIteration):
next(func(*args))
def assertLineFails(self, func, *args):
self.assertIsInstance(next(func(*args)), tuple)
def _get_factory_checks(self, factory):
check_fns = []
def _reg(check_fn):
self.assertTrue(hasattr(check_fn, '__call__'))
self.assertFalse(check_fn in check_fns)
check_fns.append(check_fn)
factory(_reg)
return check_fns
def test_factory(self):
self.assertTrue(len(self._get_factory_checks(checks.factory)) > 0)
def test_assert_true_instance(self):
self.assertEqual(1, len(list(checks.assert_true_instance(
"self.assertTrue(isinstance(e, "
"exception.BuildAbortException))"))))
self.assertEqual(0, len(list(checks.assert_true_instance(
"self.assertTrue()"))))
def test_assert_equal_or_not_none(self):
self.assertEqual(1, len(list(checks.assert_equal_or_not_none(
"self.assertEqual(A, None)"))))
self.assertEqual(1, len(list(checks.assert_equal_or_not_none(
"self.assertEqual(None, A)"))))
self.assertEqual(1, len(list(checks.assert_equal_or_not_none(
"self.assertNotEqual(A, None)"))))
self.assertEqual(1, len(list(checks.assert_equal_or_not_none(
"self.assertNotEqual(None, A)"))))
self.assertEqual(0,
len(list(checks.assert_equal_or_not_none(
"self.assertIsNone()"))))
self.assertEqual(0,
len(list(checks.assert_equal_or_not_none(
"self.assertIsNotNone()"))))
def test_no_mutable_default_args(self):
self.assertEqual(0, len(list(checks.no_mutable_default_args(
"def foo (bar):"))))
self.assertEqual(1, len(list(checks.no_mutable_default_args(
"def foo (bar=[]):"))))
self.assertEqual(1, len(list(checks.no_mutable_default_args(
"def foo (bar={}):"))))
def test_assert_equal_in(self):
self.assertEqual(1, len(list(checks.assert_equal_in(
"self.assertEqual(a in b, True)"))))
self.assertEqual(1, len(list(checks.assert_equal_in(
"self.assertEqual('str' in 'string', True)"))))
self.assertEqual(0, len(list(checks.assert_equal_in(
"self.assertEqual(any(a==1 for a in b), True)"))))
self.assertEqual(1, len(list(checks.assert_equal_in(
"self.assertEqual(True, a in b)"))))
self.assertEqual(1, len(list(checks.assert_equal_in(
"self.assertEqual(True, 'str' in 'string')"))))
self.assertEqual(0, len(list(checks.assert_equal_in(
"self.assertEqual(True, any(a==1 for a in b))"))))
self.assertEqual(1, len(list(checks.assert_equal_in(
"self.assertEqual(a in b, False)"))))
self.assertEqual(1, len(list(checks.assert_equal_in(
"self.assertEqual('str' in 'string', False)"))))
self.assertEqual(0, len(list(checks.assert_equal_in(
"self.assertEqual(any(a==1 for a in b), False)"))))
self.assertEqual(1, len(list(checks.assert_equal_in(
"self.assertEqual(False, a in b)"))))
self.assertEqual(1, len(list(checks.assert_equal_in(
"self.assertEqual(False, 'str' in 'string')"))))
self.assertEqual(0, len(list(checks.assert_equal_in(
"self.assertEqual(False, any(a==1 for a in b))"))))
def test_assert_equal_true_or_false(self):
self.assertEqual(1, len(list(checks.assert_equal_true_or_false(
"self.assertEqual(True, A)"))))
self.assertEqual(1, len(list(checks.assert_equal_true_or_false(
"self.assertEqual(False, A)"))))
self.assertEqual(0, len(list(checks.assert_equal_true_or_false(
"self.assertTrue()"))))
self.assertEqual(0, len(list(checks.assert_equal_true_or_false(
"self.assertFalse()"))))
def test_no_log_warn(self):
self.assertEqual(1, len(list(checks.no_log_warn(
"LOG.warn()"))))
self.assertEqual(0, len(list(checks.no_log_warn(
"LOG.warning()"))))
def test_no_xrange(self):
self.assertEqual(1, len(list(checks.no_xrange(
"xrange(45)"))))
self.assertEqual(0, len(list(checks.no_xrange(
"range(45)"))))
def test_no_log_translations(self):
for log in checks._all_log_levels:
for hint in checks._all_hints:
bad = 'LOG.%s(%s("Bad"))' % (log, hint)
self.assertEqual(
1, len(list(checks.no_translate_logs(bad, 'f'))))
# Catch abuses when used with a variable and not a literal
bad = 'LOG.%s(%s(msg))' % (log, hint)
self.assertEqual(
1, len(list(checks.no_translate_logs(bad, 'f'))))
# Do not do validations in tests
ok = 'LOG.%s(_("OK - unit tests"))' % log
self.assertEqual(
0, len(list(checks.no_translate_logs(ok, 'f/tests/f'))))
def test_check_localized_exception_messages(self):
f = checks.check_raised_localized_exceptions
self.assertLineFails(f, " raise KeyError('Error text')", '')
self.assertLineFails(f, ' raise KeyError("Error text")', '')
self.assertLinePasses(f, ' raise KeyError(_("Error text"))', '')
self.assertLinePasses(f, ' raise KeyError(_ERR("Error text"))', '')
self.assertLinePasses(f, " raise KeyError(translated_msg)", '')
self.assertLinePasses(f, '# raise KeyError("Not translated")', '')
self.assertLinePasses(f, 'print("raise KeyError("Not '
'translated")")', '')
def test_check_localized_exception_message_skip_tests(self):
f = checks.check_raised_localized_exceptions
self.assertLinePasses(f, "raise KeyError('Error text')",
'neutron_lib/tests/unit/mytest.py')
def test_check_no_basestring(self):
self.assertEqual(1, len(list(checks.check_no_basestring(
"isinstance('foo', basestring)"))))
self.assertEqual(0, len(list(checks.check_no_basestring(
"isinstance('foo', six.string_types)"))))
def test_dict_iteritems(self):
self.assertEqual(1, len(list(checks.check_python3_no_iteritems(
"obj.iteritems()"))))
self.assertEqual(0, len(list(checks.check_python3_no_iteritems(
"six.iteritems(obj)"))))
self.assertEqual(0, len(list(checks.check_python3_no_iteritems(
"obj.items()"))))
def test_check_no_eventlet_imports(self):
f = checks.check_no_eventlet_imports
self.assertLinePasses(f, 'from not_eventlet import greenthread')
self.assertLineFails(f, 'from eventlet import greenthread')
self.assertLineFails(f, 'import eventlet')
def test_line_continuation_no_backslash(self):
results = list(checks.check_line_continuation_no_backslash(
'', [(1, 'import', (2, 0), (2, 6), 'import \\\n'),
(1, 'os', (3, 4), (3, 6), ' os\n')]))
self.assertEqual(1, len(results))
self.assertEqual((2, 7), results[0][0])
def test_check_no_logging_imports(self):
f = checks.check_no_logging_imports
self.assertLinePasses(f, 'from oslo_log import log')
self.assertLineFails(f, 'from logging import log')
self.assertLineFails(f, 'import logging')
def test_check_no_octavia_namespace_imports(self):
f = checks.check_no_octavia_namespace_imports
self.assertLinePasses(f, 'from octavia_lib import constants')
self.assertLinePasses(f, 'import octavia_lib.constants')
self.assertLineFails(f, 'from octavia.common import rpc')
self.assertLineFails(f, 'from octavia import context')
self.assertLineFails(f, 'import octavia.common.config')

View File

@ -209,8 +209,8 @@ latex_elements = {
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
# (source start file, target name, title, author,
# documentclass [howto, manual, or own class]).
latex_documents = [
('index', 'octavia_libReleaseNotes.tex',
u'octavia_lib Release Notes Documentation',

View File

@ -2,4 +2,7 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
oslo.i18n>=3.15.3 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0
pbr!=2.1.0,>=2.0.0 # Apache-2.0
six>=1.10.0 # MIT

View File

@ -4,8 +4,12 @@
hacking!=0.13.0,<0.14,>=0.12.0 # Apache-2.0
bandit>=1.1.0 # Apache-2.0
coverage>=4.0,!=4.4 # Apache-2.0
doc8>=0.6.0 # Apache-2.0
pylint==1.9.2 # GPLv2
python-subunit>=1.0.0 # Apache-2.0/BSD
oslo.utils>=3.33.0 # Apache-2.0
oslotest>=3.2.0 # Apache-2.0
stestr>=2.0.0 # Apache-2.0
testtools>=2.2.0 # MIT

66
tools/coding-checks.sh Executable file
View File

@ -0,0 +1,66 @@
#!/bin/sh
# This script is copied from neutron and adapted for octavia-lib.
set -eu
usage () {
echo "Usage: $0 [OPTION]..."
echo "Run octavia-lib's coding check(s)"
echo ""
echo " -Y, --pylint [<basecommit>] Run pylint check on the entire octavia-lib module or just files changed in basecommit (e.g. HEAD~1)"
echo " -h, --help Print this usage message"
echo
exit 0
}
join_args() {
if [ -z "$scriptargs" ]; then
scriptargs="$opt"
else
scriptargs="$scriptargs $opt"
fi
}
process_options () {
i=1
while [ $i -le $# ]; do
eval opt=\$$i
case $opt in
-h|--help) usage;;
-Y|--pylint) pylint=1;;
*) join_args;;
esac
i=$((i+1))
done
}
run_pylint () {
local target="${scriptargs:-all}"
if [ "$target" = "all" ]; then
files="octavia_lib"
else
case "$target" in
*HEAD~[0-9]*) files=$(git diff --diff-filter=AM --name-only $target -- "*.py");;
*) echo "$target is an unrecognized basecommit"; exit 1;;
esac
fi
echo "Running pylint..."
echo "You can speed this up by running it on 'HEAD~[0-9]' (e.g. HEAD~1, this change only)..."
if [ -n "${files}" ]; then
pylint --max-nested-blocks 7 --extension-pkg-whitelist netifaces --rcfile=.pylintrc --output-format=colorized ${files}
else
echo "No python changes in this commit, pylint check not required."
exit 0
fi
}
scriptargs=
pylint=1
process_options $@
if [ $pylint -eq 1 ]; then
run_pylint
exit 0
fi

16
tox.ini
View File

@ -18,6 +18,11 @@ commands = stestr run {posargs}
[testenv:pep8]
basepython = python3
commands = flake8 {posargs}
doc8 --ignore-path doc/source/contributor/modules \
doc/source octavia_lib HACKING.rst README.rst
# Run security linter
bandit -r octavia_lib -ll -ii -x octavia_lib/tests
{toxinidir}/tools/coding-checks.sh --pylint '{posargs}'
[testenv:venv]
basepython = python3
@ -33,6 +38,7 @@ commands =
coverage combine
coverage html -d cover
coverage xml -o cover/coverage.xml
coverage report --fail-under=95 --skip-covered
[testenv:docs]
basepython = python3
@ -57,6 +63,13 @@ ignore = E123,E125
builtins = _
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build
[hacking]
import_exceptions = octavia_lib.i18n
local-check-factory = octavia_lib.hacking.checks.factory
[doc8]
max-line-length = 79
[testenv:lower-constraints]
basepython = python3
deps =
@ -65,5 +78,4 @@ deps =
-r{toxinidir}/requirements.txt
whitelist_externals = sh
commands =
sh -c 'OS_TEST_PATH={toxinidir}/octavia/tests/unit stestr run {posargs}'
sh -c 'OS_TEST_PATH={toxinidir}/octavia/tests/functional stestr run {posargs}'
sh -c 'OS_TEST_PATH={toxinidir}/octavia_lib/tests/unit stestr run {posargs}'