Implement Redfish Sessions

Implement Redfish Sessions because some vendor implementations
have disabled basic auth and require a Redfish Session to access
resources.

This was done by creating the following objects:
SessionService, SessionCollection and Session,
BasicAuth, SessionAuth, SessionOrBasicAuth

Session state is managed internally by ourselves.
Unit tests have been updated accordingly.

Closes-Bug: 1695972
Co-Authored-By: Debayan Ray <debayan.ray@gmail.com>

Change-Id: I6623538383388caf1115a7c100b0f97e73df69d9
This commit is contained in:
Yusef Shaban 2017-06-06 15:56:48 -07:00
parent 92f79530b9
commit 9610c06a17
21 changed files with 1426 additions and 21 deletions

View File

@ -10,6 +10,7 @@ Features
* Systems power management (Both soft and hard; Including NMI injection)
* Changing systems boot device, frequency (Once or permanently) and mode
(UEFI or BIOS)
* SessionManagement
.. toctree::
:maxdepth: 2

View File

@ -5,6 +5,53 @@ Using Sushy
To use sushy in a project:
-----------------------------------------
Specifying an authentication type
-----------------------------------------
There are three authentication objects. By default we use SessionOrBasicAuth.
Authentication Modes:
auth.SessionOrBasicAuth: Use session based authentication. If we are unable
to create a session we will fallback to basic authentication.
auth.BasicAuth: Use basic authentication only.
auth.SessionAuth: Use session based authentication only.
.. code-block:: python
import logging
import sushy
from sushy import auth
# Enable logging at DEBUG level
LOG = logging.getLogger('sushy')
LOG.setLevel(logging.DEBUG)
LOG.addHandler(logging.StreamHandler())
basic_auth = auth.BasicAuth(username='foo', password='bar')
session_auth = auth.SessionAuth(username='foo', password='bar')
session_or_basic_auth = auth.SessionOrBasicAuth(username='foo',
password='bar')
s = sushy.Sushy('http://localhost:8000/redfish/v1',
auth=basic_auth)
s = sushy.Sushy('http://localhost:8000/redfish/v1',
auth=session_auth)
s = sushy.Sushy('http://localhost:8000/redfish/v1',
auth=session_or_basic_auth)
# It is important to note that you can
# call sushy without supplying an
# authentication object. In that case we
# will use the SessionOrBasicAuth authentication
# object in an attempt to connect to all different
# types of redfish servers.
s = sushy.Sushy('http://localhost:8000/redfish/v1',
username='foo',
password='bar')
----------------------------------------
Creating and using a sushy system object
----------------------------------------
@ -150,6 +197,51 @@ Creating and using a sushy manager object
# Refresh the manager object
mgr_inst.refresh()
-------------------------------------------------
Creating and using a sushy session service object
-------------------------------------------------
.. code-block:: python
import logging
import sushy
# Enable logging at DEBUG level
LOG = logging.getLogger('sushy')
LOG.setLevel(logging.DEBUG)
LOG.addHandler(logging.StreamHandler())
s = sushy.Sushy('http://localhost:8000/redfish/v1',
username='foo', password='bar')
# Instantiate a SessionService object
sess_serv = s.get_session_service()
# Get SessionCollection
sess_col = sess_serv.sessions
# Print the ID of the sessions available in the collection
print(sess_col.members_identities)
# Get a list of systems objects available in the collection
sess_col_insts = sess_col.get_members()
# Instantiate a session object, same as getting it directly
sess_inst = sess_col.get_member(sess_col.members_identities[0])
# Getting it directly
sess_inst = s.get_session(sess_col.members_identities[0])
# Delete the session
sess_inst.delete()
# Create a new session
session_key, session_id = sess_serv.create_session(
username='foo', password='bar')
# Delete a session
sess_serv.close_session(sess_col.members_identities[0])
If you do not have any real baremetal machine that supports the Redfish
protocol you can look at the :ref:`contributing` page to learn how to

View File

@ -0,0 +1,6 @@
---
features:
- |
Adds "SessionService" and "Sessions" to the library.
- |
Adds the abillity to specify authentication type on creation of root sushy object.

241
sushy/auth.py Normal file
View File

