Add HPSUM firmware update support

This commit adds the support to perform hpsum
firmware update on the node using SPP (Service
Pack for Proliant) iso.

Closes-Bug: #1657492
Change-Id: If76ae30f4007cb468fabf95d110cd16f272f8bcd
This commit is contained in:
Aparna 2017-01-18 12:46:44 +00:00
parent 2c3ebf9107
commit 819690d093
10 changed files with 830 additions and 0 deletions

View File

@ -172,3 +172,31 @@ class IloSNMPExceptionFailure(IloError):
message = self.message % kwargs
super(IloSNMPExceptionFailure, self).__init__(message)
class ImageRefValidationFailed(ProliantUtilsException):
message = ("Validation of image href %(image_href)s failed, "
"reason: %(reason)s")
def __init__(self, message=None, **kwargs):
if not message:
message = self.message % kwargs
super(ImageRefValidationFailed, self).__init__(message)
class HpsumOperationError(ProliantUtilsException):
"""Hpsum based firmware update operation error.
This exception is used when a problem is encountered in
executing a hpsum operation.
"""
message = ("An error occurred while performing hpsum based firmware "
"update, reason: %(reason)s")
def __init__(self, message=None, **kwargs):
if not message:
message = self.message % kwargs
super(HpsumOperationError, self).__init__(message)

View File

View File

