Add NetApp Active IQ scheduler weigher

Add the new scheduler weigher NetAppAIQWeigher that relies on
the NetApp Acitve IQ software to weigh the hosts. It only
works with NetApp only hosts.

It is also adding a new NetApp specific pool information
called ``netapp_cluster_name`` that contains the name
of the cluster where the pool is located.

Implements: netapp-active-iq-scheduler-weigher
Signed-off-by Felipe Rodrigues <felipefuty01@gmail.com>

Change-Id: I36b08066545afdaa37e053eee319bc9cd489efdc
This commit is contained in:
Felipe Rodrigues 2023-05-15 14:36:42 -03:00
parent 44014e6827
commit 6c9d990a8f
10 changed files with 862 additions and 2 deletions

View File

@ -1177,3 +1177,8 @@ class ShareBackupSizeExceedsAvailableQuota(QuotaError):
message = _("Requested backup exceeds allowed Backup gigabytes "
"quota. Requested %(requested)sG, quota is %(quota)sG and "
"%(consumed)sG has been consumed.")
class NetappActiveIQWeigherRequiredParameter(ManilaException):
message = _("%(config)s configuration of the NetAppActiveIQ weigher "
"must be set.")

View File

@ -0,0 +1,355 @@
# Copyright 2023 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
import requests
from requests.adapters import HTTPAdapter
from requests import auth
from urllib3.util import retry
from manila import exception
from manila.scheduler.weighers import base_host
ACTIVE_IQ_WEIGHER_GROUP = 'netapp_active_iq'
active_iq_weight_opts = [
cfg.HostAddressOpt('aiq_hostname',
help='The hostname (or IP address) for the Active IQ.'),
cfg.PortOpt('aiq_port',
help=('The TCP port to use for communication with the Active '
'IQ. If not specified, the weigher driver will use 80 '
'for HTTP and 443 for HTTPS.')),
cfg.StrOpt('aiq_transport_type',
default='https',
choices=['http', 'https'],
help=('The transport protocol used when communicating with '
'the Active IQ. Valid values are '
'http or https.')),
cfg.BoolOpt('aiq_ssl_verify',
default=False,
help='Verifying the SSL certificate. Default is False.'),
cfg.StrOpt('aiq_ssl_cert_path',
help=("The path to a CA_BUNDLE file or directory with "
"certificates of trusted CA. If set to a directory, it "
"must have been processed using the c_rehash utility "
"supplied with OpenSSL. If not informed, it will use the "
"Mozilla's carefully curated collection of Root "
"Certificates for validating the trustworthiness of SSL "
"certificates.")),
cfg.StrOpt('aiq_username',
help=('Administrative user account name used to access the '
'Active IQ.')),
cfg.StrOpt('aiq_password',
help=('Password for the administrative user account '
'specified in the aiq_username option.'),
secret=True),
cfg.IntOpt('aiq_eval_method',
default=0,
help='Integer indicator of which evaluation method, defaults '
'to 0 (0 - by index, 1 - normalized value, 2 - by '
'literal value).'),
cfg.ListOpt('aiq_priority_order',
default=[
'ops',
'latency',
'volume_count',
'size'
],
help='Permutation of the list ["volume_count", "size", '
'"latency", “ops”]. Note that for volume_count and '
'latency, the higher the values, the less optimal the '
'resources. For capacity and ops, the higher the value '
'the more desirable the resources. If metrics are to be '
'considered with equal weights, concatenate the strings, '
'separated by ":".'
'An example is ["volume_count", "size", “latency:ops”] '
'if latency and ops want to have equal but minimum '
'weights, or ["volume_count:size", "latency", “ops”] '
'if volume_count and size have equal maximum weights. '
'If not provided, the default order is '
'["volume_count", "size", "latency", “ops”].'),
]
CONF = cfg.CONF
CONF.register_opts(active_iq_weight_opts, ACTIVE_IQ_WEIGHER_GROUP)
LOG = logging.getLogger(__name__)
class NetAppAIQWeigher(base_host.BaseHostWeigher):
"""AIQ Weigher. Assign weights based on NetApp Active IQ tool."""
def __init__(self, *args, **kwargs):
super(NetAppAIQWeigher, self).__init__(*args, **kwargs)
self.configuration = CONF[ACTIVE_IQ_WEIGHER_GROUP]
self.host = self.configuration.aiq_hostname
if not self.host:
raise exception.NetappActiveIQWeigherRequiredParameter(
config="aiq_hostname")
self.username = self.configuration.aiq_username
if not self.username:
raise exception.NetappActiveIQWeigherRequiredParameter(
config="aiq_username")
self.password = self.configuration.aiq_password
if not self.password:
raise exception.NetappActiveIQWeigherRequiredParameter(
config="aiq_password")
self.protocol = self.configuration.aiq_transport_type
self.port = self.configuration.aiq_port
if not self.port:
self.port = "80" if self.protocol == "http" else "443"
self.ssl_verify = self.configuration.aiq_ssl_verify
if self.ssl_verify and self.configuration.aiq_ssl_cert_path:
self.ssl_verify = self.configuration.aiq_ssl_cert_path
self.eval_method = self.configuration.aiq_eval_method
self.priority_order = self.configuration.aiq_priority_order
def _weigh_object(self, host_state, weight_properties):
"""Weight for a specific object from parent abstract class"""
# NOTE(felipe_rodrigues): this abstract class method is not called for
# the AIQ weigher, since it does not weigh one single object.
raise NotImplementedError()
def _weigh_active_iq(self, netapp_aggregates_location, weight_properties):
"""Determine host's rating based on a Active IQ."""
size = weight_properties.get('size')
share_type = weight_properties.get('share_type', {})
performance_level_name = share_type.get('extra_specs', {}).get(
'netapp:performance_service_level_name')
# retrieves the performance service level key if a PSL name is given.
performance_level_id = None
if performance_level_name:
performance_level_id = self._get_performance_level_id(
performance_level_name)
if not performance_level_id:
return []
# retrieves the equivalent active IQ keys of the pools.
resource_keys = self._get_resource_keys(netapp_aggregates_location)
if len(resource_keys) == 0:
return []
result = self._balance_aggregates(resource_keys, size,
performance_level_id)
return result
def _get_url(self):
"""Get the base URL for REST requests."""
host = self.host
if ':' in host:
host = '[%s]' % host
return f'{self.protocol}://{host}:{self.port}/api/'
def _get_request_method(self, method, session):
"""Returns the request method to be used in the REST call."""
request_methods = {
'post': session.post,
'get': session.get,
'put': session.put,
'delete': session.delete,
'patch': session.patch,
}
return request_methods[method]
def _get_session_method(self, method):
"""Get the REST method from the session."""
# NOTE(felipe_rodrigues): request resilient of temporary network
# failures (like name resolution failure), retrying until 5 times.
_session = requests.Session()
max_retries = retry.Retry(total=5, connect=5, read=2, backoff_factor=1)
adapter = HTTPAdapter(max_retries=max_retries)
_session.mount('%s://' % self.protocol, adapter)
_session.auth = auth.HTTPBasicAuth(self.username, self.password)
_session.verify = self.ssl_verify
_session.headers = {}
return self._get_request_method(method, _session)
def _call_active_iq(self, action_path, method, body=None):
"""Call the Active IQ REST API."""
rest_method = self._get_session_method(method)
url = self._get_url() + action_path
msg_args = {
"method": method.upper(),
"url": url,
"body": body,
}
LOG.debug("REQUEST: %(method)s %(url)s Body=%(body)s", msg_args)
response = rest_method(url, json=body)
code = response.status_code
response_body = response.content
msg_args = {
"code": code,
"body": response_body,
}
LOG.debug("RESPONSE: %(code)s Body=%(body)s", msg_args)
return code, response_body
def _get_performance_level_id(self, performance_level_name):
"""Gets the ID of a performance level name."""
psl_endpoint = (f'storage-provider/performance-service-levels?'
f'name={performance_level_name}')
try:
code, res = self._call_active_iq(psl_endpoint, "get")
except Exception as e:
LOG.error("Could not retrieve the key of the performance service "
"level named as '%(psl)s'. Skipping the weigher. "
"Error: %(error)s",
{'psl': performance_level_name, 'error': e})
LOG.error(e)
return None
if code != 200:
LOG.error("Could not retrieve the key of the performance service "
"level named as '%(psl)s'. Skipping the weigher.",
{'psl': performance_level_name})
return None
res = jsonutils.loads(res) if res else {}
psl_list = res.get('records', [])
if len(psl_list) == 0:
LOG.error("Could not found any performance service level named "
"as '%s'. Skipping the weigher.", performance_level_name)
return None
return psl_list[0].get("key", None)
def _get_aggregate_identifier(self, aggr_name, cluster_name):
"""Returns the string identifier of an aggregate on a cluster."""
return f'{aggr_name}:{cluster_name}'
def _get_resource_keys(self, netapp_aggregates_location):
"""Map the aggregates names to the AIQ resource keys."""
aggregate_endpoint = 'datacenter/storage/aggregates'
try:
code, res = self._call_active_iq(aggregate_endpoint, "get")
except Exception as e:
LOG.error("Could not retrieve the aggregates resource keys. "
"Skipping the weigher. Error: %s", e)
LOG.error(e)
return []
if code != 200:
LOG.error("Could not retrieve the aggregates resource keys. "
"Skipping the weigher.")
return []
res = jsonutils.loads(res) if res else {}
aggr_map = {}
for aggr in res.get('records', []):
identifier = self._get_aggregate_identifier(
aggr["name"], aggr["cluster"]["name"])
aggr_map[identifier] = aggr["key"]
# we must keep the lists with the same order.
resource_keys = []
found_pool_keys = []
for identifier in netapp_aggregates_location:
if identifier in aggr_map:
found_pool_keys.append(identifier)
# If a pool could not be found, it is marked as resource key 0.
resource_keys.append(aggr_map.get(identifier, 0))
LOG.debug("The following pools will be evaluated by Active IQ: %s",
found_pool_keys)
return resource_keys
def _balance_aggregates(self, resource_keys, size, performance_level_uuid):
"""Call AIQ to generate the weights of each aggregate."""
balance_endpoint = 'storage-provider/data-placement/balance'
body = {
"capacity": f'{size}GB',
"eval_method": self.eval_method,
# NOTE(felipe_rodrigues): from Active IQ documentation, the
# opt_method only works as 0.
"opt_method": 0,
"priority_order": self.priority_order,
"separate_flag": False,
# NOTE(felipe_rodrigues): remove the keys marked with 0, since they
# are not found the pool keys.
"resource_keys": [key for key in resource_keys if key != 0],
}
if performance_level_uuid:
body["ssl_key"] = performance_level_uuid
try:
code, res = self._call_active_iq(
balance_endpoint, "post", body=body)
except Exception as e:
LOG.error("Could not balance the aggregates. Skipping the "
"weigher. Error: %s", e)
LOG.error(e)
return []
if code != 200:
LOG.error("Could not balance the aggregates. Skipping the "
"weigher.")
return []
res = jsonutils.loads(res) if res else []
weight_map = {}
for aggr in res:
weight_map[aggr["key"]] = aggr["scores"]["total_weighted_score"]
# it must keep the lists with the same order.
weights = []
for key in resource_keys:
weights.append(weight_map.get(key, 0.0))
return weights
def weigh_objects(self, weighed_obj_list, weight_properties):
"""Weigh multiple objects using Active IQ."""
netapp_aggregates_location = []
for obj in weighed_obj_list:
# if at least one host is not from NetApp, the entire weigher is
# skipped.
if obj.obj.vendor_name != "NetApp":
LOG.debug(
"Skipping Active IQ weigher given that some backends "
"are not from NetApp.")
return []
else:
cluster_name = obj.obj.capabilities.get("netapp_cluster_name")
aggr_name = obj.obj.pool_name
netapp_aggregates_location.append(
self._get_aggregate_identifier(aggr_name, cluster_name))
result = self._weigh_active_iq(
netapp_aggregates_location, weight_properties)
LOG.debug("Active IQ weight result: %s", result)
return result

