Source code for geonode.base.forms
#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import re
import html
import json
import logging
from django.db.models.query import QuerySet
from bootstrap3_datetime.widgets import DateTimePicker
from dal import autocomplete
import dal.forward
from django import forms
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.core import validators
from django.db.models import Prefetch, Q
from django.forms import models
from django.forms.fields import ChoiceField, MultipleChoiceField
from django.forms.utils import flatatt
from django.utils.encoding import force_text
from django.utils.html import format_html
from django.utils.safestring import mark_safe
from django.utils.translation import ugettext as _
from modeltranslation.forms import TranslationModelForm
from taggit.forms import TagField
from tinymce.widgets import TinyMCE
from django.contrib.admin.utils import flatten
from django.utils.translation import get_language
from geonode.base.enumerations import ALL_LANGUAGES
from geonode.base.models import (
HierarchicalKeyword,
License,
LinkedResource,
Region,
ResourceBase,
Thesaurus,
ThesaurusKeyword,
ThesaurusKeywordLabel,
ThesaurusLabel,
TopicCategory,
)
from geonode.base.widgets import TaggitSelect2Custom, TaggitProfileSelect2Custom
from geonode.base.fields import MultiThesauriField
from geonode.documents.models import Document
from geonode.layers.models import Dataset
from geonode.base.utils import validate_extra_metadata, remove_country_from_languagecode
from geonode.people import Roles
[docs]
def get_tree_data():
def rectree(parent, path):
children_list_of_tuples = list()
c = Region.objects.filter(parent=parent)
for child in c:
children_list_of_tuples.append(tuple((path + parent.name, tuple((child.id, child.name)))))
childrens = rectree(child, f"{parent.name}/")
if childrens:
children_list_of_tuples.extend(childrens)
return children_list_of_tuples
data = list()
try:
t = Region.objects.filter(Q(level=0) | Q(parent=None))
for toplevel in t:
data.append(tuple((toplevel.id, toplevel.name)))
childrens = rectree(toplevel, "")
if childrens:
data.append(tuple((toplevel.name, childrens)))
except Exception:
pass
return tuple(data)
[docs]
class AdvancedModelChoiceIterator(models.ModelChoiceIterator):
[docs]
def choice(self, obj):
return (self.field.prepare_value(obj), self.field.label_from_instance(obj), obj)
[docs]
class CategoryChoiceField(forms.ModelChoiceField):
[docs]
def _get_choices(self):
if hasattr(self, "_choices"):
return self._choices
return AdvancedModelChoiceIterator(self)
[docs]
def label_from_instance(self, obj):
return (
'<i class="fa ' + obj.fa_class + ' fa-2x unchecked"></i>'
'<i class="fa ' + obj.fa_class + ' fa-2x checked"></i>'
'<span class="has-popover" data-container="body" data-toggle="popover" data-placement="top" '
'data-content="' + obj.description + '" trigger="hover">'
"<br/><strong>" + obj.gn_description + "</strong></span>"
)
[docs]
class RegionsMultipleChoiceField(forms.MultipleChoiceField):
[docs]
def validate(self, value):
"""
Validates that the input is a list or tuple.
"""
if self.required and not value:
raise forms.ValidationError(self.error_messages["required"], code="required")
[docs]
class RegionsSelect(forms.Select):
[docs]
def render(self, name, value, attrs=None, renderer=None):
if value is None:
value = []
final_attrs = self.build_attrs(attrs)
final_attrs["name"] = name
output = [format_html('<select multiple="multiple"{}>', flatatt(final_attrs))]
options = self.render_options(value)
if options:
output.append(options)
output.append("</select>")
return mark_safe("\n".join(output))
[docs]
def value_from_datadict(self, data, files, name):
try:
getter = data.getlist
except AttributeError:
getter = data.get
return getter(name)
[docs]
def render_option_value(self, selected_choices, option_value, option_label, data_section=None):
if option_value is None:
option_value = ""
option_value = force_text(option_value)
if option_value in selected_choices:
selected_html = mark_safe(" selected")
if not self.allow_multiple_selected:
# Only allow for a single selection.
selected_choices.remove(option_value)
else:
selected_html = ""
label = force_text(option_label)
if data_section is None:
data_section = ""
else:
data_section = force_text(data_section)
if "/" in data_section:
label = format_html("{} [{}]", label, data_section.rsplit("/", 1)[1])
return format_html(
'<option data-section="{}" value="{}"{}>{}</option>', data_section, option_value, selected_html, label
)
[docs]
def render_options(self, selected_choices):
"""
Normalize to strings.
"""
def _region_id_from_choice(choice):
if isinstance(choice, int) or (isinstance(choice, str) and choice.isdigit()):
return int(choice)
else:
return choice.id
selected_choices = {force_text(_region_id_from_choice(v)) for v in selected_choices}
output = []
output.append(format_html('<optgroup label="{}">', "Global"))
for option_value, option_label in self.choices:
if not isinstance(option_label, (list, tuple)) and isinstance(option_label, str):
output.append(self.render_option_value(selected_choices, option_value, option_label))
output.append("</optgroup>")
for option_value, option_label in self.choices:
if isinstance(option_label, (list, tuple)) and not isinstance(option_label, str):
output.append(format_html('<optgroup label="{}">', force_text(option_value)))
for option in option_label:
if isinstance(option, (list, tuple)) and not isinstance(option, str):
if isinstance(option[1][0], (list, tuple)) and not isinstance(option[1][0], str):
for option_child in option[1][0]:
output.append(
self.render_option_value(
selected_choices, *option_child, data_section=force_text(option[1][0][0])
)
)
else:
output.append(
self.render_option_value(
selected_choices, *option[1], data_section=force_text(option[0])
)
)
else:
output.append(
self.render_option_value(selected_choices, *option, data_section=force_text(option_value))
)
output.append("</optgroup>")
return "\n".join(output)
[docs]
class CategoryForm(forms.Form):
[docs]
category_choice_field = CategoryChoiceField(
required=False,
label=f"*{_('Category')}",
empty_label=None,
queryset=TopicCategory.objects.filter(is_choice=True).extra(order_by=["description"]),
)
[docs]
def clean(self):
cleaned_data = self.data
ccf_data = cleaned_data.get("category_choice_field")
category_mandatory = getattr(settings, "TOPICCATEGORY_MANDATORY", False)
if category_mandatory and not ccf_data:
msg = _("Category is required.")
self._errors = self.error_class([msg])
# Always return the full collection of cleaned data.
return cleaned_data
[docs]
class TKeywordForm(forms.ModelForm):
[docs]
tkeywords = MultiThesauriField(
queryset=ThesaurusKeyword.objects.prefetch_related(
Prefetch("keyword", queryset=ThesaurusKeywordLabel.objects.filter(lang="en"))
),
widget=autocomplete.ModelSelect2Multiple(
url="thesaurus_autocomplete",
),
label=_("Keywords from Thesaurus"),
required=False,
help_text=_(
"List of keywords from Thesaurus",
),
)
[docs]
class ThesaurusAvailableForm(forms.Form):
"""
Separator at the beginning of the thesaurus search result and between results found in the local language and alt labels.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
for item in Thesaurus.objects.all().order_by("order", "id"):
tname = self._get_thesauro_title_label(item, lang)
if item.card_max == 0:
continue
elif item.card_max == 1 and item.card_min == 0:
self.fields[f"{item.id}"] = self._define_choicefield(item, False, tname, lang)
elif item.card_max == 1 and item.card_min == 1:
self.fields[f"{item.id}"] = self._define_choicefield(item, True, tname, lang)
elif item.card_max == -1 and item.card_min == 0:
self.fields[f"{item.id}"] = self._define_multifield(item, False, tname, lang)
elif item.card_max == -1 and item.card_min == 1:
self.fields[f"{item.id}"] = self._define_multifield(item, True, tname, lang)
[docs]
def cleanx(self, x):
cleaned_values = []
for key, value in x.items():
if isinstance(value, QuerySet):
for y in value:
cleaned_values.append(y.id)
elif value:
cleaned_values.append(value)
return ThesaurusKeyword.objects.filter(id__in=flatten(cleaned_values))
[docs]
def _define_multifield(self, item, required, tname, lang):
return MultipleChoiceField(
choices=self._get_thesauro_keyword_label(item, lang),
widget=autocomplete.Select2Multiple(
url=f"/base/thesaurus_available/?sysid={item.id}&lang={lang}",
attrs={"class": "treq" if required else ""},
),
label=f"{tname}",
required=False,
)
[docs]
def _define_choicefield(self, item, required, tname, lang):
return models.ChoiceField(
label=f"{tname}",
required=False,
widget=forms.Select(attrs={"class": "treq" if required else ""}),
choices=self._get_thesauro_keyword_label(item, lang),
)
@staticmethod
[docs]
def _get_thesauro_keyword_label(item, lang):
"""
Tries to find results for the given language (e.g., en-us).
If no results are found, it removes the country code from the language (e.g., en) and tries again.
"""
keyword_id_for_given_thesaurus = ThesaurusKeyword.objects.filter(thesaurus_id=item)
qs_keyword_ids = ThesaurusKeywordLabel.objects.filter(
lang=lang, keyword_id__in=keyword_id_for_given_thesaurus
).values("keyword_id")
if len(qs_keyword_ids) == 0:
lang = remove_country_from_languagecode(lang)
qs_keyword_ids = ThesaurusKeywordLabel.objects.filter(
lang=lang, keyword_id__in=keyword_id_for_given_thesaurus
).values("keyword_id")
not_qs_ids = (
ThesaurusKeywordLabel.objects.exclude(keyword_id__in=qs_keyword_ids)
.order_by("keyword_id")
.distinct("keyword_id")
.values("keyword_id")
)
qs_local = list(
ThesaurusKeywordLabel.objects.filter(lang=lang, keyword_id__in=keyword_id_for_given_thesaurus).values_list(
"keyword_id", "label"
)
)
qs_non_local = list(keyword_id_for_given_thesaurus.filter(id__in=not_qs_ids).values_list("id", "alt_label"))
return [THESAURUS_RESULT_LIST_SEPERATOR] + qs_local + [THESAURUS_RESULT_LIST_SEPERATOR] + qs_non_local
@staticmethod
[docs]
def _get_thesauro_title_label(item, lang):
tname = ThesaurusLabel.objects.values_list("label", flat=True).filter(thesaurus=item).filter(lang=lang)
if not tname:
return Thesaurus.objects.get(id=item.id).title
return tname.first()
[docs]
class ContactRoleMultipleChoiceField(forms.ModelMultipleChoiceField):
[docs]
def clean(self, value) -> QuerySet:
try:
users = get_user_model().objects.filter(username__in=value)
except TypeError:
# value of not supported type ...
raise forms.ValidationError(_("Something went wrong in finding the profile(s) in a contact role form ..."))
return users
[docs]
class LinkedResourceForm(forms.ModelForm):
[docs]
linked_resources = forms.ModelMultipleChoiceField(
label=_("Related resources"),
required=False,
queryset=None,
widget=autocomplete.ModelSelect2Multiple(url="autocomplete_linked_resource"),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# this is used to automatically validate the POSTed back values
# these are the LinkedResource already linked to this resource
# this is used by the autocomplete view to exclude current resource
self.fields["linked_resources"].widget.forward.append(
dal.forward.Const(
self.instance.id,
"exclude",
)
)
[docs]
def save_linked_resources(self, links_field="linked_resources"):
"""
Create and fetch desired links
"""
target_ids = []
for res in self.cleaned_data[links_field]:
LinkedResource.objects.get_or_create(source=self.instance, target=res, internal=False)
target_ids.append(res.pk)
# delete remaining links
(
LinkedResource.objects.filter(source_id=self.instance.id, internal=False)
.exclude(target_id__in=target_ids)
.delete()
)
[docs]
class ResourceBaseDateTimePicker(DateTimePicker):
[docs]
def build_attrs(self, base_attrs=None, extra_attrs=None, **kwargs):
"Helper function for building an attribute dictionary."
if extra_attrs:
base_attrs.update(extra_attrs)
base_attrs.update(kwargs)
return super().build_attrs(base_attrs)
# return base_attrs
[docs]
class ResourceBaseForm(TranslationModelForm, LinkedResourceForm):
"""
Base form for metadata, should be inherited by childres classes of ResourceBase
"""
[docs]
constraints_other = forms.CharField(label=_("Other constraints"), required=False, widget=TinyMCE())
[docs]
supplemental_information = forms.CharField(label=_("Supplemental information"), required=False, widget=TinyMCE())
[docs]
data_quality_statement = forms.CharField(label=_("Data quality statement"), required=False, widget=TinyMCE())
[docs]
owner = forms.ModelChoiceField(
empty_label=_(Roles.OWNER.label),
label=_(Roles.OWNER.label),
required=True,
queryset=get_user_model().objects.exclude(username="AnonymousUser"),
widget=autocomplete.ModelSelect2(url="autocomplete_profile"),
)
[docs]
date = forms.DateTimeField(
label=_("Date"),
localize=True,
input_formats=["%Y-%m-%d %H:%M %p"],
widget=ResourceBaseDateTimePicker(options={"format": "YYYY-MM-DD HH:mm a"}),
)
[docs]
temporal_extent_start = forms.DateTimeField(
label=_("temporal extent start"),
required=False,
localize=True,
input_formats=["%Y-%m-%d %H:%M %p"],
widget=ResourceBaseDateTimePicker(options={"format": "YYYY-MM-DD HH:mm a"}),
)
[docs]
temporal_extent_end = forms.DateTimeField(
label=_("temporal extent end"),
required=False,
localize=True,
input_formats=["%Y-%m-%d %H:%M %p"],
widget=ResourceBaseDateTimePicker(options={"format": "YYYY-MM-DD HH:mm a"}),
)
[docs]
metadata_author = ContactRoleMultipleChoiceField(
label=_(Roles.METADATA_AUTHOR.label),
required=Roles.METADATA_AUTHOR.is_required,
queryset=get_user_model().objects.exclude(username="AnonymousUser"),
widget=TaggitProfileSelect2Custom(url="autocomplete_profile"),
)
[docs]
processor = ContactRoleMultipleChoiceField(
label=_(Roles.PROCESSOR.label),
required=Roles.PROCESSOR.is_required,
queryset=get_user_model().objects.exclude(username="AnonymousUser"),
widget=TaggitProfileSelect2Custom(url="autocomplete_profile"),
)
[docs]
publisher = ContactRoleMultipleChoiceField(
label=_(Roles.PUBLISHER.label),
required=Roles.PUBLISHER.is_required,
queryset=get_user_model().objects.exclude(username="AnonymousUser"),
widget=TaggitProfileSelect2Custom(url="autocomplete_profile"),
)
[docs]
custodian = ContactRoleMultipleChoiceField(
label=_(Roles.CUSTODIAN.label),
required=Roles.CUSTODIAN.is_required,
queryset=get_user_model().objects.exclude(username="AnonymousUser"),
widget=TaggitProfileSelect2Custom(url="autocomplete_profile"),
)
[docs]
poc = ContactRoleMultipleChoiceField(
label=_(Roles.POC.label),
required=Roles.POC.is_required,
queryset=get_user_model().objects.exclude(username="AnonymousUser"),
widget=TaggitProfileSelect2Custom(url="autocomplete_profile"),
)
[docs]
distributor = ContactRoleMultipleChoiceField(
label=_(Roles.DISTRIBUTOR.label),
required=Roles.DISTRIBUTOR.is_required,
queryset=get_user_model().objects.exclude(username="AnonymousUser"),
widget=TaggitProfileSelect2Custom(url="autocomplete_profile"),
)
[docs]
resource_user = ContactRoleMultipleChoiceField(
label=_(Roles.RESOURCE_USER.label),
required=Roles.RESOURCE_USER.is_required,
queryset=get_user_model().objects.exclude(username="AnonymousUser"),
widget=TaggitProfileSelect2Custom(url="autocomplete_profile"),
)
[docs]
resource_provider = ContactRoleMultipleChoiceField(
label=_(Roles.RESOURCE_PROVIDER.label),
required=Roles.RESOURCE_PROVIDER.is_required,
queryset=get_user_model().objects.exclude(username="AnonymousUser"),
widget=TaggitProfileSelect2Custom(url="autocomplete_profile"),
)
[docs]
originator = ContactRoleMultipleChoiceField(
label=_(Roles.ORIGINATOR.label),
required=Roles.ORIGINATOR.is_required,
queryset=get_user_model().objects.exclude(username="AnonymousUser"),
widget=TaggitProfileSelect2Custom(url="autocomplete_profile"),
)
[docs]
principal_investigator = ContactRoleMultipleChoiceField(
label=_(Roles.PRINCIPAL_INVESTIGATOR.label),
required=Roles.PRINCIPAL_INVESTIGATOR.is_required,
queryset=get_user_model().objects.exclude(username="AnonymousUser"),
widget=TaggitProfileSelect2Custom(url="autocomplete_profile"),
)
[docs]
keywords = TagField(
label=_("Free-text Keywords"),
required=False,
help_text=_("A space or comma-separated list of keywords. Use the widget to select from Hierarchical tree."),
# widget=TreeWidget(url='autocomplete_hierachical_keyword'), #Needs updating to work with select2
widget=TaggitSelect2Custom(url="autocomplete_hierachical_keyword"),
)
[docs]
regions = RegionsMultipleChoiceField(label=_("Regions"), required=False, widget=RegionsSelect)
regions.widget.attrs = {"size": 20}
[docs]
extra_metadata = forms.CharField(
required=False,
widget=forms.Textarea,
help_text=_(
'Additional metadata, must be in format [\
{"metadata_key": "metadata_value"},\
{"metadata_key": "metadata_value"} \
]'
),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs]
self.can_change_perms = self.user and self.user.has_perm(
"change_resourcebase_permissions", self.instance.get_self_resource()
)
if self.instance and self.instance.id and self.instance.metadata.exists():
self.fields["extra_metadata"].initial = [x.metadata for x in self.instance.metadata.all()]
for field in self.fields:
if field == "featured" and self.user and not self.user.is_superuser:
self.fields[field].disabled = True
help_text = self.fields[field].help_text
if help_text != "":
self.fields[field].widget.attrs.update(
{
"class": "has-popover",
"data-content": help_text,
"data-placement": "right",
"data-container": "body",
"data-html": "true",
}
)
if field in ["owner"] and not self.can_change_perms:
self.fields[field].disabled = True
[docs]
def disable_keywords_widget_for_non_superuser(self, user):
if settings.FREETEXT_KEYWORDS_READONLY and not user.is_superuser:
self["keywords"].field.disabled = True
[docs]
def clean_keywords(self):
keywords = self.cleaned_data["keywords"]
_unsescaped_kwds = []
for k in keywords:
_k = ("%s" % re.sub(r"%([A-Z0-9]{2})", r"&#x\g<1>;", k.strip())).split(",")
if not isinstance(_k, str):
for _kk in [html.unescape(x.strip()) for x in _k]:
# Simulate JS Unescape
_kk = (
_kk.replace("%u", r"\u")
.encode("unicode-escape")
.replace(b"\\\\u", b"\\u")
.decode("unicode-escape")
if "%u" in _kk
else _kk
)
_hk = HierarchicalKeyword.objects.filter(name__iexact=f"{_kk.strip()}")
if _hk and len(_hk) > 0:
_unsescaped_kwds.append(str(_hk[0]))
else:
_unsescaped_kwds.append(str(_kk))
else:
_hk = HierarchicalKeyword.objects.filter(name__iexact=_k.strip())
if _hk and len(_hk) > 0:
_unsescaped_kwds.append(str(_hk[0]))
else:
_unsescaped_kwds.append(str(_k))
return _unsescaped_kwds
[docs]
def clean_title(self):
title = self.cleaned_data.get("title", None)
if title:
title = title.replace(",", "_")
return title
[docs]
def clean_extra_metadata(self):
cleaned_data = self.cleaned_data.get("extra_metadata", [])
return json.dumps(validate_extra_metadata(cleaned_data, self.instance), indent=4)
[docs]
class Meta:
[docs]
exclude = (
"contacts",
"name",
"uuid",
"bbox_polygon",
"ll_bbox_polygon",
"srid",
"category",
"csw_typename",
"csw_schema",
"csw_mdsource",
"csw_type",
"csw_wkt_geometry",
"metadata_uploaded",
"metadata_xml",
"csw_anytext",
"popular_count",
"share_count",
"thumbnail",
"charset",
"rating",
"detail_url",
"tkeywords",
"users_geolimits",
"groups_geolimits",
"dirty_state",
"state",
"blob",
"files",
"was_approved",
"was_published",
)
[docs]
class ValuesListField(forms.Field):
[docs]
def to_python(self, value):
if value in validators.EMPTY_VALUES:
return []
value = [item.strip() for item in value.split(",") if item.strip()]
return value
[docs]
def clean(self, value):
value = self.to_python(value)
self.validate(value)
self.run_validators(value)
return value
[docs]
class BatchEditForm(forms.Form):
[docs]
group = forms.ModelChoiceField(label=_("Group"), queryset=Group.objects.all(), required=False)
[docs]
owner = forms.ModelChoiceField(label=_("Owner"), queryset=get_user_model().objects.all(), required=False)
[docs]
category = forms.ModelChoiceField(label=_("Category"), queryset=TopicCategory.objects.all(), required=False)
[docs]
license = forms.ModelChoiceField(label=_("License"), queryset=License.objects.all(), required=False)
[docs]
regions = forms.ModelChoiceField(label=_("Regions"), queryset=Region.objects.all(), required=False)
[docs]
def get_user_choices():
try:
return [(x.pk, x.title) for x in Dataset.objects.all().order_by("id")]
except Exception:
return []
[docs]
class UserAndGroupPermissionsForm(forms.Form):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs]
layers = MultipleChoiceField(
choices=get_user_choices,
widget=autocomplete.Select2Multiple(url="datasets_autocomplete"),
label="Datasets",
required=False,
)
[docs]
permission_type = forms.ChoiceField(
required=True,
widget=forms.RadioSelect,
choices=(
("view", "View"),
("download", "Download"),
("edit", "Edit"),
),
)
[docs]
mode = forms.ChoiceField(
required=True,
widget=forms.RadioSelect,
choices=(
("set", "Set"),
("unset", "Unset"),
),
)
@staticmethod
def label_from_instance(obj):
return obj.title
[docs]
class OwnerRightsRequestForm(forms.Form):
[docs]
resource = forms.ModelChoiceField(
label=_("Resource"), queryset=ResourceBase.objects.all(), widget=forms.HiddenInput()
)
[docs]
reason = forms.CharField(
label=_("Reason"), widget=forms.Textarea, help_text=_("Short reasoning behind the request"), required=True
)