@ -0,0 +1,197 @@
# Copyright 2017 Hewlett Packard Enterprise Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fnmatch
import os
import re
import shutil
import tempfile
import time
from oslo_concurrency import processutils
from proliantutils import exception
from proliantutils.ilo import client
from proliantutils import utils
OUTPUT_FILE = '/var/hp/log/localhost/hpsum_log.txt'
HPSUM_LOCATION = 'hp/swpackages/hpsum'
EXIT_CODE_TO_STRING = {
0: "The smart component was installed successfully.",
1: ("The smart component was installed successfully, but the system "
"must be restarted."),
3: ("The smart component was not installed. Node is already "
"up-to-date."),
253: "The installation of the component failed."
}
def _execute_hpsum(hpsum_file_path, components=None):
"""Executes the hpsum firmware update command.
This method executes the hpsum firmware update command to update the
components specified, if not, it performs update on all the firmware
components on th server.
:param hpsum_file_path: A string with the path to the hpsum binary to be
executed
:param components: A list of components to be updated. If it is None, all
the firmware components are updated.
:returns: A string with the statistics of the updated/failed components.
:raises: HpsumOperationError, when the hpsum firmware update operation on
the node fails.
"""
cmd = ' --c ' + ' --c '.join(components) if components else ''
try:
processutils.execute(hpsum_file_path, "--s", "--romonly", cmd)
except processutils.ProcessExecutionError as e:
result = _parse_hpsum_ouput(e.exit_code)
if result:
return result
else:
msg = ("Unable to perform hpsum firmware update on the node. "
"Error: " + str(e))
raise exception.HpsumOperationError(reason=msg)
def _parse_hpsum_ouput(exit_code):
"""Parse the hpsum output log file.
This method parses through the hpsum log file in the
default location to return the hpsum update status. Sample return
string:
"Summary: The installation of the component failed. Status of updated
components: Total: 5 Success: 4 Failed: 1"
:param exit_code: A integer returned by the hpsum after command execution.
:returns: A string with the statistics of the updated/failed
components and 'None' when the exit_code is not 0, 1, 3 or 253.
"""
if exit_code == 3:
return "Summary: %s" % EXIT_CODE_TO_STRING.get(exit_code)
if exit_code in (0, 1, 253):
if os.path.exists(OUTPUT_FILE):
with open(OUTPUT_FILE, 'r') as f:
output_data = f.read()
ret_data = output_data[(output_data.find('Deployed Components:') +
len('Deployed Components:')):
output_data.find('Exit status:')]
failed = 0
success = 0
for line in re.split('\n\n', ret_data):
if line:
if 'Success' not in line:
failed += 1
else:
success += 1
return ("Summary: %(return_string)s Status of updated components:"
" Total: %(total)s Success: %(success)s Failed: "
"%(failed)s." %
{'return_string': EXIT_CODE_TO_STRING.get(exit_code),
'total': (success + failed), 'success': success,
'failed': failed})
return "UPDATE STATUS: UNKNOWN"
def update_firmware(node):
"""Performs hpsum firmware update on the node.
This method performs hpsum firmware update by mounting the
SPP ISO on the node. It performs firmware update on all or
some of the firmware components.
:param node: A node object of type dict.
:returns: Operation Status string.
:raises: HpsumOperationError, when the vmedia device is not found or
when the mount operation fails or when the image validation fails.
:raises: IloConnectionError, when the iLO connection fails.
:raises: IloError, when vmedia eject or insert operation fails.
"""
hpsum_update_iso = node['clean_step']['args']['firmware_images'][0].get(
'url')
# Validates the http image reference for hpsum update ISO.
try:
utils.validate_href(hpsum_update_iso)
except exception.ImageRefValidationFailed as e:
raise exception.HpsumOperationError(reason=e)
# Ejects the CDROM device in the iLO and inserts the hpsum update ISO
# to the CDROM device.
info = node.get('driver_info')
ilo_object = client.IloClient(info.get('ilo_address'),
info.get('ilo_username'),
info.get('ilo_password'))
ilo_object.eject_virtual_media('CDROM')
ilo_object.insert_virtual_media(hpsum_update_iso, 'CDROM')
# Waits for the OS to detect the disk and update the label file. SPP ISO
# is identified by matching its label.
time.sleep(5)
vmedia_device_dir = "/dev/disk/by-label/"
for file in os.listdir(vmedia_device_dir):
if fnmatch.fnmatch(file, 'SPP*'):
vmedia_device_file = os.path.join(vmedia_device_dir, file)
if not os.path.exists(vmedia_device_file):
msg = "Unable to find the virtual media device for HPSUM"
raise exception.HpsumOperationError(reason=msg)
# Validates the SPP ISO image for any file corruption using the checksum
# of the ISO file.
expected_checksum = node['clean_step']['args']['firmware_images'][0].get(
'checksum')
try:
utils.verify_image_checksum(vmedia_device_file, expected_checksum)
except exception.ImageRefValidationFailed as e:
raise exception.HpsumOperationError(reason=e)
# Mounts SPP ISO on a temporary directory.
vmedia_mount_point = tempfile.mkdtemp()
try:
try:
processutils.execute("mount", vmedia_device_file,
vmedia_mount_point)
except processutils.ProcessExecutionError as e:
msg = ("Unable to mount virtual media device %(device)s: "
"%(error)s" % {'device': vmedia_device_file, 'error': e})
raise exception.HpsumOperationError(reason=msg)
# Executes the hpsum based firmware update by passing the default hpsum
# executable path and the components specified, if any.
hpsum_file_path = os.path.join(vmedia_mount_point, HPSUM_LOCATION)
components = node['clean_step']['args']['firmware_images'][0].get(
'component')
if components:
components = components.strip().split(',')
result = _execute_hpsum(hpsum_file_path, components=components)
processutils.trycmd("umount", vmedia_mount_point)
finally:
shutil.rmtree(vmedia_mount_point, ignore_errors=True)
return result

View File

