deb-python-pysaml2/src/saml2/soap.py

281 lines
8.5 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
Suppport for the client part of the SAML2.0 SOAP binding.
"""
import logging
from saml2 import create_class_from_element_tree
from saml2.samlp import NAMESPACE as SAMLP_NAMESPACE
from saml2.schema import soapenv
try:
from xml.etree import cElementTree as ElementTree
except ImportError:
try:
import cElementTree as ElementTree
except ImportError:
#noinspection PyUnresolvedReferences
from elementtree import ElementTree
import defusedxml.ElementTree
logger = logging.getLogger(__name__)
class XmlParseError(Exception):
pass
class WrongMessageType(Exception):
pass
def parse_soap_enveloped_saml_response(text):
tags = ['{%s}Response' % SAMLP_NAMESPACE,
'{%s}LogoutResponse' % SAMLP_NAMESPACE]
return parse_soap_enveloped_saml_thingy(text, tags)
def parse_soap_enveloped_saml_logout_response(text):
tags = ['{%s}Response' % SAMLP_NAMESPACE,
'{%s}LogoutResponse' % SAMLP_NAMESPACE]
return parse_soap_enveloped_saml_thingy(text, tags)
def parse_soap_enveloped_saml_attribute_query(text):
expected_tag = '{%s}AttributeQuery' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_attribute_response(text):
tags = ['{%s}Response' % SAMLP_NAMESPACE,
'{%s}AttributeResponse' % SAMLP_NAMESPACE]
return parse_soap_enveloped_saml_thingy(text, tags)
def parse_soap_enveloped_saml_logout_request(text):
expected_tag = '{%s}LogoutRequest' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_authn_request(text):
expected_tag = '{%s}AuthnRequest' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_artifact_resolve(text):
expected_tag = '{%s}ArtifactResolve' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_artifact_response(text):
expected_tag = '{%s}ArtifactResponse' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_name_id_mapping_request(text):
expected_tag = '{%s}NameIDMappingRequest' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_name_id_mapping_response(text):
expected_tag = '{%s}NameIDMappingResponse' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_manage_name_id_request(text):
expected_tag = '{%s}ManageNameIDRequest' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_manage_name_id_response(text):
expected_tag = '{%s}ManageNameIDResponse' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_assertion_id_request(text):
expected_tag = '{%s}AssertionIDRequest' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_assertion_id_response(text):
tags = ['{%s}Response' % SAMLP_NAMESPACE,
'{%s}AssertionIDResponse' % SAMLP_NAMESPACE]
return parse_soap_enveloped_saml_thingy(text, tags)
def parse_soap_enveloped_saml_authn_query(text):
expected_tag = '{%s}AuthnQuery' % SAMLP_NAMESPACE
return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_authn_query_response(text):
tags = ['{%s}Response' % SAMLP_NAMESPACE]
return parse_soap_enveloped_saml_thingy(text, tags)
def parse_soap_enveloped_saml_authn_response(text):
tags = ['{%s}Response' % SAMLP_NAMESPACE]
return parse_soap_enveloped_saml_thingy(text, tags)
#def parse_soap_enveloped_saml_logout_response(text):
# expected_tag = '{%s}LogoutResponse' % SAMLP_NAMESPACE
# return parse_soap_enveloped_saml_thingy(text, [expected_tag])
def parse_soap_enveloped_saml_thingy(text, expected_tags):
"""Parses a SOAP enveloped SAML thing and returns the thing as
a string.
:param text: The SOAP object as XML string
:param expected_tags: What the tag of the SAML thingy is expected to be.
:return: SAML thingy as a string
"""
envelope = defusedxml.ElementTree.fromstring(text)
# Make sure it's a SOAP message
assert envelope.tag == '{%s}Envelope' % soapenv.NAMESPACE
assert len(envelope) >= 1
body = None
for part in envelope:
if part.tag == '{%s}Body' % soapenv.NAMESPACE:
assert len(part) == 1
body = part
break
if body is None:
return ""
saml_part = body[0]
if saml_part.tag in expected_tags:
return ElementTree.tostring(saml_part, encoding="UTF-8")
else:
raise WrongMessageType("Was '%s' expected one of %s" % (saml_part.tag,
expected_tags))
import re
NS_AND_TAG = re.compile("\{([^}]+)\}(.*)")
def instanciate_class(item, modules):
m = NS_AND_TAG.match(item.tag)
ns, tag = m.groups()
for module in modules:
if module.NAMESPACE == ns:
try:
target = module.ELEMENT_BY_TAG[tag]
return create_class_from_element_tree(target, item)
except KeyError:
continue
raise Exception("Unknown class: ns='%s', tag='%s'" % (ns, tag))
def class_instances_from_soap_enveloped_saml_thingies(text, modules):
"""Parses a SOAP enveloped header and body SAML thing and returns the
thing as a dictionary class instance.
:param text: The SOAP object as XML
:param modules: modules representing xsd schemas
:return: The body and headers as class instances
"""
try:
envelope = defusedxml.ElementTree.fromstring(text)
except Exception as exc:
raise XmlParseError("%s" % exc)
assert envelope.tag == '{%s}Envelope' % soapenv.NAMESPACE
assert len(envelope) >= 1
env = {"header": [], "body": None}
for part in envelope:
if part.tag == '{%s}Body' % soapenv.NAMESPACE:
assert len(part) == 1
env["body"] = instanciate_class(part[0], modules)
elif part.tag == "{%s}Header" % soapenv.NAMESPACE:
for item in part:
env["header"].append(instanciate_class(item, modules))
return env
def open_soap_envelope(text):
"""
:param text: SOAP message
:return: dictionary with two keys "body"/"header"
"""
try:
envelope = defusedxml.ElementTree.fromstring(text)
except Exception as exc:
raise XmlParseError("%s" % exc)
assert envelope.tag == '{%s}Envelope' % soapenv.NAMESPACE
assert len(envelope) >= 1
content = {"header": [], "body": None}
for part in envelope:
if part.tag == '{%s}Body' % soapenv.NAMESPACE:
assert len(part) == 1
content["body"] = ElementTree.tostring(part[0], encoding="UTF-8")
elif part.tag == "{%s}Header" % soapenv.NAMESPACE:
for item in part:
_str = ElementTree.tostring(item, encoding="UTF-8")
content["header"].append(_str)
return content
def make_soap_enveloped_saml_thingy(thingy, headers=None):
""" Returns a soap envelope containing a SAML request
as a text string.
:param thingy: The SAML thingy
:return: The SOAP envelope as a string
"""
soap_envelope = soapenv.Envelope()
if headers:
_header = soapenv.Header()
_header.add_extension_elements(headers)
soap_envelope.header = _header
soap_envelope.body = soapenv.Body()
soap_envelope.body.add_extension_element(thingy)
return "%s" % soap_envelope
def soap_fault(message=None, actor=None, code=None, detail=None):
""" Create a SOAP Fault message
:param message: Human readable error message
:param actor: Who discovered the error
:param code: Error code
:param detail: More specific error message
:return: A SOAP Fault message as a string
"""
_string = _actor = _code = _detail = None
if message:
_string = soapenv.Fault_faultstring(text=message)
if actor:
_actor = soapenv.Fault_faultactor(text=actor)
if code:
_code = soapenv.Fault_faultcode(text=code)
if detail:
_detail = soapenv.Fault_detail(text=detail)
fault = soapenv.Fault(
faultcode=_code,
faultstring=_string,
faultactor=_actor,
detail=_detail,
)
return "%s" % fault