Support for iLO based firmware update

Added support for iLO based firmware update for the supported
devices through ribcl and ris, as applicable. Firmware update
can be initiated as part of manual cleaning step in ironic.

Partial-Bug: #1526216
Change-Id: I1eca5a08d808df1c4bda0e91181a8389d053c379
This commit is contained in:
Debayan Ray 2015-07-20 02:34:46 -07:00
parent 5b76c9bd32
commit e25520af18
16 changed files with 1827 additions and 17 deletions

1
.gitignore vendored
View File

@ -1,6 +1,7 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
.eggs
# C extensions
*.so

View File

@ -127,3 +127,13 @@ class HPSSAOperationError(HPSSAException):
message = ("An error was encountered while doing hpssa configuration: "
"%(reason)s.")
class ImageExtractionFailed(ProliantUtilsException):
message = "Failed to extract image %(image_ref)s, reason: %(reason)s"
def __init__(self, message=None, **kwargs):
if not message:
message = self.message % kwargs
super(ImageExtractionFailed, self).__init__(message)

View File

@ -43,6 +43,7 @@ SUPPORTED_RIS_METHODS = [
'get_server_capabilities',
'set_iscsi_boot_info',
'set_vm_status',
'update_firmware',
'update_persistent_boot',
]
@ -373,3 +374,16 @@ class IloClient(operations.IloOperations):
on the server.
"""
return self._call_method('activate_license', key)
def update_firmware(self, firmware_url, component_type):
"""Updates the given firmware on the server
:param firmware_url: location of the firmware
:param component_type: Type of component to be applied to.
:raises: InvalidInputError, if the validation of the input fails
:raises: IloError, on an error from iLO
:raises: IloCommandNotSupportedError, if the command is
not supported on the server
"""
return self._call_method(
'update_firmware', firmware_url, component_type)

View File

@ -14,31 +14,152 @@
"""Common functionalities used by both RIBCL and RIS."""
import os
import stat
import time
from proliantutils import exception
# Max number of times an operation to be retried
RETRY_COUNT = 10
from proliantutils import log
def wait_for_ilo_after_reset(ilo_object):
"""Checks if iLO is up after reset."""
LOG = log.get_logger(__name__)
retry_count = RETRY_COUNT
# Delay for 10 sec, for the reset operation to take effect.
time.sleep(10)
def wait_for_operation_to_complete(
has_operation_completed, retries=10, delay_bw_retries=5,
delay_before_attempts=10, failover_exc=exception.IloError,
failover_msg=("Operation did not complete even after multiple "
"attempts."), is_silent_loop_exit=False):
"""Attempts the provided operation for a specified number of times.
If it runs out of attempts, then it raises an exception. On success,
it breaks out of the loop.
:param has_operation_completed: the method to retry and it needs to return
a boolean to indicate success or failure.
:param retries: number of times the operation to be (re)tried, default 10
:param delay_bw_retries: delay in seconds before attempting after
each failure, default 5.
:param delay_before_attempts: delay in seconds before beginning any
operation attempt, default 10.
:param failover_exc: the exception which gets raised in case of failure
upon exhausting all the attempts, default IloError.
:param failover_msg: the msg with which the exception gets raised in case
of failure upon exhausting all the attempts.
:param is_silent_loop_exit: decides if exception has to be raised (in case
of failure upon exhausting all the attempts)
or not, default False (will be raised).
:raises: failover_exc, if failure happens even after all the attempts,
default IloError.
"""
retry_count = retries
# Delay for ``delay_before_attempts`` secs, before beginning any attempt
time.sleep(delay_before_attempts)
while retry_count:
try:
ilo_object.get_product_name()
break
LOG.debug("Calling '%s', retries left: %d",
has_operation_completed.__name__, retry_count)
if has_operation_completed():
break
except exception.IloError:
retry_count -= 1
time.sleep(5)
pass
time.sleep(delay_bw_retries)
retry_count -= 1
else:
msg = ('iLO is not up after reset.')
raise exception.IloConnectionError(msg)
LOG.debug("Max retries exceeded with: '%s'",
has_operation_completed.__name__)
if not is_silent_loop_exit:
raise failover_exc(failover_msg)
def wait_for_ilo_after_reset(ilo_object):
"""Continuously polls for iLO to come up after reset."""
is_ilo_up_after_reset = lambda: ilo_object.get_product_name() is not None
is_ilo_up_after_reset.__name__ = 'is_ilo_up_after_reset'
wait_for_operation_to_complete(
is_ilo_up_after_reset,
failover_exc=exception.IloConnectionError,
failover_msg='iLO is not up after reset.'
)
def wait_for_ris_firmware_update_to_complete(ris_object):
"""Continuously polls for iLO firmware update to complete."""
p_state = ['IDLE']
c_state = ['IDLE']
def has_firmware_flash_completed():
"""Checks for completion status of firmware update operation
The below table shows the conditions for which the firmware update
will be considered as DONE (be it success or error)::
+---------------------+--------------------+
| Previous state | Current state |
+=====================+====================+
| IDLE | ERROR |
+---------------------+--------------------+
| PROGRESSING | ERROR |
+---------------------+--------------------+
| PROGRESSING | COMPLETED |
+---------------------+--------------------+
| PROGRESSING | UNKNOWN |
+---------------------+--------------------+
| PROGRESSING | IDLE |
+---------------------+--------------------+
"""
curr_state, curr_percent = ris_object.get_firmware_update_progress()
p_state[0] = c_state[0]
c_state[0] = curr_state
if (((p_state[0] == 'PROGRESSING') and (c_state[0] in
['COMPLETED', 'ERROR',
'UNKNOWN', 'IDLE']))
or (p_state[0] == 'IDLE' and c_state[0] == 'ERROR')):
return True
return False
wait_for_operation_to_complete(
has_firmware_flash_completed,
delay_bw_retries=30,
failover_msg='iLO firmware update has failed.'
)
wait_for_ilo_after_reset(ris_object)
def wait_for_ribcl_firmware_update_to_complete(ribcl_object):
"""Continuously checks for iLO firmware update to complete."""
def is_ilo_reset_initiated():
"""Checks for initiation of iLO reset
Invokes the ``get_product_name`` api and returns
i) True, if exception gets raised as that marks the iLO reset
initiation.
ii) False, if the call gets through without any failure, marking
that iLO is yet to be reset.
"""
try:
LOG.debug(ribcl_object._('Checking for iLO reset...'))
ribcl_object.get_product_name()
return False
except exception.IloError:
LOG.debug(ribcl_object._('iLO is being reset...'))
return True
# Note(deray): wait for 5 secs, before checking if iLO reset got triggered
# at every interval of 6 secs. This looping call happens for 10 times.
# Once it comes out of the wait of iLO reset trigger, then it starts
# waiting for iLO to be up again after reset.
wait_for_operation_to_complete(
is_ilo_reset_initiated,
delay_bw_retries=6,
delay_before_attempts=5,
is_silent_loop_exit=True
)
wait_for_ilo_after_reset(ribcl_object)
def isDisk(result):
@ -46,3 +167,23 @@ def isDisk(result):
disk_identifier = ["Logical Drive", "HDD", "Storage", "LogVol"]
return any(e in result for e in disk_identifier)
def get_filename_and_extension_of(target_file):
"""Gets the base filename and extension of the target file.
:param target_file: the complete path of the target file
:returns: base filename and extension
"""
base_target_filename = os.path.basename(target_file)
file_name, file_ext_with_dot = os.path.splitext(base_target_filename)
return file_name, file_ext_with_dot
def add_exec_permission_to(target_file):
"""Add executable permissions to the file
:param target_file: the target file whose permission to be changed
"""
mode = os.stat(target_file).st_mode
os.chmod(target_file, mode | stat.S_IXUSR)

View File

@ -0,0 +1,395 @@
# Copyright 2016 Hewlett Packard Enterprise Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Firmware related utilities and helper functions.
"""
import abc
import os
import random
import re
import shutil
import socket
import ssl
import subprocess
import sys
import tempfile
import types
import uuid
from oslo_concurrency import processutils as utils
import six
from proliantutils import exception
from proliantutils.ilo import common
from proliantutils import log
if six.PY3:
def b(x):
return bytes(x, 'ascii')
else:
def b(x):
return x
LOG = log.get_logger(__name__)
# Supported components for firmware update
SUPPORTED_FIRMWARE_UPDATE_COMPONENTS = ['ilo', 'cpld', 'power_pic', 'bios',
'chassis']
# Supported raw firmware file extensions
RAW_FIRMWARE_EXTNS = ['.hex', '.bin', '.vme']
def check_firmware_update_component(func):
"""Checks the firmware update component."""
@six.wraps(func)
def wrapper(self, filename, component_type):
"""Wrapper around ``update_firmware`` call.
:param filename: location of the raw firmware file.
:param component_type: Type of component to be applied to.
"""
component_type = component_type and component_type.lower()
if (component_type not in SUPPORTED_FIRMWARE_UPDATE_COMPONENTS):
msg = ("Got invalid component type for firmware update: "
"``update_firmware`` is not supported on %(component)s" %
{'component': component_type})
LOG.error(self._(msg)) # noqa
raise exception.InvalidInputError(msg)
return func(self, filename, component_type)
return wrapper
@six.add_metaclass(abc.ABCMeta)
class FirmwareImageControllerBase(object):
"""Base class for firmware file related operations."""
def __init__(self, fw_file):
self.fw_file = fw_file
file_name, file_ext_with_dot = common.get_filename_and_extension_of(
fw_file)
self.fw_filename = file_name
self.fw_file_ext = file_ext_with_dot
class FirmwareImageUploader(FirmwareImageControllerBase):
"""Helper class to upload the firmware image file
This class acts as a helper class in uploading the firmware file to iLO.
"""
HTTP_UPLOAD_HEADER = ("POST /cgi-bin/uploadRibclFiles HTTP/1.1\r\n"
"Host: localhost\r\nConnection: Close\r\n"
"Content-Length: %d\r\n"
"Content-Type: multipart/form-data; "
"boundary=%s\r\n\r\n")
def upload_file_to(self, addressinfo, timeout):
"""Uploads the raw firmware file to iLO
Uploads the raw firmware file (already set as attribute in
FirmwareImageControllerBase constructor) to iLO, whose address
information is passed to this method.
:param addressinfo: tuple of hostname and port of the iLO
:param timeout: timeout in secs, used for connecting to iLO
:raises: IloInvalidInputError, if raw firmware file not found
:raises: IloError, for other internal problems
:returns: the cookie so sent back from iLO on successful upload
"""
self.hostname, self.port = addressinfo
self.timeout = timeout
filename = self.fw_file
firmware = open(filename, 'rb').read()
# generate boundary
boundary = b('------hpiLO3t' +
str(random.randint(100000, 1000000)) + 'z')
while boundary in firmware:
boundary = b('------hpiLO3t' +
str(random.randint(100000, 1000000)) + 'z')
# generate body parts
parts = [
# body1
b("--") + boundary +
b("""\r\nContent-Disposition: form-data; """
"""name="fileType"\r\n\r\n"""),
# body2
b("\r\n--") + boundary +
b('''\r\nContent-Disposition: form-data; name="fwimgfile"; '''
'''filename="''') +
b(filename) +
b('''"\r\nContent-Type: application/octet-stream\r\n\r\n'''),
# firmware image
firmware,
# body3
b("\r\n--") + boundary + b("--\r\n"),
]
total_bytes = sum([len(x) for x in parts])
sock = self._get_socket()
# send the firmware image
sock.write(b(self.HTTP_UPLOAD_HEADER %
(total_bytes, boundary.decode('ascii'))))
for part in parts:
sock.write(part)
data = ''
try:
while True:
d = sock.read()
data += d.decode('latin-1')
if not d:
break
except socket.sslerror: # Connection closed
e = sys.exc_info()[1]
if not data:
raise exception.IloConnectionError(
"Communication with %(hostname)s:%(port)d failed: "
"%(error)s" % {'hostname': self.hostname,
'port': self.port, 'error': str(e)})
# Received len(data) bytes
cookie_match = re.search('Set-Cookie: *(.*)', data)
if not cookie_match:
raise exception.IloError("Uploading of file: %s failed due "
"to unknown reason." % filename)
# return the cookie
return cookie_match.group(1)
def _get_socket(self, sslversion=ssl.PROTOCOL_TLSv1):
"""Sets up an https connection and do an HTTP/raw socket request
:param sslversion: version of ssl session
:raises: IloConnectionError, for connection failures
:returns: ssl wrapped socket object
"""
err = None
sock = None
try:
for res in socket.getaddrinfo(
self.hostname, self.port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
sock = socket.socket(af, socktype, proto)
sock.settimeout(self.timeout)
# Connecting to {self.hostname} at port {self.port}
sock.connect(sa)
except socket.timeout:
if sock is not None:
sock.close()
err = exception.IloConnectionError(
"Timeout connecting to %(hostname)s:%(port)d"
% {'hostname': self.hostname, 'port': self.port})
except socket.error:
if sock is not None:
sock.close()
e = sys.exc_info()[1]
err = exception.IloConnectionError(
"Error connecting to %(hostname)s:%(port)d : %(error)s"
% {'hostname': self.hostname, 'port': self.port,
'error': str(e)})
except Exception:
raise exception.IloConnectionError(
"Unable to resolve %s" % self.hostname)
if err is not None:
raise err
# wrapping the socket over ssl session
try:
return ssl.wrap_socket(sock, ssl_version=sslversion)
except socket.sslerror:
e = sys.exc_info()[1]
msg = (getattr(e, 'reason', None) or
getattr(e, 'message', None))
# Some older iLO s don't support TLSv1, retry with SSLv3
if ('wrong version number' in msg) and (
sslversion == ssl.PROTOCOL_TLSv1):
return self._get_socket(ssl.PROTOCOL_SSLv3)
raise exception.IloConnectionError(
"Cannot establish ssl session with %(hostname)s:%(port)d : "
"%(error)s" % {'hostname': self.hostname, 'port': self.port,
'error': str(e)})
class FirmwareImageExtractor(FirmwareImageControllerBase):
"""Helper class to extract the raw file from compact firmware image file
This class acts as a helper class in extracting the raw firmware file
from the compact firmware file.
"""
def extract(self):
"""Extracts the raw firmware file from its compact format
Extracts the raw firmware file from its compact file format (already
set as attribute in FirmwareImageControllerBase constructor).
:raises: InvalidInputError, if raw firmware file not found
:raises: ImageExtractionFailed, for extraction related issues
:returns: the raw firmware file with the complete path
:returns: boolean(True) to indicate that a new file got generated
after successful extraction.
"""
target_file = self.fw_file
common.add_exec_permission_to(target_file)
# create a temp directory where the extraction will occur
temp_dir = tempfile.mkdtemp()
extract_path = os.path.join(temp_dir, self.fw_filename)
try:
self._do_extract(target_file, extract_path)
except exception.ImageExtractionFailed:
# clean up the partial extracted content, if any,
# along with temp dir and re-raise the exception
shutil.rmtree(temp_dir, ignore_errors=True)
raise
# creating a new hard link to the core firmware file
firmware_file_path = _get_firmware_file_in_new_path(extract_path)
# delete the entire extracted content along with temp dir.
shutil.rmtree(temp_dir, ignore_errors=True)
if not firmware_file_path:
raise exception.InvalidInputError(
"Raw firmware file not found in: '%s'" % target_file)
return firmware_file_path, True
def get_fw_extractor(fw_file):
"""Gets the firmware extractor object fine-tuned for specified type
:param fw_file: compact firmware file to be extracted from
:raises: InvalidInputError, for unsupported file types
:returns: FirmwareImageExtractor object
"""
fw_img_extractor = FirmwareImageExtractor(fw_file)
extension = fw_img_extractor.fw_file_ext.lower()
if extension == '.scexe':
# assign _do_extract attribute to refer to _extract_scexe_file
fw_img_extractor._do_extract = types.MethodType(
_extract_scexe_file, fw_img_extractor)
elif extension == '.rpm':
# assign _do_extract attribute to refer to _extract_rpm_file
fw_img_extractor._do_extract = types.MethodType(
_extract_rpm_file, fw_img_extractor)
elif extension in RAW_FIRMWARE_EXTNS:
# Note(deray): Assigning ``extract`` attribute to return
# 1. the firmware file itself
# 2. boolean (False) to indicate firmware file is not extracted
def dummy_extract(self):
"""Dummy (no-op) extract method
:returns: the same firmware file with the complete path
:returns: boolean(False) to indicate that a new file is not
generated.
"""
return fw_img_extractor.fw_file, False
fw_img_extractor.extract = types.MethodType(
dummy_extract, fw_img_extractor)
else:
raise exception.InvalidInputError(
'Unexpected compact firmware file type: %s' % fw_file)
return fw_img_extractor
def _extract_scexe_file(self, target_file, extract_path):
"""Extracts the scexe file.
:param target_file: the firmware file to be extracted from
:param extract_path: the path where extraction is supposed to happen
"""
# Command to extract the smart component file.
unpack_cmd = '--unpack=' + extract_path
# os.path.isfile(target_file)
cmd = [target_file, unpack_cmd]
out, err = utils.trycmd(*cmd)
def _extract_rpm_file(self, target_file, extract_path):
"""Extracts the rpm file.
:param target_file: the firmware file to be extracted from
:param extract_path: the path where extraction is supposed to happen
:raises: ImageExtractionFailed, if any issue with extraction
"""
if not os.path.exists(extract_path):
os.makedirs(extract_path)
os.chdir(extract_path)
try:
rpm2cpio = subprocess.Popen('rpm2cpio ' + target_file,
shell=True,
stdout=subprocess.PIPE)
cpio = subprocess.Popen('cpio -idm', shell=True,
stdin=rpm2cpio.stdout)
out, err = cpio.communicate()
except Exception:
raise exception.ImageExtractionFailed(
image_ref=target_file,
reason='Unexpected error in extracting file.')
def _get_firmware_file(path):
"""Gets the raw firmware file
Gets the raw firmware file from the extracted directory structure
:param path: the directory structure to search for
:returns: the raw firmware file with the complete path
"""
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
file_name, file_ext = os.path.splitext(os.path.basename(filename))
if file_ext in RAW_FIRMWARE_EXTNS:
# return filename
return os.path.join(dirpath, filename)
def _get_firmware_file_in_new_path(searching_path):
"""Gets the raw firmware file in a new path
Gets the raw firmware file from the extracted directory structure
and creates a hard link to that in a file path and cleans up the
lookup extract path.
:param searching_path: the directory structure to search for
:returns: the raw firmware file with the complete new path
"""
firmware_file_path = _get_firmware_file(searching_path)
if not firmware_file_path:
return None
# Note(deray): the path of the new firmware file will be of the form:
#
# [TEMP_DIR]/xxx-xxx_actual_firmware_filename
#
# e.g. /tmp/77e8f689-f32c-4727-9fc3-a7dacefe67e4_ilo4_210.bin
file_name, file_ext_with_dot = common.get_filename_and_extension_of(
firmware_file_path)
new_firmware_file_path = os.path.join(
tempfile.gettempdir(), str(uuid.uuid4()) + '_' +
file_name + file_ext_with_dot)
# create a hard link to the raw firmware file
os.link(firmware_file_path, new_firmware_file_path)
return new_firmware_file_path

View File

@ -320,3 +320,16 @@ class IloOperations(object):
on the server.
"""
raise exception.IloCommandNotSupportedError(ERRMSG)
def update_firmware(self, firmware_url, component_type):
"""Updates the given firmware on the server
:param firmware_url: location of the firmware file
:param component_type: Type of component to be applied to.
:raises: InvalidInputError, if the validation of the input fails
:raises: IloError, on an error from iLO
:raises: IloConnectionError, if not able to reach iLO.
:raises: IloCommandNotSupportedError, if the command is
not supported on the server
"""
raise exception.IloCommandNotSupportedError(ERRMSG)

View File

@ -18,6 +18,7 @@ over RIBCL scripting language
"""
import copy
import os
import re
import xml.etree.ElementTree as etree
@ -29,6 +30,7 @@ import six
from proliantutils import exception
from proliantutils.ilo import common
from proliantutils.ilo import firmware_controller
from proliantutils.ilo import operations
from proliantutils import log
@ -86,7 +88,7 @@ class RIBCLOperations(operations.IloOperations):
if self.cacert is None:
urllib3.disable_warnings(urllib3_exceptions.InsecureRequestWarning)
def _request_ilo(self, root):
def _request_ilo(self, root, extra_headers=None):
"""Send RIBCL XML data to iLO.
This function sends the XML request to the ILO and
@ -100,6 +102,9 @@ class RIBCLOperations(operations.IloOperations):
urlstr = 'https://%s/ribcl' % (self.host)
xml = self._serialize_xml(root)
headers = {"Content-length": len(xml)}
if extra_headers:
headers.update(extra_headers)
kwargs = {'headers': headers, 'data': xml}
if self.cacert is not None:
kwargs['verify'] = self.cacert
@ -1041,6 +1046,71 @@ class RIBCLOperations(operations.IloOperations):
d = self._request_ilo(root)
self._parse_output(d)
@firmware_controller.check_firmware_update_component
def update_firmware(self, filename, component_type):
"""Updates the given firmware on the server for the given component.
:param filename: location of the raw firmware file. Extraction of the
firmware file (if in compact format) is expected to
happen prior to this invocation.
:param component_type: Type of component to be applied to.
:raises: InvalidInputError, if the validation of the input fails
:raises: IloError, on an error from iLO
:raises: IloConnectionError, if not able to reach iLO.
:raises: IloCommandNotSupportedError, if the command is
not supported on the server
"""
fw_img_processor = firmware_controller.FirmwareImageUploader(filename)
LOG.debug(self._('Uploading firmware file: %s ...'), filename)
cookie = fw_img_processor.upload_file_to((self.host, self.port),
self.timeout)
LOG.debug(self._('Uploading firmware file: %s ... done'), filename)
root = self._get_firmware_update_xml_for_file_and_component(
filename, component_type)
element = root.find('LOGIN/RIB_INFO')
etree.SubElement(element, 'TPM_ENABLED', VALUE='Yes')
extra_headers = {'Cookie': cookie}
LOG.debug(self._('Flashing firmware file: %s ...'), filename)
d = self._request_ilo(root, extra_headers=extra_headers)
# wait till the firmware update completes.
common.wait_for_ribcl_firmware_update_to_complete(self)
self._parse_output(d)
LOG.info(self._('Flashing firmware file: %s ... done'), filename)
def _get_firmware_update_xml_for_file_and_component(
self, filename, component):
"""Creates the dynamic xml for flashing the device firmware via iLO.
This method creates the dynamic xml for flashing the firmware, based
on the component type so passed.
:param filename: location of the raw firmware file.
:param component_type: Type of component to be applied to.
:returns: the etree.Element for the root of the RIBCL XML
for flashing the device (component) firmware.
"""
if component == 'ilo':
cmd_name = 'UPDATE_RIB_FIRMWARE'
else:
# Note(deray): Not explicitly checking for all other supported
# devices (components), as those checks have already happened
# in the invoking methods and may seem redundant here.
cmd_name = 'UPDATE_FIRMWARE'
fwlen = os.path.getsize(filename)
root = self._create_dynamic_xml(cmd_name,
'RIB_INFO',
'write',
subelements={
'IMAGE_LOCATION': filename,
'IMAGE_LENGTH': str(fwlen)
})
return root
# The below block of code is there only for backward-compatibility
# reasons (before commit 47608b6 for ris-support).
IloClient = RIBCLOperations

View File

@ -27,6 +27,7 @@ from six.moves.urllib import parse as urlparse
from proliantutils import exception
from proliantutils.ilo import common
from proliantutils.ilo import firmware_controller
from proliantutils.ilo import operations
from proliantutils import log
@ -1350,3 +1351,96 @@ class RISOperations(operations.IloOperations):
except KeyError as e:
msg = "get_one_time_boot failed with the KeyError:%s"
raise exception.IloError((msg) % e)
def _get_firmware_update_service_resource(self):
"""Gets the firmware update service uri.
:returns: firmware update service uri
:raises: IloError, on an error from iLO.
:raises: IloConnectionError, if not able to reach iLO.
:raises: IloCommandNotSupportedError, for not finding the uri
"""
manager, uri = self._get_ilo_details()
try:
fw_uri = manager['Oem']['Hp']['links']['UpdateService']['href']
except KeyError:
msg = ("Firmware Update Service resource not found.")
raise exception.IloCommandNotSupportedError(msg)
return fw_uri
@firmware_controller.check_firmware_update_component
def update_firmware(self, file_url, component_type):
"""Updates the given firmware on the server for the given component.
:param file_url: location of the raw firmware file. Extraction of the
firmware file (if in compact format) is expected to
happen prior to this invocation.
:param component_type: Type of component to be applied to.
:raises: InvalidInputError, if the validation of the input fails
:raises: IloError, on an error from iLO
:raises: IloConnectionError, if not able to reach iLO.
:raises: IloCommandNotSupportedError, if the command is
not supported on the server
"""
fw_update_uri = self._get_firmware_update_service_resource()
action_data = {
'Action': 'InstallFromURI',
'FirmwareURI': file_url,
}
# perform the POST
LOG.debug(self._('Flashing firmware file: %s ...'), file_url)
status, headers, response = self._rest_post(
fw_update_uri, None, action_data)
if status != 200:
msg = self._get_extended_error(response)
raise exception.IloError(msg)
# wait till the firmware update completes.
common.wait_for_ris_firmware_update_to_complete(self)
try:
state, percent = self.get_firmware_update_progress()
except exception.IloError:
msg = 'Status of firmware update not known'
LOG.debug(self._(msg)) # noqa
return
if state == "ERROR":
msg = 'Error in firmware update'
LOG.error(self._(msg)) # noqa
raise exception.IloError(msg)
elif state == "UNKNOWN":
msg = 'Status of firmware update not known'
LOG.debug(self._(msg)) # noqa
else: # "COMPLETED" | "IDLE"
LOG.info(self._('Flashing firmware file: %s ... done'), file_url)
def get_firmware_update_progress(self):
"""Get the progress of the firmware update.
:returns: firmware update state, one of the following values:
"IDLE", "UPLOADING", "PROGRESSING", "COMPLETED", "ERROR".
If the update resource is not found, then "UNKNOWN".
:returns: firmware update progress percent
:raises: IloError, on an error from iLO.
:raises: IloConnectionError, if not able to reach iLO.
"""
try:
fw_update_uri = self._get_firmware_update_service_resource()
except exception.IloError as e:
LOG.debug(self._('Progress of firmware update not known: %s'),
str(e))
return "UNKNOWN", "UNKNOWN"
# perform the GET
status, headers, response = self._rest_get(fw_update_uri)
if status != 200:
msg = self._get_extended_error(response)
raise exception.IloError(msg)
fw_update_state = response.get('State')
fw_update_progress_percent = response.get('ProgressPercent')
LOG.debug(self._('Flashing firmware file ... in progress %d%%'),
fw_update_progress_percent)
return fw_update_state, fw_update_progress_percent

View File

@ -6808,3 +6808,25 @@ ACTIVATE_LICENSE_FAIL_XML = '''
/>
</RIBCL>
'''
UPDATE_ILO_FIRMWARE_INPUT_XML = '''
<RIBCL VERSION="2.0">
<LOGIN PASSWORD="%s" USER_LOGIN="%s">
<RIB_INFO MODE="write">
<UPDATE_RIB_FIRMWARE IMAGE_LENGTH="%d" IMAGE_LOCATION="%s" />
<TPM_ENABLED VALUE="Yes" />
</RIB_INFO>
</LOGIN>
</RIBCL>
'''
UPDATE_NONILO_FIRMWARE_INPUT_XML = '''
<RIBCL VERSION="2.0">
<LOGIN PASSWORD="%s" USER_LOGIN="%s">
<RIB_INFO MODE="write">
<UPDATE_FIRMWARE IMAGE_LENGTH="%d" IMAGE_LOCATION="%s" />
<TPM_ENABLED VALUE="Yes" />
</RIB_INFO>
</LOGIN>
</RIBCL>
'''

View File

@ -470,3 +470,15 @@ class IloClientTestCase(testtools.TestCase):
self.client.model = 'Gen8'
self.client.get_persistent_boot_device()
get_pers_boot_device_mock.assert_called_once_with()
@mock.patch.object(client.IloClient, '_call_method')
def test_update_firmware(self, _call_method_mock):
# | GIVEN |
some_url = 'some-url'
some_component_type = 'ilo'
# | WHEN |
self.client.update_firmware(some_url, some_component_type)
# | THEN |
_call_method_mock.assert_called_once_with('update_firmware',
some_url,
some_component_type)

View File

@ -17,44 +17,177 @@
import time
import unittest
import ddt
import mock
from proliantutils import exception
from proliantutils.ilo import common
from proliantutils.ilo import ribcl
from proliantutils.ilo import ris
from proliantutils.tests.ilo import ribcl_sample_outputs as ribcl_output
@ddt.ddt
class IloCommonModuleTestCase(unittest.TestCase):
def setUp(self):
# | BEFORE_EACH |
super(IloCommonModuleTestCase, self).setUp()
self.ribcl = ribcl.RIBCLOperations("x.x.x.x", "admin", "Admin",
60, 443)
self.ris = ris.RISOperations("x.x.x.x", "admin", "Admin",
60, 443)
self.any_scexe_file = 'any_file.scexe'
self.any_rpm_file = 'any_file.rpm'
@mock.patch.object(time, 'sleep', lambda x: None)
@mock.patch.object(ribcl.RIBCLOperations, 'get_product_name')
def test_wait_for_ilo_after_reset_ribcl_ok(self, name_mock):
# | GIVEN |
name_mock.return_value = ribcl_output.GET_PRODUCT_NAME
# | WHEN |
common.wait_for_ilo_after_reset(self.ribcl)
# | THEN |
name_mock.assert_called_once_with()
@mock.patch.object(time, 'sleep')
@mock.patch.object(ribcl.RIBCLOperations, 'get_product_name')
def test_wait_for_ilo_after_reset_retry(self, name_mock, sleep_mock):
# | GIVEN |
exc = exception.IloError('error')
name_mock.side_effect = [exc, ribcl_output.GET_PRODUCT_NAME]
# | WHEN |
common.wait_for_ilo_after_reset(self.ribcl)
# | THEN |
self.assertEqual(2, name_mock.call_count)
name_mock.assert_called_with()
@mock.patch.object(time, 'sleep')
@mock.patch.object(ribcl.RIBCLOperations, 'get_product_name')
def test_wait_for_ilo_after_reset_fail(self, name_mock, time_mock):
# | GIVEN |
exc = exception.IloError('error')
name_mock.side_effect = exc
self.assertRaises(exception.IloConnectionError,
# | WHEN | & | THEN |
self.assertRaises(exception.IloError,
common.wait_for_ilo_after_reset,
self.ribcl)
self.assertEqual(common.RETRY_COUNT, name_mock.call_count)
self.assertEqual(10, name_mock.call_count)
name_mock.assert_called_with()
@mock.patch.object(time, 'sleep')
@mock.patch.object(ris.RISOperations, 'get_firmware_update_progress')
@mock.patch.object(common, 'wait_for_ilo_after_reset', lambda x: None)
def test_wait_for_ris_firmware_update_to_complete_ok(
self, get_firmware_update_progress_mock, sleep_mock):
# | GIVEN |
get_firmware_update_progress_mock.side_effect = [('PROGRESSING', 25),
('COMPLETED', 100)]
# | WHEN |
common.wait_for_ris_firmware_update_to_complete(self.ris)
# | THEN |
self.assertEqual(2, get_firmware_update_progress_mock.call_count)
@mock.patch.object(time, 'sleep')
@mock.patch.object(ris.RISOperations, 'get_firmware_update_progress')
@mock.patch.object(common, 'wait_for_ilo_after_reset', lambda x: None)
def test_wait_for_ris_firmware_update_to_complete_retry_on_exception(
self, get_firmware_update_progress_mock, sleep_mock):
# | GIVEN |
exc = exception.IloError('error')
get_firmware_update_progress_mock.side_effect = [('PROGRESSING', 25),
exc,
('COMPLETED', 100)]
# | WHEN |
common.wait_for_ris_firmware_update_to_complete(self.ris)
# | THEN |
self.assertEqual(3, get_firmware_update_progress_mock.call_count)
@mock.patch.object(time, 'sleep')
@mock.patch.object(ris.RISOperations, 'get_firmware_update_progress')
@mock.patch.object(common, 'wait_for_ilo_after_reset', lambda x: None)
def test_wait_for_ris_firmware_update_to_complete_multiple_retries(
self, get_firmware_update_progress_mock, sleep_mock):
# | GIVEN |
get_firmware_update_progress_mock.side_effect = [('IDLE', 0),
('PROGRESSING', 25),
('PROGRESSING', 50),
('PROGRESSING', 75),
('ERROR', 0)]
# | WHEN |
common.wait_for_ris_firmware_update_to_complete(self.ris)
# | THEN |
self.assertEqual(5, get_firmware_update_progress_mock.call_count)
@mock.patch.object(time, 'sleep')
@mock.patch.object(ris.RISOperations, 'get_firmware_update_progress')
def test_wait_for_ris_firmware_update_to_complete_fail(
self, get_firmware_update_progress_mock, sleep_mock):
# | GIVEN |
exc = exception.IloError('error')
get_firmware_update_progress_mock.side_effect = exc
# | WHEN | & | THEN |
self.assertRaises(exception.IloError,
common.wait_for_ris_firmware_update_to_complete,
self.ris)
self.assertEqual(10, get_firmware_update_progress_mock.call_count)
@mock.patch.object(time, 'sleep')
@mock.patch.object(ribcl.RIBCLOperations, 'get_product_name')
@mock.patch.object(common, 'wait_for_ilo_after_reset', lambda x: None)
def test_wait_for_ribcl_firmware_update_to_complete_retries_till_exception(
self, get_product_name_mock, sleep_mock):
# | GIVEN |
exc = exception.IloError('error')
get_product_name_mock.side_effect = ['Rap metal',
'Death metal',
exc]
# | WHEN |
common.wait_for_ribcl_firmware_update_to_complete(self.ribcl)
# | THEN |
self.assertEqual(3, get_product_name_mock.call_count)
@mock.patch.object(time, 'sleep')
@mock.patch.object(ribcl.RIBCLOperations, 'get_product_name')
@mock.patch.object(common, 'wait_for_ilo_after_reset', lambda x: None)
def test_wait_for_ribcl_firmware_update_silent_if_reset_exc_not_captured(
self, get_product_name_mock, sleep_mock):
# | GIVEN |
get_product_name_mock.side_effect = [
'Rap metal', 'Death metal', 'Black metal', 'Extreme metal',
'Folk metal', 'Gothic metal', 'Power metal', 'War metal',
'Thrash metal', 'Groove metal']
# | WHEN |
common.wait_for_ribcl_firmware_update_to_complete(self.ribcl)
# | THEN |
self.assertEqual(10, get_product_name_mock.call_count)
@ddt.data(('/path/to/file.scexe', 'file', '.scexe'),
('/path/to/.hidden', '.hidden', ''),
('filename', 'filename', ''),
('filename.txt.bk', 'filename.txt', '.bk'),
('//filename.txt', 'filename', '.txt'),
('.filename.txt.bk', '.filename.txt', '.bk'),
('/', '', ''),
('.', '.', ''),)
@ddt.unpack
def test_get_filename_and_extension_of(
self, input_file_path, expected_file_name, expected_file_ext):
# | WHEN |
actual_file_name, actual_file_ext = (
common.get_filename_and_extension_of(input_file_path))
# | THEN |
self.assertEqual(actual_file_name, expected_file_name)
self.assertEqual(actual_file_ext, expected_file_ext)
@mock.patch.object(common, 'os', autospec=True)
@mock.patch.object(common, 'stat', autospec=True)
def test_add_exec_permission_to(self, stat_mock, os_mock):
# | GIVEN |
any_file = 'any_file'
# | WHEN |
common.add_exec_permission_to(any_file)
# | THEN |
os_mock.stat.assert_called_once_with(any_file)
os_mock.chmod.assert_called_once_with(
any_file, os_mock.stat().st_mode | stat_mock.S_IXUSR)

View File

@ -0,0 +1,523 @@
# Copyright 2016 Hewlett Packard Enterprise Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Test Class for Firmware controller."""
import os
import shutil
import tempfile
import unittest
import ddt
import mock
from six.moves import builtins as __builtin__
from proliantutils import exception
from proliantutils.ilo import common
from proliantutils.ilo import firmware_controller
@ddt.ddt
class FirmwareControllerModuleTestCase(unittest.TestCase):
def setUp(self):
# | BEFORE_EACH |
super(FirmwareControllerModuleTestCase, self).setUp()
self.any_scexe_file = 'any_file.scexe'
self.any_rpm_file = 'any_file.rpm'
self.any_raw_fw_file = 'any_fw_file.bin'
@ddt.data('ilo', 'cpld', 'power_pic', 'bios', 'chassis')
def test_check_firmware_update_component_passes_for_valid_component(
self, component_type):
# | GIVEN |
ris_or_ribcl_obj_mock = mock.MagicMock()
update_firmware_mock = mock.MagicMock()
# Note(deray): Need to set __name__ attribute explicitly to keep
# ``six.wraps`` happy. Passing this to the `name` argument at the time
# creation of Mock doesn't help.
update_firmware_mock.__name__ = 'update_firmware_mock'
wrapped_func = (firmware_controller.
check_firmware_update_component(update_firmware_mock))
# | WHEN |
wrapped_func(
ris_or_ribcl_obj_mock, self.any_raw_fw_file, component_type)
# | THEN |
update_firmware_mock.assert_called_once_with(
ris_or_ribcl_obj_mock, self.any_raw_fw_file, component_type)
def test_check_firmware_update_component_throws_for_invalid_component(
self):
# | GIVEN |
def func(ris_or_ribcl_obj, filename, component_type):
pass
wrapped_func = (firmware_controller.
check_firmware_update_component(func))
ris_or_ribcl_obj_mock = mock.MagicMock()
# | WHEN | & | THEN |
self.assertRaises(exception.InvalidInputError,
wrapped_func,
ris_or_ribcl_obj_mock,
self.any_raw_fw_file,
'invalid_component')
def test_get_fw_extractor_will_set_the_fw_file_attribute(self):
# | WHEN |
fw_img_extractor = (firmware_controller.
get_fw_extractor(self.any_scexe_file))
# | THEN |
self.assertEqual(self.any_scexe_file, fw_img_extractor.fw_file)
@mock.patch.object(firmware_controller, '_extract_scexe_file',
autospec=True)
def test__extract_scexe_file_gets_invoked_for_scexe_firmware_file(
self, _extract_scexe_file_mock):
# _extract_scexe_file gets invoked when fw_img_extractor is initialized
# with scexe firmware file
# | WHEN |
fw_img_extractor = (firmware_controller.
get_fw_extractor(self.any_scexe_file))
fw_img_extractor._do_extract('some_target_file', 'some_extract_path')
# | THEN |
_extract_scexe_file_mock.assert_called_once_with(
fw_img_extractor, 'some_target_file', 'some_extract_path')
@mock.patch.object(firmware_controller, '_extract_rpm_file', autospec=True)
def test__extract_rpm_file_gets_invoked_for_rpm_firmware_file(
self, _extract_rpm_file_mock):
# _extract_rpm_file gets invoked when fw_img_extractor is initialized
# with rpm firmware file
# | WHEN |
fw_img_extractor = (firmware_controller.
get_fw_extractor(self.any_rpm_file))
fw_img_extractor._do_extract('some_target_file', 'some_extract_path')
# | THEN |
_extract_rpm_file_mock.assert_called_once_with(
fw_img_extractor, 'some_target_file', 'some_extract_path')
def test_no_op_extract_gets_invoked_for_raw_firmware_file(self):
# no_op extract when fw_img_extractor is initialized
# with raw firmware file
# | GIVEN |
any_raw_file = 'any_file.bin'
# | WHEN |
fw_img_extractor = firmware_controller.get_fw_extractor(any_raw_file)
return_result, is_extracted = fw_img_extractor.extract()
# | THEN |
self.assertEqual(any_raw_file, return_result)
self.assertFalse(is_extracted)
def test_get_fw_extractor_raises_exception_with_unknown_firmware_file(
self):
# | GIVEN |
any_invalid_format_firmware_file = 'any_file.abc'
# | WHEN | & | THEN |
self.assertRaises(exception.InvalidInputError,
firmware_controller.get_fw_extractor,
any_invalid_format_firmware_file)
@mock.patch.object(firmware_controller.utils, 'trycmd', autospec=True)
def test__extract_scexe_file_issues_command_as(self, utils_trycmd_mock):
# | GIVEN |
any_scexe_firmware_file = 'any_file.scexe'
any_extract_path = 'any_extract_path'
utils_trycmd_mock.return_value = ('out', 'err')
# | WHEN |
firmware_controller._extract_scexe_file(
None, any_scexe_firmware_file, any_extract_path)
# | THEN |
utils_trycmd_mock.assert_called_once_with(
any_scexe_firmware_file, '--unpack=' + any_extract_path)
@mock.patch.object(firmware_controller, 'os', autospec=True)
@mock.patch.object(firmware_controller, 'subprocess', autospec=True)
def test__extract_rpm_file_creates_dir_if_extract_path_doesnt_exist(
self, subprocess_mock, os_mock):
# | GIVEN |
any_rpm_firmware_file = 'any_file.rpm'
any_extract_path = 'any_extract_path'
os_mock.path.exists.return_value = False
mock_cpio = mock.MagicMock()
mock_cpio.communicate.return_value = ('out', 'err')
subsequent_popen_call_returns = [mock.MagicMock(), mock_cpio]
subprocess_mock.Popen = mock.MagicMock(
side_effect=subsequent_popen_call_returns)
# | WHEN |
firmware_controller._extract_rpm_file(
None, any_rpm_firmware_file, any_extract_path)
# | THEN |
os_mock.makedirs.assert_called_once_with(any_extract_path)
@mock.patch.object(firmware_controller, 'os', autospec=True)
@mock.patch.object(firmware_controller, 'subprocess', autospec=True)
def test__extract_rpm_file_doesnt_create_dir_if_extract_path_present(
self, subprocess_mock, os_mock):
# extract_rpm_file doesn't create dir if extract path
# is already present
# | GIVEN |
any_rpm_firmware_file = 'any_file.rpm'
any_extract_path = 'any_extract_path'
os_mock.path.exists.return_value = True
mock_cpio = mock.MagicMock()
mock_cpio.communicate.return_value = ('out', 'err')
subsequent_popen_call_returns = [mock.MagicMock(), mock_cpio]
subprocess_mock.Popen = mock.MagicMock(
side_effect=subsequent_popen_call_returns)
# | WHEN |
firmware_controller._extract_rpm_file(
None, any_rpm_firmware_file, any_extract_path)
# | THEN |
self.assertFalse(os_mock.makedirs.called)
@mock.patch.object(firmware_controller, 'os', autospec=True)
@mock.patch.object(firmware_controller, 'subprocess', autospec=True)
def test__extract_rpm_file_issues_commands_as(self,
subprocess_mock,
os_mock):
# | GIVEN |
any_rpm_firmware_file = 'any_file.rpm'
any_extract_path = 'any_extract_path'
os_mock.path.exists.return_value = True
rpm2cpio_mock = mock.MagicMock()
cpio_mock = mock.MagicMock()
cpio_mock.communicate.return_value = ('out', 'err')
subsequent_popen_call_returns = [rpm2cpio_mock, cpio_mock]
subprocess_mock.Popen = mock.MagicMock(
side_effect=subsequent_popen_call_returns)
# | WHEN |
firmware_controller._extract_rpm_file(
None, any_rpm_firmware_file, any_extract_path)
# | THEN |
popen_calls_to_assert = [
mock.call.Popen('rpm2cpio ' + any_rpm_firmware_file,
shell=True,
stdout=subprocess_mock.PIPE),
mock.call.Popen('cpio -idm',
shell=True,
stdin=rpm2cpio_mock.stdout),
]
subprocess_mock.assert_has_calls(popen_calls_to_assert)
cpio_mock.communicate.assert_called_once_with()
@mock.patch.object(firmware_controller, 'os', autospec=True)
@mock.patch.object(firmware_controller, 'subprocess', autospec=True)
def test__extract_rpm_file_raises_exception_if_it_fails(self,
subprocess_mock,
os_mock):
# | GIVEN |
any_rpm_firmware_file = 'any_file.rpm'
any_extract_path = 'any_extract_path'
rpm2cpio_mock = mock.MagicMock()
cpio_mock = mock.MagicMock()
cpio_mock.communicate.side_effect = Exception('foo')
subsequent_popen_call_returns = [rpm2cpio_mock, cpio_mock]
subprocess_mock.Popen = mock.MagicMock(
side_effect=subsequent_popen_call_returns)
# | WHEN | & | THEN |
self.assertRaises(exception.ImageExtractionFailed,
firmware_controller._extract_rpm_file,
None, any_rpm_firmware_file, any_extract_path)
def test__get_firmware_file(self):
# | GIVEN |
temp_dir_setup = setup_fixture_create_fw_file_extracts_for('scexe')
# | WHEN |
return_result = firmware_controller._get_firmware_file(temp_dir_setup)
# | THEN |
self.assertTrue(return_result.endswith('.bin'))
teardown_fixture_create_fw_file_extracts_for(temp_dir_setup)
@mock.patch.object(
firmware_controller, '_get_firmware_file', autospec=True)
@mock.patch.object(common, 'get_filename_and_extension_of', autospec=True)
@mock.patch.object(firmware_controller, 'tempfile', autospec=True)
@mock.patch.object(firmware_controller.uuid, 'uuid4', autospec=True)
@mock.patch.object(firmware_controller.os, 'link', autospec=True)
def test__get_firmware_file_in_new_path(
self, os_link_mock, uuid4_mock, tempfile_mock,
get_filename_and_extension_of_mock, _get_firmware_file_mock):
# | GIVEN |
_get_firmware_file_mock.return_value = 'some_raw_fw_file.bin'
get_filename_and_extension_of_mock.return_value = ('some_raw_fw_file',
'.bin')
tempfile_mock.gettempdir.return_value = '/tmp'
uuid4_mock.return_value = 12345
# | WHEN |
new_fw_file_path = (firmware_controller.
_get_firmware_file_in_new_path('any_path'))
# | THEN |
_get_firmware_file_mock.assert_called_once_with('any_path')
# tests the hard linking of the raw fw file so found
os_link_mock.assert_called_once_with(
'some_raw_fw_file.bin', '/tmp/12345_some_raw_fw_file.bin')
self.assertEqual(new_fw_file_path, '/tmp/12345_some_raw_fw_file.bin')
@mock.patch.object(
firmware_controller, '_get_firmware_file', autospec=True)
def test__get_firmware_file_in_new_path_returns_none_for_file_not_found(
self, _get_firmware_file_mock):
# | GIVEN |
_get_firmware_file_mock.return_value = None
# | WHEN |
actual_result = (firmware_controller.
_get_firmware_file_in_new_path('any_path'))
# | THEN |
self.assertIsNone(actual_result)
def teardown_fixture_create_fw_file_extracts_for(temp_dir):
# os.removedirs(temp_dir)
shutil.rmtree(temp_dir)
def setup_fixture_create_fw_file_extracts_for(format):
temp_dir = tempfile.mkdtemp()
fw_file_exts = [
'_ilo', '.bin', '.xml', '.TXT', '.hpsetup', '.cpq_package.inc'
]
if format == 'scexe':
fw_files_dir = temp_dir
elif format == 'rpm':
fw_files_dir = os.path.join(
temp_dir +
'/please_remove_rpm_file_extracts/usr/lib/i386-linux-gnu/' +
'hp-firmware-iloX-xxxx'
)
else:
fw_files_dir = temp_dir
if not os.path.exists(fw_files_dir):
os.makedirs(fw_files_dir)
for fw_file_ext in fw_file_exts:
tempfile.NamedTemporaryFile(suffix=fw_file_ext,
dir=fw_files_dir,
delete=False)
return temp_dir
class FirmwareImageExtractorTestCase(unittest.TestCase):
def setUp(self):
# | BEFORE_EACH |
self.any_scexe_file = 'any_file.scexe'
self.any_rpm_file = 'any_file.rpm'
@mock.patch.object(common, 'add_exec_permission_to', autospec=True)
@mock.patch.object(firmware_controller, 'tempfile', autospec=True)
@mock.patch.object(firmware_controller, 'os', autospec=True)
@mock.patch.object(firmware_controller, '_get_firmware_file_in_new_path',
autospec=True)
@mock.patch.object(firmware_controller, 'shutil', autospec=True)
def test_extract_method_calls__do_extract_in_turn(
self, shutil_mock, _get_firmware_file_in_new_path_mock,
os_mock, tempfile_mock, add_exec_permission_to_mock):
# | GIVEN |
os_mock.path.splitext.return_value = ('any_file', '.scexe')
fw_img_extractor = (firmware_controller.
get_fw_extractor(self.any_scexe_file))
# Now mock the _do_extract method of fw_img_extractor instance
_do_extract_mock = mock.MagicMock()
fw_img_extractor._do_extract = _do_extract_mock
expected_return_result = 'extracted_firmware_file'
_get_firmware_file_in_new_path_mock.return_value = (
expected_return_result)
# | WHEN |
actual_return_result, is_extracted = fw_img_extractor.extract()
# | THEN |
_do_extract_mock.assert_called_once_with(self.any_scexe_file, mock.ANY)
self.assertEqual(expected_return_result, actual_return_result)
@mock.patch.object(common, 'add_exec_permission_to', autospec=True)
@mock.patch.object(firmware_controller, 'tempfile', autospec=True)
@mock.patch.object(firmware_controller, 'os', autospec=True)
@mock.patch.object(firmware_controller, '_get_firmware_file_in_new_path',
autospec=True)
@mock.patch.object(firmware_controller, 'shutil', autospec=True)
def test_extract_deletes_temp_extracted_folder_before_raising_exception(
self, shutil_mock, _get_firmware_file_in_new_path_mock,
os_mock, tempfile_mock, add_exec_permission_to_mock):
# | GIVEN |
os_mock.path.splitext.return_value = ('any_file', '.rpm')
fw_img_extractor = (firmware_controller.
get_fw_extractor(self.any_rpm_file))
# Now mock the _do_extract method of fw_img_extractor instance
exc = exception.ImageExtractionFailed(
image_ref=self.any_rpm_file, reason='God only knows!')
_do_extract_mock = mock.MagicMock(side_effect=exc)
fw_img_extractor._do_extract = _do_extract_mock
# | WHEN | & | THEN |
self.assertRaises(exception.ImageExtractionFailed,
fw_img_extractor.extract)
shutil_mock.rmtree.assert_called_once_with(
tempfile_mock.mkdtemp.return_value, ignore_errors=True)
@mock.patch.object(common, 'add_exec_permission_to', autospec=True)
@mock.patch.object(firmware_controller, 'tempfile', autospec=True)
@mock.patch.object(firmware_controller, 'os', autospec=True)
@mock.patch.object(firmware_controller, '_extract_scexe_file',
autospec=True)
@mock.patch.object(firmware_controller, '_get_firmware_file_in_new_path',
autospec=True)
@mock.patch.object(firmware_controller, 'shutil', autospec=True)
def test_extract_method_raises_exception_if_raw_fw_file_not_found(
self, shutil_mock, _get_firmware_file_in_new_path_mock,
_extract_scexe_mock, os_mock, tempfile_mock,
add_exec_permission_to_mock):
# | GIVEN |
os_mock.path.splitext.return_value = ('any_file', '.scexe')
_get_firmware_file_in_new_path_mock.return_value = None
fw_img_extractor = (firmware_controller.
get_fw_extractor(self.any_scexe_file))
# | WHEN | & | THEN |
self.assertRaises(exception.InvalidInputError,
fw_img_extractor.extract)
@mock.patch.object(common, 'add_exec_permission_to', autospec=True)
@mock.patch.object(firmware_controller, 'tempfile', autospec=True)
@mock.patch.object(firmware_controller, '_extract_scexe_file',
autospec=True)
@mock.patch.object(firmware_controller, '_extract_rpm_file', autospec=True)
# don't use autospec=True here(below one), setting side_effect
# causes issue. refer https://bugs.python.org/issue17826
@mock.patch.object(firmware_controller, '_get_firmware_file_in_new_path')
@mock.patch.object(firmware_controller, 'shutil', autospec=True)
def test_successive_calls_to_extract_method(
self, shutil_mock, _get_firmware_file_in_new_path_mock,
_extract_rpm_mock, _extract_scexe_mock, tempfile_mock,
add_exec_permission_to_mock):
"""This is more of an integration test of the extract method
"""
# | GIVEN |
firmware_files = [
self.any_scexe_file,
'any_file.bin',
self.any_rpm_file,
]
actual_raw_fw_files = []
expected_raw_fw_files = [
('extracted_file_from_scexe', True),
('any_file.bin', False),
('extracted_file_from_rpm', True)
]
_get_firmware_file_in_new_path_mock.side_effect = [
'extracted_file_from_scexe',
'extracted_file_from_rpm',
]
# | WHEN |
for fw_file in firmware_files:
fw_img_extractor = firmware_controller.get_fw_extractor(fw_file)
raw_fw_file, is_extracted = fw_img_extractor.extract()
actual_raw_fw_files.append((raw_fw_file, is_extracted))
# | THEN |
self.assertSequenceEqual(actual_raw_fw_files, expected_raw_fw_files)
@ddt.ddt
class FirmwareImageUploaderTestCase(unittest.TestCase):
def setUp(self):
# | BEFORE_EACH |
self.any_scexe_file = 'any_file.scexe'
self.any_rpm_file = 'any_file.rpm'
@mock.patch.object(firmware_controller.FirmwareImageUploader,
'_get_socket', autospec=True)
@mock.patch.object(firmware_controller, 'socket')
@mock.patch.object(__builtin__, 'open', autospec=True)
def test_upload_file_to_returns_cookie_after_successful_upload(
self, open_mock, socket_mock, _get_socket_mock):
# | GIVEN |
sock_mock = _get_socket_mock.return_value
sock_mock.read.side_effect = [b'data returned from socket with ',
b'Set-Cookie: blah_blah_cookie',
b'']
fw_img_uploader = (firmware_controller.
FirmwareImageUploader('any_raw_file'))
# | WHEN |
cookie = fw_img_uploader.upload_file_to(('host', 'port'), 60)
# | THEN |
self.assertEqual('blah_blah_cookie', cookie)
@mock.patch.object(firmware_controller.FirmwareImageUploader,
'_get_socket', autospec=True)
@mock.patch.object(firmware_controller, 'socket')
@mock.patch.object(__builtin__, 'open', autospec=True)
def test_upload_file_to_throws_exception_when_cookie_not_returned(
self, open_mock, socket_mock, _get_socket_mock):
# | GIVEN |
sock_mock = _get_socket_mock.return_value
sock_mock.read.side_effect = [b'data returned from socket with ',
b'No-Cookie',
b'']
fw_img_uploader = (firmware_controller.
FirmwareImageUploader('any_raw_file'))
# | WHEN | & | THEN |
self.assertRaises(exception.IloError, fw_img_uploader.upload_file_to,
('host', 'port'), 60)
@mock.patch.object(firmware_controller, 'socket')
@mock.patch.object(firmware_controller, 'ssl')
def test__get_socket_returns_ssl_wrapped_socket_if_all_goes_well(
self, ssl_mock, socket_mock):
# | GIVEN |
socket_mock.getaddrinfo().__iter__.return_value = [
# (family, socktype, proto, canonname, sockaddr),
(10, 1, 6, '', ('2606:2800:220:1:248:1893:25c8:1946', 80, 0, 0)),
(2, 1, 6, '', ('0.0.0.0-some-address', 80)),
]
fw_img_uploader = (firmware_controller.
FirmwareImageUploader('any_raw_file'))
fw_img_uploader.hostname = 'host'
fw_img_uploader.port = 443
fw_img_uploader.timeout = 'timeout'
# | WHEN |
returned_sock = fw_img_uploader._get_socket()
# | THEN |
socket_mock.socket.assert_has_calls([
mock.call(10, 1, 6),
mock.call().settimeout('timeout'),
mock.call().connect(
('2606:2800:220:1:248:1893:25c8:1946', 80, 0, 0)),
mock.call(2, 1, 6),
mock.call().settimeout('timeout'),
mock.call().connect(('0.0.0.0-some-address', 80)),
])
self.assertTrue(ssl_mock.wrap_socket.called)
self.assertEqual(returned_sock, ssl_mock.wrap_socket())
@ddt.data(('foo.bar.com', exception.IloConnectionError),
('1.1.1.1', exception.IloConnectionError),
('any_kind_of_address', exception.IloConnectionError),)
@ddt.unpack
def test__get_socket_throws_exception_in_case_of_failed_connection(
self, input_hostname, expected_exception_type):
# | GIVEN |
fw_img_uploader = (firmware_controller.
FirmwareImageUploader('any_raw_file'))
fw_img_uploader.hostname = input_hostname
fw_img_uploader.port = 443
fw_img_uploader.timeout = 1
# | WHEN | & | THEN |
self.assertRaises(expected_exception_type, fw_img_uploader._get_socket)

View File

@ -16,7 +16,9 @@
"""Test class for RIBCL Module."""
import json
import re
import unittest
import xml.etree.ElementTree as ET
import mock
import requests
@ -707,6 +709,80 @@ class IloRibclTestCase(unittest.TestCase):
self.assertRaises(exception.IloError, self.ilo.activate_license, 'key')
self.assertTrue(request_mock.called)
@mock.patch.object(
ribcl.firmware_controller.FirmwareImageUploader, 'upload_file_to')
@mock.patch.object(ribcl, 'os', autospec=True)
@mock.patch.object(ribcl.IloClient, '_request_ilo', autospec=True)
@mock.patch.object(ribcl.IloClient, '_parse_output', autospec=True)
@mock.patch.object(common, 'wait_for_ribcl_firmware_update_to_complete',
lambda x: None)
def test_update_ilo_firmware(self, _parse_output_mock, _request_ilo_mock,
os_mock, upload_file_to_mock):
# | GIVEN |
upload_file_to_mock.return_value = 'hickory-dickory-dock'
os_mock.path.getsize.return_value = 12345
# | WHEN |
self.ilo.update_firmware('raw_fw_file.bin', 'ilo')
# | THEN |
upload_file_to_mock.assert_called_once_with(
(self.ilo.host, self.ilo.port), self.ilo.timeout)
root_xml_string = constants.UPDATE_ILO_FIRMWARE_INPUT_XML % (
self.ilo.password, self.ilo.login, 12345, 'raw_fw_file.bin')
root_xml_string = re.sub('\n\s*', '', root_xml_string)
((ribcl_obj, xml_elem), the_ext_header_dict) = (
_request_ilo_mock.call_args)
self.assertEqual(root_xml_string,
ET.tostring(xml_elem).decode('latin-1'))
self.assertDictEqual(the_ext_header_dict['extra_headers'],
{'Cookie': 'hickory-dickory-dock'})
_parse_output_mock.assert_called_once_with(
self.ilo, _request_ilo_mock.return_value)
@mock.patch.object(
ribcl.firmware_controller.FirmwareImageUploader, 'upload_file_to')
@mock.patch.object(ribcl, 'os', autospec=True)
@mock.patch.object(ribcl.IloClient, '_request_ilo', autospec=True)
@mock.patch.object(ribcl.IloClient, '_parse_output', autospec=True)
@mock.patch.object(common, 'wait_for_ribcl_firmware_update_to_complete',
lambda x: None)
def test_update_other_component_firmware(self, _parse_output_mock,
_request_ilo_mock, os_mock,
upload_file_to_mock):
# | GIVEN |
upload_file_to_mock.return_value = 'hickory-dickory-dock'
os_mock.path.getsize.return_value = 12345
# | WHEN |
self.ilo.update_firmware('raw_fw_file.bin', 'power_pic')
# | THEN |
upload_file_to_mock.assert_called_once_with(
(self.ilo.host, self.ilo.port), self.ilo.timeout)
root_xml_string = constants.UPDATE_NONILO_FIRMWARE_INPUT_XML % (
self.ilo.password, self.ilo.login, 12345, 'raw_fw_file.bin')
root_xml_string = re.sub('\n\s*', '', root_xml_string)
((ribcl_obj, xml_elem), the_ext_header_dict) = (
_request_ilo_mock.call_args)
self.assertEqual(root_xml_string,
ET.tostring(xml_elem).decode('latin-1'))
self.assertDictEqual(the_ext_header_dict['extra_headers'],
{'Cookie': 'hickory-dickory-dock'})
_parse_output_mock.assert_called_once_with(
self.ilo, _request_ilo_mock.return_value)
def test_update_firmware_throws_error_for_invalid_component(self):
# | WHEN | & | THEN |
self.assertRaises(exception.InvalidInputError,
self.ilo.update_firmware,
'raw_fw_file.bin',
'invalid_component')
class IloRibclTestCaseBeforeRisSupport(unittest.TestCase):

View File

@ -806,6 +806,108 @@ class IloRisTestCase(testtools.TestCase):
self.client.update_persistent_boot, ['fake'])
self.assertFalse(update_persistent_boot_mock.called)
def test_update_firmware_throws_error_for_invalid_component(self):
# | WHEN | & | THEN |
self.assertRaises(exception.InvalidInputError,
self.client.update_firmware,
'fw_file_url',
'invalid_component')
@mock.patch.object(ris.RISOperations,
'_get_firmware_update_service_resource',
autospec=True)
@mock.patch.object(ris.RISOperations, '_rest_post', autospec=True)
@mock.patch.object(ris.common, 'wait_for_ris_firmware_update_to_complete',
autospec=True)
@mock.patch.object(ris.RISOperations, 'get_firmware_update_progress',
autospec=True)
def test_update_firmware(
self, get_firmware_update_progress_mock,
wait_for_ris_firmware_update_to_complete_mock, _rest_post_mock,
_get_firmware_update_service_resource_mock):
# | GIVEN |
_rest_post_mock.return_value = 200, 'some-headers', 'response'
get_firmware_update_progress_mock.return_value = 'COMPLETED', 100
# | WHEN |
self.client.update_firmware('fw_file_url', 'ilo')
# | THEN |
_get_firmware_update_service_resource_mock.assert_called_once_with(
self.client)
_rest_post_mock.assert_called_once_with(
self.client, mock.ANY, None, {'Action': 'InstallFromURI',
'FirmwareURI': 'fw_file_url',
})
wait_for_ris_firmware_update_to_complete_mock.assert_called_once_with(
self.client)
get_firmware_update_progress_mock.assert_called_once_with(
self.client)
@mock.patch.object(
ris.RISOperations, '_get_firmware_update_service_resource',
autospec=True)
@mock.patch.object(ris.RISOperations, '_rest_post', autospec=True)
def test_update_firmware_throws_if_post_operation_fails(
self, _rest_post_mock, _get_firmware_update_service_resource_mock):
# | GIVEN |
_rest_post_mock.return_value = 500, 'some-headers', 'response'
# | WHEN | & | THEN |
self.assertRaises(exception.IloError,
self.client.update_firmware,
'fw_file_url',
'cpld')
@mock.patch.object(ris.RISOperations,
'_get_firmware_update_service_resource',
autospec=True)
@mock.patch.object(ris.RISOperations, '_rest_post', autospec=True)
@mock.patch.object(ris.common, 'wait_for_ris_firmware_update_to_complete',
autospec=True)
@mock.patch.object(ris.RISOperations, 'get_firmware_update_progress',
autospec=True)
def test_update_firmware_throws_if_error_occurs_in_update(
self, get_firmware_update_progress_mock,
wait_for_ris_firmware_update_to_complete_mock, _rest_post_mock,
_get_firmware_update_service_resource_mock):
# | GIVEN |
_rest_post_mock.return_value = 200, 'some-headers', 'response'
get_firmware_update_progress_mock.return_value = 'ERROR', 0
# | WHEN | & | THEN |
self.assertRaises(exception.IloError,
self.client.update_firmware,
'fw_file_url',
'ilo')
@mock.patch.object(ris.RISOperations,
'_get_firmware_update_service_resource',
autospec=True)
@mock.patch.object(ris.RISOperations, '_rest_get', autospec=True)
def test_get_firmware_update_progress(
self, _rest_get_mock,
_get_firmware_update_service_resource_mock):
# | GIVEN |
_rest_get_mock.return_value = (200, 'some-headers',
{'State': 'COMPLETED',
'ProgressPercent': 100})
# | WHEN |
state, percent = self.client.get_firmware_update_progress()
# | THEN |
_get_firmware_update_service_resource_mock.assert_called_once_with(
self.client)
_rest_get_mock.assert_called_once_with(self.client, mock.ANY)
self.assertTupleEqual((state, percent), ('COMPLETED', 100))
@mock.patch.object(ris.RISOperations,
'_get_firmware_update_service_resource',
autospec=True)
@mock.patch.object(ris.RISOperations, '_rest_get', autospec=True)
def test_get_firmware_update_progress_throws_if_get_operation_fails(
self, _rest_get_mock, _get_firmware_update_service_resource_mock):
# | GIVEN |
_rest_get_mock.return_value = 500, 'some-headers', 'response'
# | WHEN | & | THEN |
self.assertRaises(exception.IloError,
self.client.get_firmware_update_progress)
class TestRISOperationsPrivateMethods(testtools.TestCase):
@ -1502,3 +1604,32 @@ class TestRISOperationsPrivateMethods(testtools.TestCase):
self.client._get_persistent_boot_devices)
check_bios_mock.assert_called_once_with()
boot_mock.assert_called_once_with(bios_settings)
@mock.patch.object(ris.RISOperations, '_get_ilo_details', autospec=True)
def test__get_firmware_update_service_resource_traverses_manager_as(
self, _get_ilo_details_mock):
# | GIVEN |
manager_mock = mock.MagicMock(spec=dict, autospec=True)
_get_ilo_details_mock.return_value = (manager_mock, 'some_uri')
# | WHEN |
self.client._get_firmware_update_service_resource()
# | THEN |
manager_mock.__getitem__.assert_called_once_with('Oem')
manager_mock.__getitem__().__getitem__.assert_called_once_with('Hp')
(manager_mock.__getitem__().__getitem__().__getitem__.
assert_called_once_with('links'))
(manager_mock.__getitem__().__getitem__().__getitem__().
__getitem__.assert_called_once_with('UpdateService'))
(manager_mock.__getitem__().__getitem__().__getitem__().
__getitem__().__getitem__.assert_called_once_with('href'))
@mock.patch.object(ris.RISOperations, '_get_ilo_details', autospec=True)
def test__get_firmware_update_service_resource_throws_if_not_found(
self, _get_ilo_details_mock):
# | GIVEN |
manager_mock = mock.MagicMock(spec=dict)
_get_ilo_details_mock.return_value = (manager_mock, 'some_uri')
manager_mock.__getitem__.side_effect = KeyError('not found')
# | WHEN | & | THEN |
self.assertRaises(exception.IloCommandNotSupportedError,
self.client._get_firmware_update_service_resource)