View File

@ -176,6 +176,7 @@ class NetAppCmodeFileStorageLibrary(object):
self.message_api = message_api.API()
self._snapmirror_schedule = self._convert_schedule_to_seconds(
schedule=self.configuration.netapp_snapmirror_schedule)
self._cluster_name = self.configuration.netapp_cluster_name
@na_utils.trace
def do_setup(self, context):
@ -429,9 +430,12 @@ class NetAppCmodeFileStorageLibrary(object):
flexgroup_aggr = self._get_flexgroup_aggr_set()
aggr_space = self._get_aggregate_space(aggr_pool.union(flexgroup_aggr))
if self._have_cluster_creds:
cluster_name = self._cluster_name
if self._have_cluster_creds and not cluster_name:
# Get up-to-date node utilization metrics just once.
self._perf_library.update_performance_cache({}, self._ssc_stats)
cluster_name = self._client.get_cluster_name()
self._cluster_name = cluster_name
# Add FlexVol pools.
filter_function = (get_filter_function() if get_filter_function
@ -446,6 +450,7 @@ class NetAppCmodeFileStorageLibrary(object):
pool_with_func = copy.deepcopy(pool)
pool_with_func['filter_function'] = filter_function
pool_with_func['goodness_function'] = goodness_function
pool_with_func['netapp_cluster_name'] = self._cluster_name
pools.append(pool_with_func)
@ -462,6 +467,7 @@ class NetAppCmodeFileStorageLibrary(object):
pool_with_func = copy.deepcopy(pool)
pool_with_func['filter_function'] = filter_function
pool_with_func['goodness_function'] = goodness_function
pool_with_func['netapp_cluster_name'] = self._cluster_name
pools.append(pool_with_func)
@ -497,6 +503,7 @@ class NetAppCmodeFileStorageLibrary(object):
'pool_name': pool_name,
'filter_function': None,
'goodness_function': None,
'netapp_cluster_name': '',
'total_capacity_gb': total_capacity_gb,
'free_capacity_gb': free_capacity_gb,
'allocated_capacity_gb': allocated_capacity_gb,

View File

@ -200,7 +200,12 @@ netapp_cluster_opts = [
'option should only be specified when the option '
'driver_handles_share_servers is set to False (i.e. the '
'driver is managing shares on a single pre-configured '
'Vserver).')), ]
'Vserver).')),
cfg.StrOpt('netapp_cluster_name',
help=('This option specifies the Cluster Name on which '
'provisioning of file storage shares should occur. '
'If not set, the driver will try to discover by '
'API call.')), ]
netapp_support_opts = [
cfg.StrOpt('netapp_trace_flags',

View File

@ -244,6 +244,166 @@ SHARE_SERVICE_STATES_WITH_POOLS = {
thin_provisioning=False)]),
}
FAKE_ACTIVE_IQ_WEIGHER_LIST = [
"fake_aggregate_1:fake_cluster_name1",
"fake_aggregate_2:fake_cluster_name2",
"fake_aggregate_3:fake_cluster_name3"
]
FAKE_ACTIVE_IQ_WEIGHER_AGGREGATES_RESPONSE = {
"records": [
{
"name": "fake_aggregate_1",
"key": "fake_key_1",
"cluster": {
"name": "fake_cluster_name1"
}
},
{
"name": "fake_aggregate_2",
"key": "fake_key_2",
"cluster": {
"name": "fake_cluster_name2"
}
},
{
"name": "fake_aggregate_3",
"key": "fake_key_3",
"cluster": {
"name": "fake_cluster_name3"
}
}
]
}
FAKE_ACTIVE_IQ_WEIGHER_BALANCE_RESPONSE = [
{
"key": "fake_key_1",
"scores": {
"total_weighted_score": 10.0
}
},
{
"key": "fake_key_2",
"scores": {
"total_weighted_score": 20.0
}
}
]
class FakeHostManagerNetAppOnly(host_manager.HostManager):
def __init__(self):
super(FakeHostManagerNetAppOnly, self).__init__()
self.service_states = {
'host1': {
'total_capacity_gb': 1024,
'free_capacity_gb': 1024,
'allocated_capacity_gb': 0,
'thin_provisioning': False,
'reserved_percentage': 10,
'reserved_snapshot_percentage': 5,
'reserved_share_extend_percentage': 15,
'timestamp': None,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'replication_type': 'writable',
'replication_domain': 'endor',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster1',
},
'host2': {
'total_capacity_gb': 2048,
'free_capacity_gb': 300,
'allocated_capacity_gb': 1748,
'provisioned_capacity_gb': 1748,
'max_over_subscription_ratio': 2.0,
'thin_provisioning': True,
'reserved_percentage': 10,
'reserved_snapshot_percentage': 5,
'reserved_share_extend_percentage': 15,
'timestamp': None,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'replication_type': 'readable',
'replication_domain': 'kashyyyk',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster2',
},
'host3': {
'total_capacity_gb': 512,
'free_capacity_gb': 256,
'allocated_capacity_gb': 256,
'provisioned_capacity_gb': 256,
'max_over_subscription_ratio': 2.0,
'thin_provisioning': [False],
'reserved_percentage': 0,
'reserved_snapshot_percentage': 0,
'reserved_share_extend_percentage': 0,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'timestamp': None,
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster3',
},
'host4': {
'total_capacity_gb': 2048,
'free_capacity_gb': 200,
'allocated_capacity_gb': 1848,
'provisioned_capacity_gb': 1848,
'max_over_subscription_ratio': 1.0,
'thin_provisioning': [True],
'reserved_percentage': 5,
'reserved_snapshot_percentage': 2,
'reserved_share_extend_percentage': 5,
'timestamp': None,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'replication_type': 'dr',
'replication_domain': 'naboo',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster4',
},
'host5': {
'total_capacity_gb': 2048,
'free_capacity_gb': 500,
'allocated_capacity_gb': 1548,
'provisioned_capacity_gb': 1548,
'max_over_subscription_ratio': 1.5,
'thin_provisioning': [True, False],
'reserved_percentage': 5,
'reserved_snapshot_percentage': 2,
'reserved_share_extend_percentage': 5,
'timestamp': None,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'replication_type': None,
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster5',
},
'host6': {
'total_capacity_gb': 'unknown',
'free_capacity_gb': 'unknown',
'allocated_capacity_gb': 1548,
'thin_provisioning': False,
'reserved_percentage': 5,
'reserved_snapshot_percentage': 2,
'reserved_share_extend_percentage': 5,
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'timestamp': None,
'storage_protocol': 'GLUSTERFS',
'vendor_name': 'NetApp',
'netapp_cluster_name': 'cluster6',
},
}
class FakeFilterScheduler(filter.FilterScheduler):
def __init__(self, *args, **kwargs):
@ -269,6 +429,7 @@ class FakeHostManager(host_manager.HostManager):
'replication_type': 'writable',
'replication_domain': 'endor',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'Dummy',
},
'host2': {'total_capacity_gb': 2048,
'free_capacity_gb': 300,
@ -285,6 +446,7 @@ class FakeHostManager(host_manager.HostManager):
'replication_type': 'readable',
'replication_domain': 'kashyyyk',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'Dummy',
},
'host3': {'total_capacity_gb': 512,
'free_capacity_gb': 256,
@ -299,6 +461,7 @@ class FakeHostManager(host_manager.HostManager):
'create_share_from_snapshot_support': True,
'timestamp': None,
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'Dummy',
},
'host4': {'total_capacity_gb': 2048,
'free_capacity_gb': 200,
@ -315,6 +478,7 @@ class FakeHostManager(host_manager.HostManager):
'replication_type': 'dr',
'replication_domain': 'naboo',
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'Dummy',
},
'host5': {'total_capacity_gb': 2048,
'free_capacity_gb': 500,
@ -330,6 +494,7 @@ class FakeHostManager(host_manager.HostManager):
'create_share_from_snapshot_support': True,
'replication_type': None,
'storage_protocol': 'NFS_CIFS',
'vendor_name': 'Dummy',
},
'host6': {'total_capacity_gb': 'unknown',
'free_capacity_gb': 'unknown',
@ -342,6 +507,7 @@ class FakeHostManager(host_manager.HostManager):
'create_share_from_snapshot_support': True,
'timestamp': None,
'storage_protocol': 'GLUSTERFS',
'vendor_name': 'Dummy',
},
}