@ -0,0 +1,241 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Sushy Redfish Authentication Modes
import abc
import logging
import six
from sushy import exceptions
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class AuthBase(object):
def __init__(self, username=None, password=None):
"""A class representing a base Sushy authentication mechanism
:param username: User account with admin/server-profile
access privilege.
:param password: User account password.
"""
self._username = username
self._password = password
self._root_resource = None
self._connector = None
def set_context(self, root_resource, connector):
"""Set the context of the authentication object.
:param root_resource: Root sushy object
:param connector: Connector for http connections
"""
self._root_resource = root_resource
self._connector = connector
self._connector.set_auth(self)
def authenticate(self):
"""Perform authentication.
:raises: RuntimeError
"""
if self._root_resource is None or self._connector is None:
raise RuntimeError('_root_resource / _connector is missing. '
'Forgot to call set_context()?')
self._do_authenticate()
@abc.abstractmethod
def _do_authenticate(self):
"""Method to establish a session to a Redfish controller.
Needs to be implemented by extending auth class,
because each authentication type will authenticate in its own way.
"""
@abc.abstractmethod
def can_refresh_session(self):
"""Method to assert if session based refresh can be done."""
def __enter__(self):
"""Allow object to be called with the 'with' statement."""
return self
def __exit__(self, exception_type, exception_value, traceback):
"""Allow object to be called with the 'with' statement.
Allow object to be called with the 'with' statement but
also ensure we call close method on exit.
"""
self.close()
class BasicAuth(AuthBase):
"""Basic Authentication class.
This is a class used to encapsulate a basic authentication session.
:param username: User account with admin/server-profile
access privilege.
:param password: User account password.
"""
def _do_authenticate(self):
"""Attempts to establish a Basic Authentication Session.
"""
self._connector.set_http_basic_auth(self._username, self._password)
def can_refresh_session(self):
"""Method to assert if session based refresh can be done."""
return False
class SessionAuth(AuthBase):
"""Session Authentication class.
This is a class used to encapsulate a redfish session.
"""
def __init__(self, username=None, password=None):
"""A class representing a Session Authentication object.
:param username: User account with admin/server-profile access
privilege.
:param password: User account password.
"""
self._session_key = None
"""Our Sessions Key"""
self._session_resource_id = None
"""Our Sessions Unique Resource ID or URL"""
super(SessionAuth, self).__init__(username,
password)
def get_session_key(self):
"""Returns the session key.
:returns: The session key.
"""
return self._session_key
def get_session_resource_id(self):
"""Returns the session resource id.
:returns: The session resource id.
"""
return self._session_resource_id
def _do_authenticate(self):
"""Establish a redfish session.
:raises: MissingXAuthToken
:raises: ConnectionError
:raises: AccessError
:raises: HTTPError
"""
session_service = self._root_resource.get_session_service()
session_auth_token, session_uri = (
session_service.create_session(self._username,
self._password))
self._session_key = session_auth_token
self._session_resource_id = session_uri
self._connector.set_http_session_auth(session_auth_token)
def can_refresh_session(self):
"""Method to assert if session based refresh can be done."""
return True
def refresh_session(self):
"""Method to refresh a session to a Redfish controller.
This method is called to create a new session after
a session that has already been established
has timed-out or expired.
:raises: MissingXAuthToken
:raises: ConnectionError
:raises: AccessError
:raises: HTTPError
"""
self.reset_session_attrs()
self._do_authenticate()
def close(self):
"""Close the Redfish Session.
Attempts to close an established RedfishSession by
deleting it from the remote Redfish controller.
"""
if self._session_resource_id is not None:
try:
self._connector.delete(self._session_resource_id)
except (exceptions.AccessError,
exceptions.ServerSideError) as exc:
LOG.warning('Received exception "%(exception)s" while '
'attempting to delete the active session: '
'%(session_id)s',
{'exception': exc,
'session_id': self._session_resource_id})
self.reset_session_attrs()
def reset_session_attrs(self):
"""Reset active session related attributes."""
self._session_key = None
self._session_resource_id = None
class SessionOrBasicAuth(SessionAuth):
def __init__(self, username=None, password=None):
super(SessionOrBasicAuth, self).__init__(username, password)
self.basic_auth = BasicAuth(username=username, password=password)
def _do_authenticate(self):
"""Establish a RedfishSession.
We will attempt to establish a redfish session. If we are unable
to establish one, fallback to basic authentication.
"""
try:
# Attempt session based authentication
super(SessionOrBasicAuth, self)._do_authenticate()
except exceptions.SushyError as e:
LOG.debug('Received exception "%(exception)s" while '
'attempting to establish a session. '
'Falling back to basic authentication.',
{'exception': e})
# Fall back to basic authentication
self.reset_session_attrs()
self.basic_auth.set_context(self._root_resource, self._connector)
self.basic_auth.authenticate()
def can_refresh_session(self):
"""Method to assert if session based refresh can be done."""
return (self._session_key is not None and
self._session_resource_id is not None)
def refresh_session(self):
"""Method to refresh a session to a Redfish controller.
This method is called to create a new RedfishSession
if we have previously established a RedfishSession and
the previous session has timed-out or expired.
If we did not previously have an established session,
we simply return our BasicAuthentication requests.Session.
"""
if self.can_refresh_session():
super(SessionOrBasicAuth, self).refresh_session()

View File

@ -26,12 +26,24 @@ LOG = logging.getLogger(__name__)
class Connector(object):
def __init__(self, url, username=None, password=None, verify=True):
def __init__(self, url, verify=True):
self._url = url
self._verify = verify
self._session = requests.Session()
self._session.verify = verify
if username and password:
self._session.auth = (username, password)
self._session.verify = self._verify
def set_auth(self, auth):
"""Sets the authentication mechanism for our connector."""
self._auth = auth
def set_http_basic_auth(self, username, password):
"""Sets the http basic authentication information."""
self._session.auth = (username, password)
def set_http_session_auth(self, session_auth_token):
"""Sets the session authentication information."""
self._session.auth = None
self._session.headers.update({'X-Auth-Token': session_auth_token})
def close(self):
"""Close this connector and the associated HTTP session."""
@ -49,11 +61,12 @@ class Connector(object):
:raises: ConnectionError
:raises: HTTPError
"""
json_data = None
if headers is None:
headers = {}
if data is not None:
data = json.dumps(data)
json_data = json.dumps(data)
headers['Content-Type'] = 'application/json'
url = parse.urljoin(self._url, path)
@ -62,14 +75,30 @@ class Connector(object):
LOG.debug('HTTP request: %(method)s %(url)s; '
'headers: %(headers)s; body: %(data)s',
{'method': method, 'url': url, 'headers': headers,
'data': data})
'data': json_data})
try:
response = self._session.request(method, url, data=data,
response = self._session.request(method, url,
data=json_data,
headers=headers)
except requests.ConnectionError as e:
raise exceptions.ConnectionError(url=url, error=e)
# If we received an AccessError, and we
# previously established a redfish session
# there is a chance that the session has timed-out.
# Attempt to re-establish a session.
try:
exceptions.raise_for_response(method, url, response)
except exceptions.AccessError:
if self._auth.can_refresh_session():
self._auth.refresh_session()
LOG.debug("Authentication refreshed successfully, "
"retrying the call.")
response = self._session.request(method, url,
data=json_data,
headers=headers)
else:
raise
exceptions.raise_for_response(method, url, response)
LOG.debug('HTTP response for %(method)s %(url)s: '
'status code: %(code)s',
{'method': method, 'url': url,

View File

@ -115,6 +115,11 @@ class AccessError(HTTPError):
pass
class MissingXAuthToken(HTTPError):
message = ('No X-Auth-Token returned from remote host when '
'attempting to establish a session. Error: %(error)s')
def raise_for_response(method, url, response):
"""Raise a correct error class, if needed."""
if response.status_code < http_client.BAD_REQUEST:

View File

@ -12,12 +12,18 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from sushy import auth as sushy_auth
from sushy import connector
from sushy.resources import base
from sushy.resources.manager import manager
from sushy.resources.sessionservice import session
from sushy.resources.sessionservice import sessionservice
from sushy.resources.system import system
LOG = logging.getLogger(__name__)
class Sushy(base.ResourceBase):
@ -36,8 +42,13 @@ class Sushy(base.ResourceBase):
_managers_path = base.Field(['Managers', '@odata.id'], required=True)
"""ManagerCollection path"""
_session_service_path = base.Field(['SessionService', '@odata.id'],
required=True)
"""SessionService path"""
def __init__(self, base_url, username=None, password=None,
root_prefix='/redfish/v1/', verify=True):
root_prefix='/redfish/v1/', verify=True,
auth=None):
"""A class representing a RootService
:param base_url: The base URL to the Redfish controller. It
@ -54,11 +65,24 @@ class Sushy(base.ResourceBase):
the driver will ignore verifying the SSL certificate; if it's
a path the driver will use the specified certificate or one of
the certificates in the directory. Defaults to True.
:param auth: An authentication mechanism to utilize.
"""
self._root_prefix = root_prefix
if (auth is not None and (password is not None or
username is not None)):
msg = ('Username or Password were provided to Sushy '
'when an authentication mechanism was specified.')
raise ValueError(msg)
else:
auth = sushy_auth.SessionOrBasicAuth(username=username,
password=password)
super(Sushy, self).__init__(
connector.Connector(base_url, username, password, verify),
connector.Connector(base_url, verify),
path=self._root_prefix)
self._auth = auth
self._auth.set_context(self, self._conn)
self._auth.authenticate()
def _parse_attributes(self):
super(Sushy, self)._parse_attributes()
@ -101,3 +125,22 @@ class Sushy(base.ResourceBase):
"""
return manager.Manager(self._conn, identity,
redfish_version=self.redfish_version)
def get_session_service(self):
"""Get the SessionService object
:raises: MissingAttributeError, if the collection attribue is not found
:returns: as SessionCollection object
"""
return sessionservice.SessionService(
self._conn, self._session_service_path,
redfish_version=self.redfish_version)
def get_session(self, identity):
"""Given the identity return a Session object
:param identity: The identity of the session resource
:returns: The Session object
"""
return session.Session(self._conn, identity,
redfish_version=self.redfish_version)

