Merge "Implementation of Mcafee NGFW Driver"

This commit is contained in:
Jenkins 2015-03-19 15:13:09 +00:00 committed by Gerrit Code Review
commit 161449ddea
7 changed files with 1308 additions and 0 deletions

View File

@ -0,0 +1,11 @@
Mcafee NGFW Firewall as a Service Driver
* For more information, refer to:
https://wiki.openstack.org/wiki/Mcafee_NGFW_Firewall_driver
* For information on Intel NGFW CI, refer to:
https://wiki.openstack.org/wiki/ThirdPartySystems/Intel_NGFW_CI
* Intel NGFW CI contact:
- yalei.wang@intel.com
- rui.zang@intel.com

View File

@ -0,0 +1,258 @@
# Copyright 2015 Intel Corporation.
# Copyright 2015 Yalei Wang <yalei.wang at intel com>
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
JSON_PHY_INTERFACES = 'physicalInterfaces'
JSON_NAME = 'name'
JSON_NODES = 'nodes'
JSON_LOG_SERVER_REF = 'log_server_ref'
JSON_NODE_NET_VALUE = 'network_value'
JSON_NODE_NET_ADDR = 'address'
JSON_PHY_INTF = 'physical_interface'
L2_ELEMENT_TEMPLATE = ("""
{
'log_server_ref':
'http://localhost:8082/5.7/elements/log_server/1441',
'name': '@PLACE_HOLDER@ L2 FW',
'nodes':
[
{
'fwlayer2_node': {
'name': '@PLACE_HOLDER@ L2 FW node 1',
'nodeid': 1
}
}
],
'physicalInterfaces':
[
{
'physical_interface': {
'interface_id': '1',
'interfaces':
[
{
'inline_interface': {
'failure_mode': 'normal',
'logical_interface_ref':
'http://localhost:8082/5.7/elements/logical_interface/1',
'nicid': '1-2'
}
}
]
}
},
{
'physical_interface': {
'interface_id': '0',
'interfaces':
[
{
'node_interface': {
'address': '192.168.2.10',
'network_value': '192.168.2.0/24',
'nicid': '0',
'nodeid': 1,
'outgoing': true,
'primary_mgt': true
}
}
]
}
},
{
'physical_interface': {
'interface_id': '3',
'interfaces':
[
{
'capture_interface': {
'logical_interface_ref':
'http://localhost:8082/5.7/elements/logical_interface/1073741835',
'nicid': '3'
}
}
]
}
}
]
}
""")
L3_ELEMENT_TEMPLATE = ("""
{
"alias_value": [
],
"antivirus": {
"antivirus_enabled": false,
"virus_log_level": "none",
"virus_mirror": "database.clamav.net"
},
"auto_reboot_timeout": 10,
"connection_limit": 0,
"connection_timeout": [
{
"protocol": "icmp",
"timeout": 5
},
{
"protocol": "other",
"timeout": 180
},
{
"protocol": "tcp",
"timeout": 1800
},
{
"protocol": "udp",
"timeout": 50
}
],
"contact_timeout": 60000,
"default_nat": false,
"domain_server_address": [
],
"dos_protection": "always_off",
"excluded_interface": -1,
"is_cert_auto_renewal": true,
"is_config_encrypted": true,
"is_fips_compatible_operating_mode": false,
"is_loopback_tunnel_ip_address_enforced": false,
"is_virtual_defrag": true,
"log_moderation": [
{
"burst": 1000,
"log_event": "1",
"rate": 100
},
{
"log_event": "2"
}
],
"log_server_ref": "@PLACE_HOLDER@",
"log_spooling_policy": "discard",
"loopback_cluster_virtual_interface": [
],
"name": "@PLACE_HOLDER@",
"nodes": [
{
"firewall_node": {
"activate_test": true,
"disabled": false,
"loopback_node_dedicated_interface": [
],
"name": "@NODE_NAME_PLACE_HOLDER@",
"nodeid": 1
}
}
],
"passive_discard_mode": false,
"physicalInterfaces": [
],
"read_only": false,
"rollback_timeout": 60,
"scan_detection": {
"scan_detection_icmp_events": 252,
"scan_detection_icmp_timewindow": 60,
"scan_detection_tcp_events": 252,
"scan_detection_tcp_timewindow": 60,
"scan_detection_type": "default off",
"scan_detection_udp_events": 252,
"scan_detection_udp_timewindow": 60
},
"slow_request_blacklist_timeout": 300,
"slow_request_sensitivity": "off",
"strict_tcp_mode": false,
"syn_flood_sensitivity": "off",
"syn_mode": "off",
"system": false,
"tcp_reset_sensitivity": "OFF",
"tester_parameters": {
"alert_interval": 3600,
"auto_recovery": true,
"boot_delay": 30,
"boot_recovery": true,
"restart_delay": 5,
"status_delay": 5
},
"tracking_mode": "normal"
}
""")
PHYSICAL_INTERFACE_TEMPLATE = ("""
{
"physical_interface": {
"aggregate_mode": "none",
"arp_entry": [
],
"cvi_mode": "none",
"dhcp_server_on_interface": {
"dhcp_range_per_node": [
]
},
"interface_id": "@PLACE_HODLER@",
"interfaces": [
{
"single_node_interface": {
"address": "@PLACE_HOLDER_IP@",
"auth_request": false,
"auth_request_source": false,
"backup_heartbeat": false,
"backup_mgt": false,
"dynamic_ip": false,
"igmp_mode": "none",
"key": 200,
"modem": false,
"network_value": "@PLACE_HOLDER_IP_NETWORK@",
"nicid": "0",
"nodeid": 1,
"outgoing": false,
"pppoa": false,
"pppoe": false,
"primary_heartbeat": false,
"primary_mgt": false,
"relayed_by_dhcp": false,
"reverse_connection": false,
"vrrp": false,
"vrrp_id": -1,
"vrrp_priority": -1
}
}
],
"log_moderation": [
{
"burst": 1000,
"log_event": "1",
"rate": 100
},
{
"log_event": "2"
}
],
"managed_address_flag": false,
"mtu": -1,
"other_configuration_flag": false,
"qos_limit": -1,
"qos_mode": "no_qos",
"router_advertisement": false,
"syn_mode": "default",
"virtual_engine_vlan_ok": false,
"vlanInterfaces": [
]
}
}
""")