@ -16,6 +16,7 @@ from ironic_python_agent import hardware
from proliantutils import exception
from proliantutils.hpssa import manager as hpssa_manager
from proliantutils.hpsum import hpsum_controller
class ProliantHardwareManager(hardware.GenericHardwareManager):
@ -44,6 +45,9 @@ class ProliantHardwareManager(hardware.GenericHardwareManager):
'priority': 0},
{'step': 'erase_devices',
'interface': 'deploy',
'priority': 0},
{'step': 'update_firmware',
'interface': 'management',
'priority': 0}]
def evaluate_hardware_support(cls):
@ -102,3 +106,16 @@ class ProliantHardwareManager(hardware.GenericHardwareManager):
self).erase_devices(node, port))
return result
def update_firmware(self, node, port):
"""Performs HPSUM based firmware update on the bare metal node.
This method performs firmware update on all or some of the firmware
components on the bare metal node.
:returns: A string with return code and the statistics of
updated/failed components.
:raises: HpsumOperationError, when the hpsum firmware update operation
on the node fails.
"""
return hpsum_controller.update_firmware(node)

View File

View File

@ -0,0 +1,103 @@
# Copyright 2017 Hewlett Packard Enterprise Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
MODULE = "HPSUM"
HPSUM_OUTPUT_DATA = """
Scouting completed, node type:LINUX
Inventory started
Inventory completed
Analysis started
Analysis completed
Analysis started
Analysis completed
Deployment started
Deploying component: hpsmh-7.6.0-11.x86_64.rpm
Component Filename: hpsmh-7.6.0-11.x86_64.rpm
Component Name: HPE System Management Homepage for Linux (AMD64/EM64T)
Version: 7.6.0-11
Deployment Result: Success
Deploying component: ssaducli-2.60-18.0.x86_64.rpm
Component Filename: ssaducli-2.60-18.0.x86_64.rpm
Component Name: HPE Smart Storage Administrator Diagnostic Utility
Version: 2.60-18.0
Deployment Result: Success
Deployment completed
Deployed Components:
Component Filename: hpsmh-7.6.0-11.x86_64.rpm
Component Name: HPE System Management Homepage for Linux (AMD64/EM64T)
Original Version:
New Version: 7.6.0-11
Deployment Result: Success
Component Filename: ssaducli-2.60-18.0.x86_64.rpm
Component Name: HPE Smart Storage Administrator Diagnostic Utility
Original Version:
New Version: 2.60-18.0
Deployment Result: Success
Exit status: 0
"""
HPSUM_OUTPUT_DATA_FAILURE = """
Scouting completed, node type:LINUX
Inventory started
Inventory completed
Analysis started
Analysis completed
Analysis started
Analysis completed
Deployment started
Deploying component: hpsmh-7.6.0-11.x86_64.rpm
Component Filename: hpsmh-7.6.0-11.x86_64.rpm
Component Name: HPE System Management Homepage for Linux (AMD64/EM64T)
Version: 7.6.0-11
Deployment Result: Success
Deploying component: ssaducli-2.60-18.0.x86_64.rpm
Component Filename: ssaducli-2.60-18.0.x86_64.rpm
Component Name: HPE Smart Storage Administrator Diagnostic Utility
Version: 2.60-18.0
Deployment Result: Success
Deployment completed
Deployed Components:
Component Filename: hpsmh-7.6.0-11.x86_64.rpm
Component Name: HPE System Management Homepage for Linux (AMD64/EM64T)
Original Version:
New Version: 7.6.0-11
Deployment Result: Success
Component Filename: ssaducli-2.60-18.0.x86_64.rpm
Component Name: HPE Smart Storage Administrator Diagnostic Utility
Original Version:
New Version: 2.60-18.0
Deployment Result: Update returned an error
Exit status: 0
"""

View File