View File

@ -0,0 +1,76 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from sushy.resources import base
LOG = logging.getLogger(__name__)
class Session(base.ResourceBase):
description = base.Field('Description')
"""The session service description"""
identity = base.Field('Id', required=True)
"""The session service identify string"""
name = base.Field('Name', required=True)
"""The session service name"""
username = base.Field('UserName')
"""The UserName for the account for this session."""
def __init__(self, connector, identity, redfish_version=None):
"""A class representing a Session
:param connector: A Connector instance
:param identity: The identity of the Session resource
:param redfish_version: The version of RedFish. Used to construct
the object according to schema of given version.
"""
super(Session, self).__init__(connector, identity, redfish_version)
def delete(self):
"""Method for deleting a Session.
:raises: ServerSideError
"""
self._conn.delete(self.path)
class SessionCollection(base.ResourceCollectionBase):
name = base.Field('Name')
"""The session collection name"""
description = base.Field('Description')
"""The session collection description"""
@property
def _resource_type(self):
return Session
def __init__(self, connector, identity, redfish_version=None):
"""A class representing a SessionCollection
:param connector: A Connector instance
:param identity: The identity of the Session resource
:param redfish_version: The version of RedFish. Used to construct
the object according to schema of given version.
"""
super(SessionCollection, self).__init__(
connector, identity, redfish_version)

View File

@ -0,0 +1,128 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from sushy import exceptions
from sushy.resources import base
from sushy.resources.sessionservice import session
LOG = logging.getLogger(__name__)
class SessionService(base.ResourceBase):
description = base.Field('Description')
"""The session service description"""
identity = base.Field('Id', required=True)
"""The session service identify string"""
name = base.Field('Name', required=True)
"""The session service name"""
service_enabled = base.Field('ServiceEnabled')
"""Tells us if session service is enabled"""
_sessions = None # ref to SessionCollection instance
session_timeout = base.Field('SessionTimeout')
"""The session service timeout"""
def __init__(self, connector, identity, redfish_version=None):
"""A class representing a SessionService
:param connector: A Connector instance
:param identity: The identity of the SessionService resource
:param redfish_version: The version of RedFish. Used to construct
the object according to schema of given version.
"""
try:
super(SessionService, self).__init__(
connector, identity, redfish_version)
except exceptions.AccessError as ae:
LOG.warning('Received access error "%(ae)s". '
'Unable to refresh SessionService.',
{'ae': ae})
def _get_sessions_collection_path(self):
"""Helper function to find the SessionCollections path"""
sessions_col = self.json.get('Sessions')
if not sessions_col:
raise exceptions.MissingAttributeError(
attribute='Sessions', resource=self._path)
return sessions_col.get('@odata.id')
@property
def sessions(self):
"""Property to provide reference to the `SessionCollection` instance
It is calculated once when the first time it is queried. On refresh,
this property gets reset.
"""
if self._sessions is None:
self._sessions = session.SessionCollection(
self._conn, self._get_sessions_collection_path(),
redfish_version=self.redfish_version)
self._sessions.refresh(force=False)
return self._sessions
def _do_refresh(self, force=False):
"""Do custom resource specific refresh activities
On refresh, all sub-resources are marked as stale, i.e.
greedy-refresh not done for them unless forced by ``force``
argument.
"""
if self._sessions is not None:
self._sessions.invalidate(force)
def close_session(self, session_uri):
"""This function is for closing a session based on its id.
:raises: ServerSideError
"""
self._conn.delete(session_uri)
def create_session(self, username, password):
"""This function will try to create a session.
:returns: A session key and uri in the form of a tuple
:raises: MissingXAuthToken
:raises: ConnectionError
:raises: AccessError
:raises: HTTPError
"""
try:
target_uri = self._get_sessions_collection_path()
except Exception:
# Defaulting to /Sessions
target_uri = self.path + '/Sessions'
data = {'UserName': username, 'Password': password}
headers = {'X-Auth-Token': None}
rsp = self._conn.post(target_uri, data=data, headers=headers)
session_key = rsp.headers.get('X-Auth-Token')
if session_key is None:
raise exceptions.MissingXAuthToken(
method='POST', url=target_uri, response=rsp)
session_uri = rsp.headers.get('Location')
if session_uri is None:
LOG.warning("Received X-Auth-Token but NO session uri.")
return session_key, session_uri

View File

@ -0,0 +1,11 @@
{
"@odata.type": "#Session.v1_0_2.Session",
"Id": "1234567890ABCDEF",
"Name": "User Session",
"Description": "Manager User Session",
"UserName": "Administrator",
"Oem": {},
"@odata.context": "/redfish/v1/$metadata#Session.Session",
"@odata.id": "/redfish/v1/SessionService/Sessions/1234567890ABCDEF",
"@Redfish.Copyright": "Copyright 2014-2016 Distributed Management Task Force, Inc. (DMTF). For the full DMTF copyright policy, see http://www.dmtf.org/about/policies/copyright."
}

View File

