#########################################################################
#
# Copyright (C) 2020 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import logging
from django.conf import settings
from django.contrib.auth import get_user_model
from django.shortcuts import get_object_or_404
from rest_framework import permissions
from rest_framework.filters import BaseFilterBackend
from geonode.security.permissions import (
BASIC_MANAGE_PERMISSIONS,
DOWNLOAD_PERMISSIONS,
EDIT_PERMISSIONS,
VIEW_PERMISSIONS,
)
from distutils.util import strtobool
from geonode.security.utils import get_users_with_perms, get_visible_resources
from geonode.groups.models import GroupProfile
from rest_framework.permissions import DjangoModelPermissions
from guardian.shortcuts import get_objects_for_user
from itertools import chain
from guardian.shortcuts import get_groups_with_perms
[docs]
logger = logging.getLogger(__name__)
[docs]
class IsSelf(permissions.BasePermission):
"""Grant permission only if the current instance is the request user.
Used to allow users to edit their own account, nothing to others (even
superusers).
"""
[docs]
def has_permission(self, request, view):
"""Always return False here.
The fine-grained permissions are handled in has_object_permission().
"""
return False
[docs]
def has_object_permission(self, request, view, obj):
user = request.user
if user and isinstance(obj, get_user_model()) and obj.pk == user.pk:
return True
return False
[docs]
class IsSelfOrReadOnly(IsSelf):
"""Grant permissions if instance *IS* the request user, or read-only.
Used to allow users to edit their own account, and others to read.
"""
[docs]
def has_object_permission(self, request, view, obj):
if request.method in permissions.SAFE_METHODS:
return True
return IsSelf.has_object_permission(self, request, view, obj)
[docs]
class IsSelfOrAdmin(IsSelf):
"""Grant R/W to self and superusers/staff members. Deny others."""
[docs]
def has_permission(self, request, view):
user = request.user
if user and (user.is_superuser or user.is_staff):
return True
return IsSelf.has_permission(self, request, view)
[docs]
def has_object_permission(self, request, view, obj):
user = request.user
if user and (user.is_superuser or user.is_staff):
return True
return IsSelf.has_object_permission(self, request, view, obj)
[docs]
class IsSelfOrAdminOrReadOnly(IsSelfOrAdmin):
"""Grant R/W to self and superusers/staff members, R/O to others."""
[docs]
def has_permission(self, request, view):
if request.method in permissions.SAFE_METHODS:
return True
return IsSelfOrAdmin.has_permission(self, request, view)
[docs]
def has_object_permission(self, request, view, obj):
if request.method in permissions.SAFE_METHODS:
return True
return IsSelfOrAdmin.has_object_permission(self, request, view, obj)
[docs]
class IsSelfOrAdminOrAuthenticatedReadOnly(IsSelfOrAdmin):
"""Grant R/W to self and superusers/staff members, R/O to auth."""
[docs]
def has_object_permission(self, request, view, obj):
user = request.user
if request.method in permissions.SAFE_METHODS:
if user.is_authenticated():
return True
return IsSelfOrAdmin.has_object_permission(self, request, view, obj)
[docs]
class IsOwnerOrAdmin(permissions.BasePermission):
"""
Object-level permission to only allow admin and owners of an object to edit it.
Assumes the model instance has an `owner` attribute.
"""
[docs]
def has_object_permission(self, request, view, obj):
if request.user is None or (not request.user.is_anonymous and not request.user.is_active):
return False
if request.user.is_superuser or request.user.is_staff:
return True
# Instance must have an attribute named `owner`.
_request_matches = False
if isinstance(obj, get_user_model()) and obj == request.user:
_request_matches = True
elif hasattr(obj, "owner"):
_request_matches = obj.owner == request.user
elif hasattr(obj, "user"):
_request_matches = obj.user == request.user
if not _request_matches:
_request_matches = request.user in get_users_with_perms(obj)
return _request_matches
[docs]
class IsOwnerOrReadOnly(IsOwnerOrAdmin):
"""
Object-level permission to only allow owners of an object to edit it.
Assumes the model instance has an `owner` attribute.
"""
[docs]
def has_object_permission(self, request, view, obj):
# Read permissions are allowed to any request,
# so we'll always allow GET, HEAD or OPTIONS requests.
if request.method in permissions.SAFE_METHODS and not isinstance(obj, get_user_model()):
return True
return IsOwnerOrAdmin.has_object_permission(self, request, view, obj)
[docs]
class IsManagerEditOrAdmin(permissions.BasePermission):
"""
Object-level permission to only allow admin and managers to edit a group.
"""
[docs]
def has_permission(self, request, view):
if request.method in ["POST", "DELETE"]:
user = request.user
return user and (user.is_superuser or user.is_staff)
return True
[docs]
def has_object_permission(self, request, view, obj):
# Read permissions are allowed to any request,
# so we'll always allow GET, HEAD or OPTIONS requests
if request.method in permissions.SAFE_METHODS:
return True
user = request.user
if user and user.is_superuser or user.is_staff:
return True
is_group_manager = user and isinstance(obj, GroupProfile) and obj.user_is_role(user, "manager")
if is_group_manager and request.method == "PATCH":
return True
return False
[docs]
class ResourceBasePermissionsFilter(BaseFilterBackend):
"""
A filter backend that limits results to those where the requesting user
has read object level permissions.
"""
[docs]
def filter_queryset(self, request, queryset, view):
try:
metadata_only = strtobool(request.query_params.get("filter{metadata_only}", "None"))
except Exception:
metadata_only = None
return get_visible_resources(
queryset,
request.user,
metadata_only=metadata_only,
admin_approval_required=settings.ADMIN_MODERATE_UPLOADS,
unpublished_not_visible=settings.RESOURCE_PUBLISHING,
private_groups_not_visibile=settings.GROUP_PRIVATE_RESOURCES,
)
[docs]
class UserHasPerms(DjangoModelPermissions):
[docs]
perms_map = {
"GET": [f"base.{x}" for x in VIEW_PERMISSIONS + DOWNLOAD_PERMISSIONS],
"POST": ["base.add_resourcebase"] + [f"base.{x}" for x in EDIT_PERMISSIONS],
"PUT": [f"base.{x}" for x in EDIT_PERMISSIONS],
"PATCH": [f"base.{x}" for x in EDIT_PERMISSIONS],
"DELETE": [f"base.{x}" for x in BASIC_MANAGE_PERMISSIONS],
}
def __init__(self, perms_dict={}):
[docs]
self.perms_dict = perms_dict
[docs]
def __call__(self):
return self
[docs]
def has_permission(self, request, view):
from geonode.base.models import ResourceBase
queryset = self._queryset(view)
if request.user.is_superuser:
return True
if view.kwargs.get("pk"):
# if a single resource is called, we check the perms for that resource
res = get_object_or_404(ResourceBase, pk=view.kwargs.get("pk"))
# if the request is for a single resource, we take the specific or the default. If none is defined we keep the original one defined above
resource_type_specific_perms = self.perms_dict.get(
res.get_real_instance().resource_type, self.perms_dict.get("default", {})
)
perms = resource_type_specific_perms.get(request.method, []) or self.get_required_permissions(
request.method, queryset.model
)
# getting the user permission for that resource
resource_perms = list(res.get_user_perms(request.user))
if getattr(res, "get_real_instance", None):
resource_perms.extend(list(res.get_real_instance().get_user_perms(request.user)))
groups = get_groups_with_perms(res, attach_perms=True)
# we are making this because the request.user.groups sometimes returns empty si is not fully reliable
for group, perm in groups.items():
# checking if the user is in that group
if group.user_set.filter(username=request.user).exists():
resource_perms = list(chain(resource_perms, perm))
if request.user.has_perm("base.add_resourcebase"):
resource_perms.append("add_resourcebase")
# merging all available permissions into a single list
available_perms = list(set(resource_perms))
# fixup the permissions name
perms_without_base = [x.replace("base.", "") for x in perms]
# if at least one of the permissions is available the request is True
rule = resource_type_specific_perms.get("rule", any)
return rule([_perm in available_perms for _perm in perms_without_base])
if request.method in permissions.SAFE_METHODS:
return True
_default_defined_perms = self.perms_dict.get("default", {})
if _default_defined_perms.get(request.method):
_defined_perms = _default_defined_perms.get(request.method)
rule = _default_defined_perms.get("rule", any)
return rule([request.user.has_perm(_perm) for _perm in _defined_perms])
perms = self.perms_dict.get(request.method, None) or self.get_required_permissions(
request.method, queryset.model
)
# check if the user have one of the perms in all the resource available
return get_objects_for_user(request.user, perms).exists()