Source code for geonode.api.views

#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import json

from django.utils import timezone
from django.http import HttpResponse
from django.contrib.auth import get_user_model
from django.views.decorators.csrf import csrf_exempt

from guardian.models import Group

from oauth2_provider.models import AccessToken
from oauth2_provider.exceptions import OAuthToolkitError, FatalClientError
from allauth.account.utils import user_field, user_email, user_username

from ..utils import json_response
from ..decorators import superuser_or_apiauth
from ..base.auth import get_token_object_from_session, extract_headers, get_auth_token


[docs] def verify_access_token(request, key): try: token = None if request: token = get_token_object_from_session(request.session) if not token or token.key != key: token = AccessToken.objects.get(token=key) if not token.is_valid(): raise OAuthToolkitError("AccessToken is not valid.") if token.is_expired(): raise OAuthToolkitError("AccessToken has expired.") except AccessToken.DoesNotExist: raise FatalClientError("AccessToken not found at all.") except Exception: return None return token
@csrf_exempt
[docs] def user_info(request): headers = extract_headers(request) user = request.user if not user: out = {"success": False, "status": "error", "errors": {"user": ["User is not authenticated"]}} return json_response(out, status=401) access_token = None if "Authorization" not in headers or "Bearer" not in headers["Authorization"]: access_token = get_auth_token(user) if not access_token: out = {"success": False, "status": "error", "errors": {"auth": ["No token provided."]}} return json_response(out, status=403) else: access_token = headers["Authorization"].replace("Bearer ", "") groups = [group.name for group in user.groups.all()] if user.is_superuser: groups.append("admin") user_info = json.dumps( { "sub": str(user.id), "name": " ".join([user_field(user, "first_name"), user_field(user, "last_name")]), "given_name": user_field(user, "first_name"), "family_name": user_field(user, "last_name"), "email": user_email(user), "preferred_username": user_username(user), "groups": groups, "access_token": str(access_token), } ) response = HttpResponse(user_info, content_type="application/json") response["Cache-Control"] = "no-store" response["Pragma"] = "no-cache" return response
@csrf_exempt
[docs] def verify_token(request): if request.POST and "token" in request.POST: token = None try: access_token = request.POST.get("token") token = verify_access_token(request, access_token) except Exception as e: return HttpResponse(json.dumps({"error": str(e)}), status=403, content_type="application/json") if token: token_info = json.dumps( { "client_id": token.application.client_id, "user_id": token.user.id, "username": token.user.username, "issued_to": token.user.username, "access_token": access_token, "email": token.user.email, "verified_email": "true", "access_type": "online", "expires_in": (token.expires - timezone.now()).total_seconds() * 1000, } ) response = HttpResponse(token_info, content_type="application/json") response["Authorization"] = f"Bearer {access_token}" return response else: return HttpResponse( json.dumps({"error": "No access_token from server."}), status=403, content_type="application/json" ) return HttpResponse(json.dumps({"error": "invalid_request"}), status=403, content_type="application/json")
@csrf_exempt @superuser_or_apiauth()
[docs] def roles(request): groups = [group.name for group in Group.objects.all()] groups.append("admin") return HttpResponse(json.dumps({"groups": groups}), content_type="application/json")
@csrf_exempt @superuser_or_apiauth()
[docs] def users(request): user_name = request.path_info.rsplit("/", 1)[-1] User = get_user_model() if user_name is None or not user_name or user_name == "users": users = [user for user in User.objects.all()] else: users = [user for user in User.objects.filter(username=user_name)] if not users: # Try using the user email users = [user for user in User.objects.filter(email=user_name)] json_object = [] for user in users: groups = [group.name for group in user.groups.all()] if user.is_superuser: groups.append("admin") json_object.append({"username": user.username, "groups": groups}) return HttpResponse(json.dumps({"users": json_object}), content_type="application/json")
@csrf_exempt @superuser_or_apiauth()
[docs] def admin_role(request): return HttpResponse(json.dumps({"adminRole": "admin"}), content_type="application/json")