@ -0,0 +1,12 @@
{
"@odata.type": "#SessionCollection.SessionCollection",
"Name": "Session Collection",
"Members@odata.count": 1,
"@odata.id": "/redfish/v1/SessionService/Sessions",
"@odata.context": "/redfish/v1/$metadata#SessionService/Sessions/$entity",
"Members": [
{
"@odata.id": "/redfish/v1/SessionService/Sessions/104f9d68f58abb85"
}
]
}

View File

@ -0,0 +1,18 @@
{
"Content-Security-Policy": "default-src 'none'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; connect-src 'self'; img-src 'self'; frame-src 'self'; font-src 'self'; object-src 'self'; style-src 'self' 'unsafe-inline'",
"ETag": "'W/\"7dc5e2b9\"'",
"Cache-Control": "max-age=0, no-cache, no-store, must-revalidate",
"Location": "/redfish/v1/SessionService/Sessions/151edd65d41c0b89",
"Connection": "Keep-Alive",
"X-XSS-Protection": "1; mode=block",
"X-Auth-Token": "adc530e2016a0ea98c76c087f0e4b76f",
"Expires": "0",
"X-Frame-Options": "SAMEORIGIN",
"Content-Length": "392",
"X-Content-Type-Options": "nosniff",
"Content-Type": "application/json;charset=utf-8",
"OData-Version": "4.0",
"Keep-Alive": "timeout=1, max=32",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"Date": "Tue, 06 Jun 2017 17:07:48 GMT"
}

View File

@ -0,0 +1,17 @@
{
"error": {
"code": "Base.1.0.GeneralError",
"message": "A general error has occurred. See ExtendedInfo for more information.",
"@Message.ExtendedInfo": [
{
"@odata.type": "/redfish/v1/$metadata#MessageRegistry.1.0.0.MessageRegistry",
"MessageId": "Base.1.0.NoValidSession",
"RelatedProperties": [],
"Message": "There is no valid session established with the implementation.",
"MessageArgs": [],
"Severity": "Critical",
"Resolution": "Establish as session before attempting any operations."
}
]
}
}

View File

@ -0,0 +1,18 @@
{
"@odata.type": "#SessionService.v1_0_2.SessionService",
"Id": "SessionService",
"Name": "Session Service",
"Description": "Session Service",
"Status": {
"State": "Enabled",
"Health": "OK"
},
"ServiceEnabled": true,
"SessionTimeout": 30,
"Sessions": {
"@odata.id": "/redfish/v1/SessionService/Sessions"
},
"@odata.context": "/redfish/v1/$metadata#SessionService",
"@odata.id": "/redfish/v1/SessionService",
"@Redfish.Copyright": "Copyright 2014-2016 Distributed Management Task Force, Inc. (DMTF). For the full DMTF copyright policy, see http://www.dmtf.org/about/policies/copyright."
}

View File

@ -0,0 +1,100 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
from sushy import exceptions
from sushy.resources.sessionservice import session
from sushy.tests.unit import base
class SessionTestCase(base.TestCase):
def setUp(self):
super(SessionTestCase, self).setUp()
self.conn = mock.Mock()
self.auth = mock.Mock()
with open('sushy/tests/unit/json_samples/session.json', 'r') as f:
sample_json = json.loads(f.read())
self.conn.get.return_value.json.return_value = sample_json
self.auth._session_key = 'fake_x_auth_token'
self.auth._session_uri = sample_json['@odata.id']
self.conn._auth = self.auth
self.sess_inst = session.Session(
self.conn, '/redfish/v1/SessionService/Sessions/1234567890ABCDEF',
redfish_version='1.0.2')
def test__parse_attributes(self):
self.sess_inst._parse_attributes()
self.assertEqual('1.0.2', self.sess_inst.redfish_version)
self.assertEqual('1234567890ABCDEF', self.sess_inst.identity)
self.assertEqual('User Session', self.sess_inst.name)
exp_path = '/redfish/v1/SessionService/Sessions/1234567890ABCDEF'
self.assertEqual(exp_path, self.sess_inst.path)
def test__parse_attributes_missing_identity(self):
self.sess_inst.json.pop('Id')
self.assertRaisesRegex(
exceptions.MissingAttributeError, 'attribute Id',
self.sess_inst._parse_attributes)
def test_session_close(self):
session_key = self.sess_inst._conn._auth._session_key
session_uri = self.sess_inst._conn._auth._session_uri
self.assertEqual(session_key, 'fake_x_auth_token')
self.assertEqual(session_uri, self.sess_inst.path)
self.sess_inst.delete()
self.sess_inst._conn.delete.assert_called_with(session_uri)
class SessionCollectionTestCase(base.TestCase):
def setUp(self):
super(SessionCollectionTestCase, self).setUp()
self.conn = mock.Mock()
js_f = 'sushy/tests/unit/json_samples/session_collection.json'
with open(js_f, 'r') as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
self.sess_col = session.SessionCollection(
self.conn, '/redfish/v1/SessionService/Sessions',
redfish_version='1.0.2')
def test__parse_attributes(self):
path = '/redfish/v1/SessionService/Sessions/104f9d68f58abb85'
self.sess_col._parse_attributes()
self.assertEqual('1.0.2', self.sess_col.redfish_version)
self.assertEqual('Session Collection', self.sess_col.name)
self.assertEqual((path,), self.sess_col.members_identities)
@mock.patch.object(session, 'Session', autospec=True)
def test_get_member(self, mock_session):
path = '/redfish/v1/SessionService/Sessions/104f9d68f58abb85'
self.sess_col.get_member(path)
mock_session.assert_called_once_with(
self.sess_col._conn, path,
redfish_version=self.sess_col.redfish_version)
@mock.patch.object(session, 'Session', autospec=True)
def test_get_members(self, mock_session):
path = '/redfish/v1/SessionService/Sessions/104f9d68f58abb85'
members = self.sess_col.get_members()
mock_session.assert_called_once_with(
self.sess_col._conn, path,
redfish_version=self.sess_col.redfish_version)
self.assertIsInstance(members, list)
self.assertEqual(1, len(members))

View File

