#########################################################################
#
# Copyright (C) 2018 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import copy
import json
import logging
import collections
from itertools import chain
from django.db.models import Q
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import Group, Permission
from guardian.utils import get_user_obj_perms_model
from guardian.shortcuts import get_objects_for_user, get_objects_for_group
from geonode.groups.conf import settings as groups_settings
from geonode.groups.models import GroupProfile
from geonode.security.permissions import (
PermSpecCompact,
EDIT_PERMISSIONS,
VIEW_PERMISSIONS,
ADMIN_PERMISSIONS,
SERVICE_PERMISSIONS,
DOWNLOAD_PERMISSIONS,
DOWNLOADABLE_RESOURCES,
BASIC_MANAGE_PERMISSIONS,
DATASET_ADMIN_PERMISSIONS,
DATASET_EDIT_DATA_PERMISSIONS,
DATASET_EDIT_STYLE_PERMISSIONS,
DATA_EDITABLE_RESOURCES_SUBTYPES,
DATA_STYLABLE_RESOURCES_SUBTYPES,
)
[docs]
logger = logging.getLogger(__name__)
[docs]
def get_visible_resources(
queryset,
user,
request=None,
metadata_only=False,
admin_approval_required=False,
unpublished_not_visible=False,
private_groups_not_visibile=False,
):
# Get the list of objects the user has access to
from geonode.groups.models import GroupProfile
is_admin = user.is_superuser if user and user.is_authenticated else False
anonymous_group = None
public_groups = GroupProfile.objects.exclude(access="private").values("group")
groups = []
group_list_all = []
try:
group_list_all = user.group_list_all().values("group")
except Exception:
pass
try:
anonymous_group = Group.objects.get(name="anonymous")
if anonymous_group and anonymous_group not in groups:
groups.append(anonymous_group)
except Exception:
pass
filter_set = queryset.filter(dirty_state=False)
if metadata_only is not None:
# Hide Dirty State Resources
filter_set = filter_set.filter(metadata_only=metadata_only)
if not is_admin:
if user:
_allowed_resources = get_objects_for_user(
user, ["base.view_resourcebase", "base.change_resourcebase"], any_perm=True
)
filter_set = filter_set.filter(id__in=_allowed_resources.values("id"))
if admin_approval_required and not AdvancedSecurityWorkflowManager.is_simplified_workflow():
if not user or not user.is_authenticated or user.is_anonymous:
filter_set = filter_set.filter(
Q(is_published=True) | Q(group__in=public_groups) | Q(group__in=groups)
).exclude(is_approved=False)
# Hide Unpublished Resources to Anonymous Users
if unpublished_not_visible:
if not user or not user.is_authenticated or user.is_anonymous:
filter_set = filter_set.exclude(is_published=False)
# Hide Resources Belonging to Private Groups
if private_groups_not_visibile:
private_groups = GroupProfile.objects.filter(access="private").values("group")
if user and user.is_authenticated:
filter_set = filter_set.exclude(
Q(group__in=private_groups) & ~(Q(owner__username__iexact=str(user)) | Q(group__in=group_list_all))
)
else:
filter_set = filter_set.exclude(group__in=private_groups)
return filter_set
[docs]
def get_users_with_perms(obj):
"""
Override of the Guardian get_users_with_perms
"""
ctype = ContentType.objects.get_for_model(obj)
ctype_resource_base = ContentType.objects.get_for_model(obj.get_self_resource())
permissions = {}
PERMISSIONS_TO_FETCH = VIEW_PERMISSIONS + DOWNLOAD_PERMISSIONS + ADMIN_PERMISSIONS + SERVICE_PERMISSIONS
# include explicit permissions appliable to "subtype == 'vector'"
if obj.subtype == "vector":
PERMISSIONS_TO_FETCH += DATASET_ADMIN_PERMISSIONS
for perm in Permission.objects.filter(
codename__in=PERMISSIONS_TO_FETCH, content_type_id__in=[ctype.id, ctype_resource_base.id]
):
permissions[perm.id] = perm.codename
elif obj.subtype == "raster":
PERMISSIONS_TO_FETCH += DATASET_EDIT_STYLE_PERMISSIONS
for perm in Permission.objects.filter(
codename__in=PERMISSIONS_TO_FETCH, content_type_id__in=[ctype.id, ctype_resource_base.id]
):
permissions[perm.id] = perm.codename
else:
PERMISSIONS_TO_FETCH += DATASET_EDIT_DATA_PERMISSIONS
for perm in Permission.objects.filter(codename__in=PERMISSIONS_TO_FETCH):
permissions[perm.id] = perm.codename
user_model = get_user_obj_perms_model(obj)
users_with_perms = user_model.objects.filter(object_pk=obj.pk, permission_id__in=permissions).values(
"user_id", "permission_id"
)
users = {}
for item in users_with_perms:
if item["user_id"] in users:
users[item["user_id"]].append(permissions[item["permission_id"]])
else:
users[item["user_id"]] = [
permissions[item["permission_id"]],
]
profiles = {}
for profile in get_user_model().objects.filter(id__in=list(users.keys())):
profiles[profile] = users[profile.id]
return profiles
[docs]
def perms_as_set(perm) -> set:
return perm if isinstance(perm, set) else set(perm if isinstance(perm, list) else [perm])
[docs]
def get_resources_with_perms(user, filter_options={}, shortcut_kwargs={}):
"""
Returns resources a user has access to.
"""
from geonode.base.models import ResourceBase
if settings.SKIP_PERMS_FILTER:
resources = ResourceBase.objects.all()
else:
resources = get_objects_for_user(
user, ["base.view_resourcebase", "base.change_resourcebase"], any_perm=True, **shortcut_kwargs
)
resources_with_perms = get_visible_resources(
resources,
user,
admin_approval_required=settings.ADMIN_MODERATE_UPLOADS,
unpublished_not_visible=settings.RESOURCE_PUBLISHING,
private_groups_not_visibile=settings.GROUP_PRIVATE_RESOURCES,
)
if filter_options:
if resources_with_perms and resources_with_perms.exists():
if filter_options.get("title_filter"):
resources_with_perms = resources_with_perms.filter(title__icontains=filter_options.get("title_filter"))
type_filters = []
if filter_options.get("type_filter"):
_type_filter = filter_options.get("type_filter")
if _type_filter:
type_filters.append(_type_filter)
# get subtypes for geoapps
if _type_filter == "geoapp":
type_filters.extend(get_geoapp_subtypes())
if type_filters:
resources_with_perms = resources_with_perms.filter(polymorphic_ctype__model__in=type_filters)
return resources_with_perms
[docs]
def get_geoapp_subtypes():
"""
Returns a list of geoapp subtypes.
Example ['geostory']
"""
from geonode.geoapps.models import GeoApp
if not globals().get("geoapp_subtypes"):
globals()["geoapp_subtypes"] = list(GeoApp.objects.values_list("resource_type", flat=True).distinct())
return globals().get("geoapp_subtypes", [])
[docs]
def skip_registered_members_common_group(user_group):
_members_group_name = groups_settings.REGISTERED_MEMBERS_GROUP_NAME
if (settings.RESOURCE_PUBLISHING or settings.ADMIN_MODERATE_UPLOADS) and _members_group_name == user_group.name:
return True
return False
[docs]
def get_user_groups(owner, group=None):
"""
Returns all the groups belonging to the "owner"
"""
user_groups = Group.objects.filter(name__in=owner.groupmember_set.values_list("group__slug", flat=True))
if group:
user_groups = chain(user_groups, [group.group if hasattr(group, "group") else group])
return list(set(user_groups))
[docs]
def get_user_visible_groups(user, include_public_invite: bool = False):
"""
Retrieves all the groups accordingly to the following conditions:
- The user is member of
- The group is public
"""
from geonode.groups.models import GroupProfile
metadata_author_groups = []
if user.is_superuser or user.is_staff:
metadata_author_groups = GroupProfile.objects.all()
else:
if include_public_invite:
group_profile_queryset = GroupProfile.objects.exclude(access="private")
else:
group_profile_queryset = GroupProfile.objects.exclude(access="private").exclude(access="public-invite")
try:
all_metadata_author_groups = chain(user.group_list_all(), group_profile_queryset)
except Exception:
all_metadata_author_groups = group_profile_queryset
[
metadata_author_groups.append(item)
for item in all_metadata_author_groups
if item not in metadata_author_groups
]
return metadata_author_groups
[docs]
AdminViewPermissionsSet = collections.namedtuple("AdminViewPermissionsSet", ["admin_perms", "view_perms"])
[docs]
ResourceGroupsAndMembersSet = collections.namedtuple(
"ResourceGroupsAndMembersSet",
["anonymous_group", "registered_members_group", "owner_groups", "resource_groups", "managers"],
)
[docs]
class AdvancedSecurityWorkflowManager:
@staticmethod
[docs]
def is_anonymous_can_view():
return settings.DEFAULT_ANONYMOUS_VIEW_PERMISSION
@staticmethod
[docs]
def is_anonymous_can_download():
return settings.DEFAULT_ANONYMOUS_DOWNLOAD_PERMISSION
@staticmethod
[docs]
def is_group_private_mode():
return settings.GROUP_PRIVATE_RESOURCES
@staticmethod
[docs]
def is_manager_publish_mode():
return settings.RESOURCE_PUBLISHING
@staticmethod
[docs]
def is_admin_moderate_mode():
return settings.ADMIN_MODERATE_UPLOADS
@staticmethod
[docs]
def is_auto_publishing_workflow():
"""
**AUTO PUBLISHING**
- `RESOURCE_PUBLISHING = False`
- `ADMIN_MODERATE_UPLOADS = False`
- When user creates a resource:
- OWNER gets all the owner permissions (publish resource included)
- ANONYMOUS can view and download
- No change to the Group Manager is applied
"""
return not settings.RESOURCE_PUBLISHING and not settings.ADMIN_MODERATE_UPLOADS
@staticmethod
[docs]
def is_simple_publishing_workflow():
"""
**SIMPLE PUBLISHING**
- `RESOURCE_PUBLISHING = True` (Autopublishing is disabled)
- `ADMIN_MODERATE_UPLOADS = False`
- When user creates a resource:
- OWNER gets all the owner permissions (`publish_resource` and `change_resourcebase_permissions` INCLUDED)
- Group MANAGERS of the user's groups will get the owner permissions (`publish_resource` EXCLUDED)
- Group MEMBERS of the user's groups will get the `view_resourcebase`, `download_resourcebase` permission
- ANONYMOUS can not view and download if the resource is not published
- When resource has a group assigned:
- OWNER gets all the owner permissions (`publish_resource` and `change_resourcebase_permissions` INCLUDED)
- Group MANAGERS of the *resource's group* will get the owner permissions (`publish_resource` EXCLUDED)
- Group MEMBERS of the *resource's group* will get the `view_resourcebase`, `download_resourcebase` permission
"""
return settings.RESOURCE_PUBLISHING and not settings.ADMIN_MODERATE_UPLOADS
@staticmethod
[docs]
def is_advanced_workflow():
"""
**ADVANCED WORKFLOW**
- `RESOURCE_PUBLISHING = True`
- `ADMIN_MODERATE_UPLOADS = True`
- When user creates a resource:
- OWNER gets all the owner permissions (`publish_resource` and `change_resourcebase_permissions` EXCLUDED)
- Group MANAGERS of the user's groups will get the owner permissions (`publish_resource` INCLUDED)
- Group MEMBERS of the user's groups will get the `view_resourcebase`, `download_resourcebase` permission
- ANONYMOUS can not view and download if the resource is not published
- When resource has a group assigned:
- OWNER gets all the owner permissions (`publish_resource` and `change_resourcebase_permissions` EXCLUDED)
- Group MANAGERS of the resource's group will get the owner permissions (`publish_resource` INCLUDED)
- Group MEMBERS of the resource's group will get the `view_resourcebase`, `download_resourcebase` permission
"""
return settings.RESOURCE_PUBLISHING and settings.ADMIN_MODERATE_UPLOADS
@staticmethod
[docs]
def is_simplified_workflow():
"""
**SIMPLIFIED WORKFLOW**
- `RESOURCE_PUBLISHING = False`
- `ADMIN_MODERATE_UPLOADS = True`
.. note::
Is it even possibile? when the resource is automatically published, can it be un-published?
If this combination is not allowed, we should either stop the process when reading the settings or log a warning and force a safe combination.
- When user creates a resource:
- OWNER gets all the owner permissions (`publish_resource` and `change_resourcebase_permissions` INCLUDED)
- Group MANAGERS of the user's groups will get the owner permissions (`publish_resource` INCLUDED)
- Group MEMBERS of the user's group will get the `view_resourcebase`, `download_resourcebase` permission
- ANONYMOUS can view and download
"""
return not settings.RESOURCE_PUBLISHING and settings.ADMIN_MODERATE_UPLOADS
@staticmethod
[docs]
def is_allowed_to_approve(user, resource):
ResourceGroupsAndMembersSet = AdvancedSecurityWorkflowManager.compute_resource_groups_and_members_set(
resource.uuid, instance=resource, group=resource.group
)
is_superuser = user.is_superuser
is_owner = user == resource.owner
is_manager = user in ResourceGroupsAndMembersSet.managers
can_change_metadata = user.has_perm("change_resourcebase_metadata", resource.get_self_resource())
if is_superuser:
return True
elif AdvancedSecurityWorkflowManager.is_admin_moderate_mode():
return is_manager and can_change_metadata
else:
return is_owner or is_manager or can_change_metadata
@staticmethod
[docs]
def is_allowed_to_publish(user, resource):
ResourceGroupsAndMembersSet = AdvancedSecurityWorkflowManager.compute_resource_groups_and_members_set(
resource.uuid, instance=resource, group=resource.group
)
is_superuser = user.is_superuser
is_owner = user == resource.owner
is_manager = user in ResourceGroupsAndMembersSet.managers
can_publish = user.has_perm("publish_resourcebase", resource.get_self_resource())
if is_superuser:
return True
elif AdvancedSecurityWorkflowManager.is_manager_publish_mode():
return is_manager and can_publish
else:
return is_owner or is_manager or can_publish
@staticmethod
[docs]
def assignable_perm_condition(perm, resource_type):
_assignable_perm_policy_condition = (
(perm in DOWNLOAD_PERMISSIONS and resource_type in DOWNLOADABLE_RESOURCES)
or (perm in DATASET_EDIT_DATA_PERMISSIONS and resource_type in DATA_EDITABLE_RESOURCES_SUBTYPES)
or (perm not in (DOWNLOAD_PERMISSIONS + DATASET_EDIT_DATA_PERMISSIONS))
)
logger.debug(
f" perm: {perm} - resource_type: {resource_type} --> assignable: {_assignable_perm_policy_condition}"
)
return _assignable_perm_policy_condition
@staticmethod
[docs]
def get_instance(uuid: str):
from geonode.base.models import ResourceBase
return ResourceBase.objects.filter(uuid=uuid).first()
@staticmethod
[docs]
def compute_admin_and_view_permissions_set(uuid: str, /, instance=None) -> AdminViewPermissionsSet:
"""
Returns a copy of the ADMIN_PERMISSIONS and VIEW_PERMISISONS of a resource accordinlgy to:
- The resource_type
- The resource_subtype
"""
_resource = instance or AdvancedSecurityWorkflowManager.get_instance(uuid)
view_perms = []
admin_perms = []
if _resource.polymorphic_ctype:
_resource_type = _resource.resource_type or _resource.polymorphic_ctype.name
_resource_subtype = _resource.subtype
view_perms = VIEW_PERMISSIONS.copy()
if _resource_type in DOWNLOADABLE_RESOURCES:
view_perms += DOWNLOAD_PERMISSIONS.copy()
admin_perms = ADMIN_PERMISSIONS.copy()
if _resource.polymorphic_ctype.name == "dataset":
if _resource_subtype in DATA_EDITABLE_RESOURCES_SUBTYPES:
admin_perms += DATASET_EDIT_DATA_PERMISSIONS.copy()
if _resource_subtype in DATA_STYLABLE_RESOURCES_SUBTYPES:
admin_perms += DATASET_EDIT_STYLE_PERMISSIONS.copy()
if _resource.polymorphic_ctype.name == "service":
admin_perms += SERVICE_PERMISSIONS.copy()
return AdminViewPermissionsSet(admin_perms, view_perms)
@staticmethod
[docs]
def compute_resource_groups_and_members_set(uuid: str, /, instance=None, group=None) -> ResourceGroupsAndMembersSet:
"""
Returns a tuple containing:
- The "Anonymous" Group
- The "Registered Members" Group
- The "Groups" belonging to the Resource Owner
- The "managers" of the Groups affecting the Resource
- The "members" of the Groups affecting the Resource
"""
_resource = instance or AdvancedSecurityWorkflowManager.get_instance(uuid)
anonymous_group = Group.objects.get(name="anonymous")
registered_members_group = None
registered_members_group_name = groups_settings.REGISTERED_MEMBERS_GROUP_NAME
if getattr(groups_settings, "AUTO_ASSIGN_REGISTERED_MEMBERS_TO_REGISTERED_MEMBERS_GROUP_NAME", False):
registered_members_group = Group.objects.get(name=registered_members_group_name)
user_groups = get_user_groups(_resource.owner, group=group)
resource_groups, group_managers = _resource.get_group_managers(group=group)
return ResourceGroupsAndMembersSet(
anonymous_group, registered_members_group, user_groups, resource_groups, group_managers
)
@staticmethod
[docs]
def get_workflow_permissions(
uuid: str,
/,
instance=None,
perm_spec: dict = {"users": {}, "groups": {}},
created: bool = False,
approval_status_changed: bool = False,
group_status_changed: bool = False,
) -> dict:
"""
Adapts the provided "perm_spec" according to the following schema:
**Publishing Workflow Schema**
+---------------------+-----------------------+-------------------------+
| | RESOURCE_PUBLISHING | ADMIN_MODERATE_UPLOADS |
+---------------------+-----------------------+-------------------------+
| AUTO PUBLISH | X | X |
+---------------------+-----------------------+-------------------------+
| SIMPLE PUBLISHING | ✓ | X |
+---------------------+-----------------------+-------------------------+
| SIMPLIFIED WORKFLOW | X | ✓ |
+---------------------+-----------------------+-------------------------+
| ADVANCED WORKFLOW | ✓ | ✓ |
+---------------------+-----------------------+-------------------------+
**General Rules:**
- **OWNER** can never publish, except in the **AUTO_PUBLISHING** workflow.
- **MANAGERS** can always "publish" the resource.
- **MEMBERS** can always "view" and "download" the resource.
- When the OWNER is also a MANAGER, the MANAGER wins and can publish.
- Others, except in the AUTO_PUBLISHING workflow.
**Approval and Publishing Schema**
+----------------+--------------+-------------+
| | N/PUBLISHED | PUBLISHED |
+----------------+--------------+-------------+
| N/APPROVED | GM/OWR | - |
+----------------+--------------+-------------+
| APPROVED | registered | all |
+----------------+--------------+-------------+
Exceptions based on the enabled workflow:
- **SIMPLIFIED WORKFLOW**: If the resource is "approved" or "published", OWNERS won't be able to change the resource data and permissions.
- **ADVANCED WORKFLOW**: If the resource is "approved" or "published", OWNERS won't be able to change the resource data, metadata, and permissions.
"""
_resource = instance or AdvancedSecurityWorkflowManager.get_instance(uuid)
_perm_spec = copy.deepcopy(perm_spec)
def safe_remove(perms, perm):
perms.remove(perm) if perm in perms else None
if _resource:
_resource = _resource.get_real_instance()
AdminViewPermissionsSet = AdvancedSecurityWorkflowManager.compute_admin_and_view_permissions_set(
uuid, instance=_resource
)
ResourceGroupsAndMembersSet = AdvancedSecurityWorkflowManager.compute_resource_groups_and_members_set(
uuid, instance=_resource, group=_resource.group
)
# Computing the OWNER Permissions
prev_perms = _perm_spec["users"].get(_resource.owner, []) if isinstance(_perm_spec["users"], dict) else []
prev_perms += AdminViewPermissionsSet.view_perms.copy() + AdminViewPermissionsSet.admin_perms.copy()
prev_perms = list(set(prev_perms))
if not AdvancedSecurityWorkflowManager.is_auto_publishing_workflow():
# Check if owner is a manager of any group and add admin_manager_perms accordingly
if _resource.owner not in ResourceGroupsAndMembersSet.managers:
safe_remove(prev_perms, "publish_resourcebase")
if not AdvancedSecurityWorkflowManager.is_simple_publishing_workflow() and (
_resource.is_approved or _resource.is_published
):
for _perm in EDIT_PERMISSIONS:
safe_remove(prev_perms, _perm)
if _resource.resource_type == "dataset":
for _perm in DATASET_ADMIN_PERMISSIONS:
safe_remove(prev_perms, _perm)
if AdvancedSecurityWorkflowManager.is_advanced_workflow():
for _perm in BASIC_MANAGE_PERMISSIONS:
safe_remove(prev_perms, _perm)
_perm_spec["users"][_resource.owner] = list(set(prev_perms))
# Computing the MANAGERs and MEMBERs Permissions
if not AdvancedSecurityWorkflowManager.is_auto_publishing_workflow():
if group_status_changed:
# Reset Groups/Manager Perms
_owner_perms = copy.deepcopy(_perm_spec["users"].get(_resource.owner, []))
_perm_spec["users"] = {_resource.owner: _owner_perms}
_perm_spec["groups"] = {}
if ResourceGroupsAndMembersSet.managers:
for user in ResourceGroupsAndMembersSet.managers:
prev_perms = _perm_spec["users"].get(user, []) if "users" in _perm_spec else []
prev_perms += (
AdminViewPermissionsSet.view_perms.copy() + AdminViewPermissionsSet.admin_perms.copy()
)
prev_perms = list(set(prev_perms))
_perm_spec["users"][user] = list(set(prev_perms))
if ResourceGroupsAndMembersSet.resource_groups:
for group in ResourceGroupsAndMembersSet.resource_groups:
prev_perms = _perm_spec["groups"].get(group, []) if "groups" in _perm_spec else []
prev_perms += AdminViewPermissionsSet.view_perms.copy()
prev_perms = list(set(prev_perms))
_perm_spec["groups"][group] = list(set(prev_perms))
elif len(_perm_spec["groups"]):
groups = copy.deepcopy(_perm_spec["groups"])
for group in groups:
if group not in (
ResourceGroupsAndMembersSet.anonymous_group,
ResourceGroupsAndMembersSet.registered_members_group,
):
try:
group = group if hasattr(group, "group") else GroupProfile.objects.get(group=group)
users = list(group.get_managers()) + list(group.get_members())
for user in users:
if _perm_spec["users"].get(user, None):
_perm_spec["users"].pop(user)
if _perm_spec["groups"].get(group.group, None):
_perm_spec["groups"].pop(group.group)
except Exception as e:
logger.exception(e)
# Computing the 'All Others' Permissions
if ResourceGroupsAndMembersSet.anonymous_group:
prev_perms = (
_perm_spec["groups"].get(ResourceGroupsAndMembersSet.anonymous_group, [])
if isinstance(_perm_spec["groups"], dict)
else []
)
if approval_status_changed and (_resource.is_approved or _resource.is_published):
prev_perms += AdminViewPermissionsSet.view_perms.copy()
prev_perms = list(set(prev_perms))
if created:
if not AdvancedSecurityWorkflowManager.is_anonymous_can_view():
safe_remove(prev_perms, "view_resourcebase")
if not AdvancedSecurityWorkflowManager.is_anonymous_can_download():
safe_remove(prev_perms, "download_resourcebase")
if not AdvancedSecurityWorkflowManager.is_auto_publishing_workflow():
if (
(
AdvancedSecurityWorkflowManager.is_simple_publishing_workflow()
or AdvancedSecurityWorkflowManager.is_advanced_workflow()
)
and not _resource.is_published
) or (
AdvancedSecurityWorkflowManager.is_simplified_workflow()
and not (_resource.is_approved or _resource.is_published)
):
for _perm in VIEW_PERMISSIONS + DOWNLOAD_PERMISSIONS:
safe_remove(prev_perms, _perm)
_perm_spec["groups"][ResourceGroupsAndMembersSet.anonymous_group] = list(set(prev_perms))
if ResourceGroupsAndMembersSet.registered_members_group and getattr(
groups_settings, "AUTO_ASSIGN_REGISTERED_MEMBERS_TO_REGISTERED_MEMBERS_GROUP_NAME", False
):
prev_perms = (
_perm_spec["groups"].get(ResourceGroupsAndMembersSet.registered_members_group, [])
if isinstance(_perm_spec["groups"], dict)
else []
)
if approval_status_changed and (_resource.is_approved or _resource.is_published):
prev_perms += AdminViewPermissionsSet.view_perms.copy()
prev_perms = list(set(prev_perms))
if not AdvancedSecurityWorkflowManager.is_auto_publishing_workflow() and not _resource.is_approved:
for _perm in VIEW_PERMISSIONS + DOWNLOAD_PERMISSIONS:
safe_remove(prev_perms, _perm)
_perm_spec["groups"][ResourceGroupsAndMembersSet.registered_members_group] = list(set(prev_perms))
return _perm_spec
@staticmethod
[docs]
def get_permissions(
uuid: str,
/,
instance=None,
permissions: dict = {},
created: bool = False,
approval_status_changed: bool = False,
group_status_changed: bool = False,
) -> dict:
"""
Fix-ups the perm_spec accordingly to the enabled workflow (if any).
For more details check the "get_workflow_permissions" method
"""
_resource = instance or AdvancedSecurityWorkflowManager.get_instance(uuid)
_permissions = None
if permissions:
if PermSpecCompact.validate(permissions):
_permissions = PermSpecCompact(copy.deepcopy(permissions), _resource).extended
else:
_permissions = copy.deepcopy(permissions)
if _resource:
perm_spec = _permissions or copy.deepcopy(_resource.get_all_level_info())
# Sanity checks
if isinstance(perm_spec, str):
perm_spec = json.loads(perm_spec)
if "users" not in perm_spec:
perm_spec["users"] = {}
elif isinstance(perm_spec["users"], list):
_users = {}
for _item in perm_spec["users"]:
_users[_item[0]] = _item[1]
perm_spec["users"] = _users
if "groups" not in perm_spec:
perm_spec["groups"] = {}
elif isinstance(perm_spec["groups"], list):
_groups = {}
for _item in perm_spec["groups"]:
_groups[_item[0]] = _item[1]
perm_spec["groups"] = _groups
# Make sure we're dealing with "Profile"s and "Group"s...
perm_spec = _resource.fixup_perms(perm_spec)
perm_spec = AdvancedSecurityWorkflowManager.get_workflow_permissions(
_resource.uuid,
instance=_resource,
perm_spec=perm_spec,
created=created,
approval_status_changed=approval_status_changed,
group_status_changed=group_status_changed,
)
return perm_spec
@staticmethod
[docs]
def handle_moderated_uploads(uuid: str, /, instance=None) -> object:
_resource = instance or AdvancedSecurityWorkflowManager.get_instance(uuid)
if _resource:
if not AdvancedSecurityWorkflowManager.is_auto_publishing_workflow():
_resource.is_approved = False
_resource.was_approved = False
_resource.is_published = False
_resource.was_published = False
from geonode.base.models import ResourceBase
ResourceBase.objects.filter(uuid=_resource.uuid).update(
is_approved=False, was_approved=False, is_published=False, was_published=False
)
return _resource
@staticmethod
[docs]
def set_group_member_permissions(user, group, role):
if not AdvancedSecurityWorkflowManager.is_auto_publishing_workflow():
"""
Internally the set_permissions function will automatically handle the permissions
that needs to be assigned to the resource.
Background at: https://github.com/GeoNode/geonode/pull/8145
If the user is demoted, we assign by default at least the view and the download permission
to the resource
"""
# Fetching all the resources belonging to Group "group"; i.e. assgined to "group" metadata
queryset = get_objects_for_user(
user, ["base.view_resourcebase", "base.change_resourcebase"], any_perm=True
).filter(group=group.group)
# Fetching and chaining all the resources belonging to Owner Group "group"
queryset = chain(
queryset,
(
get_objects_for_group(
group.group, ["base.view_resourcebase", "base.change_resourcebase"], any_perm=True
).filter(owner__groupmember__group=group)
),
)
_resources = list(set(queryset))
if len(_resources) == 0:
queryset = get_objects_for_user(
user, ["base.view_resourcebase", "base.change_resourcebase"], any_perm=True
).filter(owner=user)
_resources = queryset.iterator()
for _r in _resources:
perm_spec = _r.get_all_level_info()
if "users" not in perm_spec:
perm_spec["users"] = {}
if "groups" not in perm_spec:
perm_spec["groups"] = {}
AdminViewPermissionsSet = AdvancedSecurityWorkflowManager.compute_admin_and_view_permissions_set(
_r.uuid, instance=_r
)
prev_perms = AdminViewPermissionsSet.view_perms.copy()
if not role:
prev_perms = []
if user == _r.owner:
_group = group if hasattr(group, "group") else GroupProfile.objects.get(group=group)
_users = list(_group.get_managers()) + list(_group.get_members())
for _m in _users:
if perm_spec["users"].get(_m, None):
perm_spec["users"].pop(_m)
if perm_spec["groups"].get(_group.group, None):
perm_spec["groups"].pop(_group.group)
elif role == "manager":
prev_perms += AdminViewPermissionsSet.admin_perms.copy()
prev_perms = list(set(prev_perms))
perm_spec["users"][user] = list(set(prev_perms))
# Let's the ResourceManager finally decide which are the correct security settings to apply
_r.set_permissions(perm_spec)