#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import itertools
import re
import logging
from django.conf import settings
from django.db import models
from django.urls import reverse
from django.utils.timezone import now
from django.utils.functional import classproperty
from django.utils.translation import ugettext_lazy as _
from tinymce.models import HTMLField
from geonode.client.hooks import hookset
from geonode.utils import build_absolute_uri, check_shp_columnnames
from geonode.security.models import PermissionLevelMixin
from geonode.groups.conf import settings as groups_settings
from geonode.security.permissions import (
VIEW_PERMISSIONS,
OWNER_PERMISSIONS,
DOWNLOAD_PERMISSIONS,
DATASET_ADMIN_PERMISSIONS,
)
from geonode.base.models import ResourceBase, ResourceBaseManager, LinkedResource
[docs]
logger = logging.getLogger("geonode.layers.models")
[docs]
vec_exts = shp_exts + csv_exts + kml_exts
[docs]
cov_exts = [".tif", ".tiff", ".geotiff", ".geotif", ".asc"]
[docs]
TIME_REGEX = (
("[0-9]{8}", _("YYYYMMDD")),
("[0-9]{8}T[0-9]{6}", _("YYYYMMDD'T'hhmmss")),
("[0-9]{8}T[0-9]{6}Z", _("YYYYMMDD'T'hhmmss'Z'")),
)
[docs]
class Style(models.Model, PermissionLevelMixin):
"""Model for storing styles."""
[docs]
name = models.CharField(_("style name"), max_length=255, unique=True)
[docs]
sld_title = models.CharField(max_length=255, null=True, blank=True)
[docs]
sld_body = models.TextField(_("sld text"), null=True, blank=True)
[docs]
sld_version = models.CharField(_("sld version"), max_length=12, null=True, blank=True)
[docs]
sld_url = models.CharField(_("sld url"), null=True, max_length=1000)
[docs]
workspace = models.CharField(max_length=255, null=True, blank=True)
[docs]
def __str__(self):
return str(self.name)
[docs]
def absolute_url(self):
if self.sld_url:
if self.sld_url.startswith(settings.OGC_SERVER["default"]["LOCATION"]):
return self.sld_url.split(settings.OGC_SERVER["default"]["LOCATION"], 1)[1]
elif self.sld_url.startswith(settings.OGC_SERVER["default"]["PUBLIC_LOCATION"]):
return self.sld_url.split(settings.OGC_SERVER["default"]["PUBLIC_LOCATION"], 1)[1]
return self.sld_url
else:
logger.error(f"SLD URL is empty for Style {self.name}")
return None
[docs]
def get_self_resource(self):
"""Get associated resource base."""
# Associate this model with resource
try:
dataset = self.dataset_styles.first()
""":type: Dataset"""
return dataset.get_self_resource()
except Exception:
return None
[docs]
class DatasetManager(ResourceBaseManager):
def __init__(self):
models.Manager.__init__(self)
[docs]
class Dataset(ResourceBase):
"""
Dataset (inherits ResourceBase fields)
"""
[docs]
PERMISSIONS = {
"write": [
"change_dataset_data",
"change_dataset_style",
]
}
# internal fields
[docs]
objects = DatasetManager()
[docs]
workspace = models.CharField(_("Workspace"), max_length=255)
[docs]
store = models.CharField(_("Store"), max_length=255)
[docs]
name = models.CharField(_("Name"), max_length=255)
[docs]
typename = models.CharField(_("Typename"), max_length=255, null=True, blank=True)
[docs]
ows_url = models.URLField(
_("ows URL"),
null=True,
blank=True,
help_text=_("The URL of the OWS service providing this layer, if any exists."),
)
[docs]
is_mosaic = models.BooleanField(_("Is mosaic?"), default=False)
[docs]
has_time = models.BooleanField(_("Has time?"), default=False)
[docs]
has_elevation = models.BooleanField(_("Has elevation?"), default=False)
[docs]
time_regex = models.CharField(_("Time regex"), max_length=128, null=True, blank=True, choices=TIME_REGEX)
[docs]
elevation_regex = models.CharField(_("Elevation regex"), max_length=128, null=True, blank=True)
[docs]
ptype = models.CharField(_("P-Type"), null=False, blank=False, max_length=255, default="gxp_wmscsource")
[docs]
default_style = models.ForeignKey(
Style, on_delete=models.SET_NULL, related_name="dataset_default_style", null=True, blank=True
)
[docs]
styles = models.ManyToManyField(Style, related_name="dataset_styles")
[docs]
remote_service = models.ForeignKey("services.Service", null=True, blank=True, on_delete=models.CASCADE)
[docs]
charset = models.CharField(max_length=255, default="UTF-8")
[docs]
use_featureinfo_custom_template = models.BooleanField(
_("use featureinfo custom template?"),
help_text=_("specifies wether or not use a custom GetFeatureInfo template."),
default=False,
)
[docs]
featureinfo_custom_template = HTMLField(
_("featureinfo custom template"),
help_text=_("the custom GetFeatureInfo template HTML contents."),
unique=False,
blank=True,
null=True,
)
[docs]
def is_vector(self):
return self.subtype in ["vector", "vector_time"]
@property
[docs]
def is_raster(self):
return self.subtype == "raster"
@property
[docs]
def display_type(self):
if self.subtype in ["vector", "vector_time"]:
return "Vector Data"
elif self.subtype == "raster":
return "Raster Data"
else:
return "Data"
@property
[docs]
def data_model(self):
if hasattr(self, "modeldescription_set"):
lmd = self.modeldescription_set.all()
if lmd.exists():
return lmd.get().get_django_model()
return None
@property
[docs]
def data_objects(self):
if self.data_model is not None:
return self.data_model.objects.using("datastore")
return None
@property
[docs]
def attributes(self):
if self.attribute_set and self.attribute_set.count():
_attrs = self.attribute_set
else:
_attrs = Attribute.objects.filter(dataset=self)
return _attrs.exclude(attribute="the_geom").order_by("display_order")
@property
[docs]
def service_typename(self):
return f"{self.remote_typename}:{self.alternate}" if self.remote_typename else self.alternate
# layer geometry type.
@property
[docs]
def gtype(self):
# return attribute type without 'gml:' and 'PropertyType'
if self.attribute_set and self.attribute_set.count():
_attrs = self.attribute_set
else:
_attrs = Attribute.objects.filter(dataset=self)
if _attrs.filter(attribute="the_geom").exists():
_att_type = _attrs.filter(attribute="the_geom").first().attribute_type
_gtype = re.match(r"gml:(.*)PropertyType", _att_type)
return _gtype.group(1) if _gtype else None
return None
[docs]
def get_base_file(self):
"""Get the shp or geotiff file for this layer."""
# If there was no upload_session return None
try:
if self.upload_session is None:
return None, None
except Exception:
return None, None
base_exts = [x.replace(".", "") for x in cov_exts + vec_exts]
base_files = self.upload_session.layerfile_set.filter(name__in=base_exts)
base_files_count = base_files.count()
# If there are no files in the upload_session return None
if base_files_count == 0:
return None, None
msg = f"There should only be one main file (.shp or .geotiff or .asc), found {base_files_count}"
assert base_files_count == 1, msg
# we need to check, for shapefile, if column names are valid
list_col = None
if self.subtype in ["vector", "vector_time"]:
valid_shp, wrong_column_name, list_col = check_shp_columnnames(self)
if wrong_column_name:
msg = f"Shapefile has an invalid column name: {wrong_column_name}"
else:
msg = _("File cannot be opened, maybe check the encoding")
# AF: Removing assertion since if the original file does not exists anymore
# it won't be possible to update Metadata anymore
# assert valid_shp, msg
# no error, let's return the base files
return base_files.get(), list_col
[docs]
def get_absolute_url(self):
return hookset.dataset_detail_url(self)
@property
[docs]
def capabilities_url(self):
from geonode.geoserver.helpers import get_dataset_capabilities_url
return get_dataset_capabilities_url(self)
@property
[docs]
def dataset_ows_url(self):
from geonode.geoserver.helpers import get_layer_ows_url
return get_layer_ows_url(self)
@property
[docs]
def embed_url(self):
try:
if self.service_typename:
return reverse("dataset_embed", kwargs={"layername": self.service_typename})
except Exception as e:
logger.exception(e)
return None
[docs]
def attribute_config(self):
# Get custom attribute sort order and labels if any
cfg = {}
visible_attributes = self.attribute_set.visible()
if visible_attributes.exists():
cfg["getFeatureInfo"] = {
"fields": [lyr.attribute for lyr in visible_attributes],
"propertyNames": {lyr.attribute: lyr.attribute_label for lyr in visible_attributes},
"displayTypes": {lyr.attribute: lyr.featureinfo_type for lyr in visible_attributes},
}
if self.use_featureinfo_custom_template:
cfg["ftInfoTemplate"] = self.featureinfo_custom_template
return cfg
[docs]
def __str__(self):
return str(self.alternate)
# Permission Level Constants
# LEVEL_NONE inherited
[docs]
LEVEL_READ = "dataset_readonly"
[docs]
LEVEL_WRITE = "dataset_readwrite"
[docs]
LEVEL_ADMIN = "dataset_admin"
@property
[docs]
def maps(self):
from geonode.maps.models import Map
map_ids = list(self.maplayers.values_list("map__id", flat=True))
return Map.objects.filter(id__in=map_ids)
[docs]
def get_linked_resources(self, as_target: bool = False):
ret = super().get_linked_resources(as_target)
if as_target:
# create LinkedResources on the fly to report MapLayer relationship
res = (LinkedResource(source=map, target=self, internal=True) for map in self.maps)
ret = itertools.chain(ret, res)
return ret
@property
[docs]
def download_url(self):
if self.subtype not in ["vector", "raster", "vector_time"]:
logger.info("Download URL is available only for datasets that have been harvested and copied locally")
return None
return build_absolute_uri(reverse("dataset_download", args=(self.alternate,)))
@property
[docs]
def maplayers(self):
from geonode.maps.models import MapLayer
return MapLayer.objects.filter(name=self.alternate)
@classproperty
[docs]
def allowed_permissions(cls):
return {
"anonymous": VIEW_PERMISSIONS + DOWNLOAD_PERMISSIONS,
"default": OWNER_PERMISSIONS + DOWNLOAD_PERMISSIONS + DATASET_ADMIN_PERMISSIONS,
groups_settings.REGISTERED_MEMBERS_GROUP_NAME: OWNER_PERMISSIONS
+ DOWNLOAD_PERMISSIONS
+ DATASET_ADMIN_PERMISSIONS,
}
@property
[docs]
def class_name(self):
return self.__class__.__name__
[docs]
def view_count_up(self, user, do_local=False):
"""increase view counter, if user is not owner and not super
@param user which views layer
@type User model
@param do_local - do local counter update even if pubsub is enabled
@type bool
"""
if user == self.owner or user.is_superuser:
return
if not do_local:
from geonode.messaging import producer
producer.viewing_dataset(str(user), str(self.owner), self.id)
else:
Dataset.objects.filter(id=self.id).update(popular_count=models.F("popular_count") + 1)
[docs]
class AttributeManager(models.Manager):
"""Helper class to access filtered attributes"""
[docs]
def visible(self):
return self.get_queryset().filter(visible=True).order_by("display_order")
[docs]
class Attribute(models.Model):
"""
Auxiliary model for storing layer attributes.
This helps reduce the need for runtime lookups
to other servers, and lets users customize attribute titles,
sort order, and visibility.
"""
[docs]
dataset = models.ForeignKey(
Dataset, blank=False, null=False, unique=False, on_delete=models.CASCADE, related_name="attribute_set"
)
[docs]
attribute = models.CharField(
_("attribute name"),
help_text=_("name of attribute as stored in shapefile/spatial database"),
max_length=255,
blank=False,
null=True,
unique=False,
)
[docs]
description = models.CharField(
_("attribute description"),
help_text=_("description of attribute to be used in metadata"),
max_length=255,
blank=True,
null=True,
)
[docs]
attribute_label = models.CharField(
_("attribute label"),
help_text=_("title of attribute as displayed in GeoNode"),
max_length=255,
blank=True,
null=True,
unique=False,
)
[docs]
attribute_type = models.CharField(
_("attribute type"),
help_text=_("the data type of the attribute (integer, string, geometry, etc)"),
max_length=50,
blank=False,
null=False,
default="xsd:string",
unique=False,
)
[docs]
visible = models.BooleanField(
_("visible?"), help_text=_("specifies if the attribute should be displayed in identify results"), default=True
)
[docs]
display_order = models.IntegerField(
_("display order"),
help_text=_("specifies the order in which attribute should be displayed in identify results"),
default=1,
)
"""
Attribute FeatureInfo-Type list
"""
[docs]
TYPE_PROPERTY = "type_property"
[docs]
TYPE_HREF = "type_href"
[docs]
TYPE_IMAGE = "type_image"
[docs]
TYPE_VIDEO_MP4 = "type_video_mp4"
[docs]
TYPE_VIDEO_OGG = "type_video_ogg"
[docs]
TYPE_VIDEO_WEBM = "type_video_webm"
[docs]
TYPE_VIDEO_3GP = "type_video_3gp"
[docs]
TYPE_VIDEO_FLV = "type_video_flv"
[docs]
TYPE_VIDEO_YOUTUBE = "type_video_youtube"
[docs]
TYPE_AUDIO = "type_audio"
[docs]
TYPE_IFRAME = "type_iframe"
[docs]
TYPES = (
(
TYPE_PROPERTY,
_("Label"),
),
(
TYPE_HREF,
_("URL"),
),
(
TYPE_IMAGE,
_(
"Image",
),
),
(
TYPE_VIDEO_MP4,
_(
"Video (mp4)",
),
),
(
TYPE_VIDEO_OGG,
_(
"Video (ogg)",
),
),
(
TYPE_VIDEO_WEBM,
_(
"Video (webm)",
),
),
(
TYPE_VIDEO_3GP,
_(
"Video (3gp)",
),
),
(
TYPE_VIDEO_FLV,
_(
"Video (flv)",
),
),
(
TYPE_VIDEO_YOUTUBE,
_(
"Video (YouTube/VIMEO - embedded)",
),
),
(
TYPE_AUDIO,
_(
"Audio",
),
),
(
TYPE_IFRAME,
_(
"IFRAME",
),
),
)
[docs]
featureinfo_type = models.CharField(
_("featureinfo type"),
help_text=_("specifies if the attribute should be rendered with an HTML widget on GetFeatureInfo template."),
max_length=255,
unique=False,
blank=False,
null=False,
default=TYPE_PROPERTY,
choices=TYPES,
)
# statistical derivations
[docs]
count = models.IntegerField(_("count"), help_text=_("count value for this field"), default=1)
[docs]
min = models.CharField(
_("min"),
help_text=_("minimum value for this field"),
max_length=255,
blank=False,
null=True,
unique=False,
default="NA",
)
[docs]
max = models.CharField(
_("max"),
help_text=_("maximum value for this field"),
max_length=255,
blank=False,
null=True,
unique=False,
default="NA",
)
[docs]
average = models.CharField(
_("average"),
help_text=_("average value for this field"),
max_length=255,
blank=False,
null=True,
unique=False,
default="NA",
)
[docs]
stddev = models.CharField(
_("standard deviation"),
help_text=_("standard deviation for this field"),
max_length=255,
blank=False,
null=True,
unique=False,
default="NA",
)
[docs]
sum = models.CharField(
_("sum"),
help_text=_("sum value for this field"),
max_length=255,
blank=False,
null=True,
unique=False,
default="NA",
)
[docs]
unique_values = models.TextField(_("unique values for this field"), null=True, blank=True, default="NA")
[docs]
last_stats_updated = models.DateTimeField(
_("last modified"), default=now, help_text=_("date when attribute statistics were last updated")
) # passing the method itself, not
[docs]
objects = AttributeManager()
[docs]
def __str__(self):
return str(self.attribute_label if self.attribute_label else self.attribute)
[docs]
def unique_values_as_list(self):
return self.unique_values.split(",")