@ -0,0 +1,259 @@
# Copyright 2017 Hewlett Packard Enterprise Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import shutil
import tempfile
import mock
from oslo_concurrency import processutils
import testtools
from proliantutils import exception
from proliantutils.hpsum import hpsum_controller
from proliantutils.ilo import client as ilo_client
from proliantutils.tests.hpsum import hpsum_sample_output as constants
from proliantutils import utils
class HpsumFirmwareUpdateTest(testtools.TestCase):
def setUp(self):
super(HpsumFirmwareUpdateTest, self).setUp()
self.info = {'ilo_address': '1.2.3.4',
'ilo_password': '12345678',
'ilo_username': 'admin'}
clean_step = {
'interface': 'management',
'step': 'update_firmware',
'args': {'firmware_update_mode': u'hpsum',
'firmware_images': [{'url': 'http://1.2.3.4/SPP.iso',
'checksum': '1234567890'}]}}
self.node = {'driver_info': self.info,
'clean_step': clean_step}
@mock.patch.object(hpsum_controller, 'open',
mock.mock_open(read_data=constants.HPSUM_OUTPUT_DATA))
@mock.patch.object(os.path, 'exists')
@mock.patch.object(processutils, 'execute')
def test_execute_hpsum(self, execute_mock, exists_mock):
exists_mock.return_value = True
value = ("hpsum_service_x64 started successfully. Sending Shutdown "
"request to engine. Successfully shutdown the service.")
execute_mock.side_effect = processutils.ProcessExecutionError(
stdout=value, stderr=None, exit_code=0)
ret_value = ("Summary: The smart component was installed successfully."
" Status of updated components: Total: 2 Success: 2 "
"Failed: 0.")
stdout = hpsum_controller._execute_hpsum("hpsum", components=None)
self.assertEqual(ret_value, stdout)
execute_mock.assert_called_once_with("hpsum", "--s", "--romonly", "")
@mock.patch.object(processutils, 'execute')
def test_execute_hpsum_with_args(self, execute_mock):
value = ("hpsum_service_x64 started successfully. Sending Shutdown "
"request to engine. Successfully shutdown the service.")
execute_mock.side_effect = processutils.ProcessExecutionError(
stdout=value, stderr=None, exit_code=3)
ret_value = ("Summary: The smart component was not installed. Node is "
"already up-to-date.")
stdout = hpsum_controller._execute_hpsum("hpsum",
components=["foo", "bar"])
execute_mock.assert_called_once_with(
"hpsum", "--s", "--romonly", " --c foo --c bar")
self.assertEqual(ret_value, stdout)
@mock.patch.object(
hpsum_controller, 'open',
mock.mock_open(read_data=constants.HPSUM_OUTPUT_DATA_FAILURE))
@mock.patch.object(os.path, 'exists')
@mock.patch.object(processutils, 'execute')
def test_execute_hpsum_update_fails(self, execute_mock, exists_mock):
exists_mock.return_value = True
ret = ("Summary: The installation of the component failed. Status "
"of updated components: Total: 2 Success: 1 Failed: 1.")
value = ("hpsum_service_x64 started successfully. Sending Shutdown "
"request to engine. Successfully shutdown the service.")
execute_mock.side_effect = processutils.ProcessExecutionError(
stdout=value, stderr=None, exit_code=253)
stdout = hpsum_controller._execute_hpsum("hpsum", components=None)
self.assertEqual(ret, stdout)
execute_mock.assert_called_once_with("hpsum", "--s", "--romonly", "")
@mock.patch.object(os.path, 'exists')
@mock.patch.object(processutils, 'execute')
def test_execute_hpsum_fails(self, execute_mock, exists_mock):
exists_mock.return_value = False
value = ("Error: Cannot launch hpsum_service_x64 locally. Reason: "
"General failure.")
execute_mock.side_effect = processutils.ProcessExecutionError(
stdout=value, stderr=None, exit_code=255)
ex = self.assertRaises(exception.HpsumOperationError,
hpsum_controller._execute_hpsum, "hpsum",
None)
self.assertIn(value, str(ex))
@mock.patch.object(utils, 'validate_href')
@mock.patch.object(utils, 'verify_image_checksum')
@mock.patch.object(hpsum_controller, '_execute_hpsum')
@mock.patch.object(os, 'listdir')
@mock.patch.object(shutil, 'rmtree', autospec=True)
@mock.patch.object(tempfile, 'mkdtemp', autospec=True)
@mock.patch.object(os.path, 'exists')
@mock.patch.object(os, 'mkdir')
@mock.patch.object(processutils, 'execute')
@mock.patch.object(ilo_client, 'IloClient', spec_set=True, autospec=True)
def test_update_firmware(self, client_mock, execute_mock, mkdir_mock,
exists_mock, mkdtemp_mock, rmtree_mock,
listdir_mock, execute_hpsum_mock,
verify_image_mock, validate_mock):
ilo_mock_object = client_mock.return_value
eject_media_mock = ilo_mock_object.eject_virtual_media
insert_media_mock = ilo_mock_object.insert_virtual_media
execute_hpsum_mock.return_value = 'SUCCESS'
listdir_mock.return_value = ['SPP_LABEL']
mkdtemp_mock.return_value = "/tempdir"
null_output = ["", ""]
exists_mock.side_effect = [True, False]
execute_mock.side_effect = [null_output, null_output]
ret_val = hpsum_controller.update_firmware(self.node)
eject_media_mock.assert_called_once_with('CDROM')
insert_media_mock.assert_called_once_with('http://1.2.3.4/SPP.iso',
'CDROM')
execute_mock.assert_any_call('mount', "/dev/disk/by-label/SPP_LABEL",
"/tempdir")
execute_hpsum_mock.assert_any_call('/tempdir/hp/swpackages/hpsum',
components=None)
exists_mock.assert_called_once_with("/dev/disk/by-label/SPP_LABEL")
execute_mock.assert_any_call('umount', "/tempdir")
mkdtemp_mock.assert_called_once_with()
rmtree_mock.assert_called_once_with("/tempdir", ignore_errors=True)
self.assertEqual('SUCCESS', ret_val)
@mock.patch.object(utils, 'validate_href')
@mock.patch.object(ilo_client, 'IloClient', spec_set=True, autospec=True)
def test_update_firmware_vmedia_attach_fails(self, client_mock,
validate_mock):
ilo_mock_object = client_mock.return_value
eject_media_mock = ilo_mock_object.eject_virtual_media
value = ("Unable to attach hpsum SPP iso http://1.2.3.4/SPP.iso "
"to the iLO")
eject_media_mock.side_effect = exception.IloError(value)
exc = self.assertRaises(exception.IloError,
hpsum_controller.update_firmware, self.node)
self.assertEqual(value, str(exc))
@mock.patch.object(utils, 'validate_href')
@mock.patch.object(os.path, 'exists')
@mock.patch.object(os, 'listdir')
@mock.patch.object(ilo_client, 'IloClient', spec_set=True, autospec=True)
def test_update_firmware_device_file_not_found(self, client_mock,
listdir_mock, exists_mock,
validate_mock):
ilo_mock_object = client_mock.return_value
eject_media_mock = ilo_mock_object.eject_virtual_media
insert_media_mock = ilo_mock_object.insert_virtual_media
listdir_mock.return_value = ['SPP_LABEL']
exists_mock.return_value = False
msg = ("An error occurred while performing hpsum based firmware "
"update, reason: Unable to find the virtual media device "
"for HPSUM")
exc = self.assertRaises(exception.HpsumOperationError,
hpsum_controller.update_firmware, self.node)
self.assertEqual(msg, str(exc))
eject_media_mock.assert_called_once_with('CDROM')
insert_media_mock.assert_called_once_with('http://1.2.3.4/SPP.iso',
'CDROM')
exists_mock.assert_called_once_with("/dev/disk/by-label/SPP_LABEL")
@mock.patch.object(utils, 'validate_href')
@mock.patch.object(utils, 'verify_image_checksum')
@mock.patch.object(processutils, 'execute')
@mock.patch.object(tempfile, 'mkdtemp', autospec=True)
@mock.patch.object(os, 'mkdir')
@mock.patch.object(os.path, 'exists')
@mock.patch.object(os, 'listdir')
@mock.patch.object(ilo_client, 'IloClient', spec_set=True, autospec=True)
def test_update_firmware_mount_fails(self, client_mock, listdir_mock,
exists_mock, mkdir_mock,
mkdtemp_mock, execute_mock,
verify_image_mock, validate_mock):
ilo_mock_object = client_mock.return_value
eject_media_mock = ilo_mock_object.eject_virtual_media
insert_media_mock = ilo_mock_object.insert_virtual_media
listdir_mock.return_value = ['SPP_LABEL']
exists_mock.return_value = True
mkdtemp_mock.return_value = "/tempdir"
execute_mock.side_effect = processutils.ProcessExecutionError
msg = ("Unable to mount virtual media device "
"/dev/disk/by-label/SPP_LABEL")
exc = self.assertRaises(exception.HpsumOperationError,
hpsum_controller.update_firmware, self.node)
self.assertIn(msg, str(exc))
eject_media_mock.assert_called_once_with('CDROM')
insert_media_mock.assert_called_once_with('http://1.2.3.4/SPP.iso',
'CDROM')
exists_mock.assert_called_once_with("/dev/disk/by-label/SPP_LABEL")
@mock.patch.object(hpsum_controller, 'open',
mock.mock_open(read_data=constants.HPSUM_OUTPUT_DATA))
@mock.patch.object(os.path, 'exists')
def test__parse_hpsum_ouput(self, exists_mock):
exists_mock.return_value = True
expt_ret = ("Summary: The smart component was installed successfully. "
"Status of updated components: Total: 2 Success: 2 "
"Failed: 0.")
ret = hpsum_controller._parse_hpsum_ouput(0)
exists_mock.assert_called_once_with(hpsum_controller.OUTPUT_FILE)
self.assertEqual(expt_ret, ret)
@mock.patch.object(
hpsum_controller, 'open',
mock.mock_open(read_data=constants.HPSUM_OUTPUT_DATA_FAILURE))
@mock.patch.object(os.path, 'exists')
def test__parse_hpsum_ouput_some_failed(self, exists_mock):
exists_mock.return_value = True
expt_ret = ("Summary: The installation of the component failed. "
"Status of updated components: Total: 2 Success: 1 "
"Failed: 1.")
ret = hpsum_controller._parse_hpsum_ouput(253)
exists_mock.assert_called_once_with(hpsum_controller.OUTPUT_FILE)
self.assertEqual(expt_ret, ret)
@mock.patch.object(os.path, 'exists')
def test__parse_hpsum_ouput_fails(self, exists_mock):
exists_mock.return_value = False
expt_ret = ("UPDATE STATUS: UNKNOWN")
ret = hpsum_controller._parse_hpsum_ouput(1)
exists_mock.assert_called_once_with(hpsum_controller.OUTPUT_FILE)
self.assertEqual(expt_ret, ret)