@ -0,0 +1,175 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
from sushy import exceptions
from sushy.resources.sessionservice import session
from sushy.resources.sessionservice import sessionservice
from sushy.tests.unit import base
class SessionServiceTestCase(base.TestCase):
def setUp(self):
super(SessionServiceTestCase, self).setUp()
self.conn = mock.Mock()
js_f = 'sushy/tests/unit/json_samples/session_service.json'
with open(js_f, 'r') as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
self.sess_serv_inst = sessionservice.SessionService(
self.conn, '/redfish/v1/SessionService',
redfish_version='1.0.2')
@mock.patch.object(sessionservice, 'LOG', autospec=True)
def test__init_throws_exception(self, mock_LOG):
self.conn.get.return_value.json.reset_mock()
self.conn.get.return_value.json.side_effect = (
exceptions.AccessError(
'GET', 'any_url', mock.MagicMock()))
sessionservice.SessionService(
self.conn, '/redfish/v1/SessionService', redfish_version='1.0.2')
self.assertTrue(mock_LOG.warning.called)
def test__parse_attributes(self):
self.sess_serv_inst._parse_attributes()
exp_path = '/redfish/v1/SessionService'
self.assertEqual('1.0.2', self.sess_serv_inst.redfish_version)
self.assertEqual('SessionService', self.sess_serv_inst.identity)
self.assertEqual('Session Service', self.sess_serv_inst.name)
self.assertEqual(30, self.sess_serv_inst.session_timeout)
self.assertEqual(exp_path, self.sess_serv_inst.path)
self.assertIsNone(self.sess_serv_inst._sessions)
def test__parse_attributes_missing_timeout(self):
self.sess_serv_inst.json.pop('SessionTimeout')
self.assertRaisesRegex(
exceptions.MissingAttributeError, 'attribute SessionTimeout',
self.sess_serv_inst._parse_attributes())
def test__get_sessions_collection_path(self):
self.sess_serv_inst.json.pop('Sessions')
self.assertRaisesRegex(
exceptions.MissingAttributeError, 'attribute Sessions',
self.sess_serv_inst._get_sessions_collection_path)
@mock.patch.object(session, 'SessionCollection', autospec=True)
def test_session_collection(self, mock_sess_col):
self.sess_serv_inst.sessions
mock_sess_col.assert_called_once_with(
self.sess_serv_inst._conn,
'/redfish/v1/SessionService/Sessions',
redfish_version=self.sess_serv_inst.redfish_version)
def test_create_session(self):
with open('sushy/tests/unit/json_samples/'
'session_creation_headers.json', 'r') as f:
self.conn.post.return_value.headers = json.loads(f.read())
session_key, session_uri = (
self.sess_serv_inst.create_session('foo', 'secret'))
self.assertEqual('adc530e2016a0ea98c76c087f0e4b76f', session_key)
self.assertEqual(
'/redfish/v1/SessionService/Sessions/151edd65d41c0b89',
session_uri)
def test_create_session_unknown_path(self):
del self.sess_serv_inst.json['Sessions']
with open('sushy/tests/unit/json_samples/'
'session_creation_headers.json', 'r') as f:
self.conn.post.return_value.headers = json.loads(f.read())
session_key, session_uri = (
self.sess_serv_inst.create_session('foo', 'secret'))
self.assertEqual('adc530e2016a0ea98c76c087f0e4b76f', session_key)
self.assertEqual(
'/redfish/v1/SessionService/Sessions/151edd65d41c0b89',
session_uri)
uri = self.sess_serv_inst.path + '/Sessions'
data = {'UserName': 'foo', 'Password': 'secret'}
headers = {'X-Auth-Token': None}
self.conn.post.assert_called_once_with(uri,
data=data,
headers=headers)
def test_create_session_missing_x_auth_token(self):
with open('sushy/tests/unit/json_samples/'
'session_creation_headers.json', 'r') as f:
self.conn.post.return_value.headers = json.loads(f.read())
self.conn.post.return_value.headers.pop('X-Auth-Token')
self.assertRaisesRegex(
exceptions.MissingXAuthToken, 'No X-Auth-Token returned',
self.sess_serv_inst.create_session, 'foo', 'bar')
@mock.patch.object(sessionservice, 'LOG', autospec=True)
def test_create_session_missing_location(self, mock_LOG):
with open('sushy/tests/unit/json_samples/'
'session_creation_headers.json', 'r') as f:
self.conn.post.return_value.headers = json.loads(f.read())
self.conn.post.return_value.headers.pop('Location')
self.sess_serv_inst.create_session('foo', 'bar')
self.assertTrue(mock_LOG.warning.called)
def _setUp_sessions(self):
self.conn.get.return_value.json.reset_mock()
successive_return_values = []
with open('sushy/tests/unit/json_samples/session.json', 'r') as f:
successive_return_values.append(json.loads(f.read()))
self.conn.get.return_value.json.side_effect = successive_return_values
def test_sessions(self):
# check for the underneath variable value
self.assertIsNone(self.sess_serv_inst._sessions)
# | GIVEN |
self._setUp_sessions()
# | WHEN |
actual_sessions = self.sess_serv_inst.sessions
# | THEN |
self.assertIsInstance(actual_sessions, session.SessionCollection)
self.conn.get.return_value.json.assert_called_once_with()
# reset mock
self.conn.get.return_value.json.reset_mock()
# | WHEN & THEN |
# tests for same object on invoking subsequently
self.assertIs(actual_sessions, self.sess_serv_inst.sessions)
self.conn.get.return_value.json.assert_not_called()
def test_sessions_on_refresh(self):
# | GIVEN |
self._setUp_sessions()
# | WHEN & THEN |
self.assertIsInstance(self.sess_serv_inst.sessions,
session.SessionCollection)
self.conn.get.return_value.json.side_effect = None
# On refreshing the sess_serv_inst instance...
with open('sushy/tests/unit/json_samples/session.json', 'r') as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
self.sess_serv_inst.refresh(force=True)
# | WHEN & THEN |
self.assertIsNotNone(self.sess_serv_inst._sessions)
self.assertFalse(self.sess_serv_inst._sessions._is_stale)
def test_close_session(self):
self.sess_serv_inst.close_session('session/identity')
self.conn.delete.assert_called_once_with('session/identity')

View File