View File

@ -0,0 +1,302 @@
# Copyright 2023 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For NetApp Active IQ Weigher.
"""
from unittest import mock
import ddt
from oslo_config import cfg
from oslo_serialization import jsonutils
import requests
from manila import context
from manila import exception
from manila.scheduler.weighers import base_host
from manila.scheduler.weighers import netapp_aiq
from manila.share import utils
from manila import test
from manila.tests.scheduler import fakes
from manila.tests import utils as test_utils
CONF = cfg.CONF
@ddt.ddt
class NetAppAIQWeigherTestCase(test.TestCase):
def setUp(self):
super(NetAppAIQWeigherTestCase, self).setUp()
self.weight_handler = base_host.HostWeightHandler(
'manila.scheduler.weighers')
netapp_aiq.LOG.debug = mock.Mock()
netapp_aiq.LOG.error = mock.Mock()
self.mock_session = mock.Mock()
self.mock_session.get = mock.Mock()
self.mock_session.post = mock.Mock()
self.mock_session.delete = mock.Mock()
self.mock_session.patch = mock.Mock()
self.mock_session.put = mock.Mock()
data = {
'netapp_active_iq': {
'aiq_hostname': "10.10.10.10",
'aiq_transport_type': 'https',
'aiq_ssl_verify': True,
'aiq_ssl_cert_path': 'fake_cert',
'aiq_username': 'fake_user',
'aiq_password': 'fake_password',
'aiq_eval_method': 1,
'aiq_priority_order': 'ops'
}
}
self.netapp_aiq_weigher = None
with test_utils.create_temp_config_with_opts(data):
self.netapp_aiq_weigher = netapp_aiq.NetAppAIQWeigher()
def test__weigh_object(self):
self.assertRaises(NotImplementedError,
self.netapp_aiq_weigher._weigh_object,
"fake", "fake")
@ddt.data(
{'resource_keys': ["fake_resource_key"], 'performance_level': None},
{'resource_keys': ["fake_resource_key"],
'performance_level': "fake_psl"},
{'resource_keys': [], 'performance_level': 'fake_psl'})
@ddt.unpack
def test__weigh_active_iq(self, resource_keys, performance_level):
weight_properties = {
'size': 1,
'share_type': {
'extra_specs': {
"netapp:performance_service_level_name": "fake_name",
}
}
}
mock_get_psl_id = self.mock_object(
self.netapp_aiq_weigher, '_get_performance_level_id',
mock.Mock(return_value=performance_level))
mock_get_resource_keys = self.mock_object(
self.netapp_aiq_weigher, '_get_resource_keys',
mock.Mock(return_value=resource_keys))
mock_balance_aggregates = self.mock_object(
self.netapp_aiq_weigher, '_balance_aggregates',
mock.Mock(return_value=["1.0", "1.0"]))
res = self.netapp_aiq_weigher._weigh_active_iq(
fakes.FAKE_ACTIVE_IQ_WEIGHER_LIST, weight_properties)
mock_get_psl_id.assert_called_once_with("fake_name")
if not resource_keys or not performance_level:
self.assertEqual([], res)
else:
self.assertEqual(["1.0", "1.0"], res)
if performance_level:
mock_get_resource_keys.assert_called_once_with(
fakes.FAKE_ACTIVE_IQ_WEIGHER_LIST)
else:
mock_get_resource_keys.assert_not_called()
if not resource_keys or not performance_level:
mock_balance_aggregates.assert_not_called()
else:
mock_balance_aggregates.assert_called_once_with(
resource_keys, 1, performance_level)
@ddt.data(True, False)
def test__get_url(self, ipv6):
if ipv6:
self.netapp_aiq_weigher.host = "2001:db8::"
else:
self.netapp_aiq_weigher.host = "1.1.1.1"
self.netapp_aiq_weigher.port = "fake_port"
self.netapp_aiq_weigher.protocol = "fake_protocol"
res = self.netapp_aiq_weigher._get_url()
if ipv6:
self.assertEqual('fake_protocol://[2001:db8::]:fake_port/api/',
res)
else:
self.assertEqual('fake_protocol://1.1.1.1:fake_port/api/',
res)
@ddt.data('get', 'post', 'delete', 'patch', 'put')
def test__get_request_method(self, method):
res = self.netapp_aiq_weigher._get_request_method(
method, self.mock_session)
if method == 'get':
self.assertEqual(self.mock_session.get, res)
elif method == 'post':
self.assertEqual(self.mock_session.post, res)
elif method == 'delete':
self.assertEqual(self.mock_session.delete, res)
elif method == 'put':
self.assertEqual(self.mock_session.put, res)
elif method == 'patch':
self.assertEqual(self.mock_session.patch, res)
def test__get_session_method(self):
mock_session_builder = self.mock_object(
requests, 'Session', mock.Mock(return_value=self.mock_session))
mock__get_request_method = self.mock_object(
self.netapp_aiq_weigher, '_get_request_method',
mock.Mock(return_value=self.mock_session.post))
res = self.netapp_aiq_weigher._get_session_method('post')
self.assertEqual(self.mock_session.post, res)
mock_session_builder.assert_called_once_with()
mock__get_request_method.assert_called_once_with(
'post', self.mock_session)
def test__call_active_iq(self):
response = mock.Mock()
response.content = "fake_response"
response.status_code = "fake_code"
mock_post = mock.Mock(return_value=response)
mock__get_session_method = self.mock_object(
self.netapp_aiq_weigher, '_get_session_method',
mock.Mock(return_value=mock_post))
fake_url = "fake_url"
fake_path = "/fake_path"
mock__get_url = self.mock_object(
self.netapp_aiq_weigher, '_get_url',
mock.Mock(return_value=fake_url))
self.netapp_aiq_weigher._call_active_iq(fake_path, "post",
body="fake_body")
mock_post.assert_called_once_with(fake_url + fake_path,
json="fake_body")
self.assertTrue(netapp_aiq.LOG.debug.called)
mock__get_session_method.assert_called_once_with("post")
mock__get_url.assert_called_once_with()
@ddt.data({}, jsonutils.dumps(
fakes.FAKE_ACTIVE_IQ_WEIGHER_AGGREGATES_RESPONSE))
def test__get_resource_keys(self, api_res):
mock__call_active_iq = self.mock_object(
self.netapp_aiq_weigher, '_call_active_iq',
mock.Mock(return_value=(200, api_res)))
res = self.netapp_aiq_weigher._get_resource_keys(
fakes.FAKE_ACTIVE_IQ_WEIGHER_LIST)
if api_res:
self.assertEqual(['fake_key_1', 'fake_key_2', 'fake_key_3'], res)
else:
self.assertEqual([0, 0, 0], res)
mock__call_active_iq.assert_called_once_with(
'datacenter/storage/aggregates', 'get')
@ddt.data(mock.Mock(side_effect=exception.NotFound),
mock.Mock(return_value=(400, "fake_res")))
def test__get_resource_keys_error(self, mock_cal):
self.mock_object(
self.netapp_aiq_weigher, '_call_active_iq', mock_cal)
res = self.netapp_aiq_weigher._get_resource_keys(
fakes.FAKE_ACTIVE_IQ_WEIGHER_LIST)
self.assertEqual([], res)
self.assertTrue(netapp_aiq.LOG.error.called)
@ddt.data([], jsonutils.dumps(
fakes.FAKE_ACTIVE_IQ_WEIGHER_BALANCE_RESPONSE))
def test__balance_aggregates(self, api_res):
mock__call_active_iq = self.mock_object(
self.netapp_aiq_weigher, '_call_active_iq',
mock.Mock(return_value=(200, api_res)))
res = self.netapp_aiq_weigher._balance_aggregates(
['fake_key_1', 'fake_key_2', 0, 'fake_key_3'], 10, 'fake_uuid')
if not api_res:
self.assertEqual([0.0, 0.0, 0.0, 0.0], res)
else:
self.assertEqual([10.0, 20.0, 0.0, 0.0], res)
fake_body = {
"capacity": '10GB',
"eval_method": 1,
"opt_method": 0,
"priority_order": ['ops'],
"separate_flag": False,
"resource_keys": ['fake_key_1', 'fake_key_2', 'fake_key_3'],
"ssl_key": 'fake_uuid'
}
mock__call_active_iq.assert_called_once_with(
'storage-provider/data-placement/balance', 'post', body=fake_body)
@ddt.data(mock.Mock(side_effect=exception.NotFound),
mock.Mock(return_value=(400, "fake_res")))
def test__balance_aggregates_error(self, mock_cal):
self.mock_object(
self.netapp_aiq_weigher, '_call_active_iq', mock_cal)
res = self.netapp_aiq_weigher._balance_aggregates(
['fake_key_1', 'fake_key_2', 0, 'fake_key_3'], 10, 'fake_uuid')
self.assertEqual([], res)
self.assertTrue(netapp_aiq.LOG.error.called)
@mock.patch('manila.db.api.IMPL.service_get_all_by_topic')
def _get_all_hosts(self, _mock_service_get_all_by_topic, disabled=False):
ctxt = context.get_admin_context()
fakes.mock_host_manager_db_calls(_mock_service_get_all_by_topic,
disabled=disabled)
host_states = self.host_manager.get_all_host_states_share(ctxt)
_mock_service_get_all_by_topic.assert_called_once_with(
ctxt, CONF.share_topic)
return host_states
def test_weigh_objects_netapp_only(self):
self.host_manager = fakes.FakeHostManagerNetAppOnly()
hosts = self._get_all_hosts() # pylint: disable=no-value-for-parameter
weight_properties = "fake_properties"
mock_weigh_active_iq = self.mock_object(
netapp_aiq.NetAppAIQWeigher, '_weigh_active_iq',
# third host wins
mock.Mock(return_value=[0.0, 0.0, 10.0, 0.0, 0.0, 0.0]))
weighed_host = self.weight_handler.get_weighed_objects(
[netapp_aiq.NetAppAIQWeigher],
hosts,
weight_properties)[0]
mock_weigh_active_iq.assert_called()
self.assertEqual(1.0, weighed_host.weight)
self.assertEqual(
'host3', utils.extract_host(weighed_host.obj.host))
def test_weigh_objects_non_netapp_backends(self):
self.host_manager = fakes.FakeHostManager()
hosts = self._get_all_hosts() # pylint: disable=no-value-for-parameter
weight_properties = "fake_properties"
mock_weigh_active_iq = self.mock_object(
netapp_aiq.NetAppAIQWeigher, '_weigh_active_iq')
weighed_host = self.weight_handler.get_weighed_objects(
[netapp_aiq.NetAppAIQWeigher],
hosts,
weight_properties)[0]
mock_weigh_active_iq.assert_not_called()
self.assertEqual(0.0, weighed_host.weight)
self.assertEqual(
'host1', utils.extract_host(weighed_host.obj.host))

View File

@ -502,6 +502,9 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
mock_get_flexgroup_space = self.mock_object(
self.library, '_get_flexgroup_pool_space',
mock.Mock(return_value=(fake_total, fake_free, fake_used)))
mock_get_cluster_name = self.mock_object(
self.library._client, 'get_cluster_name',
mock.Mock(return_value='fake_cluster_name'))
self.library._cache_pool_status = na_utils.DataCache(60)
self.library._have_cluster_creds = True
@ -520,6 +523,7 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
mock_get_flexgroup_space.assert_has_calls([
mock.call(fake.AGGREGATE_CAPACITIES,
fake.FLEXGROUP_POOL_OPT[fake.FLEXGROUP_POOL_NAME])])
mock_get_cluster_name.assert_called_once_with()
mock_get_pool.assert_has_calls([
mock.call(fake.AGGREGATES[0], fake_total, fake_free, fake_used),
mock.call(fake.AGGREGATES[1], fake_total, fake_free, fake_used),
@ -547,6 +551,7 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
fake_pool = copy.deepcopy(fake.POOLS[0])
fake_pool['filter_function'] = None
fake_pool['goodness_function'] = None
fake_pool['netapp_cluster_name'] = ''
self.library._have_cluster_creds = True
self.library._revert_to_snapshot_support = True
self.library._cluster_info = fake.CLUSTER_INFO

View File

@ -956,6 +956,7 @@ FLEXGROUP_POOL = {
'qos': True,
'security_service_update_support': True,
'netapp_flexgroup': True,
'netapp_cluster_name': 'fake_cluster_name',
}
FLEXGROUP_AGGR_SET = set(FLEXGROUP_POOL_OPT[FLEXGROUP_POOL_NAME])
@ -1038,6 +1039,7 @@ POOLS = [
'security_service_update_support': True,
'share_server_multiple_subnet_support': True,
'netapp_flexgroup': False,
'netapp_cluster_name': 'fake_cluster_name',
},
{
'pool_name': AGGREGATES[1],
@ -1066,6 +1068,7 @@ POOLS = [
'security_service_update_support': True,
'share_server_multiple_subnet_support': True,
'netapp_flexgroup': False,
'netapp_cluster_name': 'fake_cluster_name',
},
]
@ -1074,6 +1077,7 @@ POOLS_VSERVER_CREDS = [
'pool_name': AGGREGATES[0],
'filter_function': None,
'goodness_function': None,
'netapp_cluster_name': '',
'netapp_aggregate': AGGREGATES[0],
'total_capacity_gb': 'unknown',
'free_capacity_gb': 1.1,

View File

@ -0,0 +1,10 @@
---
features:
- |
Added the ``NetAppAIQWeigher`` scheduler weigher that relies on the
`NetApp Active IQ <https://www.netapp.com/services/support/active-iq/>`_
to weigh the hosts. It only works with NetApp backends. When other
backends exist, the weigher is skipped. Added a new NetApp specific
pool information called ``netapp_cluster_name`` that contains the name
of the cluster where the pool is located, it can be set by a new
NetApp configuration option.

View File

@ -65,6 +65,7 @@ manila.scheduler.weighers =
GoodnessWeigher = manila.scheduler.weighers.goodness:GoodnessWeigher
PoolWeigher = manila.scheduler.weighers.pool:PoolWeigher
HostAffinityWeigher = manila.scheduler.weighers.host_affinity:HostAffinityWeigher
NetAppAIQWeigher = manila.scheduler.weighers.netapp_aiq:NetAppAIQWeigher
oslo.config.opts =
manila = manila.opts:list_opts