View File

@ -39,6 +39,9 @@ class ProliantHardwareManagerTestCase(testtools.TestCase):
'priority': 0},
{'step': 'erase_devices',
'interface': 'deploy',
'priority': 0},
{'step': 'update_firmware',
'interface': 'management',
'priority': 0}],
self.hardware_manager.get_clean_steps("", ""))

View File

@ -14,7 +14,13 @@
# under the License.
"""Test class for Utils Module."""
import hashlib
import mock
import requests
import six
import six.moves.builtins as __builtin__
from six.moves import http_client
import testtools
from proliantutils import exception
@ -108,3 +114,131 @@ class UtilsTestCase(testtools.TestCase):
# | THEN |
self.assertEqual('core_fw_file.bin', raw_fw_file)
self.assertTrue(to_upload)
@mock.patch.object(utils, 'hashlib', autospec=True)
def test__get_hash_object(self, hashlib_mock):
algorithms_available = ('md5', 'sha1', 'sha224',
'sha256', 'sha384', 'sha512')
hashlib_mock.algorithms_guaranteed = algorithms_available
hashlib_mock.algorithms = algorithms_available
# | WHEN |
utils._get_hash_object('md5')
utils._get_hash_object('sha1')
utils._get_hash_object('sha224')
utils._get_hash_object('sha256')
utils._get_hash_object('sha384')
utils._get_hash_object('sha512')
# | THEN |
calls = [mock.call.md5(), mock.call.sha1(), mock.call.sha224(),
mock.call.sha256(), mock.call.sha384(), mock.call.sha512()]
hashlib_mock.assert_has_calls(calls)
def test__get_hash_object_throws_for_invalid_or_unsupported_hash_name(
self):
# | WHEN | & | THEN |
self.assertRaises(exception.InvalidInputError,
utils._get_hash_object,
'hickory-dickory-dock')
def test_hash_file_for_md5(self):
# | GIVEN |
data = b'Mary had a little lamb, its fleece as white as snow'
file_like_object = six.BytesIO(data)
expected = hashlib.md5(data).hexdigest()
# | WHEN |
actual = utils.hash_file(file_like_object) # using default, 'md5'
# | THEN |
self.assertEqual(expected, actual)
def test_hash_file_for_sha1(self):
# | GIVEN |
data = b'Mary had a little lamb, its fleece as white as snow'
file_like_object = six.BytesIO(data)
expected = hashlib.sha1(data).hexdigest()
# | WHEN |
actual = utils.hash_file(file_like_object, 'sha1')
# | THEN |
self.assertEqual(expected, actual)
def test_hash_file_for_sha512(self):
# | GIVEN |
data = b'Mary had a little lamb, its fleece as white as snow'
file_like_object = six.BytesIO(data)
expected = hashlib.sha512(data).hexdigest()
# | WHEN |
actual = utils.hash_file(file_like_object, 'sha512')
# | THEN |
self.assertEqual(expected, actual)
def test_hash_file_throws_for_invalid_or_unsupported_hash(self):
# | GIVEN |
data = b'Mary had a little lamb, its fleece as white as snow'
file_like_object = six.BytesIO(data)
# | WHEN | & | THEN |
self.assertRaises(exception.InvalidInputError, utils.hash_file,
file_like_object, 'hickory-dickory-dock')
@mock.patch.object(__builtin__, 'open', autospec=True)
def test_verify_image_checksum(self, open_mock):
# | GIVEN |
data = b'Yankee Doodle went to town riding on a pony;'
file_like_object = six.BytesIO(data)
open_mock().__enter__.return_value = file_like_object
actual_hash = hashlib.md5(data).hexdigest()
# | WHEN |
utils.verify_image_checksum(file_like_object, actual_hash)
# | THEN |
# no any exception thrown
def test_verify_image_checksum_throws_for_nonexistent_file(self):
# | GIVEN |
invalid_file_path = '/some/invalid/file/path'
# | WHEN | & | THEN |
self.assertRaises(exception.ImageRefValidationFailed,
utils.verify_image_checksum,
invalid_file_path, 'hash_xxx')
@mock.patch.object(__builtin__, 'open', autospec=True)
def test_verify_image_checksum_throws_for_failed_validation(self,
open_mock):
# | GIVEN |
data = b'Yankee Doodle went to town riding on a pony;'
file_like_object = six.BytesIO(data)
open_mock().__enter__.return_value = file_like_object
invalid_hash = 'invalid_hash_value'
# | WHEN | & | THEN |
self.assertRaises(exception.ImageRefValidationFailed,
utils.verify_image_checksum,
file_like_object,
invalid_hash)
@mock.patch.object(requests, 'head', autospec=True)
def test_validate_href(self, head_mock):
href = 'http://1.2.3.4/abc.iso'
response = head_mock.return_value
response.status_code = http_client.OK
utils.validate_href(href)
head_mock.assert_called_once_with(href)
response.status_code = http_client.NO_CONTENT
self.assertRaises(exception.ImageRefValidationFailed,
utils.validate_href,
href)
response.status_code = http_client.BAD_REQUEST
self.assertRaises(exception.ImageRefValidationFailed,
utils.validate_href, href)
@mock.patch.object(requests, 'head', autospec=True)
def test_validate_href_error_code(self, head_mock):
href = 'http://1.2.3.4/abc.iso'
head_mock.return_value.status_code = http_client.BAD_REQUEST
self.assertRaises(exception.ImageRefValidationFailed,
utils.validate_href, href)
head_mock.assert_called_once_with(href)
@mock.patch.object(requests, 'head', autospec=True)
def test_validate_href_error(self, head_mock):
href = 'http://1.2.3.4/abc.iso'
head_mock.side_effect = requests.ConnectionError()
self.assertRaises(exception.ImageRefValidationFailed,
utils.validate_href, href)
head_mock.assert_called_once_with(href)