@ -0,0 +1,325 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from sushy import auth
from sushy import connector
from sushy import exceptions
from sushy import main
from sushy.tests.unit import base
class BasicAuthTestCase(base.TestCase):
@mock.patch.object(main, 'Sushy', autospec=True)
@mock.patch.object(connector, 'Connector', autospec=True)
def setUp(self, mock_connector, mock_root):
super(BasicAuthTestCase, self).setUp()
self.username = 'TestUsername'
self.password = 'TestP@$$W0RD'
self.base_auth = auth.BasicAuth(self.username,
self.password)
self.conn = mock_connector.return_value
self.root = mock_root.return_value
def test_init(self):
self.assertEqual(self.username,
self.base_auth._username)
self.assertEqual(self.password,
self.base_auth._password)
self.assertIsNone(self.base_auth._root_resource)
self.assertIsNone(self.base_auth._connector)
def test_set_context(self):
self.base_auth.set_context(self.root, self.conn)
self.assertEqual(self.base_auth._root_resource,
self.root)
self.assertEqual(self.base_auth._connector,
self.conn)
def test__do_authenticate_no_context(self):
self.assertRaises(RuntimeError,
self.base_auth.authenticate)
def test__do_authenticate(self):
self.base_auth.set_context(self.root, self.conn)
self.base_auth.authenticate()
self.conn.set_http_basic_auth.assert_called_once_with(self.username,
self.password)
def test_can_refresh_session(self):
self.assertFalse(self.base_auth.can_refresh_session())
class SessionAuthTestCase(base.TestCase):
@mock.patch.object(main, 'Sushy', autospec=True)
@mock.patch.object(connector, 'Connector', autospec=True)
def setUp(self, mock_connector, mock_root):
super(SessionAuthTestCase, self).setUp()
self.username = 'TestUsername'
self.password = 'TestP@$$W0RD'
self.sess_key = 'TestingKey'
self.sess_uri = ('https://testing:8000/redfish/v1/'
'SessionService/Sessions/testing')
self.sess_auth = auth.SessionAuth(self.username,
self.password)
self.conn = mock_connector.return_value
self.root = mock_root.return_value
def test_init(self):
self.assertEqual(self.username,
self.sess_auth._username)
self.assertEqual(self.password,
self.sess_auth._password)
self.assertIsNone(self.sess_auth._root_resource)
self.assertIsNone(self.sess_auth._connector)
self.assertIsNone(self.sess_auth._session_key)
self.assertIsNone(self.sess_auth._session_resource_id)
def test_get_session_key(self):
self.sess_auth._session_key = self.sess_key
self.assertEqual(self.sess_key,
self.sess_auth.get_session_key())
def test_get_session_resource_id(self):
self.sess_auth._session_resource_id = self.sess_uri
self.assertEqual(self.sess_uri,
self.sess_auth.get_session_resource_id())
def test_reset_session_attrs(self):
self.sess_auth._session_key = self.sess_key
self.sess_auth._session_resource_id = self.sess_uri
self.assertEqual(self.sess_uri,
self.sess_auth.get_session_resource_id())
self.assertEqual(self.sess_key,
self.sess_auth.get_session_key())
self.sess_auth.reset_session_attrs()
self.assertIsNone(self.sess_auth.get_session_resource_id())
self.assertIsNone(self.sess_auth.get_session_key())
def test_set_context(self):
self.sess_auth.set_context(self.root, self.conn)
self.assertEqual(self.sess_auth._root_resource,
self.root)
self.assertEqual(self.sess_auth._connector,
self.conn)
def test__do_authenticate_no_context(self):
self.assertRaises(RuntimeError,
self.sess_auth.authenticate)
def test__do_authenticate(self):
self.assertIsNone(self.sess_auth.get_session_resource_id())
self.assertIsNone(self.sess_auth.get_session_key())
mock_sess_serv = mock.Mock()
mock_sess_serv.create_session.return_value = (self.sess_key,
self.sess_uri)
self.root.get_session_service.return_value = mock_sess_serv
self.sess_auth.set_context(self.root, self.conn)
self.sess_auth.authenticate()
self.assertEqual(self.sess_uri,
self.sess_auth.get_session_resource_id())
self.assertEqual(self.sess_key,
self.sess_auth.get_session_key())
self.conn.set_http_session_auth.assert_called_once_with(self.sess_key)
def test_can_refresh_session(self):
self.assertTrue(self.sess_auth.can_refresh_session())
def test_refresh(self):
self.assertIsNone(self.sess_auth.get_session_resource_id())
self.assertIsNone(self.sess_auth.get_session_key())
mock_sess_serv = mock.Mock()
mock_sess_serv.create_session.return_value = (self.sess_key,
self.sess_uri)
self.root.get_session_service.return_value = mock_sess_serv
self.sess_auth.set_context(self.root, self.conn)
self.sess_auth.refresh_session()
self.assertEqual(self.sess_uri,
self.sess_auth.get_session_resource_id())
self.assertEqual(self.sess_key,
self.sess_auth.get_session_key())
self.conn.set_http_session_auth.assert_called_once_with(self.sess_key)
def test_close_do_nothing(self):
self.sess_auth._session_key = None
self.sess_auth.set_context(self.root, self.conn)
self.sess_auth.close()
self.conn.delete.assert_not_called()
def test_close(self):
self.sess_auth._session_key = self.sess_key
self.sess_auth._session_resource_id = self.sess_uri
self.sess_auth.set_context(self.root, self.conn)
self.sess_auth.close()
self.conn.delete.assert_called_once_with(self.sess_uri)
self.assertIsNone(self.sess_auth.get_session_resource_id())
self.assertIsNone(self.sess_auth.get_session_key())
@mock.patch.object(auth, 'LOG', autospec=True)
def test_close_fail(self, mock_LOG):
self.sess_auth._session_key = self.sess_key
self.sess_auth._session_resource_id = self.sess_uri
self.conn.delete.side_effect = (
exceptions.ServerSideError(
'DELETE', 'any_url', mock.MagicMock()))
self.sess_auth.set_context(self.root, self.conn)
self.sess_auth.close()
self.assertTrue(mock_LOG.warning.called)
self.assertIsNone(self.sess_auth.get_session_resource_id())
self.assertIsNone(self.sess_auth.get_session_key())
class SessionOrBasicAuthTestCase(base.TestCase):
@mock.patch.object(main, 'Sushy', autospec=True)
@mock.patch.object(connector, 'Connector', autospec=True)
def setUp(self, mock_connector, mock_root):
super(SessionOrBasicAuthTestCase, self).setUp()
self.username = 'TestUsername'
self.password = 'TestP@$$W0RD'
self.sess_key = 'TestingKey'
self.sess_uri = ('https://testing:8000/redfish/v1/'
'SessionService/Sessions/testing')
self.conn = mock_connector.return_value
self.root = mock_root.return_value
self.sess_basic_auth = auth.SessionOrBasicAuth(self.username,
self.password)
def test_init(self):
self.assertEqual(self.username,
self.sess_basic_auth._username)
self.assertEqual(self.password,
self.sess_basic_auth._password)
self.assertIsNone(self.sess_basic_auth._root_resource)
self.assertIsNone(self.sess_basic_auth._connector)
self.assertIsNone(self.sess_basic_auth._session_key)
self.assertIsNone(self.sess_basic_auth._session_resource_id)
def test_get_session_key(self):
self.sess_basic_auth._session_key = self.sess_key
self.assertEqual(self.sess_key,
self.sess_basic_auth.get_session_key())
def test_get_session_resource_id(self):
self.sess_basic_auth._session_resource_id = self.sess_uri
self.assertEqual(self.sess_uri,
self.sess_basic_auth.get_session_resource_id())
def test_reset_session_attrs(self):
self.sess_basic_auth._session_key = self.sess_key
self.sess_basic_auth._session_resource_id = self.sess_uri
self.assertEqual(self.sess_uri,
self.sess_basic_auth.get_session_resource_id())
self.assertEqual(self.sess_key,
self.sess_basic_auth.get_session_key())
self.sess_basic_auth.reset_session_attrs()
self.assertIsNone(self.sess_basic_auth.get_session_resource_id())
self.assertIsNone(self.sess_basic_auth.get_session_key())
def test_set_context(self):
self.sess_basic_auth.set_context(self.root, self.conn)
self.assertEqual(self.sess_basic_auth._root_resource,
self.root)
self.assertEqual(self.sess_basic_auth._connector,
self.conn)
def test__do_authenticate_no_context(self):
self.assertRaises(RuntimeError,
self.sess_basic_auth.authenticate)
def test__do_authenticate(self):
self.assertIsNone(self.sess_basic_auth.get_session_resource_id())
self.assertIsNone(self.sess_basic_auth.get_session_key())
mock_sess_serv = mock.Mock()
mock_sess_serv.create_session.return_value = (self.sess_key,
self.sess_uri)
self.root.get_session_service.return_value = mock_sess_serv
self.sess_basic_auth.set_context(self.root, self.conn)
self.sess_basic_auth.authenticate()
self.assertEqual(self.sess_uri,
self.sess_basic_auth.get_session_resource_id())
self.assertEqual(self.sess_key,
self.sess_basic_auth.get_session_key())
self.conn.set_http_session_auth.assert_called_once_with(self.sess_key)
def test__do_authenticate_for_basic_auth(self):
self.assertIsNone(self.sess_basic_auth.get_session_resource_id())
self.assertIsNone(self.sess_basic_auth.get_session_key())
mock_sess_serv = mock.Mock()
mock_sess_serv.create_session.side_effect = exceptions.SushyError
self.root.get_session_service.return_value = mock_sess_serv
self.sess_basic_auth.set_context(self.root, self.conn)
self.sess_basic_auth.authenticate()
self.assertIsNone(self.sess_basic_auth.get_session_resource_id())
self.assertIsNone(self.sess_basic_auth.get_session_key())
self.conn.set_http_basic_auth.assert_called_once_with(
self.username, self.password)
def test_can_refresh_session(self):
mock_sess_serv = mock.Mock()
mock_sess_serv.create_session.return_value = (self.sess_key,
self.sess_uri)
self.root.get_session_service.return_value = mock_sess_serv
self.sess_basic_auth.set_context(self.root, self.conn)
self.sess_basic_auth.authenticate()
self.assertTrue(self.sess_basic_auth.can_refresh_session())
def test_refresh_no_previous_session(self):
self.assertIsNone(self.sess_basic_auth.get_session_resource_id())
self.assertIsNone(self.sess_basic_auth.get_session_key())
self.sess_basic_auth.set_context(self.root, self.conn)
self.sess_basic_auth.refresh_session()
self.assertIsNone(self.sess_basic_auth.get_session_resource_id())
self.assertIsNone(self.sess_basic_auth.get_session_key())
self.conn.set_http_session_auth.assert_not_called()
self.conn.set_http_basic_auth.assert_not_called()
def test_refresh_previous_session_exists(self):
self.sess_basic_auth._session_key = 'ThisisFirstKey'
test_url = ('https://testing:8000/redfish/v1/SessionService'
'/Sessions/testingfirst')
self.sess_basic_auth._session_resource_id = test_url
mock_sess_serv = mock.Mock()
mock_sess_serv.create_session.return_value = (self.sess_key,
self.sess_uri)
self.root.get_session_service.return_value = mock_sess_serv
self.sess_basic_auth.set_context(self.root, self.conn)
self.sess_basic_auth.refresh_session()
self.assertEqual(self.sess_uri,
self.sess_basic_auth.get_session_resource_id())
self.assertEqual(self.sess_key,
self.sess_basic_auth.get_session_key())
self.conn.set_http_session_auth.assert_called_once_with(self.sess_key)
def test_close_do_nothing(self):
self.conn.delete.assert_not_called()
def test_close(self):
self.sess_basic_auth._session_key = self.sess_key
self.sess_basic_auth._session_resource_id = self.sess_uri
self.sess_basic_auth.set_context(self.root, self.conn)
self.sess_basic_auth.close()
self.conn.delete.assert_called_once_with(self.sess_uri)
self.assertIsNone(self.sess_basic_auth.get_session_resource_id())
self.assertIsNone(self.sess_basic_auth.get_session_key())