View File

@ -0,0 +1,341 @@
# Copyright 2015 Intel Corporation.
# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# Copyright 2015 Yalei Wang <yalei.wang at intel com>
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import netaddr
from neutron.common import constants
from neutron.common import log
from neutron_fwaas.services.firewall.drivers import fwaas_base
import neutron_fwaas.services.firewall.drivers.mcafee.smc_api as smc_api
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
NGFWOpts = [
cfg.StrOpt(
'smc_url',
default='',
help=_("URL to contact SMC server")
),
cfg.StrOpt(
'smc_api_auth_key',
default='',
help=_("Authentication key to SMC API")
),
cfg.StrOpt(
'smc_api_version',
default='',
help=_("verion of SMC API")
),
]
cfg.CONF.register_opts(NGFWOpts, 'ngfw')
LOG = logging.getLogger(__name__)
class NgfwFwaasDriver(fwaas_base.FwaasDriverBase):
"""Firewall driver for NGFW Fwaas of Mcafee """
def __init__(self):
LOG.debug("Initializing FWaas Mcafee NGFW driver")
super(NgfwFwaasDriver, self).__init__()
self._host_list = []
self._network_list = []
self._smc_url = cfg.CONF.ngfw.smc_url
self.fw_ips_template_ref = None
self.fw_template_ref = None
self.connection = smc_api.SMCAPIConnection(
self._smc_url,
cfg.CONF.ngfw.smc_api_version,
cfg.CONF.ngfw.smc_api_auth_key)
@log.log
def create_firewall(self, agent_mode, apply_list, firewall):
# call update_firewall, because one tenant only support
# one firewall
return self.update_firewall(agent_mode, apply_list, firewall)
@log.log
def delete_firewall(self, agent_mode, apply_list, firewall):
# tell SMC server to remove the ngfw policy
return self._delete_policy(apply_list, firewall)
@log.log
def update_firewall(self, agent_mode, apply_list, firewall):
for router_info in apply_list:
rt = router_info.router
# only update the policy when the router is active
if (rt['tenant_id'] == firewall['tenant_id'] and
rt['status'] == 'ACTIVE'):
self._update_policy(rt, firewall)
def _delete_policy(self, apply_list, firewall):
for router_info in apply_list:
rt = router_info.router
self._clear_policy(rt, firewall)
@log.log
def apply_default_policy(self, apply_list, firewall):
return self._delete_policy(apply_list, firewall)
def _update_policy(self, router, firewall):
# clear all the policy first
self._clear_policy(router, firewall)
if firewall['admin_state_up']:
self._setup_policy(router, firewall)
def _is_ips_policy(self, policy_name):
return policy_name[len(policy_name) - 4:].lower() == '-ips'
def _get_policy_ref(self, policy_name):
# get the template ref at the first time
if not self.fw_ips_template_ref or not self.fw_template_ref:
r = self.connection.get('elements/fw_template_policy')
fw_template_list = r[0]['result']
for tplt in fw_template_list:
if tplt['name'] == "Firewall Inspection Template":
self.fw_ips_template_ref = tplt['href'].replace(
self._smc_url +
"/%s/" % cfg.CONF.ngfw.smc_api_version,
'')
elif tplt['name'] == "Firewall Template":
self.fw_template_ref = tplt['href'].replace(
self._smc_url +
"/%s/" % cfg.CONF.ngfw.smc_api_version,
'')
# use different template base on the policy name
if self._is_ips_policy(policy_name):
template = self.fw_ips_template_ref
else:
template = self.fw_template_ref
# create the policy in SMC server
fw_policy = {
"name": policy_name,
"template": template
}
ref = self._get_ref_from_service_data('fw_policy', fw_policy)
return ref
def _parse_port(self, source_port):
min_port = ''
max_port = ''
if source_port is None:
min_port = 0
max_port = 65535
elif ':' in source_port:
ports = source_port.split(':')
min_port = int(ports[0])
max_port = int(ports[1])
else:
min_port = int(source_port)
max_port = ''
return min_port, max_port
def _get_ref_from_addr(self, addr):
if addr == 'None':
return addr
ip = netaddr.IPNetwork(addr)
if str(ip.netmask) != "255.255.255.255":
# create network objects
ref = self._create_network(addr)
else:
# create host objects
ref = self._create_host(str(ip.ip))
return ref
def _get_ref_from_service_data(self, service_path, service_data):
json_data = jsonutils.dumps(service_data)
r = self.connection.post_element(service_path, json_data)
srv_ref = r.headers['location']
return srv_ref
def _convert_ipv4_to_ngfw_rule(self, rule):
# convert the ipv4 rule into ngfw rules
# create src/dst of hosts or networks
src_ref = self._get_ref_from_addr(str(rule['source_ip_address']))
dst_ref = self._get_ref_from_addr(str(rule['destination_ip_address']))
# create service
srv_ref = ''
service_dict = {}
service = "%s_service" % rule['protocol']
if rule['protocol'] in (constants.PROTO_NAME_TCP,
constants.PROTO_NAME_UDP):
source_port = rule['source_port']
dest_port = rule['destination_port']
min_src_port, max_src_port = self._parse_port(source_port)
min_dst_port, max_dst_port = self._parse_port(dest_port)
service_data = {
"name": "service-%s" % rule['name'],
"min_src_port": min_src_port,
"max_src_port": (min_src_port if max_src_port == ''
else max_src_port),
"min_dst_port": min_dst_port,
"max_dst_port": (min_dst_port if max_dst_port == ''
else max_dst_port)
}
srv_ref = self._get_ref_from_service_data(service,
service_data)
service_dict = {"service": [srv_ref]}
elif rule['protocol'] == constants.PROTO_NAME_ICMP:
# only ping is supported
service_data = {
"name": "service%s" % "22",
"icmp_type": 0,
"icmp_code": 0
}
srv_ref = self._get_ref_from_service_data(service,
service_data)
service_dict = {"service": [srv_ref]}
elif rule['protocol'] is None:
# protocal "ANY" is translated to accept all, no service create
# here
# TODO(yalie): add rules for different protocol, not ignore the
# other value like ports.
service_dict = {"any": True}
else:
raise NotImplementedError(
_("not support %s protocol now") % rule['protocol'])
# create fw rule
action = "discard" if rule["action"] == "deny" else "allow"
payload = {
"name": rule['name'],
"action": {
"action": action,
"connection_tracking_options": {}
},
"destinations": {"dst": [dst_ref]},
"services": service_dict,
"sources": {"src": [src_ref]}
}
json_data = jsonutils.dumps(payload)
return json_data
def _get_policy_name(self, router, fw):
# SMC server would bind the different NGFW policy with different
# routers(sg-engine) in a tenant
return "%s_%s_%s" % (
fw['id'][0:7], fw['firewall_policy_id'][0:7], router['id'][0:7])
def _setup_policy(self, router, fw):
# one tenant should use only one policy
with self.connection.login_server():
# create policy ref
policy_name = self._get_policy_name(router, fw)
policy_ref = self._get_policy_ref(policy_name)
# post service
for rule in fw['firewall_rule_list']:
if not rule['enabled']:
continue
if rule['ip_version'] == 4:
json_data = self._convert_ipv4_to_ngfw_rule(rule)
self.connection.post(policy_ref +
"/fw_ipv4_access_rule",
json_data, raw=True)
else:
msg = (_('Unsupported IP version rule. %(version)') %
{'version': rule['ip_version']})
raise ValueError(msg)
# upload the policy
self.connection.post(policy_ref + "/upload", '', raw=True)
def _clear_policy(self, router, fw):
# find the policy used by the tenant and firewall
policy_name = self._get_policy_name(router, fw)
path_policy_filter = 'elements/fw_policy?filter=%s' % policy_name
with self.connection.login_server():
r = self.connection.get(path=path_policy_filter)
fw_list = r[0]['result']
for f in fw_list:
if f['name'] == policy_name:
self.connection.delete(f['href'], raw=True)
# Warning, find unused elements and delete them.
r = self.connection.get(path='elements/search_unused')
element_list = r[0]['result']
for element in element_list:
self.connection.delete(element['href'], raw=True)
def _create_host(self, ip):
ref = None
host_json_def = {
"name": "host-%s" % str(ip),
"address": ip
}
with self.connection.login_server():
ref = self._get_ref_from_service_data('host', host_json_def)
return ref
def _create_network(self, cidr):
ref = None
net_json_def = {
"name": "network-%s" % str(cidr),
"ipv4_network": cidr
}
with self.connection.login_server():
# some network maybe pre-created by router-plugin
r = self.connection.get("elements/network")
networks = r[0]['result']
for net in networks:
if net['name'] == "network-%s" % cidr:
ref = net['href']
return ref
ref = self._get_ref_from_service_data('network', net_json_def)
return ref

