#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
from django.db.models import Q
from tastypie.authentication import ApiKeyAuthentication
from tastypie.authorization import DjangoAuthorization
from tastypie.exceptions import Unauthorized
from tastypie.compat import get_user_model, get_username_field
from guardian.shortcuts import get_objects_for_user
from tastypie.http import HttpUnauthorized
from django.conf import settings
from geonode import geoserver
from geonode.utils import check_ogc_backend
[docs]
class GeoNodeAuthorization(DjangoAuthorization):
"""
Object level API authorization based on GeoNode granular
permission system
"""
[docs]
def read_list(self, object_list, bundle):
permitted_ids = []
try:
permitted_ids = get_objects_for_user(bundle.request.user, "base.view_resourcebase").values("id")
except Exception:
pass
return object_list.filter(id__in=permitted_ids)
[docs]
def read_detail(self, object_list, bundle):
if "schema" in bundle.request.path:
return True
return bundle.request.user.has_perm("view_resourcebase", bundle.obj.get_self_resource())
[docs]
def create_list(self, object_list, bundle):
# TODO implement if needed
raise Unauthorized()
[docs]
def create_detail(self, object_list, bundle):
return bundle.request.user.has_perm("add_resourcebase", bundle.obj.get_self_resource())
[docs]
def update_list(self, object_list, bundle):
# TODO implement if needed
raise Unauthorized()
[docs]
def update_detail(self, object_list, bundle):
return bundle.request.user.has_perm("change_resourcebase", bundle.obj.get_self_resource())
[docs]
def delete_list(self, object_list, bundle):
# TODO implement if needed
raise Unauthorized()
[docs]
def delete_detail(self, object_list, bundle):
return bundle.request.user.has_perm("delete_resourcebase", bundle.obj.get_self_resource())
[docs]
class GeonodeApiKeyAuthentication(ApiKeyAuthentication):
"""
Override ApiKeyAuthentication to prevent 401 response when no api key is provided.
"""
[docs]
def is_authenticated(self, request, **kwargs):
"""
Finds the user and checks their API key.
Should return either ``True`` if allowed, ``False`` if not or an
``HttpResponse`` if you need something custom.
"""
try:
username, api_key = self.extract_credentials(request)
except ValueError:
return self._unauthorized()
if not username or not api_key:
return True
username_field = get_username_field()
User = get_user_model()
try:
lookup_kwargs = {username_field: username}
user = User.objects.get(**lookup_kwargs)
except (User.DoesNotExist, User.MultipleObjectsReturned):
return self._unauthorized()
if not self.check_active(user):
return False
key_auth_check = self.get_key(user, api_key)
if key_auth_check and not isinstance(key_auth_check, HttpUnauthorized):
request.user = user
return key_auth_check
[docs]
class GeoNodeStyleAuthorization(GeoNodeAuthorization):
"""
Object level API authorization based on GeoNode granular permission system.
Style object permissions should follow it's layer permissions.
"""
[docs]
def filter_by_resource_ids(self, object_list, permitted_ids):
"""
Filter Style queryset by permitted resource ids.
"""
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
return object_list.filter(dataset_styles__id__in=permitted_ids)
[docs]
def read_list(self, object_list, bundle):
permitted_ids = get_objects_for_user(bundle.request.user, "base.view_resourcebase").values("id")
return self.filter_by_resource_ids(object_list, permitted_ids)
[docs]
def delete_detail(self, object_list, bundle):
permitted_ids = get_objects_for_user(bundle.request.user, "layer.change_dataset_style").values("id")
resource_obj = bundle.obj.get_self_resource()
return resource_obj in permitted_ids
[docs]
class ApiLockdownAuthorization(DjangoAuthorization):
"""
API authorization for all resources which are not protected by others authentication/authorization mechanism.
If setting "API_LOCKDOWN" is set to True, resource can only be accessed by authenticated users. For anonymous requests, empty lists are returned.
"""
[docs]
def read_list(self, object_list, bundle):
user = bundle.request.user
if settings.API_LOCKDOWN and not user.is_authenticated:
# return empty list
return []
else:
return object_list
[docs]
class GeoNodePeopleAuthorization(DjangoAuthorization):
"""
API authorization that allows only authenticated users to view list of users
"""
[docs]
def read_list(self, object_list, bundle):
user = bundle.request.user
if not user.is_authenticated:
# return empty list
return []
return object_list
[docs]
class GroupAuthorization(ApiLockdownAuthorization):
[docs]
def read_list(self, object_list, bundle):
groups = super().read_list(object_list, bundle)
user = bundle.request.user
if groups:
if not user.is_authenticated or user.is_anonymous:
return groups.exclude(groupprofile__access="private")
elif not user.is_superuser:
return groups.filter(Q(groupprofile__in=user.group_list_all()) | ~Q(groupprofile__access="private"))
return groups
[docs]
class GroupProfileAuthorization(ApiLockdownAuthorization):
[docs]
def read_list(self, object_list, bundle):
groups = super().read_list(object_list, bundle)
user = bundle.request.user
if groups:
if not user.is_authenticated or user.is_anonymous:
return groups.exclude(access="private")
elif not user.is_superuser:
return groups.filter(Q(pk__in=user.group_list_all()) | ~Q(access="private"))
return groups