View File

@ -19,6 +19,7 @@ import mock
import requests
from six.moves import http_client
from sushy import auth as sushy_auth
from sushy import connector
from sushy import exceptions
from sushy.tests.unit import base
@ -26,11 +27,13 @@ from sushy.tests.unit import base
class ConnectorMethodsTestCase(base.TestCase):
def setUp(self):
@mock.patch.object(sushy_auth, 'SessionOrBasicAuth', autospec=True)
def setUp(self, mock_auth):
mock_auth.get_session_key.return_value = None
super(ConnectorMethodsTestCase, self).setUp()
self.conn = connector.Connector(
'http://foo.bar:1234', username='user',
password='pass', verify=True)
'http://foo.bar:1234', verify=True)
self.conn._auth = mock_auth
self.data = {'fake': 'data'}
self.headers = {'X-Fake': 'header'}
@ -69,14 +72,39 @@ class ConnectorMethodsTestCase(base.TestCase):
mock__op.assert_called_once_with(mock.ANY, 'DELETE', 'fake/path',
self.data, self.headers)
def test_set_auth(self):
mock_auth = mock.MagicMock()
self.conn.set_auth(mock_auth)
self.assertEqual(mock_auth, self.conn._auth)
def test_set_http_basic_auth(self):
self.conn.set_http_basic_auth('foo', 'secret')
self.assertEqual(('foo', 'secret'), self.conn._session.auth)
def test_set_http_session_auth(self):
self.conn.set_http_session_auth('hash-token')
self.assertTrue('X-Auth-Token' in self.conn._session.headers)
self.assertEqual(
'hash-token', self.conn._session.headers['X-Auth-Token'])
def test_close(self):
session = mock.Mock(spec=requests.Session)
self.conn._session = session
self.conn.close()
session.close.assert_called_once_with()
class ConnectorOpTestCase(base.TestCase):
def setUp(self):
@mock.patch.object(sushy_auth, 'SessionOrBasicAuth', autospec=True)
def setUp(self, mock_auth):
mock_auth.get_session_key.return_value = None
mock_auth._session_key = None
self.auth = mock_auth
super(ConnectorOpTestCase, self).setUp()
self.conn = connector.Connector(
'http://foo.bar:1234', username='user',
password='pass', verify=True)
'http://foo.bar:1234', verify=True)
self.conn._auth = mock_auth
self.data = {'fake': 'data'}
self.headers = {'X-Fake': 'header'}
self.session = mock.Mock(spec=requests.Session)
@ -120,6 +148,55 @@ class ConnectorOpTestCase(base.TestCase):
'DELETE', 'http://foo.bar:1234/fake/path',
data=None, headers=expected_headers)
def test_ok_post_with_session(self):
self.conn._session.headers = {}
self.conn._session.headers['X-Auth-Token'] = 'asdf1234'
expected_headers = self.headers.copy()
expected_headers['Content-Type'] = 'application/json'
self.conn._op('POST', path='fake/path', data=self.data,
headers=self.headers)
self.request.assert_called_once_with(
'POST', 'http://foo.bar:1234/fake/path',
data=json.dumps(self.data), headers=expected_headers)
self.assertEqual(self.conn._session.headers,
{'X-Auth-Token': 'asdf1234'})
def test_timed_out_session_unable_to_create_session(self):
self.conn._auth.can_refresh_session.return_value = False
expected_headers = self.headers.copy()
expected_headers['Content-Type'] = 'application/json'
self.conn._session = self.session
self.request = self.session.request
self.request.return_value.status_code = http_client.FORBIDDEN
self.request.return_value.json.side_effect = ValueError('no json')
with self.assertRaisesRegex(exceptions.AccessError,
'unknown error') as ae:
self.conn._op('POST', path='fake/path', data=self.data,
headers=self.headers)
exc = ae.exception
self.assertEqual(http_client.FORBIDDEN, exc.status_code)
def test_timed_out_session_re_established(self):
self.auth._session_key = 'asdf1234'
self.auth.get_session_key.return_value = 'asdf1234'
self.conn._auth = self.auth
self.session = mock.Mock(spec=requests.Session)
self.conn._session = self.session
self.request = self.session.request
first_expected_headers = self.headers.copy()
first_expected_headers['Content-Type'] = 'application/json'
first_response = mock.Mock()
first_response.status_code = http_client.FORBIDDEN
second_response = mock.Mock()
second_response.status_code = http_client.OK
second_response.json = {'Test': 'Testing'}
self.request.side_effect = [first_response, second_response]
response = self.conn._op('POST', path='fake/path', data=self.data,
headers=self.headers)
self.auth.refresh_session.assert_called_with()
self.assertEqual(response.json, second_response.json)
def test_connection_error(self):
self.request.side_effect = requests.exceptions.ConnectionError
self.assertRaises(exceptions.ConnectionError, self.conn._op, 'GET')
@ -171,6 +248,8 @@ class ConnectorOpTestCase(base.TestCase):
self.assertEqual(http_client.INTERNAL_SERVER_ERROR, exc.status_code)
def test_access_error(self):
self.conn._auth.can_refresh_session.return_value = False
self.request.return_value.status_code = http_client.FORBIDDEN
self.request.return_value.json.side_effect = ValueError('no json')