View File

@ -0,0 +1,476 @@
# Copyright 2015 Intel Corporation.
# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# Copyright 2015 Yalei Wang <yalei.wang at intel com>
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# This script uses SMC-API to get/post elements from SMC server
#
import abc
import netaddr
import requests
import six
from contextlib import contextmanager
from neutron.common import exceptions as n_exc
from neutron.i18n import _LE, _LI, _LW
from oslo_log import log as logging
from oslo_serialization import jsonutils
from neutron_fwaas.services.firewall.drivers.mcafee import constants as const
LOG = logging.getLogger(__name__)
class SMCAPIResult(object):
"""Class for returning result to API caller"""
def __init__(self, tp):
self.type = tp
self.result = "N/A"
self.code = "200"
self.headers = None
def is_json(self):
return self.type == "json"
def is_text(self):
return self.type == "text"
def __str__(self):
return self.result
class SMCAPIConnection(object):
"""Provide the REST API method to connect to the SMC server.
For login/logout operation, users should set server IP, API version and
auth key first. For get/put/delete operation, users should provide the
target element'path, and special json format data section followed "SMC
API User's Guide".
"""
def __init__(self, host, api_version, authentication_key):
self.cookies = {}
self.host = host
self.api_version = api_version
self.host_api_url = self.host + "/" + self.api_version
self.auth_key = authentication_key
self.session = None
@contextmanager
def login_server(self):
if self.session:
yield
else:
ret = self.login()
LOG.debug("SMC server LOGIN successfully.")
if ret:
try:
yield
except Exception:
LOG.exception(_LE("exception while connect to server!"))
raise n_exc.ServiceUnavailable(resource='SMC server',
msg=_("OPERATION failed"))
finally:
self.logout()
else:
raise n_exc.BadRequest(resource='SMC server',
msg=_("LOGIN failed!"))
def login(self):
self.session = requests.session()
post_addr = ("%s/login?authenticationkey=%s&beta=true" %
(self.host_api_url, self.auth_key))
res = self.session.post(post_addr)
if res.status_code == 200:
return True
LOG.error(_LE("connect to %(host)s failed"
" (%(msg)s/ code %(code)s)"),
{'host': post_addr,
'msg': res.reason,
'code': res.status_code})
return False
def logout(self):
result = self.session.put("%s/logout" % (self.host_api_url))
self.session = None
LOG.debug("LOGOUT from SMC server result %s", result)
def session_op(self, attr, path, raw=False, data=None, headers=None):
op = getattr(self.session, attr)
if raw:
result = op(path, headers=headers, data=data)
else:
result = op("%s/%s" %
(self.host_api_url, path), headers=headers, data=data)
if result.status_code == "404":
LOG.error(_LE("SMC Error 404 %s"), result.reason)
return result
def get(self, path, etag=None, raw=False):
json_result = None
etag_out = None
headers = {'accept': 'application/json',
'content-type': 'application/json'}
if etag:
headers['ETag'] = etag
try:
result = self.session_op("get", path, raw, headers=headers)
if 'etag' in result.headers:
etag_out = result.headers['etag']
json_result = result.json()
if result.status_code == "404":
LOG.error(_LE("%(msg)s %(detail)s"),
{'msg': json_result["message"],
'detail': json_result["details"]})
except Exception:
LOG.error(_LE("exception when GET operation"))
raise
r = [json_result]
if etag_out:
r.append(etag_out)
return [json_result]
def check_ret(self, string, path, ret, updated_result):
if ret.status_code != 200:
LOG.info(_LI("%(str)s ELEMENT result code: %(stat)d "
"%(path)s %(reason)s text=%(text)s"),
{'str': string, 'path': path,
'stat': ret.status_code,
'reason': ret.reason,
'text': ret.text})
updated_result.type = "text"
updated_result.result = ret.text
else:
if ret.headers.get('content-type') == "application/json":
updated_result.type = "json"
updated_result.result = ret.json
else:
updated_result.type = "text"
updated_result.result = ret.content
updated_result.code = ret.status_code
def delete(self, path, raw=False):
del_result = SMCAPIResult("text")
try:
result = self.session_op("delete", path, raw)
self.check_ret("DELETE", path, result, del_result)
except Exception:
LOG.error(_LE("exception when DELETE operation"))
raise
return del_result
def post(self, path, json_element, raw=False):
headers = {'accept': '*/*',
'content-type': 'application/json'}
post_result = SMCAPIResult("text")
try:
result = self.session_op(
"post", path, raw, headers=headers, data=json_element)
self.check_ret("POST", path, result, post_result)
post_result.headers = result.headers
except Exception:
LOG.error(_LE("exception when POST operation"))
raise
return post_result
def post_element(self, element_type, json_element):
return self.post("elements/%s" % (element_type), json_element)
@six.add_metaclass(abc.ABCMeta)
class SMCAPIElement(object):
"""
Base class of elements, used by L2/L3 single firewall class
"""
element_type = "N/A"
@staticmethod
def usage(extra_info=None):
if extra_info:
LOG.error(_LE("Error -> %s"), extra_info)
raise ValueError(_('Wrong initial data!'))
def __init__(self, name, smc_api_connection, control_ip=None):
if not name:
self.usage("name of element missing.")
self.name = name
self.element_id = 0
self.json_element = None
self.element_template = None
self.smc_api_connection = smc_api_connection
self.keyboard = None
self.timezone = None
if control_ip:
self.control_ip = netaddr.IPNetwork(control_ip)
if self.control_ip.prefixlen == 32:
self.usage(
"Control_ip %s needs to netmask bits e.g x.x.x.x/yy"
% (self.control_ip))
else:
self.control_ip = None
def to_json(self):
return jsonutils.dumps(self.json_element)
@abc.abstractmethod
def create(self):
raise NotImplementedError(
"not support SMCAPIElement create")
@abc.abstractmethod
def update(self):
raise NotImplementedError(
"not support SMCAPIElement update")
@abc.abstractmethod
def delete(self):
raise NotImplementedError(
"not support SMCAPIElement delete")
def get_element(self, path):
LOG.debug("Getting path: %s", path)
return self.smc_api_connection.get("elements/%s" % (path))
def get_elements(self, element_type=None):
if not element_type:
element_type = self.element_type
return self.smc_api_connection.get("elements/%s" % (element_type))
def fetch_element_id(self):
json_result = self.get_elements()
if not json_result[0]['result']:
LOG.warn(_LW("No #{element_type} defined in SMC"))
else:
for element in json_result[0]['result']:
href = element['href']
self.element_id = int(href.split('/')[-1])
if element['name'] == self.name:
LOG.debug("%(type)s element with name %(name)s FOUND "
"%(href)s",
{'type': self.element_type,
'name': self.name,
'href': href})
break
LOG.debug("Got ID %s", self.element_id)
return self.element_id
def get_initial_contact_data(self):
"""Get the element's configuration data used to contact to SMC server.
Contact data is a configuration string including the SMC server's IP,
interfaces defined and special one-time password.
eg. first create the L3 element on behalf of sg-engine in SMC server
and generate the contact data, then boot the sg-engine with it and
engine will init properly and connect to SMC server finally.
"""
data = None
result = self.get_element("%s/%s/node" %
(self.element_type, self.element_id))
LOG.debug("resule = %s", result)
node_ref = result[0]['result'][0]['href'].replace(
self.smc_api_connection.host_api_url + "/elements/", "")
LOG.debug("Node ref is %s", node_ref)
extra_options = []
if self.keyboard:
extra_options.append("keyboard=%s" % (self.keyboard))
if self.timezone:
extra_options.append("time_zone=%s" % (self.timezone))
if extra_options:
extra_options = "&" + extra_options
else:
extra_options = ""
result = self.smc_api_connection.post_element(
"%s/initial_contact?enable_ssh=true%s" %
(node_ref, extra_options), "")
if result.is_text():
d1 = str(result).split("\n")
idx = 0
for l in d1:
if l.find("ssh/enabled") != -1:
l = l.replace("false", "true")
d1[idx] = l
idx += 1
result.result = "\n".join(d1)
data = result
result = self.smc_api_connection.post_element(
"%s/bind_license" % (node_ref), "")
if result.code != 200:
LOG.error(_LE("Could not bind license. "
"Maybe SMC license pool is empty. "
"SMC API details: %s"), result)
return data
class SMCAPIElementL2FWSingle(SMCAPIElement):
"""L2 single firewall element."""
element_type = "single_layer2"
def __init__(self, name, smc_api_connection, control_ip):
SMCAPIElement.__init__(self, name, smc_api_connection, control_ip)
self.element_id = 0
self.json_element = None
def create(self):
json_result = self.get_elements("log_server")
log_server_ref = json_result[0]['result'][0]['href']
LOG.debug("Using log server '%(name)s', ref %(ref)s",
{'name': json_result[0]['result'][0]['name'],
'ref': log_server_ref})
json_result = self.get_elements("logical_interface")
logical_interfaces = dict((logical_iface['name'],
logical_iface['href']) for logical_iface in
json_result[0]['result'] if logical_iface['name']
in ('default_eth', 'capture'))
for name, ref in logical_interfaces.iteritems():
LOG.debug("Using logical interface %(name)s ref %(href)s",
{'name': name, 'href': ref})
json_data = jsonutils.loads(const.L2_ELEMENT_TEMPLATE)
json_data[const.JSON_LOG_SERVER_REF] = log_server_ref
json_data[const.JSON_NAME] = self.name
json_data[const.JSON_NODES][0]['fwlayer2_node']['name'] = (self.name +
" node 1")
physical_ifaces = json_data[const.JSON_PHY_INTERFACES]
for phys_iface in physical_ifaces:
for iface in phys_iface[const.JSON_PHY_INTF]['interfaces']:
if 'inline_interface' in iface:
inline_iface = iface['inline_interface']
inline_iface['logical_interface_ref'] = (
logical_interfaces['default_eth']['href'])
elif 'capture_interface' in iface:
capture_iface = iface['capture_interface']
capture_iface['logical_interface_ref'] = (
logical_interfaces['capture']['href'])
elif 'node_interface' in iface:
node_iface = iface['node_interface']
if not node_iface['primary_mgt']:
continue
node_iface[const.JSON_NODE_NET_ADDR] = (
str(self.control_ip.ip))
node_iface[const.JSON_NODE_NET_VALUE] = (
str(self.control_ip.cidr))
self.json_element = json_data
self.smc_api_connection.post_element(self.element_type, self.to_json())
self.fetch_element_id()
def update(self):
"""Update element """
pass
def delete(self):
"""Delete element """
pass
class SMCAPIElementL3FWSingle(SMCAPIElement):
"""L3 single firewall element."""
element_type = "single_fw"
def __init__(self, name, smc_api_connection, control_ip):
super(SMCAPIElementL3FWSingle, self).__init__(self, name,
smc_api_connection, control_ip)
self.element_id = 0
self.json_element = None
self.physical_interfaces = []
def modify_interface_property(self, physical_interface, name, value):
iface = physical_interface[const.JSON_PHY_INTF]
iface = iface['interfaces'][0]['single_node_interface']
iface[name] = value
def add_physical_interface(self, ip_and_network, interface_id):
ip = netaddr.IPNetwork(ip_and_network)
json_data = jsonutils.loads(const.PHYSICAL_INTERFACE_TEMPLATE)
phys_iface = json_data[const.JSON_PHY_INTF]
phys_iface['interface_id'] = interface_id
iface = json_data[const.JSON_PHY_INTF]['interfaces'][0]
iface = iface['single_node_interface']
iface[const.JSON_NODE_NET_ADDR] = str(ip.ip)
iface[const.JSON_NODE_NET_VALUE] = str(ip.cidr)
self.physical_interfaces.append(json_data)
return json_data
def create(self):
json_result = self.get_elements("log_server")
log_server_ref = json_result[0]['result'][0]['href']
LOG.debug(
"Using log server '%(name)s' ref %(ref)s",
{'name': json_result[0]['result'][0]['name'],
'ref': log_server_ref})
json_data = jsonutils.loads(const.L3_ELEMENT_TEMPLATE)
json_data[const.JSON_LOG_SERVER_REF] = log_server_ref
json_data[const.JSON_NAME] = self.name
json_data[const.JSON_NODES][0]['firewall_node']['name'] = (self.name +
" node 1")
iface = self.add_physical_interface(self.control_ip, 0)
self.modify_interface_property(iface, "primary_mgt", True)
for phys_iface in self.physical_interfaces:
json_data[const.JSON_PHY_INTERFACES].append(phys_iface)
LOG.debug("%s",
jsonutils.dumps(json_data, sort_keys=False,
indent=2, separators=(',', ': ')))
self.json_element = json_data
self.smc_api_connection.post_element(self.element_type, self.to_json())
self.fetch_element_id()

