#########################################################################
#
# Copyright (C) 2020 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
from oauth2_provider.settings import oauth2_settings
from oauth2_provider.oauth2_validators import OAuth2Validator
import json
import base64
import hashlib
import logging
from datetime import datetime, timedelta
from django.utils import dateformat, timezone
from jwcrypto import jwk, jwt
[docs]
log = logging.getLogger(__name__)
[docs]
class OIDCValidator(OAuth2Validator):
"""
Example:
Check if the username and password correspond to a valid and active User.
If authentication fails, try Facebook token authentication.
Example method:
.. code-block:: python
def validate_user(self, username, password, client, request, *args, **kwargs):
u = authenticate(username=username, password=password)
if u is None or not u.is_active:
u = authenticate_with_facebook()
if u is not None and u.is_active:
request.user = u
return True
return False
"""
[docs]
def get_authorization_code_nonce(self, client_id, code, redirect_uri, request):
return None
[docs]
def get_id_token(self, token, token_handler, request):
key = jwk.JWK.from_pem(oauth2_settings.OIDC_RSA_PRIVATE_KEY.encode("utf8"))
# TODO: http://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken2
# Save the id_token on database bound to code when the request come to
# Authorization Endpoint and return the same one when request come to
# Token Endpoint
# TODO: Check if at this point this request parameters are alredy validated
expiration_time = timezone.now() + timedelta(seconds=oauth2_settings.ID_TOKEN_EXPIRE_SECONDS)
# Required ID Token claims
claims = {
"iss": oauth2_settings.OIDC_ISS_ENDPOINT,
"sub": str(request.user.id),
"aud": request.client_id,
"exp": int(dateformat.format(expiration_time, "U")),
"iat": int(dateformat.format(datetime.utcnow(), "U")),
"auth_time": int(dateformat.format(request.user.last_login, "U")),
}
nonce = getattr(request, "nonce", None)
if nonce:
claims["nonce"] = nonce
# TODO: create a function to check if we should add at_hash
# http://openid.net/specs/openid-connect-core-1_0.html#CodeIDToken
# http://openid.net/specs/openid-connect-core-1_0.html#ImplicitIDToken
# if request.grant_type in 'authorization_code' and 'access_token' in token:
if (
(request.grant_type == "authorization_code" and "access_token" in token)
or request.response_type == "code id_token token"
or (request.response_type == "id_token token" and "access_token" in token)
):
acess_token = token["access_token"]
sha256 = hashlib.sha256(acess_token.encode("ascii"))
bits128 = sha256.hexdigest()[:16]
at_hash = base64.urlsafe_b64encode(bits128.encode("ascii"))
claims["at_hash"] = at_hash.decode("utf8")
# TODO: create a function to check if we should include c_hash
# http://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken
if request.response_type in ("code id_token", "code id_token token"):
code = token["code"]
sha256 = hashlib.sha256(code.encode("ascii"))
bits256 = sha256.hexdigest()[:32]
c_hash = base64.urlsafe_b64encode(bits256.encode("ascii"))
claims["c_hash"] = c_hash.decode("utf8")
jwt_token = jwt.JWT(header=json.dumps({"alg": "RS256"}, default=str), claims=json.dumps(claims, default=str))
jwt_token.make_signed_token(key)
id_token = self._save_id_token(jwt_token, request, expiration_time)
# this is needed by django rest framework
request.access_token = id_token
request.id_token = id_token
return jwt_token.serialize()