
1790 lines
65 KiB

# Copyright (c) 2022 NetApp, Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import math
from time import time
from oslo_log import log as logging
from oslo_utils import excutils
import six
from cinder import exception
from cinder.i18n import _
from cinder import utils
from cinder.volume.drivers.netapp.dataontap.client import api as netapp_api
from cinder.volume.drivers.netapp.dataontap.client import client_cmode
from cinder.volume.drivers.netapp import utils as na_utils
from cinder.volume import volume_utils
LOG = logging.getLogger(__name__)
ONTAP_C190 = 'C190'
DELETED_PREFIX = 'deleted_cinder_'
# Keys in this map are REST API's endpoints that the user shall have permission
# in order to enable extra specs reported to Cinder's scheduler.
# NOTE(sfernand): ONTAP does not retrieve volume efficiency information
# properly when using the pre-created "vsadmin" role (SVM scoped), causing
# dedup and compression extra specs to be reported as disabled despite its
# current configuration.
'/storage/aggregates': [
'/storage/disks': [
'/snapmirror/relationships': [
'/storage/volumes': [
class RestClient(object):
def __init__(self, **kwargs):
host = kwargs['hostname']
username = kwargs['username']
password = kwargs['password']
api_trace_pattern = kwargs['api_trace_pattern']
self.connection = netapp_api.RestNaServer(
self.async_rest_timeout = kwargs.get('async_rest_timeout', 60)
self.vserver = kwargs.get('vserver')
ontap_version = self.get_ontap_version(cached=False)
if ontap_version < (9, 11, 1):
msg = _('REST Client can be used only with ONTAP 9.11.1 or upper.')
raise na_utils.NetAppDriverException(msg)
self.ssh_client = self._init_ssh_client(host, username, password)
# NOTE(nahimsouza): ZAPI Client is needed to implement the fallback
# when a REST method is not supported.
self.zapi_client = client_cmode.Client(**kwargs)
def _init_ssh_client(self, host, username, password):
return netapp_api.SSHUtil(
def _init_features(self):
self.features = na_utils.Features()
generation, major, minor = self.get_ontap_version()
ontap_version = (generation, major)
ontap_9_0 = ontap_version >= (9, 0)
ontap_9_4 = ontap_version >= (9, 4)
ontap_9_5 = ontap_version >= (9, 5)
ontap_9_6 = ontap_version >= (9, 6)
ontap_9_8 = ontap_version >= (9, 8)
ontap_9_9 = ontap_version >= (9, 9)
nodes_info = self._get_cluster_nodes_info()
for node in nodes_info:
qos_min_block = False
qos_min_nfs = False
if node['model'] == ONTAP_SELECT_MODEL:
qos_min_block = node['is_all_flash_select'] and ontap_9_6
qos_min_nfs = qos_min_block
elif ONTAP_C190 in node['model']:
qos_min_block = node['is_all_flash'] and ontap_9_6
qos_min_nfs = qos_min_block
qos_min_block = node['is_all_flash'] and ontap_9_0
qos_min_nfs = node['is_all_flash'] and ontap_9_0
qos_name = na_utils.qos_min_feature_name(True, node['name'])
self.features.add_feature(qos_name, supported=qos_min_nfs)
qos_name = na_utils.qos_min_feature_name(False, node['name'])
self.features.add_feature(qos_name, supported=qos_min_block)
self.features.add_feature('SNAPMIRROR_V2', supported=ontap_9_0)
self.features.add_feature('SYSTEM_METRICS', supported=ontap_9_0)
self.features.add_feature('CLONE_SPLIT_STATUS', supported=ontap_9_0)
self.features.add_feature('FAST_CLONE_DELETE', supported=ontap_9_0)
self.features.add_feature('BACKUP_CLONE_PARAM', supported=ontap_9_0)
self.features.add_feature('CLUSTER_PEER_POLICY', supported=ontap_9_0)
self.features.add_feature('FLEXVOL_ENCRYPTION', supported=ontap_9_0)
self.features.add_feature('FLEXGROUP', supported=ontap_9_8)
self.features.add_feature('ADAPTIVE_QOS', supported=ontap_9_4)
supported=ontap_9_5)'ONTAP Version: %(generation)s.%(major)s.%(minor)s',
{'generation': ontap_version[0], 'major': ontap_version[1],
'minor': minor})
def __getattr__(self, name):
"""If method is not implemented for REST, try to call the ZAPI."""
LOG.debug("The %s call is not supported for REST, falling back to "
"ZAPI.", name)
# Don't use self.zapi_client to avoid reentrant call to __getattr__()
zapi_client = object.__getattribute__(self, 'zapi_client')
return getattr(zapi_client, name)
def _wait_job_result(self, job_url):
"""Waits for a job to finish."""
interval = 2
retries = (self.async_rest_timeout / interval)
@utils.retry(netapp_api.NaRetryableError, interval=interval,
retries=retries, backoff_rate=1)
def _waiter():
response = self.send_request(job_url, 'get',
job_state = response.get('state')
if job_state == 'success':
return response
elif job_state == 'failure':
message = response['error']['message']
code = response['error']['code']
raise netapp_api.NaApiError(message=message, code=code)
msg_args = {'job': job_url, 'state': job_state}
LOG.debug("Job %(job)s has not finished: %(state)s", msg_args)
raise netapp_api.NaRetryableError(message='Job is running.')
return _waiter()
except netapp_api.NaRetryableError:
msg = _("Job %s did not reach the expected state. Retries "
"exhausted. Aborting.") % job_url
raise na_utils.NetAppDriverException(msg)
def send_request(self, action_url, method, body=None, query=None,
"""Sends REST request to ONTAP.
:param action_url: action URL for the request
:param method: HTTP method for the request ('get', 'post', 'put',
'delete' or 'patch')
:param body: dict of arguments to be passed as request body
:param query: dict of arguments to be passed as query string
:param enable_tunneling: enable tunneling to the ONTAP host
:param max_page_length: size of the page during pagination
:param wait_on_accepted: if True, wait until the job finishes when
HTTP code 202 (Accepted) is returned
:returns: parsed REST response
response = None
if method == 'get':
response = self.get_records(
action_url, query, enable_tunneling, max_page_length)
code, response = self.connection.invoke_successfully(
action_url, method, body=body, query=query,
if code == HTTP_ACCEPTED and wait_on_accepted:
# get job URL and discard '/api'
job_url = response['job']['_links']['self']['href'][4:]
response = self._wait_job_result(job_url)
return response
def get_records(self, action_url, query=None, enable_tunneling=True,
"""Retrieves ONTAP resources using pagination REST request.
:param action_url: action URL for the request
:param query: dict of arguments to be passed as query string
:param enable_tunneling: enable tunneling to the ONTAP host
:param max_page_length: size of the page during pagination
:returns: dict containing records and num_records
# Initialize query variable if it is None
query = query if query else {}
query['max_records'] = max_page_length
_, response = self.connection.invoke_successfully(
action_url, 'get', query=query,
# NOTE(nahimsouza): if all records are returned in the first call,
# 'next_url' will be None.
next_url = response.get('_links', {}).get('next', {}).get('href')
next_url = next_url[4:] if next_url else None # discard '/api'
# Get remaining pages, saving data into first page
while next_url:
# NOTE(nahimsouza): clean the 'query', because the parameters are
# already included in 'next_url'.
_, next_response = self.connection.invoke_successfully(
next_url, 'get', query=None,
response['num_records'] += next_response.get('num_records', 0)
next_url = (
next_response.get('_links', {}).get('next', {}).get('href'))
next_url = next_url[4:] if next_url else None # discard '/api'
return response
def get_ontap_version(self, cached=True):
"""Gets the ONTAP version as tuple."""
if cached:
return self.connection.get_ontap_version()
query = {
'fields': 'version'
response = self.send_request('/cluster/', 'get', query=query)
version = (response['version']['generation'],
return version
def check_api_permissions(self):
"""Check which APIs that support SSC functionality are available."""
inaccessible_apis = []
invalid_extra_specs = []
for api, extra_specs in SSC_API_MAP.items():
if not self.check_cluster_api(api):
if inaccessible_apis:
if '/storage/volumes' in inaccessible_apis:
msg = _('User not permitted to query Data ONTAP volumes.')
raise exception.VolumeBackendAPIException(data=msg)
LOG.warning('The configured user account does not have '
'sufficient privileges to use all needed '
'APIs. The following extra specs will fail '
'or be ignored: %s.', invalid_extra_specs)
return invalid_extra_specs
def check_cluster_api(self, api):
"""Checks the availability of a cluster API.
Returns True if the specified cluster API exists and may be called by
the current user.
# No need to return any records here since we just want to know if
# the user is allowed to make the request. A "Permission Denied"
# error code is expected in case user does not have the necessary
# permissions.
self.send_request('%s?return_records=false' % api, 'get',
except netapp_api.NaApiError as ex:
# NOTE(nahimsouza): This function only returns false in case user
# is not authorized. If other error is returned, it must be
# handled in the function call that uses the same endpoint.
if ex.code == netapp_api.REST_UNAUTHORIZED:
return False
return True
def _get_cluster_nodes_info(self):
"""Return a list of models of the nodes in the cluster."""
query_args = {'fields': 'model,'
nodes = []
result = self.send_request('/cluster/nodes', 'get',
for record in result['records']:
node = {
'model': record['model'],
'name': record['name'],
except netapp_api.NaApiError as e:
if e.code == netapp_api.REST_UNAUTHORIZED:
LOG.debug('Cluster nodes can only be collected with '
'cluster scoped credentials.')
LOG.exception('Failed to get the cluster nodes.')
return nodes
def list_flexvols(self):
"""Returns the names of the flexvols on the controller."""
query = {
'type': 'rw',
'style': 'flex*', # Match both 'flexvol' and 'flexgroup'
'is_svm_root': 'false',
'error_state.is_inconsistent': 'false',
'state': 'online',
'fields': 'name'
response = self.send_request(
'/storage/volumes/', 'get', query=query)
records = response.get('records', [])
volumes = [volume['name'] for volume in records]
return volumes
def _get_unique_volume(self, records):
"""Get the unique FlexVol or FlexGroup volume from a volume list."""
if len(records) != 1:
msg = _('Could not find unique volume. Volumes found: %(vol)s.')
msg_args = {'vol': records}
raise exception.VolumeBackendAPIException(data=msg % msg_args)
return records[0]
def _get_volume_by_args(self, vol_name=None, vol_path=None,
vserver=None, fields=None):
"""Get info from a single volume according to the args."""
query = {
'type': 'rw',
'style': 'flex*', # Match both 'flexvol' and 'flexgroup'
'is_svm_root': 'false',
'error_state.is_inconsistent': 'false',
'state': 'online',
'fields': 'name,style'
if vol_name:
query['name'] = vol_name
if vol_path:
query['nas.path'] = vol_path
if vserver:
query[''] = vserver
if fields:
query['fields'] = fields
volumes_response = self.send_request(
'/storage/volumes/', 'get', query=query)
records = volumes_response.get('records', [])
volume = self._get_unique_volume(records)
return volume
def get_flexvol(self, flexvol_path=None, flexvol_name=None):
"""Get flexvol attributes needed for the storage service catalog."""
fields = (',name,,nas.path,'
unique_volume = self._get_volume_by_args(
vol_name=flexvol_name, vol_path=flexvol_path, fields=fields)
aggregate = None
if unique_volume['style'] == 'flexvol':
# flexvol has only 1 aggregate
aggregate = unique_volume['aggregates'][0]['name']
aggregate = [aggr["name"]
for aggr in unique_volume.get('aggregates', [])]
qos_policy_group = (
unique_volume.get('qos', {}).get('policy', {}).get('name'))
volume = {
'name': unique_volume['name'],
'vserver': unique_volume['svm']['name'],
'junction-path': unique_volume.get('nas', {}).get('path'),
'aggregate': aggregate,
'type': unique_volume['type'],
'space-guarantee-enabled': unique_volume['guarantee']['honored'],
'space-guarantee': unique_volume['guarantee']['type'],
'size': str(unique_volume['space']['size']),
'qos-policy-group': qos_policy_group,
'snapshot-policy': unique_volume['snapshot_policy']['name'],
'language': unique_volume['language'],
'style-extended': unique_volume['style'],
return volume
def is_flexvol_mirrored(self, flexvol_name, vserver_name):
"""Check if flexvol is a SnapMirror source."""
query = {
'source.path': vserver_name + ':' + flexvol_name,
'state': 'snapmirrored',
'return_records': 'false',
response = self.send_request('/snapmirror/relationships/',
'get', query=query)
return response['num_records'] > 0
except netapp_api.NaApiError:
LOG.exception('Failed to get SnapMirror info for volume %s.',
return False
def is_flexvol_encrypted(self, flexvol_name, vserver_name):
"""Check if a flexvol is encrypted."""
if not self.features.FLEXVOL_ENCRYPTION:
return False
query = {
'encryption.enabled': 'true',
'name': flexvol_name,
'': vserver_name,
'return_records': 'false',
response = self.send_request(
'/storage/volumes/', 'get', query=query)
return response['num_records'] > 0
except netapp_api.NaApiError:
LOG.exception('Failed to get Encryption info for volume %s.',
return False
def get_aggregate_disk_types(self, aggregate_name):
"""Get the disk type(s) of an aggregate."""
disk_types = self._get_aggregate_disk_types(aggregate_name)
return list(disk_types) if disk_types else None
def _get_aggregate_disk_types(self, aggregate_name):
"""Get the disk type(s) of an aggregate"""
disk_types = set()
query = {
'': aggregate_name,
'fields': 'effective_type'
response = self.send_request(
'/storage/disks', 'get', query=query, enable_tunneling=False)
except netapp_api.NaApiError:
LOG.exception('Failed to get disk info for aggregate %s.',
return disk_types
for storage_disk_info in response['records']:
return disk_types
def _get_aggregates(self, aggregate_names=None, fields=None):
query = {}
if aggregate_names:
query['name'] = ','.join(aggregate_names)
if fields:
query['fields'] = fields
response = self.send_request(
'/storage/aggregates', 'get', query=query, enable_tunneling=False)
return response['records']
def get_aggregate(self, aggregate_name):
"""Get aggregate attributes needed for the storage service catalog."""
if not aggregate_name:
return {}
fields = ('name,block_storage.primary.raid_type,'
aggrs = self._get_aggregates(aggregate_names=[aggregate_name],
except netapp_api.NaApiError:
LOG.exception('Failed to get info for aggregate %s.',
return {}
if len(aggrs) < 1:
return {}
aggr_attributes = aggrs[0]
aggregate = {
'name': aggr_attributes['name'],
aggr_attributes['block_storage']['storage_type'] == 'hybrid',
'node-name': aggr_attributes['home_node']['name'],
return aggregate
def is_qos_min_supported(self, is_nfs, node_name):
"""Check if the node supports QoS minimum."""
if node_name is None:
# whether no access to node name (SVM account or error), the QoS
# min support is dropped.
return False
qos_min_name = na_utils.qos_min_feature_name(is_nfs, node_name)
return getattr(self.features, qos_min_name, False).__bool__()
def get_flexvol_dedupe_info(self, flexvol_name):
"""Get dedupe attributes needed for the storage service catalog."""
query = {
'efficiency.volume_path': '/vol/%s' % flexvol_name,
'fields': 'efficiency.state,efficiency.compression'
# Set default values for the case there is no response.
no_dedupe_response = {
'compression': False,
'dedupe': False,
'logical-data-size': 0,
'logical-data-limit': 1,
response = self.send_request('/storage/volumes',
'get', query=query)
except netapp_api.NaApiError:
LOG.exception('Failed to get dedupe info for volume %s.',
return no_dedupe_response
if response["num_records"] != 1:
return no_dedupe_response
state = response["records"][0]["efficiency"]["state"]
compression = response["records"][0]["efficiency"]["compression"]
# TODO(nahimsouza): as soon as REST API supports the fields
# 'logical-data-size and 'logical-data-limit', we should include
# them in the query and set them correctly.
# NOTE(nahimsouza): these fields are only used by the client function
# `get_flexvol_dedupe_used_percent`, since the function is not
# implemented on REST yet, the below hard-coded fields are not
# affecting the driver in anyway.
logical_data_size = 0
logical_data_limit = 1
dedupe_info = {
'compression': False if compression == "none" else True,
'dedupe': False if state == "disabled" else True,
'logical-data-size': logical_data_size,
'logical-data-limit': logical_data_limit,
return dedupe_info
def get_lun_list(self):
"""Gets the list of LUNs on filer.
Gets the LUNs from cluster with vserver.
query = {
'': self.vserver,
'fields': ',,space.size,'
response = self.send_request(
'/storage/luns/', 'get', query=query)
if response['num_records'] == '0':
return []
lun_list = []
for lun in response['records']:
lun_info = {}
lun_info['Vserver'] = lun['svm']['name']
lun_info['Volume'] = lun['location']['volume']['name']
lun_info['Size'] = lun['space']['size']
lun_info['Qtree'] = \
lun['location'].get('qtree', {}).get('name', '')
lun_info['Path'] = lun['name']
lun_info['OsType'] = lun['os_type']
lun_info['SpaceReserved'] = lun['space']['guarantee']['requested']
lun_info['UUID'] = lun['uuid']
return lun_list
def get_lun_by_args(self, **lun_info_args):
"""Retrieves LUN with specified args."""
query = {
'fields': ',,space.size,'
if lun_info_args:
if 'vserver' in lun_info_args:
query[''] = lun_info_args['vserver']
if 'path' in lun_info_args:
query['name'] = lun_info_args['path']
if 'uuid' in lun_info_args:
query['uuid'] = lun_info_args['uuid']
response = self.send_request(
'/storage/luns/', 'get', query=query)
if response['num_records'] == '0':
return []
lun_list = []
for lun in response['records']:
lun_info = {}
lun_info['Vserver'] = lun['svm']['name']
lun_info['Volume'] = lun['location']['volume']['name']
lun_info['Size'] = lun['space']['size']
lun_info['Qtree'] = \
lun['location'].get('qtree', {}).get('name', '')
lun_info['Path'] = lun['name']
lun_info['OsType'] = lun['os_type']
lun_info['SpaceReserved'] = lun['space']['guarantee']['requested']
lun_info['UUID'] = lun['uuid']
# NOTE(nahimsouza): Currently, ONTAP REST API does not have the
# 'block-size' in the response. By default, we are setting its
# value to 512, since traditional block size advertised by hard
# disks is 512 bytes.
lun_info['BlockSize'] = 512
return lun_list
def get_lun_sizes_by_volume(self, volume_name):
""""Gets the list of LUNs and their sizes from a given volume name"""
query = {
'': volume_name,
'fields': 'space.size,name'
response = self.send_request('/storage/luns/', 'get', query=query)
if response['num_records'] == '0':
return []
luns = []
for lun_info in response['records']:
'path': lun_info.get('name', ''),
'size': float(lun_info.get('space', {}).get('size', 0))
return luns
def get_file_sizes_by_dir(self, dir_path):
"""Gets the list of files and their sizes from a given directory."""
# 'dir_path' will always be a FlexVol name
volume = self._get_volume_by_args(vol_name=dir_path)
query = {
'type': 'file',
'fields': 'size,name'
vol_uuid = volume['uuid']
response = self.send_request(
'get', query=query)
except netapp_api.NaApiError as e:
if e.code == netapp_api.REST_NO_SUCH_FILE:
return []
raise e
files = []
for file_info in response['records']:
'name': file_info.get('name', ''),
'file-size': float(file_info.get('size', 0))
return files
def get_volume_state(self, junction_path=None, name=None):
"""Returns volume state for a given name or junction path."""
query_args = {}
if name:
query_args['name'] = name
if junction_path:
query_args['nas.path'] = junction_path
query_args['fields'] = 'state'
response = self.send_request('/storage/volumes/',
'get', query=query_args)
records = response.get('records', [])
unique_volume = self._get_unique_volume(records)
except exception.VolumeBackendAPIException:
return None
return unique_volume['state']
def delete_snapshot(self, volume_name, snapshot_name):
"""Deletes a volume snapshot."""
volume = self._get_volume_by_args(vol_name=volume_name)
f'?name={snapshot_name}', 'delete')
def get_operational_lif_addresses(self):
"""Gets the IP addresses of operational LIFs on the vserver."""
query = {
'state': 'up',
'fields': 'ip.address',
response = self.send_request(
'/network/ip/interfaces/', 'get', query=query)
return [lif_info['ip']['address']
for lif_info in response['records']]
def _list_vservers(self):
"""Get the names of vservers present"""
query = {
'fields': 'name',
response = self.send_request('/svm/svms', 'get', query=query,
return [svm['name'] for svm in response.get('records', [])]
def _get_ems_log_destination_vserver(self):
"""Returns the best vserver destination for EMS messages."""
# NOTE(nahimsouza): Differently from ZAPI, only 'data' SVMs can be
# managed by the SVM REST APIs - that's why the vserver type is not
# specified.
vservers = self._list_vservers()
if vservers:
return vservers[0]
raise exception.NotFound("No Vserver found to receive EMS messages.")
def send_ems_log_message(self, message_dict):
"""Sends a message to the Data ONTAP EMS log."""
body = {
'computer_name': message_dict['computer-name'],
'event_source': message_dict['event-source'],
'app_version': message_dict['app-version'],
'category': message_dict['category'],
'severity': 'notice',
'autosupport_required': message_dict['auto-support'] == 'true',
'event_id': message_dict['event-id'],
'event_description': message_dict['event-description'],
bkp_connection = copy.copy(self.connection)
bkp_timeout = self.connection.get_timeout()
bkp_vserver = self.vserver
# TODO(nahimsouza): Vserver is being set to replicate the ZAPI
# behavior, but need to check if this could be removed in REST API
'post', body=body)
LOG.debug('EMS executed successfully.')
except netapp_api.NaApiError as e:
LOG.warning('Failed to invoke EMS. %s', e)
# Restores the data
timeout = (
bkp_timeout if bkp_timeout is not None else DEFAULT_TIMEOUT)
self.connection = copy.copy(bkp_connection)
def get_performance_counter_info(self, object_name, counter_name):
"""Gets info about one or more Data ONTAP performance counters."""
# NOTE(nahimsouza): This conversion is nedeed because different names
# are used in ZAPI and we want to avoid changes in the driver for now.
rest_counter_names = {
'domain_busy': 'domain_busy_percent',
'processor_elapsed_time': 'elapsed_time',
'avg_processor_busy': 'average_processor_busy_percent',
rest_counter_name = counter_name
if counter_name in rest_counter_names:
rest_counter_name = rest_counter_names[counter_name]
# Get counter table info
query = {
'': rest_counter_name,
'fields': 'counter_schemas.*'
table = self.send_request(
'get', query=query, enable_tunneling=False)
name = counter_name # use the original name (ZAPI compatible)
base_counter = table['counter_schemas'][0]['denominator']['name']
query = {
'': rest_counter_name,
'fields': 'counters.*'
response = self.send_request(
'get', query=query, enable_tunneling=False)
table_rows = response.get('records', [])
labels = []
if len(table_rows) != 0:
labels = table_rows[0]['counters'][0].get('labels', [])
# NOTE(nahimsouza): Values have a different format on REST API
# and we want to keep compatibility with ZAPI for a while
if object_name == 'wafl' and counter_name == 'cp_phase_times':
# discard the prefix 'cp_'
labels = [label[3:] for label in labels]
return {
'name': name,
'labels': labels,
'base-counter': base_counter,
except netapp_api.NaApiError:
raise exception.NotFound(_('Counter %s not found') % counter_name)
def get_performance_instance_uuids(self, object_name, node_name):
"""Get UUIDs of performance instances for a cluster node."""
query = {
'id': node_name + ':*',
response = self.send_request(
'get', query=query, enable_tunneling=False)
records = response.get('records', [])
uuids = []
for record in records:
return uuids
def get_performance_counters(self, object_name, instance_uuids,
"""Gets more cDOT performance counters."""
# NOTE(nahimsouza): This conversion is nedeed because different names
# are used in ZAPI and we want to avoid changes in the driver for now.
rest_counter_names = {
'domain_busy': 'domain_busy_percent',
'processor_elapsed_time': 'elapsed_time',
'avg_processor_busy': 'average_processor_busy_percent',
zapi_counter_names = {
'domain_busy_percent': 'domain_busy',
'elapsed_time': 'processor_elapsed_time',
'average_processor_busy_percent': 'avg_processor_busy',
for i in range(len(counter_names)):
if counter_names[i] in rest_counter_names:
counter_names[i] = rest_counter_names[counter_names[i]]
query = {
'id': '|'.join(instance_uuids),
'': '|'.join(counter_names),
'fields': 'id,,counters.*',
response = self.send_request(
'get', query=query, enable_tunneling=False)
counter_data = []
for record in response.get('records', []):
for counter in record['counters']:
counter_name = counter['name']
# Reverts the name conversion
if counter_name in zapi_counter_names:
counter_name = zapi_counter_names[counter_name]
counter_value = ''
if counter.get('value'):
counter_value = counter.get('value')
elif counter.get('values'):
# NOTE(nahimsouza): Conversion made to keep compatibility
# with old ZAPI format
values = counter.get('values')
counter_value = ','.join([str(v) for v in values])
'instance-name': record['counter_table']['name'],
'instance-uuid': record['id'],
'node-name': record['id'].split(':')[0],
'timestamp': int(time()),
counter_name: counter_value,
return counter_data
def get_aggregate_capacities(self, aggregate_names):
"""Gets capacity info for multiple aggregates."""
if not isinstance(aggregate_names, list):
return {}
aggregates = {}
for aggregate_name in aggregate_names:
aggregates[aggregate_name] = self._get_aggregate_capacity(
return aggregates
def _get_aggregate_capacity(self, aggregate_name):
"""Gets capacity info for an aggregate."""
fields = ('space.block_storage.available,space.block_storage.size,'
aggrs = self._get_aggregates(aggregate_names=[aggregate_name],
result = {}
if len(aggrs) > 0:
aggr = aggrs[0]
available = float(aggr['space']['block_storage']['available'])
total = float(aggr['space']['block_storage']['size'])
used = float(aggr['space']['block_storage']['used'])
percent_used = int((used * 100) // total)
result = {
'percent-used': percent_used,
'size-available': available,
'size-total': total,
return result
except netapp_api.NaApiError as e:
if (e.code == netapp_api.REST_API_NOT_FOUND or
e.code == netapp_api.REST_UNAUTHORIZED):
LOG.debug('Aggregate capacity can only be collected with '
'cluster scoped credentials.')
LOG.exception('Failed to get info for aggregate %s.',
return {}
def get_node_for_aggregate(self, aggregate_name):
"""Get home node for the specified aggregate.
This API could return None, most notably if it was sent
to a Vserver LIF, so the caller must be able to handle that case.
if not aggregate_name:
return None
fields = ''
aggrs = self._get_aggregates(aggregate_names=[aggregate_name],
node = None
if len(aggrs) > 0:
aggr = aggrs[0]
node = aggr['home_node']['name']
return node
except netapp_api.NaApiError as e:
if e.code == netapp_api.REST_API_NOT_FOUND:
return None
raise e
def provision_qos_policy_group(self, qos_policy_group_info,
"""Create QoS policy group on the backend if appropriate."""
if qos_policy_group_info is None:
# Legacy QoS uses externally provisioned QoS policy group,
# so we don't need to create one on the backend.
legacy = qos_policy_group_info.get('legacy')
if legacy:
spec = qos_policy_group_info.get('spec')
if not spec:
is_adaptive = na_utils.is_qos_policy_group_spec_adaptive(
self._validate_qos_policy_group(is_adaptive, spec=spec,
qos_policy_group = self._get_qos_first_policy_group_by_name(
if not qos_policy_group:
self._create_qos_policy_group(spec, is_adaptive)
self._modify_qos_policy_group(spec, is_adaptive,
def _get_qos_first_policy_group_by_name(self, qos_policy_group_name):
records = self._get_qos_policy_group_by_name(qos_policy_group_name)
if len(records) == 0:
return None
return records[0]
def _get_qos_policy_group_by_name(self, qos_policy_group_name):
query = {'name': qos_policy_group_name}
response = self.send_request('/storage/qos/policies/',
'get', query=query)
records = response.get('records')
if not records:
return []
return records
def _qos_spec_to_api_args(self, spec, is_adaptive, vserver=None):
"""Convert a QoS spec to REST args."""
rest_args = {}
if is_adaptive:
rest_args['adaptive'] = {}
if spec.get('absolute_min_iops'):
rest_args['adaptive']['absolute_min_iops'] = (
if spec.get('expected_iops'):
rest_args['adaptive']['expected_iops'] = (
if spec.get('expected_iops_allocation'):
rest_args['adaptive']['expected_iops_allocation'] = (
if spec.get('peak_iops'):
rest_args['adaptive']['peak_iops'] = (
if spec.get('peak_iops_allocation'):
rest_args['adaptive']['peak_iops_allocation'] = (
if spec.get('block_size'):
rest_args['adaptive']['block_size'] = (
rest_args['fixed'] = {}
qos_max = spec.get('max_throughput')
if qos_max and 'iops' in qos_max:
rest_args['fixed']['max_throughput_iops'] = (
elif qos_max:
# Convert from B/s to MB/s
value = math.ceil(
self._sanitize_qos_spec_value(qos_max) / (10**6))
rest_args['fixed']['max_throughput_mbps'] = value
qos_min = spec.get('min_throughput')
if qos_min and 'iops' in qos_min:
rest_args['fixed']['min_throughput_iops'] = (
if spec.get('policy_name'):
rest_args['name'] = spec.get('policy_name')
if spec.get('return_record'):
rest_args['return_records'] = spec.get('return_record')
if vserver:
rest_args['svm'] = {}
rest_args['svm']['name'] = vserver
return rest_args
def _sanitize_qos_spec_value(self, value):
value = value.lower()
value = value.replace('iops', '').replace('b/s', '')
value = int(value)
return value
def _create_qos_policy_group(self, spec, is_adaptive):
"""Creates a QoS policy group."""
body = self._qos_spec_to_api_args(
spec, is_adaptive, vserver=self.vserver)
self.send_request('/storage/qos/policies/', 'post', body=body,
def _modify_qos_policy_group(self, spec, is_adaptive, qos_policy_group):
"""Modifies a QoS policy group."""
body = self._qos_spec_to_api_args(spec, is_adaptive)
if qos_policy_group['name'] == body['name']:
f'/storage/qos/policies/{qos_policy_group["uuid"]}', 'patch',
body=body, enable_tunneling=False)
def get_vol_by_junc_vserver(self, vserver, junction):
"""Gets the volume by junction path and vserver."""
volume = self._get_volume_by_args(vol_path=junction, vserver=vserver)
return volume['name']
def file_assign_qos(self, flex_vol, qos_policy_group_name,
qos_policy_group_is_adaptive, file_path):
"""Assigns the named QoS policy-group to a file."""
volume = self._get_volume_by_args(flex_vol)
body = {
'': qos_policy_group_name
'patch', body=body, enable_tunneling=False)
def mark_qos_policy_group_for_deletion(self, qos_policy_group_info,
"""Soft delete a QoS policy group backing a cinder volume."""
if qos_policy_group_info is None:
spec = qos_policy_group_info.get('spec')
# For cDOT we want to delete the QoS policy group that we created for
# this cinder volume. Because the QoS policy may still be "in use"
# after the zapi call to delete the volume itself returns successfully,
# we instead rename the QoS policy group using a specific pattern and
# later attempt on a best effort basis to delete any QoS policy groups
# matching that pattern.
if spec:
current_name = spec['policy_name']
new_name = DELETED_PREFIX + current_name
self._rename_qos_policy_group(current_name, new_name)
except netapp_api.NaApiError as ex:
LOG.warning('Rename failure in cleanup of cDOT QoS policy '
'group %(current_name)s: %(ex)s',
{'current_name': current_name, 'ex': ex})
# Attempt to delete any QoS policies named "delete-openstack-*".
def delete_file(self, path_to_file):
"""Delete file at path."""
LOG.debug('Deleting file: %s', path_to_file)
volume_name = path_to_file.split('/')[2]
relative_path = '/'.join(path_to_file.split('/')[3:])
volume = self._get_volume_by_args(volume_name)
# Path requires "%2E" to represent "." and "%2F" to represent "/".
relative_path = relative_path.replace('.', '%2E').replace('/', '%2F')
+ f'/files/{relative_path}', 'delete')
def _rename_qos_policy_group(self, qos_policy_group_name, new_name):
"""Renames a QoS policy group."""
body = {'name': new_name}
query = {'name': qos_policy_group_name}
self.send_request('/storage/qos/policies/', 'patch', body=body,
query=query, enable_tunneling=False)
def remove_unused_qos_policy_groups(self):
"""Deletes all QoS policy groups that are marked for deletion."""
query = {'name': f'{DELETED_PREFIX}*'}
self.send_request('/storage/qos/policies', 'delete', query=query)
def create_lun(self, volume_name, lun_name, size, metadata,
"""Issues API request for creating LUN on volume."""
path = f'/vol/{volume_name}/{lun_name}'
space_reservation = metadata['SpaceReserved']
initial_size = size
body = {
'name': path,
'space.size': str(initial_size),
'os_type': metadata['OsType'],
'space.guarantee.requested': space_reservation
if qos_policy_group_name:
body[''] = qos_policy_group_name
self.send_request('/storage/luns', 'post', body=body)
except netapp_api.NaApiError as ex:
with excutils.save_and_reraise_exception():
LOG.error('Error provisioning volume %(lun_name)s on '
'%(volume_name)s. Details: %(ex)s',
'lun_name': lun_name,
'volume_name': volume_name,
'ex': ex,
def do_direct_resize(self, path, new_size_bytes, force=True):
"""Resize the LUN."""
seg = path.split("/")'Resizing LUN %s directly to new size.', seg[-1])
body = {'name': path, 'space.size': new_size_bytes}
self._lun_update_by_path(path, body)
def _get_lun_by_path(self, path):
query = {'name': path}
response = self.send_request('/storage/luns', 'get', query=query)
records = response.get('records', [])
return records
def _get_first_lun_by_path(self, path):
records = self._get_lun_by_path(path)
if len(records) == 0:
return None
return records[0]
def _lun_update_by_path(self, path, body):
"""Update the LUN."""
lun = self._get_first_lun_by_path(path)
if not lun:
raise netapp_api.NaApiError(code=netapp_api.EOBJECTNOTFOUND)
self.send_request(f'/storage/luns/{lun["uuid"]}', 'patch', body=body)
def _validate_qos_policy_group(self, is_adaptive, spec=None,
if is_adaptive and not self.features.ADAPTIVE_QOS:
msg = _("Adaptive QoS feature requires ONTAP 9.4 or later.")
raise na_utils.NetAppDriverException(msg)
if not spec:
if 'min_throughput' in spec and not qos_min_support:
msg = 'min_throughput is not supported by this back end.'
raise na_utils.NetAppDriverException(msg)
def get_if_info_by_ip(self, ip):
"""Gets the network interface info by ip."""
query_args = {}
query_args['ip.address'] = volume_utils.resolve_hostname(ip)
query_args['fields'] = 'svm'
result = self.send_request('/network/ip/interfaces/', 'get',
query=query_args, enable_tunneling=False)
num_records = result['num_records']
records = result.get('records', [])
if num_records == 0:
raise exception.NotFound(
_('No interface found on cluster for ip %s') % ip)
return [{'vserver': item['svm']['name']} for item in records]
def get_igroup_by_initiators(self, initiator_list):
"""Get igroups exactly matching a set of initiators."""
igroup_list = []
if not initiator_list:
return igroup_list
query = {
'': self.vserver,
'': ' '.join(initiator_list),
'fields': 'name,protocol,os_type'
response = self.send_request('/protocols/san/igroups',
'get', query=query)
records = response.get('records', [])
for igroup_item in records:
igroup = {'initiator-group-os-type': igroup_item['os_type'],
'initiator-group-type': igroup_item['protocol'],
'initiator-group-name': igroup_item['name']}
return igroup_list
def add_igroup_initiator(self, igroup, initiator):
"""Adds initiators to the specified igroup."""
query_initiator_uuid = {
'name': igroup,
'fields': 'uuid'
response_initiator_uuid = self.send_request(
'/protocols/san/igroups/', 'get', query=query_initiator_uuid)
response = response_initiator_uuid.get('records', [])
if len(response) < 1:
msg = _('Could not find igroup initiator.')
raise exception.VolumeBackendAPIException(data=msg)
igroup_uuid = response[0]['uuid']
body = {
'name': initiator
self.send_request('/protocols/san/igroups/' +
igroup_uuid + '/initiators',
'post', body=body)
def create_igroup(self, igroup, igroup_type='iscsi', os_type='default'):
"""Creates igroup with specified args."""
body = {
'name': igroup,
'protocol': igroup_type,
'os_type': os_type,
self.send_request('/protocols/san/igroups', 'post', body=body)
def map_lun(self, path, igroup_name, lun_id=None):
"""Maps LUN to the initiator and returns LUN id assigned."""
body_post = {
'': path,
'': igroup_name,
if lun_id is not None:
body_post['logical_unit_number'] = lun_id
result = self.send_request('/protocols/san/lun-maps', 'post',
query={'return_records': 'true'})
records = result.get('records')
lun_id_assigned = records[0].get('logical_unit_number')
return lun_id_assigned
except netapp_api.NaApiError as e:
code = e.code
message = e.message
LOG.warning('Error mapping LUN. Code :%(code)s, Message: '
'%(message)s', {'code': code, 'message': message})
def get_lun_map(self, path):
"""Gets the LUN map by LUN path."""
map_list = []
query = {
'': path,
'fields': ',logical_unit_number,',
response = self.send_request('/protocols/san/lun-maps',
num_records = response.get('num_records')
records = response.get('records', None)
if records is None or num_records is None:
return map_list
for element in records:
map_lun = {}
map_lun['initiator-group'] = element['igroup']['name']
map_lun['lun-id'] = element['logical_unit_number']
map_lun['vserver'] = element['svm']['name']
return map_list
def get_fc_target_wwpns(self):
"""Gets the FC target details."""
wwpns = []
query = {
'fields': 'wwpn'
response = self.send_request('/network/fc/interfaces',
'get', query=query)
records = response.get('records')
for record in records:
wwpn = record.get('wwpn').lower()
return wwpns
def unmap_lun(self, path, igroup_name):
"""Unmaps a LUN from given initiator."""
# get lun amd igroup uuids
query_uuid = {
'': igroup_name,
'': path,
'fields': 'lun.uuid,igroup.uuid'
response_uuid = self.send_request(
'/protocols/san/lun-maps', 'get', query=query_uuid)
if response_uuid['num_records'] > 0:
lun_uuid = response_uuid['records'][0]['lun']['uuid']
igroup_uuid = response_uuid['records'][0]['igroup']['uuid']
except netapp_api.NaApiError as e:
LOG.warning("Error unmapping LUN. Code: %(code)s, Message: "
"%(message)s", {'code': e.code,
'message': e.message})
# if the LUN is already unmapped
if e.code == netapp_api.REST_NO_SUCH_LUN_MAP:
raise e
# Input is invalid or LUN may already be unmapped
LOG.warning("Error unmapping LUN. Invalid input.")
def has_luns_mapped_to_initiators(self, initiator_list):
"""Checks whether any LUNs are mapped to the given initiator(s)."""
query = {
'': ' '.join(initiator_list),
'fields': 'lun_maps'
response = self.send_request('/protocols/san/igroups',
'get', query=query)
records = response.get('records', [])
if len(records) > 0:
for record in records:
lun_maps = record.get('lun_maps', [])
if len(lun_maps) > 0:
return True
return False
def get_iscsi_service_details(self):
"""Returns iscsi iqn."""
query = {
'fields': ''
response = self.send_request(
'/protocols/san/iscsi/services', 'get', query=query)
records = response.get('records')
if records:
return records[0]['target']['name']
LOG.debug('No iSCSI service found for vserver %s', self.vserver)
return None
def check_iscsi_initiator_exists(self, iqn):
"""Returns True if initiator exists."""
endpoint_url = '/protocols/san/iscsi/credentials'
initiator_exists = True
query = {
'initiator': iqn,
response = self.send_request(endpoint_url, 'get', query=query)
records = response.get('records')
if not records:
initiator_exists = False
except netapp_api.NaApiError:
initiator_exists = False
return initiator_exists
def set_iscsi_chap_authentication(self, iqn, username, password):
"""Provides NetApp host's CHAP credentials to the backend."""
initiator_exists = self.check_iscsi_initiator_exists(iqn)
command_template = ('iscsi security %(mode)s -vserver %(vserver)s '
'-initiator-name %(iqn)s -auth-type CHAP '
'-user-name %(username)s')
if initiator_exists:
LOG.debug('Updating CHAP authentication for %(iqn)s.',
{'iqn': iqn})
command = command_template % {
'mode': 'modify',
'vserver': self.vserver,
'iqn': iqn,
'username': username,
LOG.debug('Adding initiator %(iqn)s with CHAP authentication.',
{'iqn': iqn})
command = command_template % {
'mode': 'create',
'vserver': self.vserver,
'iqn': iqn,
'username': username,
with self.ssh_client.ssh_connect_semaphore:
ssh_pool = self.ssh_client.ssh_pool
with ssh_pool.item() as ssh:
except Exception as e:
msg = _('Failed to set CHAP authentication for target IQN %(iqn)s.'
' Details: %(ex)s') % {
'iqn': iqn,
'ex': e,
raise exception.VolumeBackendAPIException(data=msg)
def get_iscsi_target_details(self):
"""Gets the iSCSI target portal details."""
query = {
'services': 'data_iscsi',
'fields': 'ip.address,enabled'
response = self.send_request('/network/ip/interfaces',
'get', query=query)
target_list = []
records = response.get('records', [])
for record in records:
details = dict()
details['address'] = record['ip']['address']
details['tpgroup-tag'] = None
details['interface-enabled'] = record['enabled']
# NOTE(nahimsouza): from ONTAP documentation:
# ONTAP does not support changing the port number for iSCSI.
# Port number 3260 is registered as part of the iSCSI specification
# and cannot be used by any other application or service.
details['port'] = 3260
return target_list
def move_lun(self, path, new_path):
"""Moves the LUN at path to new path."""
seg = path.split("/")
new_seg = new_path.split("/")
LOG.debug("Moving LUN %(name)s to %(new_name)s.",
{'name': seg[-1], 'new_name': new_seg[-1]})
query = {
'': self.vserver,
'name': path
body = {
'name': new_path,
self.send_request('/storage/luns/', 'patch', query=query, body=body)
def clone_file(self, flex_vol, src_path, dest_path, vserver,
dest_exists=False, source_snapshot=None, is_snapshot=False):
"""Clones file on vserver."""
LOG.debug('Cloning file - volume %(flex_vol)s, src %(src_path)s, '
'dest %(dest_path)s, vserver %(vserver)s,'
'source_snapshot %(source_snapshot)s',
'flex_vol': flex_vol,
'src_path': src_path,
'dest_path': dest_path,
'vserver': vserver,
'source_snapshot': source_snapshot,
volume = self._get_volume_by_args(flex_vol)
body = {
'volume': {
'uuid': volume['uuid'],
'name': volume['name']
'source_path': src_path,
'destination_path': dest_path,
if is_snapshot and self.features.BACKUP_CLONE_PARAM:
body['is_backup'] = True
if dest_exists:
body['overwrite_destination'] = True
self.send_request('/storage/file/clone', 'post', body=body)
def clone_lun(self, volume, name, new_name, space_reserved='true',
qos_policy_group_name=None, src_block=0, dest_block=0,
block_count=0, source_snapshot=None, is_snapshot=False,
"""Clones lun on vserver."""
LOG.debug('Cloning lun - volume: %(volume)s, name: %(name)s, '
'new_name: %(new_name)s, space_reserved: %(space_reserved)s,'
' qos_policy_group_name: %(qos_policy_group_name)s',
'volume': volume,
'name': name,
'new_name': new_name,
'space_reserved': space_reserved,
'qos_policy_group_name': qos_policy_group_name,
# NOTE(nahimsouza): some parameters are not available on REST API,
# but they are in the header just to keep compatilbility with ZAPI:
# src_block, dest_block, block_count, is_snapshot
source_path = f'/vol/{volume}'
if source_snapshot:
source_path += f'/.snapshot/{source_snapshot}'
source_path += f'/{name}'
body = {
'svm': {
'name': self.vserver
'name': f'/vol/{volume}/{new_name}',
'clone': {
'source': {
'name': source_path,
'space': {
'guarantee': {
'requested': space_reserved == 'true',
if qos_policy_group_name:
body['qos_policy'] = {'name': qos_policy_group_name}
self.send_request('/storage/luns', 'post', body=body)
def destroy_lun(self, path, force=True):
"""Destroys the LUN at the path."""
query = {}
query['name'] = path
query['svm'] = self.vserver
if force:
query['allow_delete_while_mapped'] = 'true'
self.send_request('/storage/luns/', 'delete', query=query)
def get_flexvol_capacity(self, flexvol_path=None, flexvol_name=None):
"""Gets total capacity and free capacity, in bytes, of the flexvol."""
fields = 'name,space.available,space.afs_total'
volume = self._get_volume_by_args(
vol_name=flexvol_name, vol_path=flexvol_path, fields=fields)
capacity = {
'size-total': float(volume['space']['afs_total']),
'size-available': float(volume['space']['available']),
return capacity
except exception.VolumeBackendAPIException:
msg = _('Volume %s not found.')
msg_args = flexvol_path or flexvol_name
raise na_utils.NetAppDriverException(msg % msg_args)