View File

@ -0,0 +1,222 @@
# Copyright 2015 Intel Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
import neutron_fwaas.services.firewall.drivers.mcafee as mcafee
import neutron_fwaas.services.firewall.drivers.mcafee.ngfw_fwaas as fwaas
from neutron.tests import base
FAKE_FIREWALL_ID = 'firewall_id'
FAKE_POLICY_ID = 'policy_id'
FAKE_TENANT_ID = 'tenant_id'
FAKE_ROUTER_ID = 'router_id'
FAKE_FW_NAME = 'fw_name'
class NGFWFwaasTestCase(base.BaseTestCase):
def setUp(self):
super(NGFWFwaasTestCase, self).setUp()
self.firewall = fwaas.NgfwFwaasDriver()
self.rule_list = self._fake_ipv4_rules()
self.apply_list = self._fake_apply_list()
self.post_return = mock.MagicMock()
self.tmp_ref = 'temp_ref'
self.post_return.headers = {'location': self.tmp_ref}
# we generate the policy name by formatting the ids of firewall,
# policy, router
self.policy_name = "%s_%s_%s" % (
FAKE_FIREWALL_ID[0:7], FAKE_POLICY_ID[0:7],
FAKE_ROUTER_ID[0:7])
def _fake_ipv4_rules(self):
rule1 = {'action': 'deny',
'description': '',
'destination_ip_address': None,
'destination_port': '23',
'enabled': True,
'firewall_policy_id': FAKE_POLICY_ID,
'id': '1',
'ip_version': 4,
'name': 'a2',
'position': 1,
'protocol': 'udp',
'shared': False,
'source_ip_address': None,
'source_port': '23',
'tenant_id': FAKE_TENANT_ID}
rule2 = {'action': 'deny',
'description': '',
'destination_ip_address': None,
'destination_port': None,
'enabled': True,
'firewall_policy_id': FAKE_POLICY_ID,
'id': '2',
'ip_version': 4,
'name': 'a3',
'position': 2,
'protocol': 'icmp',
'shared': False,
'source_ip_address': '192.168.100.0/24',
'source_port': None,
'tenant_id': FAKE_TENANT_ID}
rule3 = {'action': 'allow',
'description': '',
'destination_ip_address': None,
'destination_port': None,
'enabled': True,
'firewall_policy_id': FAKE_POLICY_ID,
'id': '3',
'ip_version': 4,
'name': 'a4',
'position': 3,
'protocol': 'tcp',
'shared': False,
'source_ip_address': None,
'source_port': None,
'tenant_id': FAKE_TENANT_ID}
return [rule1, rule2, rule3]
def _fake_firewall(self, rule_list):
fw = {
'admin_state_up': True,
'description': '',
'firewall_policy_id': FAKE_POLICY_ID,
'id': FAKE_FIREWALL_ID,
'name': FAKE_FW_NAME,
'shared': None,
'status': 'PENDING_CREATE',
'tenant_id': FAKE_TENANT_ID,
'firewall_rule_list': rule_list}
return fw
def _fake_apply_list(self):
apply_list = []
router_info_inst = mock.Mock()
fake_interface = mock.Mock()
router_inst = (
{'_interfaces': fake_interface,
'admin_state_up': True,
'distributed': False,
'external_gateway_info': None,
'gw_port_id': None,
'ha': False,
'ha_vr_id': 0,
'id': FAKE_ROUTER_ID,
'name': 'rrr1',
'routes': [],
'status': 'ACTIVE',
'tenant_id': FAKE_TENANT_ID})
router_info_inst.router = router_inst
apply_list.append(router_info_inst)
return apply_list
def test_update_firewall(self):
firewall = self._fake_firewall(self.rule_list)
ref_v4rule = self.tmp_ref + "/fw_ipv4_access_rule"
ref_upload = self.tmp_ref + "/upload"
with contextlib.nested(
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'login'),
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'get'),
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'logout'),
mock.patch.object(
mcafee.smc_api.SMCAPIConnection, 'post',
return_value=self.post_return),
) as (lg, get, logout, post):
expected = [mock.call(
'elements/fw_policy',
'{"name": "%s", "template": null}' % self.policy_name),
mock.call(
'elements/udp_service',
'{"min_dst_port": 23, "max_dst_port": 23, '
'"name": "service-a2", "max_src_port": 23, '
'"min_src_port": 23}'),
mock.call(
ref_v4rule,
'{"action": {"action": "discard", '
'"connection_tracking_options": {}}, '
'"services": {"service": ["%s"]}, "sources": '
'{"src": ["None"]}, "name": "a2", "destinations": '
'{"dst": ["None"]}}' % self.tmp_ref, raw=True),
mock.call(
'elements/network',
'{"ipv4_network": "192.168.100.0/24", '
'"name": "network-192.168.100.0/24"}'),
mock.call(
'elements/icmp_service',
'{"icmp_code": 0, "icmp_type": 0, "name": "service22"}'),
mock.call(ref_v4rule,
'{"action": {"action": "discard", '
'"connection_tracking_options": {}}, '
'"services": {"service": ["%s"]}, '
'"sources": {"src": ["%s"]}, "name": "a3", '
'"destinations": {"dst": ["None"]}}' % (
self.tmp_ref, self.tmp_ref), raw=True),
mock.call(
'elements/tcp_service',
'{"min_dst_port": 0, "max_dst_port": 65535, '
'"name": "service-a4", "max_src_port": 65535, '
'"min_src_port": 0}'),
mock.call(
ref_v4rule,
'{"action": {"action": "allow", '
'"connection_tracking_options": {}}, '
'"services": {"service": ["%s"]}, '
'"sources": {"src": ["None"]}, "name": "a4", '
'"destinations": {"dst": ["None"]}}' %
self.tmp_ref, raw=True),
mock.call(ref_upload, '', raw=True)]
self.firewall.update_firewall('legacy', self.apply_list, firewall)
self.assertEqual(expected, post.call_args_list)
def test_create_firewall(self):
self.test_update_firewall()
def test_delete_firewall(self):
firewall = self._fake_firewall(self.rule_list)
get_value = [{'result': [{'name': self.policy_name,
'href': self.tmp_ref}, ]}, ]
with contextlib.nested(
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'login'),
mock.patch.object(
mcafee.smc_api.SMCAPIConnection, 'get',
return_value=get_value),
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'logout'),
mock.patch.object(
mcafee.smc_api.SMCAPIConnection, 'post',
return_value=self.post_return),
mock.patch.object(mcafee.smc_api.SMCAPIConnection, 'delete'),
) as (lg, get, logout, post, delete):
self.firewall.delete_firewall('legacy', self.apply_list, firewall)
expected = [
mock.call(self.tmp_ref, raw=True),
mock.call(self.tmp_ref, raw=True)
]
self.assertEqual(expected, delete.call_args_list)