#########################################################################
#
# Copyright (C) 2017 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import copy
import logging
import operator
import traceback
from functools import reduce
from django.db.models import Q
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist
from django.contrib.auth.models import Group, Permission
from django.contrib.contenttypes.models import ContentType
from guardian.shortcuts import get_perms, get_groups_with_perms, get_anonymous_user
from geonode.groups.models import GroupProfile
from geonode.groups.conf import settings as groups_settings
from geonode.security.utils import get_user_groups, AdvancedSecurityWorkflowManager
from .permissions import (
VIEW_PERMISSIONS,
DOWNLOAD_PERMISSIONS,
ADMIN_PERMISSIONS,
SERVICE_PERMISSIONS,
DATASET_ADMIN_PERMISSIONS,
DATASET_EDIT_DATA_PERMISSIONS,
DATASET_EDIT_STYLE_PERMISSIONS,
)
from .utils import get_users_with_perms, get_user_obj_perms_model, skip_registered_members_common_group
[docs]
logger = logging.getLogger(__name__)
[docs]
class PermissionLevelError(Exception):
pass
[docs]
class PermissionLevelMixin:
"""
Mixin for adding "Permission Level" methods to a model class -- eg role systems where a user has exactly one assigned role with respect to an object representing an "access level"
"""
[docs]
def get_all_level_info(self):
"""
Translates the current object guardian permissions into a JSON-like "perm_spec" object in the form:
.. code-block:: json
{
"users": {
"<Profile AnonymousUser>": ["view"],
"<Profile username>": ["perm1", "perm2", "perm3"],
"<Profile username2>": ["perm1", "perm2", "perm3"]
},
"groups": {
"<Group groupname>": ["perm1", "perm2", "perm3"],
"<Group groupname2>": ["perm1", "perm2", "perm3"]
}
}
"""
resource = self.get_self_resource()
users = get_users_with_perms(resource)
groups = get_groups_with_perms(resource, attach_perms=True)
info = {"users": users, "groups": groups}
try:
if hasattr(self, "dataset"):
info_dataset = {
"users": get_users_with_perms(self.dataset),
"groups": get_groups_with_perms(self.dataset, attach_perms=True),
}
for user in info_dataset["users"]:
if user in info["users"]:
info["users"][user] = info["users"][user] + info_dataset["users"][user]
else:
info["users"][user] = info_dataset["users"][user]
for group in info_dataset["groups"]:
if group in info["groups"]:
info["groups"][group] = list(
dict.fromkeys(info["groups"][group] + info_dataset["groups"][group])
)
else:
info["groups"][group] = info_dataset["groups"][group]
except Exception:
tb = traceback.format_exc()
logger.debug(tb)
for _k, _v in info.items():
for _kk in list(_v):
# Remove AnonymousUser if set for some reason (legacy code)
if _kk == get_anonymous_user():
logger.warning(
"Guardian permisions for AnonymouUser on resource {resource.id} were found in the DB, which is unexpected"
)
del info[_k][_kk]
continue
_vv = _v[_kk]
if _vv and isinstance(_vv, list):
# Remove duplicated perms
info[_k][_kk] = list(set(_vv))
return info
[docs]
def get_self_resource(self):
"""
Returns the "ResourceBase" associated to this "object".
"""
try:
if hasattr(self, "resourcebase_ptr_id"):
return self.resourcebase_ptr
except ObjectDoesNotExist:
pass
return self
[docs]
def get_group_managers(self, group=None):
"""
Given the groups belonging to a "user", this method returns a tuple containing:
- The "groups" perms spec with resource access permissions (at least VIEW ones)
- The list of "group managers" of the groups above
"""
obj_groups = []
obj_group_managers = []
if group:
user_groups = get_user_groups(self.owner, group=group)
if user_groups:
for _user_group in user_groups:
if not skip_registered_members_common_group(Group.objects.get(name=_user_group)):
try:
_group_profile = GroupProfile.objects.get(slug=_user_group)
managers = _group_profile.get_managers()
if managers:
for manager in managers:
if manager not in obj_group_managers and not manager.is_superuser:
obj_group_managers.append(manager)
except GroupProfile.DoesNotExist:
tb = traceback.format_exc()
logger.debug(tb)
if self.group:
obj_groups.append(self.group)
for x in self.owner.groupmember_set.all():
if x.group.slug != groups_settings.REGISTERED_MEMBERS_GROUP_NAME:
obj_groups.append(x.group.group)
managers = x.group.get_managers()
if managers:
for manager in managers:
if manager not in obj_group_managers and not manager.is_superuser:
obj_group_managers.append(manager)
return list(set(obj_groups)), list(set(obj_group_managers))
[docs]
def set_default_permissions(self, owner=None, created=False):
"""
Removes all the permissions except for the owner and assign the view permission to the anonymous group.
"""
from geonode.resource.manager import resource_manager
# default permissions for anonymous users
anonymous_group, _ = Group.objects.get_or_create(name="anonymous")
if not anonymous_group:
raise Exception("Could not acquire 'anonymous' Group.")
perm_spec = copy.deepcopy(self.get_all_level_info())
if "users" not in perm_spec:
perm_spec["users"] = {}
if "groups" not in perm_spec:
perm_spec["groups"] = {}
# default permissions for owner and owner's groups
_owner = owner or self.owner
user_groups = Group.objects.filter(name__in=_owner.groupmember_set.values_list("group__slug", flat=True))
# Anonymous
anonymous_can_view = settings.DEFAULT_ANONYMOUS_VIEW_PERMISSION
if anonymous_can_view:
perm_spec["groups"][anonymous_group] = ["view_resourcebase"]
else:
for user_group in user_groups:
if not skip_registered_members_common_group(user_group):
perm_spec["groups"][user_group] = ["view_resourcebase"]
anonymous_can_download = settings.DEFAULT_ANONYMOUS_DOWNLOAD_PERMISSION
if anonymous_can_download:
perm_spec["groups"][anonymous_group] = ["view_resourcebase", "download_resourcebase"]
else:
for user_group in user_groups:
if not skip_registered_members_common_group(user_group):
perm_spec["groups"][user_group] = ["view_resourcebase", "download_resourcebase"]
AdvancedSecurityWorkflowManager.handle_moderated_uploads(self.uuid, instance=self)
return resource_manager.set_permissions(
self.uuid, instance=self, owner=owner, permissions=perm_spec, created=created
)
[docs]
def set_permissions(self, perm_spec=None, created=False, approval_status_changed=False, group_status_changed=False):
"""
Sets an object's the permission levels based on the perm_spec JSON.
the mapping looks like:
.. code-block:: bash
{
'users': {
'AnonymousUser': ['view'],
'username': ['perm1','perm2','perm3'],
'username2': ['perm1','perm2','perm3']
...
},
'groups': [
'groupname': ['perm1','perm2','perm3'],
'groupname2': ['perm1','perm2','perm3'],
...
]
}
"""
from geonode.resource.manager import resource_manager
return resource_manager.set_permissions(
self.uuid,
instance=self,
permissions=perm_spec,
created=created,
approval_status_changed=approval_status_changed,
group_status_changed=group_status_changed,
)
[docs]
def handle_moderated_uploads(self):
AdvancedSecurityWorkflowManager.handle_moderated_uploads(self.uuid, instance=self)
[docs]
def compare_perms(self, prev_perm_spec, perm_spec):
"""
Compare two perm_specs in the form
.. code-block:: bash
{
'users': {
<Profile AnonymousUser>: ['view'],
<Profile username>: ['perm1','perm2','perm3'],
<Profile username2>: ['perm1','perm2','perm3']
...
},
'groups': [
<Group groupname>: ['perm1','perm2','perm3'],
<Group groupname2>: ['perm1','perm2','perm3'],
...
]
}
"""
if "users" in prev_perm_spec:
if "users" in perm_spec:
if len(prev_perm_spec["users"]) != len(perm_spec["users"]):
return False
else:
_users_iterator = (
prev_perm_spec["users"].items()
if isinstance(prev_perm_spec["users"], dict)
else prev_perm_spec["users"]
)
for _user, _perms in _users_iterator:
if sorted(_perms) != sorted(perm_spec["users"].get(_user, [])):
return False
else:
return False
if "groups" in prev_perm_spec:
if "groups" in perm_spec:
if len(prev_perm_spec["groups"]) != len(perm_spec["groups"]):
return False
else:
_groups_iterator = (
prev_perm_spec["groups"].items()
if isinstance(prev_perm_spec["groups"], dict)
else prev_perm_spec["groups"]
)
for _group, _perms in _groups_iterator:
if sorted(_perms) != sorted(perm_spec["groups"].get(_group, [])):
return False
else:
return False
return True
[docs]
def fixup_perms(self, perm_spec):
"""
Transform a perm_spec in the form
.. code-block:: bash
{
'users': {
'AnonymousUser': ['view'],
'username': ['perm1','perm2','perm3'],
'username2': ['perm1','perm2','perm3']
...
},
'groups': [
'groupname': ['perm1','perm2','perm3'],
'groupname2': ['perm1','perm2','perm3'],
...
]
}
to the one in the form:
{
'users': {
<Profile AnonymousUser>: ['view'],
<Profile username>: ['perm1','perm2','perm3'],
<Profile username2>: ['perm1','perm2','perm3']
...
},
'groups': [
<Group groupname>: ['perm1','perm2','perm3'],
<Group groupname2>: ['perm1','perm2','perm3'],
...
]
}
It also removes items with empty permissions, e.g.:
'AnonymousUser': [] # the item will completely removed
"""
perm_spec_fixed = copy.deepcopy(perm_spec)
if "users" in perm_spec:
_users_iterator = perm_spec["users"].items() if isinstance(perm_spec["users"], dict) else perm_spec["users"]
for _user, _perms in _users_iterator:
if not isinstance(_user, get_user_model()):
perm_spec_fixed["users"].pop(_user)
if _perms and get_user_model().objects.filter(username=_user).count() == 1:
perm_spec_fixed["users"][get_user_model().objects.get(username=_user)] = _perms
if "groups" in perm_spec:
_groups_iterator = (
perm_spec["groups"].items() if isinstance(perm_spec["groups"], dict) else perm_spec["groups"]
)
for _group, _perms in _groups_iterator:
if not isinstance(_group, Group):
perm_spec_fixed["groups"].pop(_group)
if _perms and Group.objects.filter(name=_group).count() == 1:
perm_spec_fixed["groups"][Group.objects.get(name=_group)] = _perms
return perm_spec_fixed
[docs]
def get_user_perms(self, user):
"""
Returns a list of permissions a user has on a given resource.
"""
# To avoid circular import
from geonode.base.models import Configuration
config = Configuration.load()
ctype = ContentType.objects.get_for_model(self)
ctype_resource_base = ContentType.objects.get_for_model(self.get_self_resource())
PERMISSIONS_TO_FETCH = VIEW_PERMISSIONS + DOWNLOAD_PERMISSIONS + ADMIN_PERMISSIONS + SERVICE_PERMISSIONS
# include explicit permissions appliable to "subtype == 'vector'"
if self.subtype in ["vector", "vector_time"]:
PERMISSIONS_TO_FETCH += DATASET_ADMIN_PERMISSIONS
elif self.subtype == "raster":
PERMISSIONS_TO_FETCH += DATASET_EDIT_STYLE_PERMISSIONS
resource_perms = Permission.objects.filter(
codename__in=PERMISSIONS_TO_FETCH, content_type_id__in=[ctype.id, ctype_resource_base.id]
).values_list("codename", flat=True)
# Don't filter for admin users
if not user.is_superuser:
user_model = get_user_obj_perms_model(self)
user_resource_perms = user_model.objects.filter(
object_pk=self.pk,
content_type_id__in=[ctype.id, ctype_resource_base.id],
user__username=str(user),
permission__codename__in=resource_perms,
)
# get user's implicit perms for anyone flag
implicit_perms = get_perms(user, self)
# filter out implicit permissions unappliable to "subtype != 'vector'"
if self.subtype == "raster":
implicit_perms = list(set(implicit_perms) - set(DATASET_EDIT_DATA_PERMISSIONS))
elif self.subtype != "vector":
implicit_perms = list(set(implicit_perms) - set(DATASET_ADMIN_PERMISSIONS))
resource_perms = user_resource_perms.union(
user_model.objects.filter(permission__codename__in=implicit_perms)
).values_list("permission__codename", flat=True)
# filter out permissions for edit, change or publish if readonly mode is active
perm_prefixes = ["change", "delete", "publish"]
if config.read_only:
clauses = (Q(codename__contains=prefix) for prefix in perm_prefixes)
query = reduce(operator.or_, clauses)
if user.is_superuser:
resource_perms = resource_perms.exclude(query)
else:
perm_objects = Permission.objects.filter(codename__in=resource_perms)
resource_perms = perm_objects.exclude(query).values_list("codename", flat=True)
return resource_perms
[docs]
def user_can(self, user, permission):
"""
Checks if a has a given permission to the resource.
"""
resource = self.get_self_resource()
user_perms = self.get_user_perms(user).union(resource.get_user_perms(user))
if permission not in user_perms:
# TODO cater for permissions with syntax base.permission_codename
# eg 'base.change_resourcebase'
return False
return True