Merge "Implementation of Mcafee NGFW Driver"
This commit is contained in:
commit
161449ddea
|
@ -0,0 +1,11 @@
|
|||
Mcafee NGFW Firewall as a Service Driver
|
||||
|
||||
* For more information, refer to:
|
||||
https://wiki.openstack.org/wiki/Mcafee_NGFW_Firewall_driver
|
||||
|
||||
* For information on Intel NGFW CI, refer to:
|
||||
https://wiki.openstack.org/wiki/ThirdPartySystems/Intel_NGFW_CI
|
||||
|
||||
* Intel NGFW CI contact:
|
||||
- yalei.wang@intel.com
|
||||
- rui.zang@intel.com
|
|
@ -0,0 +1,258 @@
|
|||
# Copyright 2015 Intel Corporation.
|
||||
# Copyright 2015 Yalei Wang <yalei.wang at intel com>
|
||||
#
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
JSON_PHY_INTERFACES = 'physicalInterfaces'
|
||||
JSON_NAME = 'name'
|
||||
JSON_NODES = 'nodes'
|
||||
JSON_LOG_SERVER_REF = 'log_server_ref'
|
||||
JSON_NODE_NET_VALUE = 'network_value'
|
||||
JSON_NODE_NET_ADDR = 'address'
|
||||
JSON_PHY_INTF = 'physical_interface'
|
||||
|
||||
L2_ELEMENT_TEMPLATE = ("""
|
||||
{
|
||||
'log_server_ref':
|
||||
'http://localhost:8082/5.7/elements/log_server/1441',
|
||||
'name': '@PLACE_HOLDER@ L2 FW',
|
||||
'nodes':
|
||||
[
|
||||
{
|
||||
'fwlayer2_node': {
|
||||
'name': '@PLACE_HOLDER@ L2 FW node 1',
|
||||
'nodeid': 1
|
||||
}
|
||||
}
|
||||
],
|
||||
'physicalInterfaces':
|
||||
[
|
||||
{
|
||||
'physical_interface': {
|
||||
'interface_id': '1',
|
||||
'interfaces':
|
||||
[
|
||||
{
|
||||
'inline_interface': {
|
||||
'failure_mode': 'normal',
|
||||
'logical_interface_ref':
|
||||
'http://localhost:8082/5.7/elements/logical_interface/1',
|
||||
'nicid': '1-2'
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
'physical_interface': {
|
||||
'interface_id': '0',
|
||||
'interfaces':
|
||||
[
|
||||
{
|
||||
'node_interface': {
|
||||
'address': '192.168.2.10',
|
||||
'network_value': '192.168.2.0/24',
|
||||
'nicid': '0',
|
||||
'nodeid': 1,
|
||||
'outgoing': true,
|
||||
'primary_mgt': true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
'physical_interface': {
|
||||
'interface_id': '3',
|
||||
'interfaces':
|
||||
[
|
||||
{
|
||||
'capture_interface': {
|
||||
'logical_interface_ref':
|
||||
'http://localhost:8082/5.7/elements/logical_interface/1073741835',
|
||||
'nicid': '3'
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
""")
|
||||
|
||||
L3_ELEMENT_TEMPLATE = ("""
|
||||
{
|
||||
"alias_value": [
|
||||
],
|
||||
"antivirus": {
|
||||
"antivirus_enabled": false,
|
||||
"virus_log_level": "none",
|
||||
"virus_mirror": "database.clamav.net"
|
||||
},
|
||||
"auto_reboot_timeout": 10,
|
||||
"connection_limit": 0,
|
||||
"connection_timeout": [
|
||||
{
|
||||
"protocol": "icmp",
|
||||
"timeout": 5
|
||||
},
|
||||
{
|
||||
"protocol": "other",
|
||||
"timeout": 180
|
||||
},
|
||||
{
|
||||
"protocol": "tcp",
|
||||
"timeout": 1800
|
||||
},
|
||||
{
|
||||
"protocol": "udp",
|
||||
"timeout": 50
|
||||
}
|
||||
],
|
||||
"contact_timeout": 60000,
|
||||
"default_nat": false,
|
||||
"domain_server_address": [
|
||||
],
|
||||
"dos_protection": "always_off",
|
||||
"excluded_interface": -1,
|
||||
"is_cert_auto_renewal": true,
|
||||
"is_config_encrypted": true,
|
||||
"is_fips_compatible_operating_mode": false,
|
||||
"is_loopback_tunnel_ip_address_enforced": false,
|
||||
"is_virtual_defrag": true,
|
||||
"log_moderation": [
|
||||
{
|
||||
"burst": 1000,
|
||||
"log_event": "1",
|
||||
"rate": 100
|
||||
},
|
||||
{
|
||||
"log_event": "2"
|
||||
}
|
||||
],
|
||||
"log_server_ref": "@PLACE_HOLDER@",
|
||||
"log_spooling_policy": "discard",
|
||||
"loopback_cluster_virtual_interface": [
|
||||
],
|
||||
"name": "@PLACE_HOLDER@",
|
||||
"nodes": [
|
||||
{
|
||||
"firewall_node": {
|
||||
"activate_test": true,
|
||||
"disabled": false,
|
||||
"loopback_node_dedicated_interface": [
|
||||
],
|
||||
"name": "@NODE_NAME_PLACE_HOLDER@",
|
||||
"nodeid": 1
|
||||
}
|
||||
}
|
||||
],
|
||||
"passive_discard_mode": false,
|
||||
"physicalInterfaces": [
|
||||
],
|
||||
"read_only": false,
|
||||
"rollback_timeout": 60,
|
||||
"scan_detection": {
|
||||
"scan_detection_icmp_events": 252,
|
||||
"scan_detection_icmp_timewindow": 60,
|
||||
"scan_detection_tcp_events": 252,
|
||||
"scan_detection_tcp_timewindow": 60,
|
||||
"scan_detection_type": "default off",
|
||||
"scan_detection_udp_events": 252,
|
||||
"scan_detection_udp_timewindow": 60
|
||||
},
|
||||
"slow_request_blacklist_timeout": 300,
|
||||
"slow_request_sensitivity": "off",
|
||||
"strict_tcp_mode": false,
|
||||
"syn_flood_sensitivity": "off",
|
||||
"syn_mode": "off",
|
||||
"system": false,
|
||||
"tcp_reset_sensitivity": "OFF",
|
||||
"tester_parameters": {
|
||||
"alert_interval": 3600,
|
||||
"auto_recovery": true,
|
||||
"boot_delay": 30,
|
||||
"boot_recovery": true,
|
||||
"restart_delay": 5,
|
||||
"status_delay": 5
|
||||
},
|
||||
"tracking_mode": "normal"
|
||||
}
|
||||
""")
|
||||
|
||||
PHYSICAL_INTERFACE_TEMPLATE = ("""
|
||||
{
|
||||
"physical_interface": {
|
||||
"aggregate_mode": "none",
|
||||
"arp_entry": [
|
||||
],
|
||||
"cvi_mode": "none",
|
||||
"dhcp_server_on_interface": {
|
||||
"dhcp_range_per_node": [
|
||||
]
|
||||
},
|
||||
"interface_id": "@PLACE_HODLER@",
|
||||
"interfaces": [
|
||||
{
|
||||
"single_node_interface": {
|
||||
"address": "@PLACE_HOLDER_IP@",
|
||||
"auth_request": false,
|
||||
"auth_request_source": false,
|
||||
"backup_heartbeat": false,
|
||||
"backup_mgt": false,
|
||||
"dynamic_ip": false,
|
||||
"igmp_mode": "none",
|
||||
"key": 200,
|
||||
"modem": false,
|
||||
"network_value": "@PLACE_HOLDER_IP_NETWORK@",
|
||||
"nicid": "0",
|
||||
"nodeid": 1,
|
||||
"outgoing": false,
|
||||
"pppoa": false,
|
||||
"pppoe": false,
|
||||
"primary_heartbeat": false,
|
||||
"primary_mgt": false,
|
||||
"relayed_by_dhcp": false,
|
||||
"reverse_connection": false,
|
||||
"vrrp": false,
|
||||
"vrrp_id": -1,
|
||||
"vrrp_priority": -1
|
||||
}
|
||||
}
|
||||
],
|
||||
"log_moderation": [
|
||||
{
|
||||
"burst": 1000,
|
||||
"log_event": "1",
|
||||
"rate": 100
|
||||
},
|
||||
{
|
||||
"log_event": "2"
|
||||
}
|
||||
],
|
||||
"managed_address_flag": false,
|
||||
"mtu": -1,
|
||||
"other_configuration_flag": false,
|
||||
"qos_limit": -1,
|
||||
"qos_mode": "no_qos",
|
||||
"router_advertisement": false,
|
||||
"syn_mode": "default",
|
||||
"virtual_engine_vlan_ok": false,
|
||||
"vlanInterfaces": [
|
||||
]
|
||||
}
|
||||
}
|
||||
""")
|
|
@ -0,0 +1,341 @@
|
|||
# Copyright 2015 Intel Corporation.
|
||||
# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
|
||||
# <isaku.yamahata at gmail com>
|
||||
# Copyright 2015 Yalei Wang <yalei.wang at intel com>
|
||||
#
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
|
||||
import netaddr
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import log
|
||||
from neutron_fwaas.services.firewall.drivers import fwaas_base
|
||||
import neutron_fwaas.services.firewall.drivers.mcafee.smc_api as smc_api
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
|
||||
NGFWOpts = [
|
||||
cfg.StrOpt(
|
||||
'smc_url',
|
||||
default='',
|
||||
help=_("URL to contact SMC server")
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'smc_api_auth_key',
|
||||
default='',
|
||||
help=_("Authentication key to SMC API")
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'smc_api_version',
|
||||
default='',
|
||||
help=_("verion of SMC API")
|
||||
),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(NGFWOpts, 'ngfw')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NgfwFwaasDriver(fwaas_base.FwaasDriverBase):
|
||||
"""Firewall driver for NGFW Fwaas of Mcafee """
|
||||
def __init__(self):
|
||||
LOG.debug("Initializing FWaas Mcafee NGFW driver")
|
||||
super(NgfwFwaasDriver, self).__init__()
|
||||
self._host_list = []
|
||||
self._network_list = []
|
||||
self._smc_url = cfg.CONF.ngfw.smc_url
|
||||
self.fw_ips_template_ref = None
|
||||
self.fw_template_ref = None
|
||||
self.connection = smc_api.SMCAPIConnection(
|
||||
self._smc_url,
|
||||
cfg.CONF.ngfw.smc_api_version,
|
||||
cfg.CONF.ngfw.smc_api_auth_key)
|
||||
|
||||
@log.log
|
||||
def create_firewall(self, agent_mode, apply_list, firewall):
|
||||
# call update_firewall, because one tenant only support
|
||||
# one firewall
|
||||
return self.update_firewall(agent_mode, apply_list, firewall)
|
||||
|
||||
@log.log
|
||||
def delete_firewall(self, agent_mode, apply_list, firewall):
|
||||
# tell SMC server to remove the ngfw policy
|
||||
return self._delete_policy(apply_list, firewall)
|
||||
|
||||
@log.log
|
||||
def update_firewall(self, agent_mode, apply_list, firewall):
|
||||
for router_info in apply_list:
|
||||
rt = router_info.router
|
||||
|
||||
# only update the policy when the router is active
|
||||
if (rt['tenant_id'] == firewall['tenant_id'] and
|
||||
rt['status'] == 'ACTIVE'):
|
||||
self._update_policy(rt, firewall)
|
||||
|
||||
def _delete_policy(self, apply_list, firewall):
|
||||
for router_info in apply_list:
|
||||
rt = router_info.router
|
||||
|
||||
self._clear_policy(rt, firewall)
|
||||
|
||||
@log.log
|
||||
def apply_default_policy(self, apply_list, firewall):
|
||||
return self._delete_policy(apply_list, firewall)
|
||||
|
||||
def _update_policy(self, router, firewall):
|
||||
# clear all the policy first
|
||||
self._clear_policy(router, firewall)
|
||||
|
||||
if firewall['admin_state_up']:
|
||||
self._setup_policy(router, firewall)
|
||||
|
||||
def _is_ips_policy(self, policy_name):
|
||||
return policy_name[len(policy_name) - 4:].lower() == '-ips'
|
||||
|
||||
def _get_policy_ref(self, policy_name):
|
||||
# get the template ref at the first time
|
||||
if not self.fw_ips_template_ref or not self.fw_template_ref:
|
||||
r = self.connection.get('elements/fw_template_policy')
|
||||
fw_template_list = r[0]['result']
|
||||
for tplt in fw_template_list:
|
||||
if tplt['name'] == "Firewall Inspection Template":
|
||||
self.fw_ips_template_ref = tplt['href'].replace(
|
||||
self._smc_url +
|
||||
"/%s/" % cfg.CONF.ngfw.smc_api_version,
|
||||
'')
|
||||
elif tplt['name'] == "Firewall Template":
|
||||
self.fw_template_ref = tplt['href'].replace(
|
||||
self._smc_url +
|
||||
"/%s/" % cfg.CONF.ngfw.smc_api_version,
|
||||
'')
|
||||
|
||||
# use different template base on the policy name
|
||||
if self._is_ips_policy(policy_name):
|
||||
template = self.fw_ips_template_ref
|
||||
else:
|
||||
template = self.fw_template_ref
|
||||
|
||||
# create the policy in SMC server
|
||||
fw_policy = {
|
||||
"name": policy_name,
|
||||
"template": template
|
||||
}
|
||||
|
||||
ref = self._get_ref_from_service_data('fw_policy', fw_policy)
|
||||
|
||||
return ref
|
||||
|
||||
def _parse_port(self, source_port):
|
||||
min_port = ''
|
||||
max_port = ''
|
||||
|
||||
if source_port is None:
|
||||
min_port = 0
|
||||
max_port = 65535
|
||||
elif ':' in source_port:
|
||||
ports = source_port.split(':')
|
||||
min_port = int(ports[0])
|
||||
max_port = int(ports[1])
|
||||
else:
|
||||
min_port = int(source_port)
|
||||
max_port = ''
|
||||
|
||||
return min_port, max_port
|
||||
|
||||
def _get_ref_from_addr(self, addr):
|
||||
if addr == 'None':
|
||||
return addr
|
||||
|
||||
ip = netaddr.IPNetwork(addr)
|
||||
|
||||
if str(ip.netmask) != "255.255.255.255":
|
||||
# create network objects
|
||||
ref = self._create_network(addr)
|
||||
else:
|
||||
# create host objects
|
||||
ref = self._create_host(str(ip.ip))
|
||||
|
||||
return ref
|
||||
|
||||
def _get_ref_from_service_data(self, service_path, service_data):
|
||||
json_data = jsonutils.dumps(service_data)
|
||||
r = self.connection.post_element(service_path, json_data)
|
||||
srv_ref = r.headers['location']
|
||||
return srv_ref
|
||||
|
||||
def _convert_ipv4_to_ngfw_rule(self, rule):
|
||||
# convert the ipv4 rule into ngfw rules
|
||||
|
||||
# create src/dst of hosts or networks
|
||||
src_ref = self._get_ref_from_addr(str(rule['source_ip_address']))
|
||||
dst_ref = self._get_ref_from_addr(str(rule['destination_ip_address']))
|
||||
|
||||
# create service
|
||||
srv_ref = ''
|
||||
service_dict = {}
|
||||
|
||||
service = "%s_service" % rule['protocol']
|
||||
if rule['protocol'] in (constants.PROTO_NAME_TCP,
|
||||
constants.PROTO_NAME_UDP):
|
||||
|
||||
source_port = rule['source_port']
|
||||
dest_port = rule['destination_port']
|
||||
|
||||
min_src_port, max_src_port = self._parse_port(source_port)
|
||||
min_dst_port, max_dst_port = self._parse_port(dest_port)
|
||||
|
||||
service_data = {
|
||||
"name": "service-%s" % rule['name'],
|
||||
"min_src_port": min_src_port,
|
||||
"max_src_port": (min_src_port if max_src_port == ''
|
||||
else max_src_port),
|
||||
"min_dst_port": min_dst_port,
|
||||
"max_dst_port": (min_dst_port if max_dst_port == ''
|
||||
else max_dst_port)
|
||||
}
|
||||
|
||||
srv_ref = self._get_ref_from_service_data(service,
|
||||
service_data)
|
||||
service_dict = {"service": [srv_ref]}
|
||||
|
||||
elif rule['protocol'] == constants.PROTO_NAME_ICMP:
|
||||
# only ping is supported
|
||||
service_data = {
|
||||
"name": "service%s" % "22",
|
||||
"icmp_type": 0,
|
||||
"icmp_code": 0
|
||||
}
|
||||
|
||||
srv_ref = self._get_ref_from_service_data(service,
|
||||
service_data)
|
||||
service_dict = {"service": [srv_ref]}
|
||||
|
||||
elif rule['protocol'] is None:
|
||||
# protocal "ANY" is translated to accept all, no service create
|
||||
# here
|
||||
# TODO(yalie): add rules for different protocol, not ignore the
|
||||
# other value like ports.
|
||||
service_dict = {"any": True}
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
_("not support %s protocol now") % rule['protocol'])
|
||||
|
||||
# create fw rule
|
||||
action = "discard" if rule["action"] == "deny" else "allow"
|
||||
|
||||
payload = {
|
||||
"name": rule['name'],
|
||||
"action": {
|
||||
"action": action,
|
||||
"connection_tracking_options": {}
|
||||
},
|
||||
"destinations": {"dst": [dst_ref]},
|
||||
"services": service_dict,
|
||||
"sources": {"src": [src_ref]}
|
||||
}
|
||||
|
||||
json_data = jsonutils.dumps(payload)
|
||||
|
||||
return json_data
|
||||
|
||||
def _get_policy_name(self, router, fw):
|
||||
# SMC server would bind the different NGFW policy with different
|
||||
# routers(sg-engine) in a tenant
|
||||
return "%s_%s_%s" % (
|
||||
fw['id'][0:7], fw['firewall_policy_id'][0:7], router['id'][0:7])
|
||||
|
||||
def _setup_policy(self, router, fw):
|
||||
# one tenant should use only one policy
|
||||
with self.connection.login_server():
|
||||
# create policy ref
|
||||
policy_name = self._get_policy_name(router, fw)
|
||||
policy_ref = self._get_policy_ref(policy_name)
|
||||
|
||||
# post service
|
||||
for rule in fw['firewall_rule_list']:
|
||||
if not rule['enabled']:
|
||||
continue
|
||||
|
||||
if rule['ip_version'] == 4:
|
||||
json_data = self._convert_ipv4_to_ngfw_rule(rule)
|
||||
self.connection.post(policy_ref +
|
||||
"/fw_ipv4_access_rule",
|
||||
json_data, raw=True)
|
||||
else:
|
||||
msg = (_('Unsupported IP version rule. %(version)') %
|
||||
{'version': rule['ip_version']})
|
||||
raise ValueError(msg)
|
||||
|
||||
# upload the policy
|
||||
self.connection.post(policy_ref + "/upload", '', raw=True)
|
||||
|
||||
def _clear_policy(self, router, fw):
|
||||
# find the policy used by the tenant and firewall
|
||||
policy_name = self._get_policy_name(router, fw)
|
||||
|
||||
path_policy_filter = 'elements/fw_policy?filter=%s' % policy_name
|
||||
|
||||
with self.connection.login_server():
|
||||
r = self.connection.get(path=path_policy_filter)
|
||||
|
||||
fw_list = r[0]['result']
|
||||
for f in fw_list:
|
||||
if f['name'] == policy_name:
|
||||
self.connection.delete(f['href'], raw=True)
|
||||
|
||||
# Warning, find unused elements and delete them.
|
||||
r = self.connection.get(path='elements/search_unused')
|
||||
|
||||
element_list = r[0]['result']
|
||||
|
||||
for element in element_list:
|
||||
self.connection.delete(element['href'], raw=True)
|
||||
|
||||
def _create_host(self, ip):
|
||||
ref = None
|
||||
host_json_def = {
|
||||
"name": "host-%s" % str(ip),
|
||||
"address": ip
|
||||
}
|
||||
|
||||
with self.connection.login_server():
|
||||
ref = self._get_ref_from_service_data('host', host_json_def)
|
||||
|
||||
return ref
|
||||
|
||||
def _create_network(self, cidr):
|
||||
ref = None
|
||||
net_json_def = {
|
||||
"name": "network-%s" % str(cidr),
|
||||
"ipv4_network": cidr
|
||||
}
|
||||
|
||||
with self.connection.login_server():
|
||||
# some network maybe pre-created by router-plugin
|
||||
r = self.connection.get("elements/network")
|
||||
networks = r[0]['result']
|
||||
for net in networks:
|
||||
if net['name'] == "network-%s" % cidr:
|
||||
ref = net['href']
|
||||
return ref
|
||||
|
||||
ref = self._get_ref_from_service_data('network', net_json_def)
|
||||
|
||||
return ref
|
|
@ -0,0 +1,476 @@
|
|||
# Copyright 2015 Intel Corporation.
|
||||
# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
|
||||
# <isaku.yamahata at gmail com>
|
||||
# Copyright 2015 Yalei Wang <yalei.wang at intel com>
|
||||
#
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# This script uses SMC-API to get/post elements from SMC server
|
||||
#
|
||||
|
||||
import abc
|
||||
import netaddr
|
||||
import requests
|
||||
import six
|
||||
|
||||
from contextlib import contextmanager
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron.i18n import _LE, _LI, _LW
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from neutron_fwaas.services.firewall.drivers.mcafee import constants as const
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SMCAPIResult(object):
|
||||
"""Class for returning result to API caller"""
|
||||
|
||||
def __init__(self, tp):
|
||||
self.type = tp
|
||||
self.result = "N/A"
|
||||
self.code = "200"
|
||||
self.headers = None
|
||||
|
||||
def is_json(self):
|
||||
return self.type == "json"
|
||||
|
||||
def is_text(self):
|
||||
return self.type == "text"
|
||||
|
||||
def __str__(self):
|
||||
return self.result
|
||||
|
||||
|
||||
class SMCAPIConnection(object):
|
||||
"""Provide the REST API method to connect to the SMC server.
|
||||
|
||||
For login/logout operation, users should set server IP, API version and
|
||||
auth key first. For get/put/delete operation, users should provide the
|
||||
target element'path, and special json format data section followed "SMC
|
||||
API User's Guide".
|
||||
"""
|
||||
def __init__(self, host, api_version, authentication_key):
|
||||
self.cookies = {}
|
||||
self.host = host
|
||||
self.api_version = api_version
|
||||
self.host_api_url = self.host + "/" + self.api_version
|
||||
self.auth_key = authentication_key
|
||||
self.session = None
|
||||
|
||||
@contextmanager
|
||||
def login_server(self):
|
||||
if self.session:
|
||||
yield
|
||||
else:
|
||||
ret = self.login()
|
||||
LOG.debug("SMC server LOGIN successfully.")
|
||||
|
||||
if ret:
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
LOG.exception(_LE("exception while connect to server!"))
|
||||
raise n_exc.ServiceUnavailable(resource='SMC server',
|
||||
msg=_("OPERATION failed"))
|
||||
|
||||
finally:
|
||||
self.logout()
|
||||
|
||||
else:
|
||||
raise n_exc.BadRequest(resource='SMC server',
|
||||
msg=_("LOGIN failed!"))
|
||||
|
||||
def login(self):
|
||||
self.session = requests.session()
|
||||
|
||||
post_addr = ("%s/login?authenticationkey=%s&beta=true" %
|
||||
(self.host_api_url, self.auth_key))
|
||||
res = self.session.post(post_addr)
|
||||
|
||||
if res.status_code == 200:
|
||||
return True
|
||||
|
||||
LOG.error(_LE("connect to %(host)s failed"
|
||||
" (%(msg)s/ code %(code)s)"),
|
||||
{'host': post_addr,
|
||||
'msg': res.reason,
|
||||
'code': res.status_code})
|
||||
|
||||
return False
|
||||
|
||||
def logout(self):
|
||||
result = self.session.put("%s/logout" % (self.host_api_url))
|
||||
self.session = None
|
||||
LOG.debug("LOGOUT from SMC server result %s", result)
|
||||
|
||||
def session_op(self, attr, path, raw=False, data=None, headers=None):
|
||||
op = getattr(self.session, attr)
|
||||
|
||||
if raw:
|
||||
result = op(path, headers=headers, data=data)
|
||||
else:
|
||||
result = op("%s/%s" %
|
||||
(self.host_api_url, path), headers=headers, data=data)
|
||||
|
||||
if result.status_code == "404":
|
||||
LOG.error(_LE("SMC Error 404 %s"), result.reason)
|
||||
|
||||
return result
|
||||
|
||||
def get(self, path, etag=None, raw=False):
|
||||
json_result = None
|
||||
etag_out = None
|
||||
headers = {'accept': 'application/json',
|
||||
'content-type': 'application/json'}
|
||||
if etag:
|
||||
headers['ETag'] = etag
|
||||
|
||||
try:
|
||||
result = self.session_op("get", path, raw, headers=headers)
|
||||
|
||||
if 'etag' in result.headers:
|
||||
etag_out = result.headers['etag']
|
||||
|
||||
json_result = result.json()
|
||||
|
||||
if result.status_code == "404":
|
||||
LOG.error(_LE("%(msg)s %(detail)s"),
|
||||
{'msg': json_result["message"],
|
||||
'detail': json_result["details"]})
|
||||
|
||||
except Exception:
|
||||
LOG.error(_LE("exception when GET operation"))
|
||||
raise
|
||||
|
||||
r = [json_result]
|
||||
if etag_out:
|
||||
r.append(etag_out)
|
||||
|
||||
return [json_result]
|
||||
|
||||
def check_ret(self, string, path, ret, updated_result):
|
||||
if ret.status_code != 200:
|
||||
LOG.info(_LI("%(str)s ELEMENT result code: %(stat)d "
|
||||
"%(path)s %(reason)s text=%(text)s"),
|
||||
{'str': string, 'path': path,
|
||||
'stat': ret.status_code,
|
||||
'reason': ret.reason,
|
||||
'text': ret.text})
|
||||
|
||||
updated_result.type = "text"
|
||||
updated_result.result = ret.text
|
||||
else:
|
||||
if ret.headers.get('content-type') == "application/json":
|
||||
updated_result.type = "json"
|
||||
updated_result.result = ret.json
|
||||
else:
|
||||
updated_result.type = "text"
|
||||
updated_result.result = ret.content
|
||||
updated_result.code = ret.status_code
|
||||
|
||||
def delete(self, path, raw=False):
|
||||
del_result = SMCAPIResult("text")
|
||||
|
||||
try:
|
||||
result = self.session_op("delete", path, raw)
|
||||
self.check_ret("DELETE", path, result, del_result)
|
||||
|
||||
except Exception:
|
||||
LOG.error(_LE("exception when DELETE operation"))
|
||||
raise
|
||||
|
||||
return del_result
|
||||
|
||||
def post(self, path, json_element, raw=False):
|
||||
headers = {'accept': '*/*',
|
||||
'content-type': 'application/json'}
|
||||
post_result = SMCAPIResult("text")
|
||||
|
||||
try:
|
||||
result = self.session_op(
|
||||
"post", path, raw, headers=headers, data=json_element)
|
||||
self.check_ret("POST", path, result, post_result)
|
||||
post_result.headers = result.headers
|
||||
except Exception:
|
||||
LOG.error(_LE("exception when POST operation"))
|
||||
raise
|
||||
|
||||
return post_result
|
||||
|
||||
def post_element(self, element_type, json_element):
|
||||
return self.post("elements/%s" % (element_type), json_element)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class SMCAPIElement(object):
|
||||
"""
|
||||
Base class of elements, used by L2/L3 single firewall class
|
||||
"""
|
||||
element_type = "N/A"
|
||||
|
||||
@staticmethod
|
||||
def usage(extra_info=None):
|
||||
if extra_info:
|
||||
LOG.error(_LE("Error -> %s"), extra_info)
|
||||
raise ValueError(_('Wrong initial data!'))
|
||||
|
||||
def __init__(self, name, smc_api_connection, control_ip=None):
|
||||
|
||||
if not name:
|
||||
self.usage("name of element missing.")
|
||||
|
||||
self.name = name
|
||||
self.element_id = 0
|
||||
self.json_element = None
|
||||
self.element_template = None
|
||||
self.smc_api_connection = smc_api_connection
|
||||
self.keyboard = None
|
||||
self.timezone = None
|
||||
if control_ip:
|
||||
self.control_ip = netaddr.IPNetwork(control_ip)
|
||||
if self.control_ip.prefixlen == 32:
|
||||
self.usage(
|
||||
"Control_ip %s needs to netmask bits e.g x.x.x.x/yy"
|
||||
% (self.control_ip))
|
||||
else:
|
||||
self.control_ip = None
|
||||
|
||||
def to_json(self):
|
||||
return jsonutils.dumps(self.json_element)
|
||||
|
||||
@abc.abstractmethod
|
||||
def create(self):
|
||||
raise NotImplementedError(
|
||||
"not support SMCAPIElement create")
|
||||
|
||||
@abc.abstractmethod
|
||||
def update(self):
|
||||
raise NotImplementedError(
|
||||
"not support SMCAPIElement update")
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete(self):
|
||||
raise NotImplementedError(
|
||||
"not support SMCAPIElement delete")
|
||||
|
||||
def get_element(self, path):
|
||||
LOG.debug("Getting path: %s", path)
|
||||
return self.smc_api_connection.get("elements/%s" % (path))
|
||||
|
||||
def get_elements(self, element_type=None):
|
||||
if not element_type:
|
||||
element_type = self.element_type
|
||||
|
||||
return self.smc_api_connection.get("elements/%s" % (element_type))
|
||||
|
||||
def fetch_element_id(self):
|
||||
json_result = self.get_elements()
|
||||
|
||||
if not json_result[0]['result']:
|
||||
LOG.warn(_LW("No #{element_type} defined in SMC"))
|
||||
else:
|
||||
for element in json_result[0]['result']:
|
||||
href = element['href']
|
||||
self.element_id = int(href.split('/')[-1])
|
||||
if element['name'] == self.name:
|
||||
LOG.debug("%(type)s element with name %(name)s FOUND "
|
||||
"%(href)s",
|
||||
{'type': self.element_type,
|
||||
'name': self.name,
|
||||
'href': href})
|
||||
break
|
||||
|
||||
LOG.debug("Got ID %s", self.element_id)
|
||||
return self.element_id
|
||||
|
||||
def get_initial_contact_data(self):
|
||||
"""Get the element's configuration data used to contact to SMC server.
|
||||
|
||||
Contact data is a configuration string including the SMC server's IP,
|
||||
interfaces defined and special one-time password.
|
||||
eg. first create the L3 element on behalf of sg-engine in SMC server
|
||||
and generate the contact data, then boot the sg-engine with it and
|
||||
engine will init properly and connect to SMC server finally.
|
||||
"""
|
||||
|
||||
data = None
|
||||
result = self.get_element("%s/%s/node" %
|
||||
(self.element_type, self.element_id))
|
||||
LOG.debug("resule = %s", result)
|
||||
|
||||
node_ref = result[0]['result'][0]['href'].replace(
|
||||
self.smc_api_connection.host_api_url + "/elements/", "")
|
||||
|
||||
LOG.debug("Node ref is %s", node_ref)
|
||||
|
||||
extra_options = []
|
||||
if self.keyboard:
|
||||
extra_options.append("keyboard=%s" % (self.keyboard))
|
||||
if self.timezone:
|
||||
extra_options.append("time_zone=%s" % (self.timezone))
|
||||
|
||||
if extra_options:
|
||||
extra_options = "&" + extra_options
|
||||
else:
|
||||
extra_options = ""
|
||||
|
||||
result = self.smc_api_connection.post_element(
|
||||
"%s/initial_contact?enable_ssh=true%s" %
|
||||
(node_ref, extra_options), "")
|
||||
if result.is_text():
|
||||
d1 = str(result).split("\n")
|
||||
idx = 0
|
||||
for l in d1:
|
||||
if l.find("ssh/enabled") != -1:
|
||||
l = l.replace("false", "true")
|
||||
d1[idx] = l
|
||||
idx += 1
|
||||
result.result = "\n".join(d1)
|
||||
data = result
|
||||
|
||||
result = self.smc_api_connection.post_element(
|
||||
"%s/bind_license" % (node_ref), "")
|
||||
|
||||
if result.code != 200:
|
||||
LOG.error(_LE("Could not bind license. "
|
||||
"Maybe SMC license pool is empty. "
|
||||
"SMC API details: %s"), result)
|
||||
return data
|
||||
|
||||
|
||||
class SMCAPIElementL2FWSingle(SMCAPIElement):
|
||||
"""L2 single firewall element."""
|
||||
element_type = "single_layer2"
|
||||
|
||||
def __init__(self, name, smc_api_connection, control_ip):
|
||||
SMCAPIElement.__init__(self, name, smc_api_connection, control_ip)
|
||||
self.element_id = 0
|
||||
self.json_element = None
|
||||
|
||||
def create(self):
|
||||
json_result = self.get_elements("log_server")
|
||||
log_server_ref = json_result[0]['result'][0]['href']
|
||||
LOG.debug("Using log server '%(name)s', ref %(ref)s",
|
||||
{'name': json_result[0]['result'][0]['name'],
|
||||
'ref': log_server_ref})
|
||||
|
||||
json_result = self.get_elements("logical_interface")
|
||||
|
||||
logical_interfaces = dict((logical_iface['name'],
|
||||
logical_iface['href']) for logical_iface in
|
||||
json_result[0]['result'] if logical_iface['name']
|
||||
in ('default_eth', 'capture'))
|
||||
|
||||
for name, ref in logical_interfaces.iteritems():
|
||||
LOG.debug("Using logical interface %(name)s ref %(href)s",
|
||||
{'name': name, 'href': ref})
|
||||
|
||||
json_data = jsonutils.loads(const.L2_ELEMENT_TEMPLATE)
|
||||
|
||||
json_data[const.JSON_LOG_SERVER_REF] = log_server_ref
|
||||
json_data[const.JSON_NAME] = self.name
|
||||
json_data[const.JSON_NODES][0]['fwlayer2_node']['name'] = (self.name +
|
||||
" node 1")
|
||||
|
||||
physical_ifaces = json_data[const.JSON_PHY_INTERFACES]
|
||||
for phys_iface in physical_ifaces:
|
||||
for iface in phys_iface[const.JSON_PHY_INTF]['interfaces']:
|
||||
if 'inline_interface' in iface:
|
||||
inline_iface = iface['inline_interface']
|
||||
inline_iface['logical_interface_ref'] = (
|
||||
logical_interfaces['default_eth']['href'])
|
||||
elif 'capture_interface' in iface:
|
||||
capture_iface = iface['capture_interface']
|
||||
capture_iface['logical_interface_ref'] = (
|
||||
logical_interfaces['capture']['href'])
|
||||
elif 'node_interface' in iface:
|
||||
node_iface = iface['node_interface']
|
||||
if not node_iface['primary_mgt']:
|
||||
continue
|
||||
node_iface[const.JSON_NODE_NET_ADDR] = (
|
||||
str(self.control_ip.ip))
|
||||
node_iface[const.JSON_NODE_NET_VALUE] = (
|
||||
str(self.control_ip.cidr))
|
||||
|
||||
self.json_element = json_data
|
||||
self.smc_api_connection.post_element(self.element_type, self.to_json())
|
||||
self.fetch_element_id()
|
||||
|
||||
def update(self):
|
||||
"""Update element """
|
||||
pass
|
||||
|
||||
def delete(self):
|
||||
"""Delete element """
|
||||
pass
|
||||
|
||||
|
||||
class SMCAPIElementL3FWSingle(SMCAPIElement):
|
||||
"""L3 single firewall element."""
|
||||
element_type = "single_fw"
|
||||
|
||||
def __init__(self, name, smc_api_connection, control_ip):
|
||||
super(SMCAPIElementL3FWSingle, self).__init__(self, name,
|
||||
smc_api_connection, control_ip)
|
||||
self.element_id = 0
|
||||
self.json_element = None
|
||||
self.physical_interfaces = []
|
||||
|
||||
def modify_interface_property(self, physical_interface, name, value):
|
||||
iface = physical_interface[const.JSON_PHY_INTF]
|
||||
iface = iface['interfaces'][0]['single_node_interface']
|
||||
iface[name] = value
|
||||
|
||||
def add_physical_interface(self, ip_and_network, interface_id):
|
||||
ip = netaddr.IPNetwork(ip_and_network)
|
||||
|
||||
json_data = jsonutils.loads(const.PHYSICAL_INTERFACE_TEMPLATE)
|
||||
phys_iface = json_data[const.JSON_PHY_INTF]
|
||||
phys_iface['interface_id'] = interface_id
|
||||
iface = json_data[const.JSON_PHY_INTF]['interfaces'][0]
|
||||
iface = iface['single_node_interface']
|
||||
iface[const.JSON_NODE_NET_ADDR] = str(ip.ip)
|
||||
iface[const.JSON_NODE_NET_VALUE] = str(ip.cidr)
|
||||
self.physical_interfaces.append(json_data)
|
||||
return json_data
|
||||
|
||||
def create(self):
|
||||
json_result = self.get_elements("log_server")
|
||||
log_server_ref = json_result[0]['result'][0]['href']
|
||||
|
||||
LOG.debug(
|
||||
"Using log server '%(name)s' ref %(ref)s",
|
||||
{'name': json_result[0]['result'][0]['name'],
|
||||
'ref': log_server_ref})
|
||||
|
||||
json_data = jsonutils.loads(const.L3_ELEMENT_TEMPLATE)
|
||||
json_data[const.JSON_LOG_SERVER_REF] = log_server_ref
|
||||
json_data[const.JSON_NAME] = self.name
|
||||
json_data[const.JSON_NODES][0]['firewall_node']['name'] = (self.name +
|
||||
" node 1")
|
||||
iface = self.add_physical_interface(self.control_ip, 0)
|
||||
self.modify_interface_property(iface, "primary_mgt", True)
|
||||
for phys_iface in self.physical_interfaces:
|
||||
json_data[const.JSON_PHY_INTERFACES].append(phys_iface)
|
||||
|
||||
LOG.debug("%s",
|
||||
jsonutils.dumps(json_data, sort_keys=False,
|
||||
indent=2, separators=(',', ': ')))
|
||||
|
||||
self.json_element = json_data
|
||||
self.smc_api_connection.post_element(self.element_type, self.to_json())
|
||||
self.fetch_element_id()
|
|
@ -0,0 +1,222 @@
|
|||
# Copyright 2015 Intel Corporation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import contextlib
|
||||
import mock
|
||||
import neutron_fwaas.services.firewall.drivers.mcafee as mcafee
|
||||
import neutron_fwaas.services.firewall.drivers.mcafee.ngfw_fwaas as fwaas
|
||||
|
||||
from neutron.tests import base
|
||||
|
||||
FAKE_FIREWALL_ID = 'firewall_id'
|
||||
FAKE_POLICY_ID = 'policy_id'
|
||||
FAKE_TENANT_ID = 'tenant_id'
|
||||
FAKE_ROUTER_ID = 'router_id'
|
||||
FAKE_FW_NAME = 'fw_name'
|
||||
|
||||
|
||||
class NGFWFwaasTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(NGFWFwaasTestCase, self).setUp()
|
||||
self.firewall = fwaas.NgfwFwaasDriver()
|
||||
|
||||
self.rule_list = self._fake_ipv4_rules()
|
||||
self.apply_list = self._fake_apply_list()
|
||||
self.post_return = mock.MagicMock()
|
||||
self.tmp_ref = 'temp_ref'
|
||||
self.post_return.headers = {'location': self.tmp_ref}
|
||||
# we generate the policy name by formatting the ids of firewall,
|
||||
# policy, router
|
||||
self.policy_name = "%s_%s_%s" % (
|
||||
FAKE_FIREWALL_ID[0:7], FAKE_POLICY_ID[0:7],
|
||||
FAKE_ROUTER_ID[0:7])
|
||||
|
||||
def _fake_ipv4_rules(self):
|
||||
rule1 = {'action': 'deny',
|
||||
'description': '',
|
||||
'destination_ip_address': None,
|
||||
'destination_port': '23',
|
||||
'enabled': True,
|
||||
'firewall_policy_id': FAKE_POLICY_ID,
|
||||
'id': '1',
|
||||
'ip_version': 4,
|
||||
'name': 'a2',
|
||||
'position': 1,
|
||||
'protocol': 'udp',
|
||||
'shared': False,
|
||||
'source_ip_address': None,
|
||||
'source_port': '23',
|
||||
'tenant_id': FAKE_TENANT_ID}
|
||||
rule2 = {'action': 'deny',
|
||||
'description': '',
|
||||
'destination_ip_address': None,
|
||||
'destination_port': None,
|
||||
'enabled': True,
|
||||
'firewall_policy_id': FAKE_POLICY_ID,
|
||||
'id': '2',
|
||||
'ip_version': 4,
|
||||
'name': 'a3',
|
||||
'position': 2,
|
||||
'protocol': 'icmp',
|
||||
'shared': False,
|
||||
'source_ip_address': '192.168.100.0/24',
|
||||
'source_port': None,
|
||||
'tenant_id': FAKE_TENANT_ID}
|
||||
rule3 = {'action': 'allow',
|
||||
'description': '',
|
||||
'destination_ip_address': None,
|
||||
'destination_port': None,
|
||||
'enabled': True,
|
||||
'firewall_policy_id': FAKE_POLICY_ID,
|
||||
'id': '3',
|
||||
'ip_version': 4,
|
||||
'name': 'a4',
|
||||
'position': 3,
|
||||
'protocol': 'tcp',
|
||||
'shared': False,
|
||||
'source_ip_address': None,
|
||||
'source_port': None,
|
||||
'tenant_id': FAKE_TENANT_ID}
|
||||
return [rule1, rule2, rule3]
|
||||
|
||||
def _fake_firewall(self, rule_list):
|
||||
fw = {
|
||||
'admin_state_up': True,
|
||||
'description': '',
|
||||
'firewall_policy_id': FAKE_POLICY_ID,
|
||||
|
||||
'id': FAKE_FIREWALL_ID,
|
||||
'name': FAKE_FW_NAME,
|
||||
'shared': None,
|
||||
'status': 'PENDING_CREATE',
|
||||
|
||||
'tenant_id': FAKE_TENANT_ID,
|
||||
'firewall_rule_list': rule_list}
|
||||
return fw
|
||||
|
||||
def _fake_apply_list(self):
|
||||
apply_list = []
|
||||
|
||||
router_info_inst = mock.Mock()
|
||||
fake_interface = mock.Mock()
|
||||
router_inst = (
|
||||
{'_interfaces': fake_interface,
|
||||
'admin_state_up': True,
|
||||
'distributed': False,
|
||||
'external_gateway_info': None,
|
||||
'gw_port_id': None,
|
||||
'ha': False,
|
||||
'ha_vr_id': 0,
|
||||
'id': FAKE_ROUTER_ID,
|
||||
'name': 'rrr1',
|
||||
'routes': [],
|
||||
'status': 'ACTIVE',
|
||||
'tenant_id': FAKE_TENANT_ID})
|
||||
|
||||
router_info_inst.router = router_inst
|
||||
apply_list.append(router_info_inst)
|
||||
return apply_list
|
||||
|
||||
def test_update_firewall(self):
|
||||
|
||||
firewall = self._fake_firewall(self.rule_list)
|
||||
|
||||
ref_v4rule = self.tmp_ref + "/fw_ipv4_access_rule"
|
||||
ref_upload = self.tmp_ref + "/upload"
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'login'),
|
||||
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'get'),
|
||||
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'logout'),
|
||||
mock.patch.object(
|
||||
mcafee.smc_api.SMCAPIConnection, 'post',
|
||||
return_value=self.post_return),
|
||||
) as (lg, get, logout, post):
|
||||
|
||||
expected = [mock.call(
|
||||
'elements/fw_policy',
|
||||
'{"name": "%s", "template": null}' % self.policy_name),
|
||||
mock.call(
|
||||
'elements/udp_service',
|
||||
'{"min_dst_port": 23, "max_dst_port": 23, '
|
||||
'"name": "service-a2", "max_src_port": 23, '
|
||||
'"min_src_port": 23}'),
|
||||
mock.call(
|
||||
ref_v4rule,
|
||||
'{"action": {"action": "discard", '
|
||||
'"connection_tracking_options": {}}, '
|
||||
'"services": {"service": ["%s"]}, "sources": '
|
||||
'{"src": ["None"]}, "name": "a2", "destinations": '
|
||||
'{"dst": ["None"]}}' % self.tmp_ref, raw=True),
|
||||
mock.call(
|
||||
'elements/network',
|
||||
'{"ipv4_network": "192.168.100.0/24", '
|
||||
'"name": "network-192.168.100.0/24"}'),
|
||||
mock.call(
|
||||
'elements/icmp_service',
|
||||
'{"icmp_code": 0, "icmp_type": 0, "name": "service22"}'),
|
||||
mock.call(ref_v4rule,
|
||||
'{"action": {"action": "discard", '
|
||||
'"connection_tracking_options": {}}, '
|
||||
'"services": {"service": ["%s"]}, '
|
||||
'"sources": {"src": ["%s"]}, "name": "a3", '
|
||||
'"destinations": {"dst": ["None"]}}' % (
|
||||
self.tmp_ref, self.tmp_ref), raw=True),
|
||||
mock.call(
|
||||
'elements/tcp_service',
|
||||
'{"min_dst_port": 0, "max_dst_port": 65535, '
|
||||
'"name": "service-a4", "max_src_port": 65535, '
|
||||
'"min_src_port": 0}'),
|
||||
mock.call(
|
||||
ref_v4rule,
|
||||
'{"action": {"action": "allow", '
|
||||
'"connection_tracking_options": {}}, '
|
||||
'"services": {"service": ["%s"]}, '
|
||||
'"sources": {"src": ["None"]}, "name": "a4", '
|
||||
'"destinations": {"dst": ["None"]}}' %
|
||||
self.tmp_ref, raw=True),
|
||||
mock.call(ref_upload, '', raw=True)]
|
||||
|
||||
self.firewall.update_firewall('legacy', self.apply_list, firewall)
|
||||
self.assertEqual(expected, post.call_args_list)
|
||||
|
||||
def test_create_firewall(self):
|
||||
self.test_update_firewall()
|
||||
|
||||
def test_delete_firewall(self):
|
||||
firewall = self._fake_firewall(self.rule_list)
|
||||
|
||||
get_value = [{'result': [{'name': self.policy_name,
|
||||
'href': self.tmp_ref}, ]}, ]
|
||||
with contextlib.nested(
|
||||
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'login'),
|
||||
mock.patch.object(
|
||||
mcafee.smc_api.SMCAPIConnection, 'get',
|
||||
return_value=get_value),
|
||||
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'logout'),
|
||||
mock.patch.object(
|
||||
mcafee.smc_api.SMCAPIConnection, 'post',
|
||||
return_value=self.post_return),
|
||||
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'delete'),
|
||||
) as (lg, get, logout, post, delete):
|
||||
self.firewall.delete_firewall('legacy', self.apply_list, firewall)
|
||||
|
||||
expected = [
|
||||
mock.call(self.tmp_ref, raw=True),
|
||||
mock.call(self.tmp_ref, raw=True)
|
||||
]
|
||||
self.assertEqual(expected, delete.call_args_list)
|
Loading…
Reference in New Issue