View File

@ -0,0 +1,110 @@
# Copyright 2016 Hewlett Packard Enterprise Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Test class for Utils Module."""
import mock
import testtools
from proliantutils import exception
from proliantutils.ilo import client
from proliantutils.ilo import firmware_controller
from proliantutils.ilo import ribcl
from proliantutils import utils
class UtilsTestCase(testtools.TestCase):
@mock.patch.object(ribcl.RIBCLOperations, 'get_product_name')
def setUp(self, product_mock):
super(UtilsTestCase, self).setUp()
product_mock.return_value = 'Gen8'
self.some_compact_fw_file = 'some_compact_fw_file.scexe'
self.client = client.IloClient("1.2.3.4", "admin", "Admin")
@mock.patch.object(firmware_controller, 'get_fw_extractor',
spec_set=True, autospec=True)
def test_process_firmware_image_throws_for_unknown_firmware_file_format(
self, get_extractor_mock):
# | GIVEN |
get_extractor_mock.side_effect = exception.InvalidInputError
# | WHEN | & | THEN |
self.assertRaises(exception.InvalidInputError,
utils.process_firmware_image,
'invalid_compact_fw_file',
self.client)
@mock.patch.object(firmware_controller, 'get_fw_extractor',
spec_set=True, autospec=True)
def test_process_firmware_image_throws_for_failed_extraction(
self, get_extractor_mock):
# | GIVEN |
exc = exception.ImageExtractionFailed(
image_ref='some_file', reason='God only knows!')
get_extractor_mock.return_value.extract.side_effect = exc
# | WHEN | & | THEN |
self.assertRaises(exception.ImageExtractionFailed,
utils.process_firmware_image,
self.some_compact_fw_file,
self.client)
@mock.patch.object(firmware_controller, 'get_fw_extractor',
spec_set=True, autospec=True)
def test_process_firmware_image_calls_extract_of_fw_extractor_object(
self, get_extractor_mock):
# process_firmware_image calls extract on the firmware_extractor
# instance
# | GIVEN |
get_extractor_mock.return_value.extract.return_value = (
'core_fw_file.bin', True)
# | WHEN |
raw_fw_file, to_upload, is_extracted = (
utils.process_firmware_image(self.some_compact_fw_file,
self.client))
# | THEN |
get_extractor_mock.assert_called_once_with(self.some_compact_fw_file)
get_extractor_mock.return_value.extract.assert_called_once_with()
@mock.patch.object(firmware_controller, 'get_fw_extractor',
spec_set=True, autospec=True)
def test_process_firmware_image_asks_not_to_upload_firmware_file(
self, get_extractor_mock):
# | GIVEN |
get_extractor_mock.return_value.extract.return_value = (
'core_fw_file.bin', True)
self.client.model = 'Gen8'
# | WHEN |
raw_fw_file, to_upload, is_extracted = (
utils.process_firmware_image(self.some_compact_fw_file,
self.client))
# | THEN |
self.assertEqual('core_fw_file.bin', raw_fw_file)
self.assertFalse(to_upload)
@mock.patch.object(firmware_controller, 'get_fw_extractor',
spec_set=True, autospec=True)
def test_process_firmware_image_asks_to_upload_firmware_file(
self, get_extractor_mock):
# if fw_version is greater than or equal to 2.0
# | GIVEN |
get_extractor_mock.return_value.extract.return_value = (
'core_fw_file.bin', True)
self.client.model = 'Gen9'
# | WHEN |
raw_fw_file, to_upload, is_extracted = (
utils.process_firmware_image(self.some_compact_fw_file,
self.client))
# | THEN |
self.assertEqual('core_fw_file.bin', raw_fw_file)
self.assertTrue(to_upload)

