#########################################################################
#
# Copyright (C) 2021 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
"""
Permissions will be managed according to a "compact" set:
- view: view resource
- download: view and download
- edit: view download and edit (metadata, style, data)
- manage: change permissions, delete resource, etc.
The GET method will return:
users:
- username
- first name
- last name
- permissions (view | download | edit | manage)
organizations:
- title
- name
- permissions (view | download | edit | manage)
groups:
- title
- name
- permissions (view | download | edit | manage)
"""
import copy
import json
import pprint
import jsonschema
import collections
from avatar.templatetags.avatar_tags import avatar_url
from guardian.shortcuts import get_group_perms, get_anonymous_user
from django.conf import settings
from django.contrib.auth.models import Group
from django.contrib.auth import get_user_model
from geonode.utils import build_absolute_uri
from geonode.groups.conf import settings as groups_settings
# Permissions mapping
[docs]
PERMISSIONS = {
"add_resourcebase": "add_resource",
}
[docs]
DOWNLOADABLE_RESOURCES = ["dataset", "document"]
[docs]
DATA_EDITABLE_RESOURCES_SUBTYPES = ["vector", "vector_time"]
[docs]
DATA_STYLABLE_RESOURCES_SUBTYPES = ["raster", "vector", "vector_time"]
# The following permissions will be filtered out when READ_ONLY mode is active
[docs]
READ_ONLY_AFFECTED_PERMISSIONS = [
"add_resource",
]
# Permissions on resources
[docs]
VIEW_PERMISSIONS = [
"view_resourcebase",
]
[docs]
DOWNLOAD_PERMISSIONS = [
"download_resourcebase",
]
[docs]
EDIT_PERMISSIONS = [
"change_resourcebase",
"change_resourcebase_metadata",
]
[docs]
BASIC_MANAGE_PERMISSIONS = [
"delete_resourcebase",
"change_resourcebase_permissions",
]
[docs]
MANAGE_PERMISSIONS = BASIC_MANAGE_PERMISSIONS + [
"publish_resourcebase",
]
[docs]
ADMIN_PERMISSIONS = MANAGE_PERMISSIONS + EDIT_PERMISSIONS
[docs]
OWNER_PERMISSIONS = ADMIN_PERMISSIONS + VIEW_PERMISSIONS
[docs]
DATASET_EDIT_DATA_PERMISSIONS = [
"change_dataset_data",
]
[docs]
DATASET_EDIT_STYLE_PERMISSIONS = [
"change_dataset_style",
]
[docs]
DATASET_ADMIN_PERMISSIONS = DATASET_EDIT_DATA_PERMISSIONS + DATASET_EDIT_STYLE_PERMISSIONS
[docs]
SERVICE_PERMISSIONS = ["add_service", "delete_service", "change_resourcebase_metadata", "add_resourcebase_from_service"]
[docs]
DEFAULT_PERMISSIONS = []
if settings.DEFAULT_ANONYMOUS_VIEW_PERMISSION:
DEFAULT_PERMISSIONS += VIEW_PERMISSIONS
if settings.DEFAULT_ANONYMOUS_DOWNLOAD_PERMISSION:
DEFAULT_PERMISSIONS += DOWNLOAD_PERMISSIONS
[docs]
DEFAULT_PERMS_SPEC = json.dumps({"users": {"AnonymousUser": DEFAULT_PERMISSIONS}, "groups": {}})
[docs]
DOWNLOAD_RIGHTS = "download"
[docs]
MANAGE_RIGHTS = "manage"
[docs]
COMPACT_RIGHT_MODES = (
(VIEW_RIGHTS, "view"),
(DOWNLOAD_RIGHTS, "download"),
(EDIT_RIGHTS, "edit"),
(MANAGE_RIGHTS, "manage"),
(OWNER_RIGHTS, "owner"),
)
[docs]
PERM_SPEC_COMPACT_SCHEMA = {
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"users": {
"type": "array",
"items": [
{
"type": "object",
"properties": {
"avatar": {"type": "string"},
"first_name": {"type": "string"},
"id": {"type": "integer"},
"last_name": {"type": "string"},
"permissions": {
"type": "string",
"enum": ["none", "view", "download", "edit", "manage", "owner"],
},
"username": {"type": "string"},
"is_staff": {"type": "boolean"},
"is_superuser": {"type": "boolean"},
},
"required": [
"avatar",
"first_name",
"id",
"last_name",
"permissions",
"username",
"is_staff",
"is_superuser",
],
}
],
},
"organizations": {
"type": "array",
"items": [
{
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"permissions": {
"type": "string",
"enum": ["none", "view", "download", "edit", "manage", "owner"],
},
"title": {"type": "string"},
},
"required": ["id", "name", "permissions", "title"],
}
],
},
"groups": {
"type": "array",
"items": [
{
"type": "object",
"properties": {
"id": {"type": "integer"},
"title": {"type": "string"},
"name": {"type": "string"},
"permissions": {
"type": "string",
"enum": ["none", "view", "download", "edit", "manage", "owner"],
},
},
"required": ["id", "title", "name", "permissions"],
}
],
},
},
"required": ["users", "organizations", "groups"],
}
[docs]
def _to_extended_perms(
perm: str, resource_type: str = None, resource_subtype: str = None, is_owner: bool = False
) -> list:
"""
Explode "compact" permissions into an "extended" set, accordingly to the schema below:
- view: view resource
- download: view and download
- edit: view download and edit (metadata, style, data)
- manage: change permissions, delete resource, etc.
- owner: admin permissions
"""
def safe_list(perms):
return sorted(list(set(copy.deepcopy(perms)))) if perms else []
if is_owner:
if resource_type and resource_type.lower() in DOWNLOADABLE_RESOURCES:
if resource_subtype and resource_subtype.lower() in safe_list(
DATA_EDITABLE_RESOURCES_SUBTYPES + DATA_STYLABLE_RESOURCES_SUBTYPES
):
return safe_list(DATASET_ADMIN_PERMISSIONS + OWNER_PERMISSIONS + DOWNLOAD_PERMISSIONS)
else:
return safe_list(OWNER_PERMISSIONS + DOWNLOAD_PERMISSIONS)
else:
return safe_list(OWNER_PERMISSIONS)
elif perm is None or len(perm) == 0 or perm == NONE_RIGHTS:
return []
elif perm == VIEW_RIGHTS:
return safe_list(VIEW_PERMISSIONS)
elif perm == DOWNLOAD_RIGHTS:
if resource_type and resource_type.lower() in DOWNLOADABLE_RESOURCES:
return safe_list(VIEW_PERMISSIONS + DOWNLOAD_PERMISSIONS)
else:
return safe_list(VIEW_PERMISSIONS)
elif perm == EDIT_RIGHTS:
if resource_type and resource_type.lower() in DOWNLOADABLE_RESOURCES:
if resource_subtype and resource_subtype.lower() in safe_list(
DATA_EDITABLE_RESOURCES_SUBTYPES + DATA_STYLABLE_RESOURCES_SUBTYPES
):
return safe_list(DATASET_ADMIN_PERMISSIONS + VIEW_PERMISSIONS + EDIT_PERMISSIONS + DOWNLOAD_PERMISSIONS)
else:
return safe_list(VIEW_PERMISSIONS + EDIT_PERMISSIONS + DOWNLOAD_PERMISSIONS)
else:
return safe_list(VIEW_PERMISSIONS + EDIT_PERMISSIONS)
elif perm == MANAGE_RIGHTS:
if resource_type and resource_type.lower() in DOWNLOADABLE_RESOURCES:
if resource_subtype and resource_subtype.lower() in safe_list(
DATA_EDITABLE_RESOURCES_SUBTYPES + DATA_STYLABLE_RESOURCES_SUBTYPES
):
return safe_list(
DATASET_ADMIN_PERMISSIONS + VIEW_PERMISSIONS + ADMIN_PERMISSIONS + DOWNLOAD_PERMISSIONS
)
else:
return safe_list(VIEW_PERMISSIONS + ADMIN_PERMISSIONS + DOWNLOAD_PERMISSIONS)
else:
return safe_list(VIEW_PERMISSIONS + ADMIN_PERMISSIONS)
[docs]
def _to_compact_perms(
perms: list, resource_type: str = None, resource_subtype: str = None, is_owner: bool = False
) -> str:
"""
Compress standard permissions into a "compact" set, accordingly to the schema below:
- view: view resource
- download: view and download
- edit: view download and edit (metadata, style, data)
- manage: change permissions, delete resource, etc.
- owner: admin permissions
"""
if is_owner:
return OWNER_RIGHTS
if perms is None or len(perms) == 0:
return NONE_RIGHTS
if any(_p in MANAGE_PERMISSIONS for _p in perms):
return MANAGE_RIGHTS
elif (
resource_type
and resource_type.lower() in DOWNLOADABLE_RESOURCES
and any(_p in DATASET_ADMIN_PERMISSIONS + EDIT_PERMISSIONS for _p in perms)
):
return EDIT_RIGHTS
elif any(_p in DATASET_ADMIN_PERMISSIONS + EDIT_PERMISSIONS for _p in perms):
return EDIT_RIGHTS
elif (
resource_type
and resource_type.lower() in DOWNLOADABLE_RESOURCES
and any(_p in DOWNLOAD_PERMISSIONS for _p in perms)
):
return DOWNLOAD_RIGHTS
elif any(_p in VIEW_PERMISSIONS for _p in perms):
return VIEW_RIGHTS
return NONE_RIGHTS
[docs]
_Binding = collections.namedtuple("Binding", ["name", "expected", "ro", "binding"])
[docs]
_User = collections.namedtuple(
"User", ["id", "username", "last_name", "first_name", "avatar", "is_superuser", "is_staff"]
)
[docs]
_Group = collections.namedtuple("Group", ["id", "title", "name", "logo"])
[docs]
def _binding(name, expected=True, ro=True, binding=None):
return _Binding(name, expected, ro, binding)
[docs]
class BindingFailed(Exception):
"""
Something in the API has changed
"""
pass
[docs]
class PermSpecConverterBase(object):
def __init__(self, json, resource, parent=None):
[docs]
self._resource = resource
if parent == self:
raise Exception("bogus")
if json is not None:
self._bind_json(json)
[docs]
def _bind_json(self, json):
# generally, not required for override. instead use _bind_custom_json
# if possible
if not isinstance(json, dict):
self._binding_failed("expected dict, got %s", type(json))
for binding in self._bindings:
val = json.pop(binding.name, None)
if binding.expected and val is None:
self._binding_failed("expected val for %s", binding.name)
if binding.binding and val is not None:
if issubclass(binding.binding, PermSpecConverterBase):
if isinstance(val, list):
val = [binding.binding(v, getattr(self, "_resource", None), parent=self) for v in val]
else:
val = binding.binding(val, getattr(self, "_resource", None), parent=self)
else:
if isinstance(val, list):
val = [binding.binding(v, parent=self) for v in val]
else:
val = binding.binding(val, parent=self)
setattr(self, binding.name, val)
self._bind_custom_json(json)
[docs]
def _bind_custom_json(self, json):
# do any custom binding like for a shortcut
pass
[docs]
def _binding_failed(self, msg, args):
raise BindingFailed(f"[{type(self)}] {msg % args}")
[docs]
def _to_json_object(self, deep=True, top_level=True):
_json = {}
for binding in self._bindings:
val = getattr(self, binding.name, None)
if isinstance(val, PermSpecConverterBase):
val = val._to_json_object(top_level=False)
if val is not None:
_json[binding.name] = val
self._to_json_object_custom(_json)
if top_level and self._object_name:
_json = {self._object_name: _json}
return _json.copy()
[docs]
def _to_json_object_custom(self, json):
pass
[docs]
def __repr__(self):
_json = self._to_json_object(deep=True, top_level=False)
try:
return json.dumps(_json)
except Exception:
return pprint.pformat(_json, indent=2)
[docs]
class PermSpec(PermSpecConverterBase):
[docs]
_object_name = "perm_spec"
[docs]
_bindings = (
_binding("users"),
_binding("groups"),
)
@property
[docs]
def compact(self):
"""
Converts a standard and verbose 'perm_spec' into 'compact mode'.
- This method also recognizes special/internal security groups, such as 'anonymous' and 'registered-members',
and places their permissions in a specific node called 'groups'.
- Every security group different from the former ones, associated with a GeoNode 'GroupProfile',
will instead be placed in a node called 'organizations'.
Example:
.. code-block:: json
{
"users": [
{
"id": 1001,
"username": "afabiani",
"first_name": "",
"last_name": "",
"avatar": "",
"permissions": "manage",
"is_superuser": true,
"is_staff": false
}
],
"organizations": [],
"groups": [
{
"id": 3,
"title": "Registered Members",
"name": "registered-members",
"permissions": "edit"
},
{
"id": 2,
"title": "anonymous",
"name": "anonymous",
"permissions": "download"
}
]
}
"""
json = {}
user_perms = []
group_perms = []
anonymous_perms = None
contributors_perms = None
organization_perms = []
for _k in self.users:
_perms = self.users[_k]
if isinstance(_k, str):
_k = get_user_model().objects.get(username=_k)
if not _k.is_anonymous and _k.username != "AnonymousUser":
avatar = build_absolute_uri(avatar_url(_k, 240))
user = _User(_k.id, _k.username, _k.last_name, _k.first_name, avatar, _k.is_superuser, _k.is_staff)
is_owner = _k == self._resource.owner
user_perms.append(
{
"id": user.id,
"username": user.username,
"first_name": user.first_name,
"last_name": user.last_name,
"avatar": user.avatar,
"permissions": _to_compact_perms(
_perms, self._resource.resource_type, self._resource.subtype, is_owner
),
"is_superuser": user.is_superuser,
"is_staff": user.is_staff,
}
)
else:
anonymous_perms = {
"id": Group.objects.get(name="anonymous").id,
"title": "anonymous",
"name": "anonymous",
"permissions": _to_compact_perms(_perms, self._resource.resource_type, self._resource.subtype),
}
# Let's make sure we don't lose control over the resource
if not any([_u.get("id", None) == self._resource.owner.id for _u in user_perms]):
user_perms.append(
{
"id": self._resource.owner.id,
"username": self._resource.owner.username,
"first_name": self._resource.owner.first_name,
"last_name": self._resource.owner.last_name,
"avatar": build_absolute_uri(avatar_url(self._resource.owner, 240)),
"permissions": OWNER_RIGHTS,
"is_superuser": self._resource.owner.is_superuser,
"is_staff": self._resource.owner.is_staff,
}
)
for user in get_user_model().objects.filter(is_superuser=True):
if not any([_u.get("id", None) == user.id for _u in user_perms]):
user_perms.append(
{
"id": user.id,
"username": user.username,
"first_name": user.first_name,
"last_name": user.last_name,
"avatar": build_absolute_uri(avatar_url(user, 240)),
"permissions": MANAGE_RIGHTS,
"is_superuser": user.is_superuser,
"is_staff": user.is_staff,
}
)
for _k in self.groups:
_perms = self.groups[_k]
if isinstance(_k, str):
_k = Group.objects.get(name=_k)
if _k.name == "anonymous":
anonymous_perms = {
"id": _k.id,
"title": "anonymous",
"name": "anonymous",
"permissions": _to_compact_perms(_perms, self._resource.resource_type, self._resource.subtype),
}
elif hasattr(_k, "groupprofile"):
group = _Group(_k.id, _k.groupprofile.title, _k.name, _k.groupprofile.logo_url)
if _k.name == groups_settings.REGISTERED_MEMBERS_GROUP_NAME:
contributors_perms = {
"id": group.id,
"title": group.title,
"name": group.name,
"permissions": _to_compact_perms(_perms, self._resource.resource_type, self._resource.subtype),
}
else:
organization_perms.append(
{
"id": group.id,
"title": group.title,
"name": group.name,
"logo": group.logo,
"permissions": _to_compact_perms(
_perms, self._resource.resource_type, self._resource.subtype
),
}
)
if anonymous_perms:
group_perms.append(anonymous_perms)
else:
anonymous_group = Group.objects.get(name="anonymous")
group_perms.append(
{
"id": anonymous_group.id,
"title": "anonymous",
"name": "anonymous",
"permissions": _to_compact_perms(
get_group_perms(anonymous_group, self._resource),
self._resource.resource_type,
self._resource.subtype,
),
}
)
if contributors_perms:
group_perms.append(contributors_perms)
elif Group.objects.filter(name=groups_settings.REGISTERED_MEMBERS_GROUP_NAME).exists():
contributors_group = Group.objects.get(name=groups_settings.REGISTERED_MEMBERS_GROUP_NAME)
group_perms.append(
{
"id": contributors_group.id,
"title": "Registered Members",
"name": contributors_group.name,
"permissions": _to_compact_perms(
get_group_perms(contributors_group, self._resource),
self._resource.resource_type,
self._resource.subtype,
),
}
)
json["users"] = sorted(user_perms, key=lambda x: x["id"], reverse=True)
json["organizations"] = sorted(organization_perms, key=lambda x: x["id"], reverse=True)
json["groups"] = sorted(group_perms, key=lambda x: x["id"], reverse=True)
return json.copy()
[docs]
class PermSpecUserCompact(PermSpecConverterBase):
[docs]
_object_name = "perm_spec_user_compact"
[docs]
_bindings = (
_binding("id"),
_binding("username", expected=False),
_binding("first_name", expected=False),
_binding("last_name", expected=False),
_binding("avatar", expected=False),
_binding("permissions"),
_binding("is_superuser", expected=False),
_binding("is_staff", expected=False),
)
[docs]
class PermSpecGroupCompact(PermSpecConverterBase):
[docs]
_object_name = "perm_spec_group_compact"
[docs]
_bindings = (
_binding("id"),
_binding("title", expected=False),
_binding("name", expected=False),
_binding("logo", expected=False),
_binding("permissions"),
)
[docs]
class PermSpecCompact(PermSpecConverterBase):
[docs]
_object_name = "perm_spec_compact"
[docs]
_bindings = (
_binding("users", expected=False, binding=PermSpecUserCompact),
_binding("organizations", expected=False, binding=PermSpecGroupCompact),
_binding("groups", expected=False, binding=PermSpecGroupCompact),
)
@classmethod
[docs]
def validate(cls, perm_spec):
try:
jsonschema.validate(perm_spec, PERM_SPEC_COMPACT_SCHEMA)
return True
except jsonschema.ValidationError:
return False
@property
[docs]
def extended(self):
"""
Converts a 'perm_spec' in 'compact mode' into a standard and verbose format.
Example:
.. code-block:: json
{
"groups": {
"<Group: registered-members>": [
"view_resourcebase",
"download_resourcebase",
"change_resourcebase"
],
"<Group: anonymous>": [
"view_resourcebase"
]
},
"users": {
"<Profile: AnonymousUser>": [
"view_resourcebase"
],
"<Profile: afabiani>": [
"view_resourcebase",
"download_resourcebase",
"change_resourcebase_metadata",
"change_resourcebase",
"delete_resourcebase",
"change_resourcebase_permissions",
"publish_resourcebase"
]
}
}
"""
json = {"users": {}, "groups": {}}
for _u in self.users:
_user_profile = get_user_model().objects.get(id=_u.id)
_is_owner = _user_profile == self._resource.owner
_perms = OWNER_RIGHTS if _is_owner else MANAGE_RIGHTS if _user_profile.is_superuser else _u.permissions
json["users"][_user_profile.username] = _to_extended_perms(
_perms, self._resource.resource_type, self._resource.subtype, _is_owner
)
for _go in self.organizations:
_group = Group.objects.get(id=_go.id)
json["groups"][_group.name] = _to_extended_perms(
_go.permissions, self._resource.resource_type, self._resource.subtype
)
for _go in self.groups:
_group = Group.objects.get(id=_go.id)
json["groups"][_group.name] = _to_extended_perms(
_go.permissions, self._resource.resource_type, self._resource.subtype
)
if _go.name == "anonymous":
_user_profile = get_anonymous_user()
json["users"][_user_profile.username] = _to_extended_perms(
_go.permissions, self._resource.resource_type, self._resource.subtype
)
return json.copy()
[docs]
def merge(self, perm_spec_compact_patch: "PermSpecCompact"):
"""Merges 'perm_spec_compact_patch' to the current one.
- Existing elements will be overridden.
- Non existing elements will be added.
- If you need to delete elements you cannot use this method.
"""
for _elem in [_b.name for _b in self._bindings]:
for _up in getattr(perm_spec_compact_patch, _elem, []) or []:
_merged = False
for _i, _u in enumerate(getattr(self, _elem, []) or []):
if _up.id == _u.id:
getattr(self, _elem)[_i] = _up
getattr(self, _elem)[_i].parent = self
_merged = True
break
if not _merged:
if isinstance(getattr(self, _elem), list):
getattr(self, _elem).append(_up)
else:
getattr(self, _elem).add(_up)
[docs]
def get_compact_perms_list(
perms: list,
resource_type: str = None,
resource_subtype: str = None,
is_owner: bool = False,
is_none_allowed: bool = True,
compact_perms_labels: dict = {},
) -> list:
"""
Transforms an extended "perm_spec" into a list of compact perms.
"""
def _get_labeled_compact_perm(compact_perm: str):
return {"name": compact_perm, "label": compact_perms_labels.get(compact_perm, compact_perm)}
_perms_list = []
_perm = _to_compact_perms(perms, resource_type, resource_subtype, is_owner)
if _perm:
for _p in COMPACT_RIGHT_MODES:
if (
_p[1] not in [DOWNLOAD_RIGHTS] + DATASET_ADMIN_PERMISSIONS
or _p[1] in [DOWNLOAD_RIGHTS]
and any(__p in DOWNLOAD_PERMISSIONS for __p in perms)
or _p[1] in DATASET_ADMIN_PERMISSIONS
and any(__p in DATA_EDITABLE_RESOURCES_SUBTYPES + DATA_STYLABLE_RESOURCES_SUBTYPES for __p in perms)
):
_perms_list.append(_get_labeled_compact_perm(_p[1]))
if _p[1] == _perm:
break
if is_owner and OWNER_RIGHTS not in _perms_list:
_perms_list.append(_get_labeled_compact_perm(OWNER_RIGHTS))
if is_none_allowed and NONE_RIGHTS not in _perms_list:
_perms_list.insert(0, _get_labeled_compact_perm(NONE_RIGHTS))
return _perms_list