View File

@ -17,27 +17,34 @@ import json
import mock
from sushy import auth
from sushy import connector
from sushy import main
from sushy.resources.manager import manager
from sushy.resources.sessionservice import session
from sushy.resources.sessionservice import sessionservice
from sushy.resources.system import system
from sushy.tests.unit import base
class MainTestCase(base.TestCase):
@mock.patch.object(auth, 'SessionOrBasicAuth', autospec=True)
@mock.patch.object(connector, 'Connector', autospec=True)
def setUp(self, mock_connector):
@mock.patch.object(sessionservice, 'SessionService', autospec=True)
def setUp(self, mock_session_service, mock_connector, mock_auth):
super(MainTestCase, self).setUp()
self.conn = mock.Mock()
self.sess_serv = mock.Mock()
self.sess_serv.create_session.return_value = (None, None)
mock_session_service.return_value = self.sess_serv
mock_connector.return_value = self.conn
with open('sushy/tests/unit/json_samples/root.json', 'r') as f:
self.conn.get.return_value.json.return_value = json.loads(f.read())
self.root = main.Sushy(
'http://foo.bar:1234', username='foo', password='bar',
verify=True)
self.root = main.Sushy('http://foo.bar:1234',
verify=True, auth=mock_auth)
mock_connector.assert_called_once_with(
'http://foo.bar:1234', 'foo', 'bar', True)
'http://foo.bar:1234', True)
def test__parse_attributes(self):
self.root._parse_attributes()
@ -48,6 +55,14 @@ class MainTestCase(base.TestCase):
self.root.uuid)
self.assertEqual('/redfish/v1/Systems', self.root._systems_path)
self.assertEqual('/redfish/v1/Managers', self.root._managers_path)
self.assertEqual('/redfish/v1/SessionService',
self.root._session_service_path)
@mock.patch.object(connector, 'Connector', autospec=True)
def test__init_throws_exception(self, mock_Connector):
self.assertRaises(
ValueError, main.Sushy, 'http://foo.bar:1234',
'foo', 'bar', auth=mock.MagicMock())
@mock.patch.object(system, 'SystemCollection', autospec=True)
def test_get_system_collection(self, mock_system_collection):
@ -76,3 +91,17 @@ class MainTestCase(base.TestCase):
Manager_mock.assert_called_once_with(
self.root._conn, 'fake-manager-id',
redfish_version=self.root.redfish_version)
@mock.patch.object(sessionservice, 'SessionService', autospec=True)
def test_get_sessionservice(self, mock_sess_serv):
self.root.get_session_service()
mock_sess_serv.assert_called_once_with(
self.root._conn, '/redfish/v1/SessionService',
redfish_version=self.root.redfish_version)
@mock.patch.object(session, 'Session', autospec=True)
def test_get_session(self, mock_sess):
self.root.get_session('asdf')
mock_sess.assert_called_once_with(
self.root._conn, 'asdf',
redfish_version=self.root.redfish_version)