Source code for geonode.upload.forms
#########################################################################
#
# Copyright (C) 2018 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import ast
import logging
from django import forms
from django.core.exceptions import ValidationError
from django.utils.translation import ugettext_lazy as _
from geonode.storage.manager import StorageManager
from geonode.upload.utils import UploadLimitValidator
from .. import geoserver
from ..utils import check_ogc_backend
from ..layers.forms import JSONField
from .upload_validators import validate_uploaded_files
[docs]
class LayerUploadForm(forms.Form):
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
sld_file_path = forms.CharField(required=False)
# Adding style file based on the backend
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
spatial_files.append("sld_file")
spatial_files = tuple(spatial_files)
def __init__(self, *args, **kwargs):
super(LayerUploadForm, self).__init__(*args, **kwargs)
[docs]
def clean_store_spatial_files(self):
store_spatial_files = self.data.get("store_spatial_files")
if store_spatial_files is None:
store_spatial_files = True
self.cleaned_data["store_spatial_files"] = True
elif isinstance(store_spatial_files, str):
store_spatial_files = ast.literal_eval(store_spatial_files.lower().capitalize())
return store_spatial_files if isinstance(store_spatial_files, bool) else True
[docs]
def clean(self):
cleaned = super().clean()
if cleaned.get("store_spatial_files") is None:
cleaned["store_spatial_files"] = True
uploaded, files = self._get_files_paths_or_objects(cleaned)
cleaned["uploaded"] = uploaded
base_file = files.get("base_file")
if not base_file and "base_file" not in self.errors and "base_file_path" not in self.errors:
logger.error("Base file must be a file or url.")
raise ValidationError(_("Base file must be a file or url."))
if self.errors:
# Something already went wrong
return cleaned
upload_validator = UploadLimitValidator(user=self.user)
upload_validator.validate_parallelism_limit_per_user()
# Validate form file sizes
upload_validator.validate_files_sum_of_sizes(self.files)
# Get remote files
file_manager = StorageManager(remote_files=files)
file_manager.clone_remote_files()
self.data_retriever = file_manager.data_retriever
cleaned["data_retriever"] = self.data_retriever
cleaned["storage_manager"] = file_manager
# Validate remote file sizes
upload_validator.validate_files_sum_of_sizes(self.data_retriever)
file_paths_without_base = self.data_retriever.get_paths()
base_file_path = file_paths_without_base.pop("base_file")
valid_extensions = validate_uploaded_files(
cleaned=cleaned,
uploaded_files=file_paths_without_base,
field_spatial_types=self.spatial_files,
base_file_path=base_file_path,
)
cleaned["valid_extensions"] = valid_extensions
return cleaned
[docs]
def _get_files_paths_or_objects(self, cleaned_data):
"""Return a dict with all of the uploaded files"""
files = {}
uploaded = True
file_fields = (
("base_file", "base_file_path"),
("dbf_file", "dbf_file_path"),
("shx_file", "shx_file_path"),
("prj_file", "prj_file_path"),
("xml_file", "xml_file_path"),
("sld_file", "sld_file_path"),
)
for file_field in file_fields:
field_name = file_field[0]
file_field_value = cleaned_data.get(file_field[0], None)
path_field_value = cleaned_data.get(file_field[1], None)
if file_field_value and path_field_value:
raise ValidationError(
_(f"`{field_name}` field cannot have both a file and a path. Please choose one and try again.")
)
if path_field_value:
uploaded = False
files[field_name] = path_field_value
elif file_field_value:
uploaded = True
files[field_name] = file_field_value
return uploaded, files
[docs]
class TimeForm(forms.Form):
[docs]
precision_step = forms.ChoiceField(
required=False,
choices=[("years",) * 2, ("months",) * 2, ("days",) * 2, ("hours",) * 2, ("minutes",) * 2, ("seconds",) * 2],
)
def __init__(self, *args, **kwargs):
# have to remove these from kwargs or Form gets mad
super().__init__(*args, **kwargs)
self._build_choice("time_attribute", self._time_names)
self._build_choice("end_time_attribute", self._time_names)
self._build_choice("text_attribute", self._text_names)
self._build_choice("end_text_attribute", self._text_names)
if self._text_names:
self.fields["text_attribute_format"] = forms.CharField(required=False, widget=widget)
self.fields["end_text_attribute_format"] = forms.CharField(required=False, widget=widget)
self._build_choice("year_attribute", self._year_names)
self._build_choice("end_year_attribute", self._year_names)
[docs]
def _resolve_attribute_and_type(self, *name_and_types):
return [(self.cleaned_data[n], t) for n, t in name_and_types if self.cleaned_data.get(n, None)]
[docs]
def _build_choice(self, att, names):
if names:
names.sort()
choices = [("", "<None>")] + [(a, a) for a in names]
self.fields[att] = forms.ChoiceField(choices=choices, required=False)
@property
@property
@property
[docs]
def clean(self):
starts = self._resolve_attribute_and_type(
("time_attribute", "Date"),
("text_attribute", "Text"),
("year_attribute", "Number"),
)
if len(starts) > 1:
raise ValidationError("multiple start attributes")
ends = self._resolve_attribute_and_type(
("end_time_attribute", "Date"),
("end_text_attribute", "Text"),
("end_year_attribute", "Number"),
)
if len(ends) > 1:
raise ValidationError("multiple end attributes")
if len(starts) > 0:
self.cleaned_data["start_attribute"] = starts[0]
if len(ends) > 0:
self.cleaned_data["end_attribute"] = ends[0]
return self.cleaned_data
# @todo implement clean