Source code for geonode.upload.upload_validators

#########################################################################
#
# Copyright (C) 2018 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

"""Tools for performing validation of uploaded spatial files."""
import re
import os.path
import logging
import zipfile

from collections import namedtuple

from django import forms
from django.utils.translation import ugettext as _

from ..geoserver.helpers import ogc_server_settings
from . import files
from .utils import get_kml_doc

[docs] logger = logging.getLogger(__name__)
[docs] ShapefileAux = namedtuple("ShapefileAux", ["extension", "mandatory"])
[docs] def _supported_type(ext, supported_types): return any([type_.matches(ext) for type_ in supported_types])
[docs] def validate_uploaded_files(cleaned, uploaded_files, field_spatial_types, base_file_path): logger.debug(f"uploaded_files: {uploaded_files}") requires_datastore = () if ogc_server_settings.DATASTORE else ("csv", "kml") types = [t for t in files.types if t.code not in requires_datastore] base_file_name = os.path.basename(base_file_path) base_ext = os.path.splitext(base_file_name)[-1].lower()[1:] if not _supported_type(base_ext, types) and base_ext.lower() != "zip": raise forms.ValidationError( "%(supported)s files are supported. You uploaded a " "%(uploaded)s file", params={"supported": " , ".join(t.name for t in types), "uploaded": base_ext}, ) elif base_ext.lower() == "zip": if not zipfile.is_zipfile(base_file_path): raise forms.ValidationError(_("Invalid zip file detected")) # Let's check if the zip file contains a valid ESRI Shapefile valid_extensions = validate_shapefile(base_file_path) if not valid_extensions: # Let's check if the zip file contains a valid KMZ valid_extensions = validate_kmz(base_file_path) if not valid_extensions: # Let's check if the zip file contains a valid KML valid_extensions = validate_kml_zip(base_file_path) if not valid_extensions: # Let's check if the zip file contains any valid Raster Image valid_extensions = validate_raster_zip(base_file_path) if not valid_extensions: # No suitable data have been found on the ZIP file; raise a ValidationError raise forms.ValidationError(_("Could not find any valid spatial file inside the uploaded zip")) elif base_ext.lower() == "kmz": if not zipfile.is_zipfile(base_file_path): raise forms.ValidationError(_("Invalid kmz file detected")) valid_extensions = validate_kmz(base_file_path) if not valid_extensions: raise forms.ValidationError(_("Could not find any kml files inside the uploaded kmz")) elif base_ext.lower() == "shp": file_paths = list(uploaded_files.values()) if base_file_path not in file_paths: file_paths += [base_file_path] valid_extensions = _validate_shapefile_components(file_paths) elif base_ext.lower() == "kml": valid_extensions = validate_kml(uploaded_files) elif base_ext.lower() in files._tif_extensions: valid_extensions = validate_raster(uploaded_files) else: # default behavior just assumes files are valid valid_extensions = [] for field_name in field_spatial_types: django_file = cleaned.get(field_name) try: extension = os.path.splitext(django_file.name)[1][1:] valid_extensions.append(extension) except AttributeError: pass if not valid_extensions: valid_extensions = [f"{base_ext.lower()}"] return valid_extensions
[docs] def _validate_shapefile_components(possible_filenames): """Validates that a shapefile can be loaded from the input file paths :arg possible_files: Remaining form upload contents :type possible_files: list :raises: forms.ValidationError """ shp_files = [str(f) for f in possible_filenames if str(f).lower().endswith(".shp")] aux_mandatory = True if len(shp_files) > 1: raise forms.ValidationError(_("Only one shapefile per zip is allowed")) elif len(shp_files) == 0: shp_files = [ f for f in possible_filenames if os.path.splitext(f.lower())[1] in (".shp", ".dbf", ".shx", ".prj") ] aux_mandatory = False try: shape_component = shp_files[0] except IndexError: return None base_name, base_extension = os.path.splitext(os.path.basename(shape_component)) components = [base_extension[1:]] shapefile_additional = [ ShapefileAux(extension="dbf", mandatory=aux_mandatory), ShapefileAux(extension="shx", mandatory=aux_mandatory), ShapefileAux(extension="prj", mandatory=False), ShapefileAux(extension="xml", mandatory=False), ShapefileAux(extension="sld", mandatory=False), ] for additional_component in shapefile_additional: for path in possible_filenames: additional_name = os.path.splitext(os.path.basename(path))[0] matches_main_name = bool(re.match(base_name, additional_name, re.I)) extension = os.path.splitext(path)[1][1:].lower() found_component = extension == additional_component.extension if found_component and matches_main_name: components.append(additional_component.extension) break else: if additional_component.mandatory: raise forms.ValidationError( f"Could not find {additional_component.extension} file, which is mandatory for " "shapefile uploads" ) logger.debug(f"shapefile components: {components}") return components
[docs] def _validate_kml_bytes(kml_bytes, other_files): result = None kml_doc, namespaces = get_kml_doc(kml_bytes) ground_overlays = kml_doc.xpath("//kml:GroundOverlay", namespaces=namespaces) if len(ground_overlays) > 1: raise forms.ValidationError(_("kml files with more than one GroundOverlay are not supported")) elif len(ground_overlays) == 1: try: image_path = ground_overlays[0].xpath("kml:Icon/kml:href/text()", namespaces=namespaces)[0].strip() except IndexError: image_path = "" logger.debug(f"image_path: {image_path}") logger.debug(f"other_files: {other_files}") if image_path not in other_files: raise forms.ValidationError(_("Ground overlay image declared in kml file cannot be found")) result = ("kml", "sld", os.path.splitext(image_path)[-1][1:]) return result
[docs] def validate_kml(possible_files): """Validate uploaded KML file and a possible image companion file KML files that specify vectorial data typers are uploaded standalone. However, if the KML specifies a GroundOverlay type (raster) they are uploaded together with a raster file. """ kml_file = [f for f in possible_files if f.name.lower().endswith(".kml")][0] others = [f.name for f in possible_files if not f.name.lower().endswith(".kml")] kml_file.seek(0) kml_bytes = kml_file.read() result = _validate_kml_bytes(kml_bytes, others) if not result: kml_doc, namespaces = get_kml_doc(kml_bytes) if kml_doc and namespaces: return ( "kml", "sld", ) return result
[docs] def validate_kml_zip(kmz_django_file): kml_bytes = None with zipfile.ZipFile(kmz_django_file, allowZip64=True) as zip_handler: zip_contents = zip_handler.namelist() kml_files = [i for i in zip_contents if i.lower().endswith(".kml")] if not kml_files: return None if len(kml_files) > 1: raise forms.ValidationError(_("Only one kml file per ZIP is allowed")) kml_zip_path = kml_files[0] kml_bytes = zip_handler.read(kml_zip_path) kml_doc, namespaces = get_kml_doc(kml_bytes) if kml_doc and namespaces: return ("zip",) return None
[docs] def validate_kmz(kmz_django_file): with zipfile.ZipFile(kmz_django_file, allowZip64=True) as zip_handler: zip_contents = zip_handler.namelist() kml_files = [i for i in zip_contents if i.lower().endswith(".kml")] if len(kml_files) > 1: raise forms.ValidationError(_("Only one kml file per kmz is allowed")) try: kml_zip_path = kml_files[0] kml_bytes = zip_handler.read(kml_zip_path) except IndexError: return None other_filenames = [i for i in zip_contents if not i.lower().endswith(".kml")] if _validate_kml_bytes(kml_bytes, other_filenames): return ("kmz",) else: return None
[docs] def validate_shapefile(zip_django_file): valid_extensions = None with zipfile.ZipFile(zip_django_file, allowZip64=True) as zip_handler: contents = zip_handler.namelist() if _validate_shapefile_components(contents): valid_extensions = ("zip",) return valid_extensions
[docs] def validate_raster(contents, allow_multiple=False): def dupes(_a): return {x for x in _a if _a.count(x) > 1} valid_extensions = None raster_types = [t for t in files.types if t.dataset_type == files.raster] raster_exts = [f".{t.code}" for t in raster_types] raster_aliases = [] for alias in [aliases for aliases in [t.aliases for t in raster_types] if aliases]: raster_aliases.extend([f".{a}" for a in alias]) raster_exts.extend(raster_aliases) raster_files = [f for f in contents if os.path.splitext(str(f).lower())[1] in raster_exts] other_files = [f for f in contents if os.path.splitext(str(f).lower())[1] not in raster_exts] all_extensions = [os.path.splitext(str(f))[1][1:] for f in raster_files] other_extensions = tuple({os.path.splitext(str(f))[1][1:] for f in other_files}) valid_extensions = tuple(set(all_extensions)) dup_extensions = tuple(dupes(all_extensions)) if dup_extensions: geotiff_extensions = [x for x in dup_extensions if x in files._tif_extensions] mosaics_extensions = [x for x in other_extensions if x in files._mosaics_extensions] if mosaics_extensions: return ("zip-mosaic",) elif geotiff_extensions: if not allow_multiple: raise forms.ValidationError( _("You are trying to upload multiple GeoTIFFs without a valid 'indexer.properties' file.") ) else: return ("zip-mosaic",) else: raise forms.ValidationError(_("Only one raster file per ZIP is allowed")) else: if valid_extensions: if len(valid_extensions) > 1 and not allow_multiple: raise forms.ValidationError(_("No multiple rasters allowed")) else: if not allow_multiple or ( "properties" not in other_extensions and ("sld" in other_extensions or "xml" in other_extensions) ): return valid_extensions + other_extensions else: return ("zip-mosaic",) else: return None
[docs] def validate_raster_zip(zip_django_file): valid_extensions = None with zipfile.ZipFile(zip_django_file, allowZip64=True) as zip_handler: contents = zip_handler.namelist() valid_extensions = validate_raster(contents, allow_multiple=True) if valid_extensions: if "zip-mosaic" not in valid_extensions: return ("zip",) else: return ("zip-mosaic",) return None