View File

@ -15,7 +15,13 @@
"""
Non-iLO related utilities and helper functions.
"""
import hashlib
import requests
import six
from six.moves import http_client
from proliantutils import exception
from proliantutils.ilo import firmware_controller
from proliantutils import log
@ -63,3 +69,86 @@ def process_firmware_image(compact_firmware_file, ilo_object):
'yes_or_no': 'Yes' if to_upload else 'No'})
LOG.info(msg)
return raw_fw_file_path, to_upload, is_extracted
def _get_hash_object(hash_algo_name):
"""Create a hash object based on given algorithm.
:param hash_algo_name: name of the hashing algorithm.
:raises: InvalidInputError, on unsupported or invalid input.
:returns: a hash object based on the given named algorithm.
"""
algorithms = (hashlib.algorithms_guaranteed if six.PY3
else hashlib.algorithms)
if hash_algo_name not in algorithms:
msg = ("Unsupported/Invalid hash name '%s' provided."
% hash_algo_name)
raise exception.InvalidInputError(msg)
return getattr(hashlib, hash_algo_name)()
def hash_file(file_like_object, hash_algo='md5'):
"""Generate a hash for the contents of a file.
It returns a hash of the file object as a string of double length,
containing only hexadecimal digits. It supports all the algorithms
hashlib does.
:param file_like_object: file like object whose hash to be calculated.
:param hash_algo: name of the hashing strategy, default being 'md5'.
:raises: InvalidInputError, on unsupported or invalid input.
:returns: a condensed digest of the bytes of contents.
"""
checksum = _get_hash_object(hash_algo)
for chunk in iter(lambda: file_like_object.read(32768), b''):
checksum.update(chunk)
return checksum.hexdigest()
def verify_image_checksum(image_location, expected_checksum):
"""Verifies checksum (md5) of image file against the expected one.
This method generates the checksum of the image file on the fly and
verifies it against the expected checksum provided as argument.
:param image_location: location of image file whose checksum is verified.
:param expected_checksum: checksum to be checked against
:raises: ImageRefValidationFailed, if invalid file path or
verification fails.
"""
try:
with open(image_location, 'rb') as fd:
actual_checksum = hash_file(fd)
except IOError as e:
raise exception.ImageRefValidationFailed(image_href=image_location,
reason=e)
if actual_checksum != expected_checksum:
msg = ('Error verifying image checksum. Image %(image)s failed to '
'verify against checksum %(checksum)s. Actual checksum is: '
'%(actual_checksum)s' %
{'image': image_location, 'checksum': expected_checksum,
'actual_checksum': actual_checksum})
raise exception.ImageRefValidationFailed(image_href=image_location,
reason=msg)
def validate_href(image_href):
"""Validate HTTP image reference.
:param image_href: Image reference.
:raises: exception.ImageRefValidationFailed if HEAD request failed or
returned response code not equal to 200.
:returns: Response to HEAD request.
"""
try:
response = requests.head(image_href)
if response.status_code != http_client.OK:
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=("Got HTTP code %s instead of 200 in response to "
"HEAD request." % response.status_code))
except requests.RequestException as e:
raise exception.ImageRefValidationFailed(image_href=image_href,
reason=e)
return response