65
proliantutils/utils.py Normal file
View File

@ -0,0 +1,65 @@
# Copyright 2016 Hewlett Packard Enterprise Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Non-iLO related utilities and helper functions.
"""
from proliantutils.ilo import firmware_controller
from proliantutils import log
LOG = log.get_logger(__name__)
def process_firmware_image(compact_firmware_file, ilo_object):
"""Processes the firmware file.
Processing the firmware file entails extracting the firmware file from its
compact format. Along with the raw (extracted) firmware file, this method
also sends out information of whether or not the extracted firmware file
a) needs to be uploaded to http store
b) is extracted in reality or the file was already in raw format
:param compact_firmware_file: firmware file to extract from
:param ilo_object: ilo client object (ribcl/ris object)
:raises: InvalidInputError, for unsupported file types or raw firmware
file not found from compact format.
:raises: ImageExtractionFailed, for extraction related issues
:returns: core(raw) firmware file
:returns: to_upload, boolean to indicate whether to upload or not
:returns: is_extracted, boolean to indicate firmware image is actually
extracted or not.
"""
fw_img_extractor = firmware_controller.get_fw_extractor(
compact_firmware_file)
LOG.debug('Extracting firmware file: %s ...', compact_firmware_file)
raw_fw_file_path, is_extracted = fw_img_extractor.extract()
# Note(deray): Need to check if this processing is for RIS or RIBCL
# based systems. For Gen9 machines (RIS based) the firmware file needs
# to be on a http store, and hence requires the upload to happen for the
# firmware file.
to_upload = False
if 'Gen9' in ilo_object.model:
to_upload = True
LOG.debug('Extracting firmware file: %s ... done', compact_firmware_file)
msg = ('Firmware file %(fw_file)s is %(msg)s. Need hosting (on an http '
'store): %(yes_or_no)s' %
{'fw_file': compact_firmware_file,
'msg': ('extracted. Extracted file: %s' % raw_fw_file_path
if is_extracted else 'already in raw format'),
'yes_or_no': 'Yes' if to_upload else 'No'})
LOG.info(msg)
return raw_fw_file_path, to_upload, is_extracted