Merge "use is_port_trusted from neutron-lib"

This commit is contained in:
Jenkins 2017-05-24 21:26:16 +00:00 committed by Gerrit Code Review
commit bf5df0256b
7 changed files with 13 additions and 21 deletions

View File

@ -13,6 +13,7 @@
# under the License.
from neutron_lib.plugins import directory
from neutron_lib.utils import net
from oslo_log import log as logging
import oslo_messaging
@ -20,7 +21,6 @@ from neutron._i18n import _LW
from neutron.common import constants
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
LOG = logging.getLogger(__name__)
@ -80,7 +80,7 @@ class SecurityGroupServerRpcCallback(object):
return dict(
(port['id'], port)
for port in self.plugin.get_ports_from_devices(context, devices)
if port and not utils.is_port_trusted(port)
if port and not net.is_port_trusted(port)
)
def security_group_rules_for_devices(self, context, **kwargs):

View File

@ -277,15 +277,6 @@ def ip_version_from_int(ip_version_int):
raise ValueError(_('Illegal IP version number'))
def is_port_trusted(port):
"""Used to determine if port can be trusted not to attack network.
Trust is currently based on the device_owner field starting with 'network:'
since we restrict who can use that in the default policy.json file.
"""
return port['device_owner'].startswith(n_const.DEVICE_OWNER_NETWORK_PREFIX)
class DelayedStringRenderer(object):
"""Takes a callable and its args and calls when __str__ is called

View File

@ -22,6 +22,7 @@ from neutron_lib.api.definitions import portbindings
from neutron_lib.api import validators
from neutron_lib import constants as const
from neutron_lib import exceptions as exc
from neutron_lib.utils import net
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
@ -320,7 +321,7 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
subnet_cidr=subnet_cidr)
def _validate_max_ips_per_port(self, fixed_ip_list, device_owner):
if common_utils.is_port_trusted({'device_owner': device_owner}):
if net.is_port_trusted({'device_owner': device_owner}):
return
if len(fixed_ip_list) > cfg.CONF.max_fixed_ips_per_port:

View File

@ -15,9 +15,9 @@
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api import validators
from neutron_lib.plugins import directory
from neutron_lib.utils import net
from neutron.api.v2 import attributes as attrs
from neutron.common import utils
from neutron.db import _resource_extend as resource_extend
from neutron.db import portsecurity_db_common
@ -43,7 +43,7 @@ class PortSecurityDbMixin(portsecurity_db_common.PortSecurityDbCommon):
"""
has_ip = self._ip_on_port(port)
# we don't apply security groups for dhcp, router
if port.get('device_owner') and utils.is_port_trusted(port):
if port.get('device_owner') and net.is_port_trusted(port):
return (False, has_ip)
if validators.is_attr_set(port.get(psec.PORTSECURITY)):

View File

@ -20,6 +20,7 @@ from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib.utils import helpers
from neutron_lib.utils import net
from oslo_utils import uuidutils
from sqlalchemy.orm import exc
from sqlalchemy.orm import scoped_session
@ -27,7 +28,6 @@ from sqlalchemy.orm import scoped_session
from neutron._i18n import _
from neutron.api.v2 import attributes
from neutron.common import constants as n_const
from neutron.common import utils
from neutron.db import _model_query as model_query
from neutron.db import _resource_extend as resource_extend
from neutron.db import _utils as db_utils
@ -730,7 +730,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
port = port['port']
if not validators.is_attr_set(port.get(ext_sg.SECURITYGROUPS)):
return
if port.get('device_owner') and utils.is_port_trusted(port):
if port.get('device_owner') and net.is_port_trusted(port):
return
port_sg = port.get(ext_sg.SECURITYGROUPS, [])
@ -752,7 +752,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
def _ensure_default_security_group_on_port(self, context, port):
# we don't apply security groups for dhcp, router
port = port['port']
if port.get('device_owner') and utils.is_port_trusted(port):
if port.get('device_owner') and net.is_port_trusted(port):
return
default_sg = self._ensure_default_security_group(context,
port['tenant_id'])

View File

@ -14,12 +14,12 @@
# under the License.
import netaddr
from neutron_lib.utils import net
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron._i18n import _LI
from neutron.agent.linux import ip_lib
from neutron.common import utils
LOG = logging.getLogger(__name__)
SPOOF_CHAIN_PREFIX = 'neutronARP-'
@ -34,7 +34,7 @@ def setup_arp_spoofing_protection(vif, port_details):
LOG.info(_LI("Skipping ARP spoofing rules for port '%s' because "
"it has port security disabled"), vif)
return
if utils.is_port_trusted(port_details):
if net.is_port_trusted(port_details):
# clear any previous entries related to this port
delete_arp_spoofing_protection([vif], current_rules)
LOG.debug("Skipping ARP spoofing rules for network owned port "

View File

@ -15,10 +15,10 @@
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api import validators
from neutron_lib.utils import net
from oslo_log import log as logging
from neutron._i18n import _LI
from neutron.common import utils
from neutron.db import common_db_mixin
from neutron.db import portsecurity_db_common as ps_db_common
from neutron.plugins.ml2 import driver_api as api
@ -72,7 +72,7 @@ class PortSecurityExtensionDriver(api.ExtensionDriver,
otherwise the value associated with the network is returned.
"""
# we don't apply security groups for dhcp, router
if port.get('device_owner') and utils.is_port_trusted(port):
if port.get('device_owner') and net.is_port_trusted(port):
return False
if validators.is_attr_set(port.get(psec.PORTSECURITY)):