Merge "Support PKCE with v3oidcdeviceauthz"

This commit is contained in:
Zuul 2023-08-15 11:19:13 +00:00 committed by Gerrit Code Review
commit e6f3999c6f
2 changed files with 46 additions and 1 deletions

View File

@ -11,6 +11,9 @@
# under the License.
import abc
import base64
import hashlib
import os
import time
from urllib import parse as urlparse
import warnings
@ -481,10 +484,11 @@ class OidcDeviceAuthorization(_OidcBase):
HEADER_X_FORM = {"Content-Type": "application/x-www-form-urlencoded"}
def __init__(self, auth_url, identity_provider, protocol, # nosec
client_id, client_secret,
client_id, client_secret=None,
access_token_endpoint=None,
device_authorization_endpoint=None,
discovery_endpoint=None,
code_challenge=None, code_challenge_method=None,
**kwargs):
"""The OAuth 2.0 Device Authorization plugin expects the following.
@ -495,10 +499,14 @@ class OidcDeviceAuthorization(_OidcBase):
provided this value will override
the discovered one.
:type device_authorization_endpoint: string
:param code_challenge_method: PKCE Challenge Method (RFC 7636).
:type code_challenge_method: string
"""
# RFC 8628 only allows to retrieve an access_token
self.access_token_type = 'access_token'
self.device_authorization_endpoint = device_authorization_endpoint
self.code_challenge_method = code_challenge_method
super(OidcDeviceAuthorization, self).__init__(
auth_url=auth_url,
@ -536,6 +544,29 @@ class OidcDeviceAuthorization(_OidcBase):
raise exceptions.oidc.OidcDeviceAuthorizationEndpointNotFound()
return endpoint
def _generate_pkce_verifier(self):
"""Generate PKCE verifier string as defined in RFC 7636."""
raw_bytes = 42 # 32 is the minimum from the RFC, let's use a bit more
_rand = os.urandom(raw_bytes)
_rand_b64 = base64.urlsafe_b64encode(_rand).decode('ascii')
code_verifier = _rand_b64.rstrip('=') # strip padding as RFC says
return code_verifier
def _generate_pkce_challenge(self):
"""Generate PKCE challenge string as defined in RFC 7636."""
if self.code_challenge_method not in ('plain', 'S256'):
raise exceptions.OidcGrantTypeMissmatch()
self.code_verifier = self._generate_pkce_verifier()
if self.code_challenge_method == 'plain':
return self.code_verifier
elif self.code_challenge_method == 'S256':
_tmp = self.code_verifier.encode('ascii')
_hash = hashlib.sha256(_tmp).digest()
_tmp = base64.urlsafe_b64encode(_hash).decode('ascii')
code_challenge = _tmp.rstrip('=')
return code_challenge
def get_payload(self, session):
"""Get an authorization grant for the "device_code" grant type.
@ -548,9 +579,19 @@ class OidcDeviceAuthorization(_OidcBase):
client_auth = (self.client_id, self.client_secret)
device_authz_endpoint = \
self._get_device_authorization_endpoint(session)
payload = {}
if self.code_challenge_method:
self.code_challenge = self._generate_pkce_challenge()
payload.setdefault('code_challenge_method',
self.code_challenge_method)
payload.setdefault('code_challenge', self.code_challenge)
encoded_payload = urlparse.urlencode(payload)
op_response = session.post(device_authz_endpoint,
requests_auth=client_auth,
headers=self.HEADER_X_FORM,
data=encoded_payload,
authenticated=False)
self.expires_in = int(op_response.json()["expires_in"])
@ -563,6 +604,8 @@ class OidcDeviceAuthorization(_OidcBase):
op_response.json()["verification_uri_complete"]
payload = {'device_code': self.device_code}
if self.code_challenge_method:
payload.setdefault('code_verifier', self.code_verifier)
return payload
def _get_access_token(self, session, payload):

View File

@ -209,6 +209,8 @@ class OpenIDConnectDeviceAuthorization(_OpenIDConnectBase):
'that if a discovery document is being passed this '
'option will override the endpoint provided by the '
'server in the discovery document.'),
loading.Opt('code-challenge-method',
help='PKCE Challenge Method (RFC 7636)'),
])
return options