#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import io
import itertools
import os
import shutil
import gisdata
import logging
import zipfile
from uuid import uuid4
from unittest.mock import MagicMock, patch
from collections import namedtuple
from pinax.ratings.models import OverallRating
from django.urls import reverse
from django.test import TestCase
from django.forms import ValidationError
from django.test.client import RequestFactory
from django.contrib.contenttypes.models import ContentType
from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.management import call_command
from django.contrib.auth.models import Group
from django.contrib.gis.geos import Polygon
from django.db.models import Count
from django.contrib.auth import get_user_model
from django.http import HttpResponse
from django.conf import settings
from django.test.utils import override_settings
from django.contrib.admin.sites import AdminSite
from geonode.geoserver.createlayer.utils import create_dataset
from geonode.layers import utils
from geonode.layers.utils import clear_dataset_download_handlers
from geonode.base import enumerations
from geonode.layers import DatasetAppConfig
from geonode.layers.admin import DatasetAdmin
from geonode.decorators import on_ogc_backend
from geonode.maps.models import Map, MapLayer
from geonode.utils import DisableDjangoSignals, mkdtemp
from geonode.layers.views import _resolve_dataset
from geonode import GeoNodeException, geoserver
from geonode.people.utils import get_valid_user
from geonode.people import Roles
from guardian.shortcuts import get_anonymous_user
from geonode.tests.base import GeoNodeBaseTestSupport
from geonode.resource.manager import resource_manager
from geonode.tests.utils import NotificationsTestsHelper
from geonode.layers.models import Dataset, Style, Attribute
from geonode.layers.forms import DatasetForm, DatasetTimeSerieForm, JSONField, LayerUploadForm
from geonode.layers.populate_datasets_data import create_dataset_data
from geonode.base.models import TopicCategory, License, Region, Link
from geonode.utils import check_ogc_backend, set_resource_default_links
from geonode.layers.metadata import convert_keyword, set_metadata, parse_metadata
from geonode.groups.models import GroupProfile
from geonode.layers.utils import (
is_sld_upload_only,
is_xml_upload_only,
dataset_type,
get_files,
get_valid_name,
get_valid_dataset_name,
surrogate_escape_string,
validate_input_source,
)
from geonode.base.populate_test_data import all_public, create_models, remove_models, create_single_dataset
from geonode.layers.download_handler import DatasetDownloadHandler
[docs]
logger = logging.getLogger(__name__)
[docs]
class DatasetsTest(GeoNodeBaseTestSupport):
"""Tests geonode.layers app/module"""
[docs]
fixtures = ["initial_data.json", "group_test_data.json", "default_oauth_apps.json"]
@classmethod
[docs]
def setUpClass(cls):
super().setUpClass()
create_models(type=cls.get_type, integration=cls.get_integration)
all_public()
@classmethod
[docs]
def tearDownClass(cls):
super().tearDownClass()
remove_models(cls.get_obj_ids, type=cls.get_type, integration=cls.get_integration)
[docs]
def setUp(self):
super().setUp()
self.user = "admin"
self.passwd = "admin"
self.anonymous_user = get_anonymous_user()
self.exml_path = f"{settings.PROJECT_ROOT}/base/fixtures/test_xml.xml"
self.sld_path = f"{settings.PROJECT_ROOT}/base/fixtures/test_sld.sld"
self.maxDiff = None
self.sut = create_single_dataset("single_point")
create_dataset_data(self.sut.resourcebase_ptr_id)
create_dataset_data(Dataset.objects.first().resourcebase_ptr_id)
self.r = namedtuple("GSCatalogRes", ["resource"])
site = AdminSite()
self.admin = DatasetAdmin(Dataset, site)
self.request_admin = RequestFactory().get("/admin")
self.request_admin.user = get_user_model().objects.get(username="admin")
# Admin Tests
[docs]
def test_admin_save_model(self):
obj = Dataset.objects.first()
self.assertEqual(len(obj.keywords.all()), 2)
form = self.admin.get_form(self.request_admin, obj=obj, change=True)
self.admin.save_model(self.request_admin, obj, form, True)
[docs]
def test_default_sourcetype(self):
obj = Dataset.objects.first()
self.assertEqual(obj.sourcetype, enumerations.SOURCE_TYPE_LOCAL)
# Data Tests
[docs]
def test_describe_data_2(self):
"""/data/geonode:CA/metadata -> Test accessing the description of a layer"""
self.assertEqual(10, get_user_model().objects.all().count())
response = self.client.get(reverse("dataset_metadata", args=("geonode:CA",)))
# Since we are not authenticated, we should not be able to access it
self.assertEqual(response.status_code, 302)
# but if we log in ...
self.client.login(username="admin", password="admin")
# ... all should be good
response = self.client.get(reverse("dataset_metadata", args=("geonode:CA",)))
self.assertEqual(response.status_code, 200)
[docs]
def test_describe_data_3(self):
"""/data/geonode:CA/metadata_detail -> Test accessing the description of a layer"""
self.client.login(username="admin", password="admin")
# ... all should be good
response = self.client.get(reverse("dataset_metadata_detail", args=("geonode:CA",)))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Approved", count=1, status_code=200, msg_prefix="", html=False)
self.assertContains(response, "Published", count=1, status_code=200, msg_prefix="", html=False)
self.assertContains(response, "Featured", count=3, status_code=200, msg_prefix="", html=False)
self.assertContains(response, "<dt>Group</dt>", count=0, status_code=200, msg_prefix="", html=False)
# ... now assigning a Group to the Dataset
lyr = Dataset.objects.get(alternate="geonode:CA")
group = Group.objects.first()
lyr.group = group
lyr.save()
response = self.client.get(reverse("dataset_metadata_detail", args=("geonode:CA",)))
self.assertEqual(response.status_code, 200)
self.assertContains(response, "<dt>Group</dt>", count=1, status_code=200, msg_prefix="", html=False)
lyr.group = None
lyr.save()
# Dataset Tests
[docs]
def test_dataset_name_clash(self):
_ll_1 = Dataset.objects.create(
uuid=str(uuid4()),
owner=get_user_model().objects.get(username=self.user),
name="states",
store="geonode_data",
subtype="vector",
alternate="geonode:states",
)
_ll_2 = Dataset.objects.create(
uuid=str(uuid4()),
owner=get_user_model().objects.get(username=self.user),
name="geonode:states",
store="httpfooremoteservce",
subtype="remote",
alternate="geonode:states",
)
_ll_1.set_permissions({"users": {"bobby": ["base.view_resourcebase"]}})
_ll_2.set_permissions({"users": {"bobby": ["base.view_resourcebase"]}})
self.client.login(username="bobby", password="bob")
_request = self.client.request()
_request.user = get_user_model().objects.get(username="bobby")
_ll = _resolve_dataset(_request, alternate="geonode:states")
self.assertIsNotNone(_ll)
self.assertEqual(_ll.name, _ll_1.name)
[docs]
def test_describe_data(self):
"""/data/geonode:CA/metadata -> Test accessing the description of a layer"""
self.assertEqual(10, get_user_model().objects.all().count())
response = self.client.get(reverse("dataset_metadata", args=("geonode:CA",)))
# Since we are not authenticated, we should not be able to access it
self.assertEqual(response.status_code, 302)
# but if we log in ...
self.client.login(username="admin", password="admin")
# ... all should be good
response = self.client.get(reverse("dataset_metadata", args=("geonode:CA",)))
self.assertEqual(response.status_code, 200)
[docs]
def test_dataset_attributes(self):
lyr = Dataset.objects.all().first()
# There should be a total of 3 attributes
self.assertEqual(len(lyr.attribute_set.all()), 4)
# 2 out of 3 attributes should be visible
custom_attributes = lyr.attribute_set.visible()
self.assertEqual(len(custom_attributes), 3)
# place_ name should come before description
self.assertEqual(custom_attributes[0].attribute_label, "Place Name")
self.assertEqual(custom_attributes[1].attribute_label, "Description")
self.assertEqual(custom_attributes[2].attribute, "N\xfamero_De_M\xe9dicos")
# TODO: do test against layer with actual attribute statistics
self.assertEqual(custom_attributes[1].count, 1)
self.assertEqual(custom_attributes[1].min, "NA")
self.assertEqual(custom_attributes[1].max, "NA")
self.assertEqual(custom_attributes[1].average, "NA")
self.assertEqual(custom_attributes[1].median, "NA")
self.assertEqual(custom_attributes[1].stddev, "NA")
self.assertEqual(custom_attributes[1].sum, "NA")
self.assertEqual(custom_attributes[1].unique_values, "NA")
[docs]
def test_dataset_bbox(self):
lyr = Dataset.objects.all().first()
dataset_bbox = lyr.bbox[0:4]
logger.debug(dataset_bbox)
def decimal_encode(bbox):
_bbox = [float(o) for o in bbox]
# Must be in the form : [x0, x1, y0, y1
return [_bbox[0], _bbox[2], _bbox[1], _bbox[3]]
from geonode.utils import bbox_to_projection
projected_bbox = decimal_encode(
bbox_to_projection(
[float(coord) for coord in dataset_bbox]
+ [
lyr.srid,
],
target_srid=4326,
)[:4]
)
logger.debug(projected_bbox)
self.assertEqual(projected_bbox, [-180.0, -90.0, 180.0, 90.0])
logger.debug(lyr.ll_bbox)
self.assertEqual(lyr.ll_bbox, [-180.0, 180.0, -90.0, 90.0, "EPSG:4326"])
projected_bbox = decimal_encode(
bbox_to_projection(
[float(coord) for coord in dataset_bbox]
+ [
lyr.srid,
],
target_srid=3857,
)[:4]
)
solution = [-20037397.023298454, -74299743.40065672, 20037397.02329845, 74299743.40061197]
logger.debug(projected_bbox)
for coord, check in zip(projected_bbox, solution):
self.assertAlmostEqual(coord, check, places=3)
[docs]
def test_dataset_attributes_feature_catalogue(self):
"""Test layer feature catalogue functionality"""
self.assertTrue(self.client.login(username="admin", password="admin"))
# test a non-existing layer
url = reverse("dataset_feature_catalogue", args=("bad_dataset",))
response = self.client.get(url)
self.assertEqual(response.status_code, 404)
# Get the layer to work with
layer = Dataset.objects.all()[3]
url = reverse("dataset_feature_catalogue", args=(layer.alternate,))
response = self.client.get(url)
self.assertNotEqual(response.status_code, 404)
[docs]
def test_dataset_attribute_config(self):
lyr = Dataset.objects.all().first()
attribute_config = lyr.attribute_config()
custom_attributes = attribute_config["getFeatureInfo"]
self.assertEqual(custom_attributes["fields"], ["place_name", "description", "N\xfamero_De_M\xe9dicos"])
self.assertEqual(custom_attributes["propertyNames"]["description"], "Description")
self.assertEqual(custom_attributes["propertyNames"]["place_name"], "Place Name")
attributes = Attribute.objects.filter(dataset=lyr)
for _att in attributes:
self.assertEqual(_att.featureinfo_type, "type_property")
lyr.featureinfo_custom_template = "<h1>Test HTML</h1>"
lyr.use_featureinfo_custom_template = True
lyr.save()
attribute_config = lyr.attribute_config()
self.assertTrue("ftInfoTemplate" in attribute_config)
self.assertEqual(attribute_config["ftInfoTemplate"], "<h1>Test HTML</h1>")
lyr.use_featureinfo_custom_template = False
lyr.save()
attribute_config = lyr.attribute_config()
self.assertTrue("ftInfoTemplate" not in attribute_config)
[docs]
def test_dataset_styles(self):
lyr = Dataset.objects.all().first()
# There should be a total of 3 styles
self.assertEqual(len(lyr.styles.all()), 4)
# One of the style is the default one
self.assertEqual(lyr.default_style, Style.objects.get(id=lyr.default_style.id))
try:
[str(style) for style in lyr.styles.all()]
except UnicodeEncodeError:
self.fail("str of the Style model throws a UnicodeEncodeError with special characters.")
[docs]
def test_dataset_links(self):
lyr = Dataset.objects.filter(subtype="vector").first()
self.assertEqual(lyr.subtype, "vector")
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
links = Link.objects.filter(resource=lyr.resourcebase_ptr, link_type="metadata")
self.assertIsNotNone(links)
for ll in links:
self.assertEqual(ll.link_type, "metadata")
_def_link_types = ("data", "image", "original", "html", "OGC:WMS", "OGC:WFS", "OGC:WCS")
Link.objects.filter(resource=lyr.resourcebase_ptr, link_type__in=_def_link_types).delete()
links = Link.objects.filter(resource=lyr.resourcebase_ptr, link_type="data")
self.assertIsNotNone(links)
set_resource_default_links(lyr, lyr)
links = Link.objects.filter(resource=lyr.resourcebase_ptr, link_type="metadata")
self.assertIsNotNone(links)
for ll in links:
self.assertEqual(ll.link_type, "metadata")
links = Link.objects.filter(resource=lyr.resourcebase_ptr, link_type="data")
self.assertIsNotNone(links)
links = Link.objects.filter(resource=lyr.resourcebase_ptr, link_type="image")
self.assertIsNotNone(links)
lyr = Dataset.objects.filter(subtype="raster").first()
self.assertEqual(lyr.subtype, "raster")
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
links = Link.objects.filter(resource=lyr.resourcebase_ptr, link_type="metadata")
self.assertIsNotNone(links)
for ll in links:
self.assertEqual(ll.link_type, "metadata")
_def_link_types = ("data", "image", "original", "html", "OGC:WMS", "OGC:WFS", "OGC:WCS")
Link.objects.filter(resource=lyr.resourcebase_ptr, link_type__in=_def_link_types).delete()
links = Link.objects.filter(resource=lyr.resourcebase_ptr, link_type="data")
self.assertIsNotNone(links)
set_resource_default_links(lyr, lyr)
links = Link.objects.filter(resource=lyr.resourcebase_ptr, link_type="metadata")
self.assertIsNotNone(links)
for ll in links:
self.assertEqual(ll.link_type, "metadata")
links = Link.objects.filter(resource=lyr.resourcebase_ptr, link_type="data")
self.assertIsNotNone(links)
links = Link.objects.filter(resource=lyr.resourcebase_ptr, link_type="image")
self.assertIsNotNone(links)
[docs]
def test_get_valid_user(self):
# Verify it accepts an admin user
adminuser = get_user_model().objects.get(is_superuser=True)
valid_user = get_valid_user(adminuser)
msg = f'Passed in a valid admin user "{adminuser}" but got "{valid_user}" in return'
assert valid_user.id == adminuser.id, msg
# Verify it returns a valid user after receiving None
valid_user = get_valid_user(None)
msg = f'Expected valid user after passing None, got "{valid_user}"'
assert isinstance(valid_user, get_user_model()), msg
newuser = get_user_model().objects.create(username="arieluser")
valid_user = get_valid_user(newuser)
msg = f'Passed in a valid user "{newuser}" but got "{valid_user}" in return'
assert valid_user.id == newuser.id, msg
valid_user = get_valid_user("arieluser")
msg = 'Passed in a valid user by username "arieluser" but got' f' "{valid_user}" in return'
assert valid_user.username == "arieluser", msg
nn = get_anonymous_user()
self.assertRaises(GeoNodeException, get_valid_user, nn)
[docs]
def testShapefileValidation(self):
files = dict(
base_file=SimpleUploadedFile("foo.shp", b" "),
shx_file=SimpleUploadedFile("foo.shx", b" "),
dbf_file=SimpleUploadedFile("foo.dbf", b" "),
prj_file=SimpleUploadedFile("foo.prj", b" "),
)
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(
base_file=SimpleUploadedFile("foo.SHP", b" "),
shx_file=SimpleUploadedFile("foo.SHX", b" "),
dbf_file=SimpleUploadedFile("foo.DBF", b" "),
prj_file=SimpleUploadedFile("foo.PRJ", b" "),
)
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(
base_file=SimpleUploadedFile("foo.SHP", b" "),
shx_file=SimpleUploadedFile("foo.shx", b" "),
dbf_file=SimpleUploadedFile("foo.dbf", b" "),
)
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(
base_file=SimpleUploadedFile("foo.SHP", b" "),
shx_file=SimpleUploadedFile("foo.shx", b" "),
dbf_file=SimpleUploadedFile("foo.dbf", b" "),
prj_file=SimpleUploadedFile("foo.PRJ", b" "),
)
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(
base_file=SimpleUploadedFile("foo.SHP", b" "),
shx_file=SimpleUploadedFile("bar.shx", b" "),
dbf_file=SimpleUploadedFile("bar.dbf", b" "),
prj_file=SimpleUploadedFile("bar.PRJ", b" "),
)
self.assertFalse(LayerUploadForm(dict(), files).is_valid())
files = dict(
base_file=SimpleUploadedFile("foo.shp", b" "),
dbf_file=SimpleUploadedFile("foo.dbf", b" "),
prj_file=SimpleUploadedFile("foo.PRJ", b" "),
)
self.assertFalse(LayerUploadForm(dict(), files).is_valid())
files = dict(
base_file=SimpleUploadedFile("foo.txt", b" "),
shx_file=SimpleUploadedFile("foo.shx", b" "),
dbf_file=SimpleUploadedFile("foo.sld", b" "),
prj_file=SimpleUploadedFile("foo.prj", b" "),
)
self.assertFalse(LayerUploadForm(dict(), files).is_valid())
[docs]
def testGeoTiffValidation(self):
files = dict(base_file=SimpleUploadedFile("foo.tif", b" "))
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(base_file=SimpleUploadedFile("foo.TIF", b" "))
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(base_file=SimpleUploadedFile("foo.tiff", b" "))
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(base_file=SimpleUploadedFile("foo.TIF", b" "))
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(base_file=SimpleUploadedFile("foo.geotif", b" "))
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(base_file=SimpleUploadedFile("foo.GEOTIF", b" "))
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(base_file=SimpleUploadedFile("foo.geotiff", b" "))
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(base_file=SimpleUploadedFile("foo.GEOTIF", b" "))
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
[docs]
def testASCIIValidation(self):
files = dict(base_file=SimpleUploadedFile("foo.asc", b" "))
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
files = dict(base_file=SimpleUploadedFile("foo.ASC", b" "))
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
[docs]
def testZipValidation(self):
the_zip = zipfile.ZipFile("test_upload.zip", "w")
in_memory_file = io.StringIO()
in_memory_file.write("test")
the_zip.writestr("foo.shp", in_memory_file.getvalue())
the_zip.writestr("foo.dbf", in_memory_file.getvalue())
the_zip.writestr("foo.shx", in_memory_file.getvalue())
the_zip.writestr("foo.prj", in_memory_file.getvalue())
the_zip.close()
files = dict(base_file=SimpleUploadedFile("test_upload.zip", open("test_upload.zip", mode="rb").read()))
self.assertTrue(LayerUploadForm(dict(), files).is_valid())
os.remove("test_upload.zip")
[docs]
def testWriteFiles(self):
files = dict(
base_file=SimpleUploadedFile("foo.shp", b" "),
shx_file=SimpleUploadedFile("foo.shx", b" "),
dbf_file=SimpleUploadedFile("foo.dbf", b" "),
prj_file=SimpleUploadedFile("foo.prj", b" "),
)
form = LayerUploadForm(dict(), files)
self.assertTrue(form.is_valid())
tempdir = form.write_files()[0]
self.assertEqual(set(os.listdir(tempdir)), {"foo.shp", "foo.shx", "foo.dbf", "foo.prj"})
the_zip = zipfile.ZipFile("test_upload.zip", "w")
in_memory_file = io.StringIO()
in_memory_file.write("test")
the_zip.writestr("foo.shp", in_memory_file.getvalue())
the_zip.writestr("foo.dbf", in_memory_file.getvalue())
the_zip.writestr("foo.shx", in_memory_file.getvalue())
the_zip.writestr("foo.prj", in_memory_file.getvalue())
the_zip.close()
files = dict(base_file=SimpleUploadedFile("test_upload.zip", open("test_upload.zip", mode="rb").read()))
form = LayerUploadForm(dict(), files)
self.assertTrue(form.is_valid())
tempdir = form.write_files()[0]
self.assertEqual(set(os.listdir(tempdir)), {"foo.shp", "foo.shx", "foo.dbf", "foo.prj"})
os.remove("test_upload.zip")
[docs]
def test_dataset_type(self):
self.assertEqual(dataset_type("foo.shp"), "vector")
self.assertEqual(dataset_type("foo.SHP"), "vector")
self.assertEqual(dataset_type("foo.sHp"), "vector")
self.assertEqual(dataset_type("foo.tif"), "raster")
self.assertEqual(dataset_type("foo.TIF"), "raster")
self.assertEqual(dataset_type("foo.TiF"), "raster")
self.assertEqual(dataset_type("foo.geotif"), "raster")
self.assertEqual(dataset_type("foo.GEOTIF"), "raster")
self.assertEqual(dataset_type("foo.gEoTiF"), "raster")
self.assertEqual(dataset_type("foo.tiff"), "raster")
self.assertEqual(dataset_type("foo.TIFF"), "raster")
self.assertEqual(dataset_type("foo.TiFf"), "raster")
self.assertEqual(dataset_type("foo.geotiff"), "raster")
self.assertEqual(dataset_type("foo.GEOTIFF"), "raster")
self.assertEqual(dataset_type("foo.gEoTiFf"), "raster")
self.assertEqual(dataset_type("foo.asc"), "raster")
self.assertEqual(dataset_type("foo.ASC"), "raster")
self.assertEqual(dataset_type("foo.AsC"), "raster")
# basically anything else should produce a GeoNodeException
self.assertRaises(GeoNodeException, lambda: dataset_type("foo.gml"))
[docs]
def test_get_files(self):
def generate_files(*extensions):
if extensions[0].lower() != "shp":
return
d = None
expected_files = None
try:
d = mkdtemp()
fnames = [f"foo.{ext}" for ext in extensions]
expected_files = {ext.lower(): fname for ext, fname in zip(extensions, fnames)}
for f in fnames:
path = os.path.join(d, f)
# open and immediately close to create empty file
open(path, "w").close()
finally:
return d, expected_files
# Check that a well-formed Shapefile has its components all picked up
d = None
_tmpdir = None
try:
d, expected_files = generate_files("shp", "shx", "prj", "dbf")
gotten_files, _tmpdir = get_files(os.path.join(d, "foo.shp"))
gotten_files = {k: os.path.basename(v) for k, v in gotten_files.items()}
self.assertEqual(gotten_files, expected_files)
finally:
if d is not None:
shutil.rmtree(d, ignore_errors=True)
if _tmpdir is not None:
shutil.rmtree(_tmpdir, ignore_errors=True)
# Check that a Shapefile missing required components raises an
# exception
d = None
try:
d, expected_files = generate_files("shp", "shx", "prj")
self.assertRaises(GeoNodeException, lambda: get_files(os.path.join(d, "foo.shp")))
finally:
if d is not None:
shutil.rmtree(d, ignore_errors=True)
# Check that including an SLD with a valid shapefile results in the SLD
# getting picked up
d = None
_tmpdir = None
try:
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
d, expected_files = generate_files("shp", "shx", "prj", "dbf", "sld")
gotten_files, _tmpdir = get_files(os.path.join(d, "foo.shp"))
gotten_files = {k: os.path.basename(v) for k, v in gotten_files.items()}
self.assertEqual(gotten_files, expected_files)
finally:
if d is not None:
shutil.rmtree(d, ignore_errors=True)
if _tmpdir is not None:
shutil.rmtree(_tmpdir, ignore_errors=True)
# Check that capitalized extensions are ok
d = None
_tmpdir = None
try:
d, expected_files = generate_files("SHP", "SHX", "PRJ", "DBF")
gotten_files, _tmpdir = get_files(os.path.join(d, "foo.SHP"))
gotten_files = {k: os.path.basename(v) for k, v in gotten_files.items()}
self.assertEqual(gotten_files, expected_files)
finally:
if d is not None:
shutil.rmtree(d, ignore_errors=True)
if _tmpdir is not None:
shutil.rmtree(_tmpdir, ignore_errors=True)
# Check that mixed capital and lowercase extensions are ok
d = None
_tmpdir = None
try:
d, expected_files = generate_files("SHP", "shx", "pRJ", "DBF")
gotten_files, _tmpdir = get_files(os.path.join(d, "foo.SHP"))
gotten_files = {k: os.path.basename(v) for k, v in gotten_files.items()}
self.assertEqual(gotten_files, expected_files)
finally:
if d is not None:
shutil.rmtree(d, ignore_errors=True)
if _tmpdir is not None:
shutil.rmtree(_tmpdir, ignore_errors=True)
# Check that including both capital and lowercase extensions raises an
# exception
d = None
try:
d, expected_files = generate_files("SHP", "SHX", "PRJ", "DBF", "shp", "shx", "prj", "dbf")
# Only run the tests if this is a case sensitive OS
if len(os.listdir(d)) == len(expected_files):
self.assertRaises(GeoNodeException, lambda: get_files(os.path.join(d, "foo.SHP")))
self.assertRaises(GeoNodeException, lambda: get_files(os.path.join(d, "foo.shp")))
finally:
if d is not None:
shutil.rmtree(d, ignore_errors=True)
# Check that including both capital and lowercase PRJ (this is
# special-cased in the implementation)
d = None
try:
d, expected_files = generate_files("SHP", "SHX", "PRJ", "DBF", "prj")
# Only run the tests if this is a case sensitive OS
if len(os.listdir(d)) == len(expected_files):
self.assertRaises(GeoNodeException, lambda: get_files(os.path.join(d, "foo.SHP")))
self.assertRaises(GeoNodeException, lambda: get_files(os.path.join(d, "foo.shp")))
finally:
if d is not None:
shutil.rmtree(d, ignore_errors=True)
# Check that including both capital and lowercase SLD (this is
# special-cased in the implementation)
d = None
try:
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
d, expected_files = generate_files("SHP", "SHX", "PRJ", "DBF", "SLD", "sld")
# Only run the tests if this is a case sensitive OS
if len(os.listdir(d)) == len(expected_files):
self.assertRaises(GeoNodeException, lambda: get_files(os.path.join(d, "foo.SHP")))
self.assertRaises(GeoNodeException, lambda: get_files(os.path.join(d, "foo.shp")))
finally:
if d is not None:
shutil.rmtree(d, ignore_errors=True)
[docs]
def test_get_valid_name(self):
self.assertEqual(get_valid_name("blug"), "blug")
self.assertEqual(get_valid_name("<-->"), "_")
self.assertEqual(get_valid_name("<ab>"), "_ab_")
self.assertNotEqual(get_valid_name("CA"), "CA_1")
self.assertNotEqual(get_valid_name("CA"), "CA_1")
[docs]
def test_get_valid_dataset_name(self):
self.assertEqual(get_valid_dataset_name("blug", False), "blug")
self.assertEqual(get_valid_dataset_name("blug", True), "blug")
self.assertEqual(get_valid_dataset_name("<ab>", False), "_ab_")
self.assertEqual(get_valid_dataset_name("<ab>", True), "<ab>")
self.assertEqual(get_valid_dataset_name("<-->", False), "_")
self.assertEqual(get_valid_dataset_name("<-->", True), "<-->")
self.assertNotEqual(get_valid_dataset_name("CA", False), "CA_1")
self.assertNotEqual(get_valid_dataset_name("CA", False), "CA_1")
self.assertEqual(get_valid_dataset_name("CA", True), "CA")
self.assertEqual(get_valid_dataset_name("CA", True), "CA")
layer = Dataset.objects.get(name="CA")
self.assertNotEqual(get_valid_dataset_name(layer, False), "CA_1")
self.assertEqual(get_valid_dataset_name(layer, True), "CA")
self.assertRaises(GeoNodeException, get_valid_dataset_name, 12, False)
self.assertRaises(GeoNodeException, get_valid_dataset_name, 12, True)
# NOTE: we don't care about file content for many of these tests (the
# forms under test validate based only on file name, and leave actual
# content inspection to GeoServer) but Django's form validation will omit
# any files with empty bodies.
#
# That is, this leads to mysterious test failures:
# SimpleUploadedFile('foo', ' '.encode("UTF-8"))
#
# And this should be used instead to avoid that:
# SimpleUploadedFile('foo', ' '.encode("UTF-8"))
[docs]
def testJSONField(self):
field = JSONField()
# a valid JSON document should pass
field.clean('{ "users": [] }')
# text which is not JSON should fail
self.assertRaises(ValidationError, lambda: field.clean("<users></users>"))
[docs]
def test_rating_dataset_remove(self):
"""Test layer rating is removed on layer remove"""
# Get the layer to work with
layer = Dataset.objects.all()[3]
dataset_id = layer.id
# Create the rating with the correct content type
ctype = ContentType.objects.get(model="dataset")
OverallRating.objects.create(category=2, object_id=dataset_id, content_type=ctype, rating=3)
rating = OverallRating.objects.all()
self.assertEqual(rating.count(), 1)
# Remove the layer
resource_manager.delete(layer.uuid)
# Check there are no ratings matching the remove layer
rating = OverallRating.objects.all()
self.assertEqual(rating.count(), 0)
[docs]
def test_sld_upload(self):
"""Test layer remove functionality"""
layer = Dataset.objects.all().first()
url = reverse("dataset_sld_upload", args=(layer.alternate,))
# Now test with a valid user
self.client.login(username="admin", password="admin")
# test a method other than POST and GET
response = self.client.put(url)
content = response.content.decode("utf-8")
self.assertEqual(response.status_code, 200)
self.assertFalse("#modal_perms" in content)
[docs]
def test_category_counts(self):
topics = TopicCategory.objects.all()
topics = topics.annotate(**{"dataset_count": Count("resourcebase__dataset__category")})
location = topics.get(identifier="location")
# there are three layers with location category
self.assertEqual(location.dataset_count, 3)
# change the category of one layers_count
layer = Dataset.objects.filter(category=location)[0]
elevation = topics.get(identifier="elevation")
layer.category = elevation
layer.save()
# reload the categories since it's caching the old count
topics = topics.annotate(**{"dataset_count": Count("resourcebase__dataset__category")})
location = topics.get(identifier="location")
elevation = topics.get(identifier="elevation")
self.assertEqual(location.dataset_count, 2)
self.assertEqual(elevation.dataset_count, 4)
# delete a layer and check the count update
# use the first since it's the only one which has styles
layer = Dataset.objects.all().first()
elevation = topics.get(identifier="elevation")
self.assertEqual(elevation.dataset_count, 4)
layer.delete()
topics = topics.annotate(**{"dataset_count": Count("resourcebase__dataset__category")})
elevation = topics.get(identifier="elevation")
self.assertEqual(elevation.dataset_count, 3)
[docs]
def test_assign_change_dataset_data_perm(self):
"""
Ensure set_permissions supports the change_dataset_data permission.
"""
layer = Dataset.objects.first()
user = get_anonymous_user()
layer.set_permissions({"users": {user.username: ["change_dataset_data"]}})
perms = layer.get_all_level_info()
self.assertNotIn(user, perms["users"])
self.assertNotIn(user.username, perms["users"])
[docs]
def test_batch_edit(self):
"""
Test batch editing of metadata fields.
"""
Model = Dataset
view = "dataset_batch_metadata"
resources = Model.objects.all()[:3]
ids = ",".join(str(element.pk) for element in resources)
# test non-admin access
self.client.login(username="bobby", password="bob")
response = self.client.get(reverse(view))
self.assertTrue(response.status_code in (401, 403))
# test group change
group = Group.objects.first()
self.client.login(username="admin", password="admin")
response = self.client.post(
reverse(view),
data={"group": group.pk, "ids": ids, "regions": 1},
)
self.assertEqual(response.status_code, 302)
resources = Model.objects.filter(id__in=[r.pk for r in resources])
for resource in resources:
self.assertEqual(resource.group, group)
# test owner change
owner = get_user_model().objects.first()
response = self.client.post(
reverse(view),
data={"owner": owner.pk, "ids": ids, "regions": 1},
)
self.assertEqual(response.status_code, 302)
resources = Model.objects.filter(id__in=[r.pk for r in resources])
for resource in resources:
self.assertEqual(resource.owner, owner)
# test license change
license = License.objects.first()
response = self.client.post(
reverse(view),
data={"license": license.pk, "ids": ids, "regions": 1},
)
self.assertEqual(response.status_code, 302)
resources = Model.objects.filter(id__in=[r.pk for r in resources])
for resource in resources:
self.assertEqual(resource.license, license)
# test regions change
region = Region.objects.first()
response = self.client.post(
reverse(view),
data={"region": region.pk, "ids": ids, "regions": 1},
)
self.assertEqual(response.status_code, 302)
resources = Model.objects.filter(id__in=[r.pk for r in resources])
for resource in resources:
if resource.regions.all():
self.assertTrue(region in resource.regions.all())
# test language change
language = "eng"
response = self.client.post(
reverse(view),
data={"language": language, "ids": ids, "regions": 1},
)
resources = Model.objects.filter(id__in=[r.pk for r in resources])
for resource in resources:
self.assertEqual(resource.language, language)
# test keywords change
keywords = "some,thing,new"
response = self.client.post(
reverse(view),
data={"keywords": keywords, "ids": ids, "regions": 1},
)
resources = Model.objects.filter(id__in=[r.pk for r in resources])
for resource in resources:
for word in resource.keywords.all():
self.assertTrue(word.name in keywords.split(","))
[docs]
def test_surrogate_escape_string(self):
surrogate_escape_raw = "Zo\udcc3\udcab"
surrogate_escape_expected = "Zoë"
surrogate_escape_result = surrogate_escape_string(
surrogate_escape_raw, "UTF-8"
) # add more test cases using different charsets?
self.assertEqual(
surrogate_escape_result,
surrogate_escape_expected,
"layers.utils.surrogate_escape_string did not produce expected result. "
f"Expected {surrogate_escape_expected}, received {surrogate_escape_result}",
)
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs]
def test_assign_remove_permissions(self):
# Assing
layer = Dataset.objects.all().first()
perm_spec = layer.get_all_level_info()
self.assertNotIn(get_user_model().objects.get(username="norman"), perm_spec["users"])
utils.set_datasets_permissions(
"edit", resources_names=[layer.name], users_usernames=["norman"], delete_flag=False, verbose=True
)
perm_spec = layer.get_all_level_info()
_c = 0
if "users" in perm_spec:
for _u in perm_spec["users"]:
if _u == "norman" or _u == get_user_model().objects.get(username="norman"):
_c += 1
# "norman" has both read & write permissions
self.assertEqual(_c, 1)
# Remove
utils.set_datasets_permissions(
"read", resources_names=[layer.name], users_usernames=["norman"], delete_flag=True, verbose=True
)
perm_spec = layer.get_all_level_info()
_c = 0
if "users" in perm_spec:
for _u in perm_spec["users"]:
if _u == "norman" or _u == get_user_model().objects.get(username="norman"):
_c += 1
# "norman" has no permissions
self.assertEqual(_c, 0)
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs]
def test_assign_remove_permissions_for_groups(self):
# Assing
layer = Dataset.objects.all().first()
perm_spec = layer.get_all_level_info()
group_profile = GroupProfile.objects.create(slug="group1", title="group1", access="public")
self.assertNotIn(group_profile, perm_spec["groups"])
# giving manage permissions to the group
utils.set_datasets_permissions(
"manage", resources_names=[layer.name], groups_names=["group1"], delete_flag=False, verbose=True
)
perm_spec = layer.get_all_level_info()
expected = {
"change_dataset_data",
"change_dataset_style",
"change_resourcebase",
"change_resourcebase_metadata",
"change_resourcebase_permissions",
"delete_resourcebase",
"download_resourcebase",
"publish_resourcebase",
"view_resourcebase",
}
# checking the perms list
self.assertSetEqual(expected, set(perm_spec["groups"][group_profile.group]))
# Chaning perms to the group from manage to read
utils.set_datasets_permissions(
"view", resources_names=[layer.name], groups_names=["group1"], delete_flag=False, verbose=True
)
perm_spec = layer.get_all_level_info()
expected = {"view_resourcebase"}
# checking the perms list
self.assertSetEqual(expected, set(perm_spec["groups"][group_profile.group]))
# Chaning perms to the group from manage to read
utils.set_datasets_permissions(
"view", resources_names=[layer.name], groups_names=["group1"], delete_flag=True, verbose=True
)
perm_spec = layer.get_all_level_info()
# checking the perms list
self.assertTrue(group_profile.group not in perm_spec["groups"])
if group_profile:
group_profile.delete()
[docs]
def test_xml_should_return_404_if_the_dataset_does_not_exists(self):
params = {
"permissions": '{ "users": {"AnonymousUser": ["view_resourcebase"]} , "groups":{}}',
"base_file": open(self.exml_path),
"xml_file": open(self.exml_path),
"dataset_title": "Fake layer title",
"metadata_upload_form": True,
"time": False,
"charset": "UTF-8",
}
self.client.login(username="admin", password="admin")
resp = self.client.post(reverse("dataset_upload"), params)
self.assertEqual(404, resp.status_code)
[docs]
def test_xml_should_update_the_dataset_with_the_expected_values(self):
params = {
"permissions": '{ "users": {"AnonymousUser": ["view_resourcebase"]} , "groups":{}}',
"base_file": open(self.exml_path),
"xml_file": open(self.exml_path),
"dataset_title": "geonode:single_point",
"metadata_upload_form": True,
"time": False,
"charset": "UTF-8",
}
self.client.login(username="admin", password="admin")
prev_dataset = Dataset.objects.get(typename="geonode:single_point")
self.assertEqual(0, prev_dataset.keywords.count())
resp = self.client.post(reverse("dataset_upload"), params)
self.assertEqual(404, resp.status_code)
self.assertEqual(
resp.json()["errors"], "The UUID identifier from the XML Metadata, is different from the one saved"
)
[docs]
def test_sld_should_raise_500_if_is_invalid(self):
layer = Dataset.objects.get(typename="geonode:single_point")
params = {
"permissions": '{ "users": {"AnonymousUser": ["view_resourcebase"]} , "groups":{}}',
"base_file": open(self.sld_path),
"sld_file": open(self.sld_path),
"dataset_title": "random",
"metadata_upload_form": False,
"time": False,
"charset": "UTF-8",
}
self.client.login(username="admin", password="admin")
self.assertGreaterEqual(layer.styles.count(), 1)
self.assertIsNotNone(layer.styles.first())
resp = self.client.post(reverse("dataset_upload"), params)
self.assertEqual(500, resp.status_code)
self.assertFalse(resp.json().get("success"))
self.assertEqual("No Dataset matches the given query.", resp.json().get("errors"))
[docs]
def test_sld_should_update_the_dataset_with_the_expected_values(self):
layer = Dataset.objects.get(typename="geonode:single_point")
params = {
"permissions": '{ "users": {"AnonymousUser": ["view_resourcebase"]} , "groups":{}}',
"base_file": open(self.sld_path),
"sld_file": open(self.sld_path),
"dataset_title": f"geonode:{layer.name}",
"metadata_upload_form": False,
"time": False,
"charset": "UTF-8",
}
self.client.login(username="admin", password="admin")
self.assertGreaterEqual(layer.styles.count(), 1)
self.assertIsNotNone(layer.styles.first())
resp = self.client.post(reverse("dataset_upload"), params)
self.assertEqual(200, resp.status_code)
updated_dataset = Dataset.objects.get(alternate=f"geonode:{layer.name}")
# just checking some values if are updated
self.assertGreaterEqual(updated_dataset.styles.all().count(), 1)
self.assertIsNotNone(updated_dataset.styles.first())
self.assertEqual(layer.styles.first().sld_title, updated_dataset.styles.first().sld_title)
[docs]
def test_xml_should_raise_an_error_if_the_uuid_is_changed(self):
"""
If the UUID coming from the XML and the one saved in the DB are different
The system should raise an error
"""
params = {
"permissions": '{ "users": {"AnonymousUser": ["view_resourcebase"]} , "groups":{}}',
"base_file": open(self.exml_path),
"xml_file": open(self.exml_path),
"dataset_title": "geonode:single_point",
"metadata_upload_form": True,
"time": False,
"charset": "UTF-8",
}
self.client.login(username="admin", password="admin")
prev_dataset = Dataset.objects.get(typename="geonode:single_point")
self.assertEqual(0, prev_dataset.keywords.count())
resp = self.client.post(reverse("dataset_upload"), params)
self.assertEqual(404, resp.status_code)
expected = {
"success": False,
"errors": "The UUID identifier from the XML Metadata, is different from the one saved",
}
self.assertDictEqual(expected, resp.json())
[docs]
def test_will_raise_exception_for_replace_vector_dataset_with_raster(self):
layer = Dataset.objects.get(name="single_point")
filename = "/tpm/filename.tif"
files = ["/opt/file1.shp", "/opt/file2.ccc"]
with self.assertRaises(Exception) as e:
validate_input_source(layer, filename, files, action_type="append")
expected = "You are attempting to append a vector dataset with a raster."
self.assertEqual(expected, e.exception.args[0])
[docs]
def test_will_raise_exception_for_replace_dataset_with_different_file_name(self):
layer = Dataset.objects.get(name="single_point")
file_path = gisdata.VECTOR_DATA
filename = os.path.join(file_path, "san_andres_y_providencia_highway.shp")
files = {
"shp": filename,
"dbf": f"{file_path}/san_andres_y_providencia_highway.sbf",
"prj": f"{file_path}/san_andres_y_providencia_highway.prj",
"shx": f"{file_path}/san_andres_y_providencia_highway.shx",
}
with self.assertRaises(Exception) as e:
validate_input_source(layer, filename, files, action_type="append")
expected = (
"Some error occurred while trying to access the uploaded schema: "
"Please ensure the name is consistent with the file you are trying to append."
)
self.assertEqual(expected, e.exception.args[0])
@patch("geonode.layers.utils.gs_catalog")
[docs]
def test_will_raise_exception_for_not_existing_dataset_in_the_catalog(self, catalog):
catalog.get_layer.return_value = None
create_single_dataset("san_andres_y_providencia_water")
layer = Dataset.objects.get(name="san_andres_y_providencia_water")
file_path = gisdata.VECTOR_DATA
filename = os.path.join(file_path, "san_andres_y_providencia_water.shp")
files = {
"shp": filename,
"dbf": f"{file_path}/san_andres_y_providencia_water.sbf",
"prj": f"{file_path}/san_andres_y_providencia_water.prj",
"shx": f"{file_path}/san_andres_y_providencia_water.shx",
}
with self.assertRaises(Exception) as e:
validate_input_source(layer, filename, files, action_type="append")
expected = (
"Some error occurred while trying to access the uploaded schema: "
"The selected Dataset does not exists in the catalog."
)
self.assertEqual(expected, e.exception.args[0])
@patch("geonode.layers.utils.gs_catalog")
[docs]
def test_will_raise_exception_if_schema_is_not_equal_between_catalog_and_file(self, catalog):
attr = namedtuple("GSCatalogAttr", ["attributes"])
attr.attributes = []
self.r.resource = attr
catalog.get_layer.return_value = self.r
create_single_dataset("san_andres_y_providencia_water")
layer = Dataset.objects.filter(name="san_andres_y_providencia_water")[0]
file_path = gisdata.VECTOR_DATA
filename = os.path.join(file_path, "san_andres_y_providencia_water.shp")
files = {
"shp": filename,
"dbf": f"{file_path}/san_andres_y_providencia_water.sbf",
"prj": f"{file_path}/san_andres_y_providencia_water.prj",
"shx": f"{file_path}/san_andres_y_providencia_water.shx",
}
with self.assertRaises(Exception) as e:
validate_input_source(layer, filename, files, action_type="append")
expected = (
"Some error occurred while trying to access the uploaded schema: "
"Please ensure that the dataset structure is consistent with the file you are trying to append."
)
self.assertEqual(expected, e.exception.args[0])
@patch("geonode.layers.utils.gs_catalog")
[docs]
def test_validation_will_pass_for_valid_append(self, catalog):
attr = namedtuple("GSCatalogAttr", ["attributes"])
attr.attributes = ["NATURAL", "NAME"]
self.r.resource = attr
catalog.get_layer.return_value = self.r
create_single_dataset("san_andres_y_providencia_water")
layer = Dataset.objects.filter(name="san_andres_y_providencia_water")[0]
file_path = gisdata.VECTOR_DATA
filename = os.path.join(file_path, "san_andres_y_providencia_water.shp")
files = {
"shp": filename,
"dbf": f"{file_path}/san_andres_y_providencia_water.sbf",
"prj": f"{file_path}/san_andres_y_providencia_water.prj",
"shx": f"{file_path}/san_andres_y_providencia_water.shx",
}
actual = validate_input_source(layer, filename, files, action_type="append")
self.assertTrue(actual)
[docs]
def test_dataset_download_not_found_for_non_existing_dataset(self):
self.client.login(username="admin", password="admin")
url = reverse("dataset_download", args=["foo-dataset"])
response = self.client.get(url)
self.assertEqual(404, response.status_code)
@override_settings(USE_GEOSERVER=False)
[docs]
def test_dataset_download_redirect_to_proxy_url(self):
# if settings.USE_GEOSERVER is false, the URL must be redirected
self.client.login(username="admin", password="admin")
dataset = Dataset.objects.first()
url = reverse("dataset_download", args=[dataset.alternate])
response = self.client.get(url)
self.assertEqual(302, response.status_code)
self.assertEqual(f"/download/{dataset.id}", response.url)
@patch("geonode.layers.download_handler.HttpClient.request")
[docs]
def test_dataset_download_call_the_catalog_raise_error_for_no_200(self, mocked_catalog):
_response = MagicMock(status_code=500, content="foo-bar")
mocked_catalog.return_value = _response, "foo-bar"
# if settings.USE_GEOSERVER is false, the URL must be redirected
self.client.login(username="admin", password="admin")
dataset = Dataset.objects.first()
url = reverse("dataset_download", args=[dataset.alternate])
response = self.client.get(url)
self.assertEqual(500, response.status_code)
self.assertDictEqual({"error": "Download dataset exception: error during call with GeoServer"}, response.json())
[docs]
def test_dataset_download_call_the_catalog_raise_error_for_error_content(self):
content = """<?xml version="1.0" encoding="UTF-8"?>
<ows:ExceptionReport xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="1.1.0" xsi:schemaLocation="http://www.opengis.net/ows/1.1 http://localhost:8080/geoserver/schemas/ows/1.1.0/owsAll.xsd">
<ows:Exception exceptionCode="InvalidParameterValue" locator="ResponseDocument">
<ows:ExceptionText>Foo Bar Exception</ows:ExceptionText>
</ows:Exception>
</ows:ExceptionReport>
""" # noqa
_response = MagicMock(status_code=200, text=content, headers={"Content-Type": "text/xml"})
# if settings.USE_GEOSERVER is false, the URL must be redirected
self.client.login(username="admin", password="admin")
dataset = Dataset.objects.first()
with patch("geonode.layers.download_handler.HttpClient.request") as mocked_catalog:
mocked_catalog.return_value = _response, content
url = reverse("dataset_download", args=[dataset.alternate])
response = self.client.get(url)
self.assertEqual(500, response.status_code)
self.assertDictEqual({"error": "InvalidParameterValue: Foo Bar Exception"}, response.json())
[docs]
def test_dataset_download_call_the_catalog_works(self):
# if settings.USE_GEOSERVER is false, the URL must be redirected
_response = MagicMock(status_code=200, text="", headers={"Content-Type": ""}) # noqa
self.client.login(username="admin", password="admin")
dataset = Dataset.objects.first()
layer = create_dataset(dataset.title, dataset.title, dataset.owner, "Point")
with patch("geonode.layers.download_handler.HttpClient.request") as mocked_catalog:
mocked_catalog.return_value = _response, ""
url = reverse("dataset_download", args=[layer.alternate])
response = self.client.get(url)
self.assertTrue(response.status_code == 200)
[docs]
def test_dataset_download_call_the_catalog_not_work_without_download_resurcebase_perm(self):
dataset = Dataset.objects.first()
dataset.set_permissions({"users": {"bobby": ["base.view_resourcebase"]}})
self.client.login(username="bobby", password="bob")
url = reverse("dataset_download", args=[dataset.alternate])
response = self.client.get(url)
self.assertEqual(404, response.status_code)
[docs]
def test_dataset_download_call_the_catalog_work_anonymous(self):
# if settings.USE_GEOSERVER is false, the URL must be redirected
_response = MagicMock(status_code=200, text="", headers={"Content-Type": ""}) # noqa
dataset = Dataset.objects.first()
layer = create_dataset(dataset.title, dataset.title, dataset.owner, "Point")
with patch("geonode.layers.download_handler.HttpClient.request") as mocked_catalog:
mocked_catalog.return_value = _response, ""
url = reverse("dataset_download", args=[layer.alternate])
response = self.client.get(url)
self.assertTrue(response.status_code == 200)
@override_settings(USE_GEOSERVER=True)
@patch("geonode.layers.download_handler.get_template")
[docs]
def test_dataset_download_call_the_catalog_work_for_raster(self, pathed_template):
# if settings.USE_GEOSERVER is false, the URL must be redirected
_response = MagicMock(status_code=200, text="", headers={"Content-Type": ""}) # noqa
dataset = Dataset.objects.filter(subtype="raster").first()
layer = create_dataset(dataset.title, dataset.title, dataset.owner, "Point")
Dataset.objects.filter(alternate=layer.alternate).update(subtype="raster")
with patch("geonode.layers.download_handler.HttpClient.request") as mocked_catalog:
mocked_catalog.return_value = _response, ""
url = reverse("dataset_download", args=[layer.alternate])
response = self.client.get(url)
self.assertTrue(response.status_code == 200)
"""
Evaluate that the context used by the template contains the right mimetype for the resource
"""
self.assertTupleEqual(
({"alternate": layer.alternate, "download_format": "image/tiff"},), pathed_template.mock_calls[1].args
)
@override_settings(USE_GEOSERVER=True)
@patch("geonode.layers.download_handler.get_template")
[docs]
def test_dataset_download_call_the_catalog_work_for_vector(self, pathed_template):
# if settings.USE_GEOSERVER is false, the URL must be redirected
_response = MagicMock(status_code=200, text="", headers={"Content-Type": ""}) # noqa
dataset = Dataset.objects.filter(subtype="vector").first()
layer = create_dataset(dataset.title, dataset.title, dataset.owner, "Point")
with patch("geonode.layers.download_handler.HttpClient.request") as mocked_catalog:
mocked_catalog.return_value = _response, ""
url = reverse("dataset_download", args=[layer.alternate])
response = self.client.get(url)
self.assertTrue(response.status_code == 200)
"""
Evaluate that the context used by the template contains the right mimetype for the resource
"""
self.assertTupleEqual(
({"alternate": layer.alternate, "download_format": "application/zip"},), pathed_template.mock_calls[1].args
)
[docs]
class TestLayerDetailMapViewRights(GeoNodeBaseTestSupport):
[docs]
fixtures = ["initial_data.json", "group_test_data.json", "default_oauth_apps.json"]
@classmethod
[docs]
def setUpClass(cls):
super().setUpClass()
create_models(type=cls.get_type, integration=cls.get_integration)
all_public()
create_single_dataset("single_point")
@classmethod
[docs]
def tearDownClass(cls):
super().tearDownClass()
remove_models(cls.get_obj_ids, type=cls.get_type, integration=cls.get_integration)
[docs]
def setUp(self):
super().setUp()
self.user = get_user_model().objects.create(username="dybala", email="dybala@gmail.com")
self.user.set_password("very-secret")
self.admin = get_user_model().objects.get(username="admin")
self.map = Map.objects.create(uuid=str(uuid4()), owner=self.admin, title="test", is_approved=True)
self.not_admin = get_user_model().objects.create(username="r-lukaku", is_active=True)
self.not_admin.set_password("very-secret")
self.not_admin.save()
self.layer = Dataset.objects.all().first()
create_dataset_data(self.layer.resourcebase_ptr_id)
with DisableDjangoSignals():
self.map_dataset = MapLayer.objects.create(
name=self.layer.alternate,
map=self.map,
)
[docs]
def test_that_keyword_multiselect_is_disabled_for_non_admin_users(self):
"""
Test that keyword multiselect widget is disabled when the user is not an admin
"""
self.test_dataset = resource_manager.create(
None, resource_type=Dataset, defaults=dict(owner=self.not_admin, title="test", is_approved=True)
)
url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
self.client.login(username=self.not_admin.username, password="very-secret")
with self.settings(FREETEXT_KEYWORDS_READONLY=True):
response = self.client.get(url)
self.assertTrue(response.context["form"]["keywords"].field.disabled, self.test_dataset.alternate)
[docs]
def test_that_keyword_multiselect_is_not_disabled_for_admin_users(self):
"""
Test that only admin users can create/edit keywords when FREETEXT_KEYWORDS_READONLY=True
"""
admin = self.not_admin
admin.is_superuser = True
admin.save()
self.test_dataset = resource_manager.create(
None, resource_type=Dataset, defaults=dict(owner=admin, title="test", is_approved=True)
)
url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
self.client.login(username=admin.username, password="very-secret")
with self.settings(FREETEXT_KEYWORDS_READONLY=True):
response = self.client.get(url)
self.assertFalse(response.context["form"]["keywords"].field.disabled, self.test_dataset.alternate)
[docs]
def test_that_featured_enabling_and_disabling_for_users(self):
self.test_dataset = resource_manager.create(
None, resource_type=Dataset, defaults=dict(owner=self.not_admin, title="test", is_approved=True)
)
url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
# Non Admins
self.client.login(username=self.not_admin.username, password="very-secret")
response = self.client.get(url)
self.assertFalse(self.not_admin.is_superuser)
self.assertEqual(response.status_code, 200)
self.assertTrue(response.context["form"]["featured"].field.disabled)
# Admin
self.client.login(username="admin", password="admin")
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
self.assertFalse(response.context["form"]["featured"].field.disabled)
[docs]
def test_that_non_admin_user_cannot_create_edit_keyword(self):
"""
Test that non admin users cannot edit/create keywords when FREETEXT_KEYWORDS_READONLY=True
"""
self.test_dataset = resource_manager.create(
None, resource_type=Dataset, defaults=dict(owner=self.not_admin, title="test", is_approved=True)
)
url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
self.client.login(username=self.not_admin.username, password="very-secret")
with self.settings(FREETEXT_KEYWORDS_READONLY=True):
response = self.client.post(url, data={"resource-keywords": "wonderful-keyword"})
self.assertEqual(response.status_code, 401)
self.assertEqual(response.content, b"Unauthorized: Cannot edit/create Free-text Keywords")
[docs]
def test_that_keyword_multiselect_is_enabled_for_non_admin_users_when_freetext_keywords_readonly_istrue(self):
"""
Test that keyword multiselect widget is not disabled when the user is not an admin
and FREETEXT_KEYWORDS_READONLY=False
"""
self.test_dataset = resource_manager.create(
None, resource_type=Dataset, defaults=dict(owner=self.not_admin, title="test", is_approved=True)
)
url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
self.client.login(username=self.not_admin.username, password="very-secret")
with self.settings(FREETEXT_KEYWORDS_READONLY=False):
response = self.client.get(url)
self.assertFalse(response.context["form"]["keywords"].field.disabled, self.test_dataset.alternate)
response = self.client.get(reverse("dataset_embed", args=(self.layer.alternate,)))
self.assertIsNotNone(response.context["resource"])
[docs]
def test_that_only_users_with_permissions_can_view_maps_in_dataset_view(self):
"""
Test only users with view permissions to a map can view them in layer detail view
"""
resource_manager.remove_permissions(self.map.uuid, instance=self.map.get_self_resource())
self.client.login(username="admin", password="admin")
response = self.client.get(reverse("dataset_embed", args=(self.layer.alternate,)))
self.assertEqual(response.context["resource"].alternate, self.map_dataset.name)
[docs]
def test_update_with_a_comma_in_title_is_replaced_by_undescore(self):
"""
Test that when changing the dataset title, if the entered title has a comma it is replaced by an undescore.
"""
self.test_dataset = None
try:
self.test_dataset = resource_manager.create(
None, resource_type=Dataset, defaults=dict(owner=self.not_admin, title="test", is_approved=True)
)
data = {
"resource-title": "test,comma,2021",
"resource-owner": self.test_dataset.owner.id,
"resource-date": str(self.test_dataset.date),
"resource-date_type": self.test_dataset.date_type,
"resource-language": self.test_dataset.language,
"dataset_attribute_set-TOTAL_FORMS": 0,
"dataset_attribute_set-INITIAL_FORMS": 0,
}
url = reverse("dataset_metadata", args=(self.test_dataset.alternate,))
self.client.login(username=self.not_admin.username, password="very-secret")
response = self.client.post(url, data=data)
self.test_dataset.refresh_from_db()
self.assertEqual(self.test_dataset.title, "test_comma_2021")
self.assertEqual(response.status_code, 200)
finally:
if self.test_dataset:
self.test_dataset.delete()
[docs]
class LayerNotificationsTestCase(NotificationsTestsHelper):
@classmethod
[docs]
def setUpClass(cls):
super().setUpClass()
create_models(type=cls.get_type, integration=cls.get_integration)
all_public()
@classmethod
[docs]
def tearDownClass(cls):
super().tearDownClass()
remove_models(cls.get_obj_ids, type=cls.get_type, integration=cls.get_integration)
[docs]
def setUp(self):
super().setUp()
self.user = "admin"
self.passwd = "admin"
self.anonymous_user = get_anonymous_user()
self.u = get_user_model().objects.get(username=self.user)
self.u.email = "test@email.com"
self.u.is_active = True
self.u.save()
self.setup_notifications_for(DatasetAppConfig.NOTIFICATIONS, self.u)
self.norman = get_user_model().objects.get(username="norman")
self.norman.email = "norman@email.com"
self.norman.is_active = True
self.norman.save()
self.setup_notifications_for(DatasetAppConfig.NOTIFICATIONS, self.norman)
[docs]
def testLayerNotifications(self):
with self.settings(
EMAIL_ENABLE=True,
NOTIFICATION_ENABLED=True,
NOTIFICATIONS_BACKEND="pinax.notifications.backends.email.EmailBackend",
PINAX_NOTIFICATIONS_QUEUE_ALL=False,
):
self.clear_notifications_queue()
self.client.login(username=self.user, password=self.passwd)
_l = resource_manager.create(
None,
resource_type=Dataset,
defaults=dict(
name="test notifications",
title="test notifications",
bbox_polygon=Polygon.from_bbox((-180, -90, 180, 90)),
srid="EPSG:4326",
owner=self.norman,
),
)
self.assertTrue(self.check_notification_out("dataset_created", self.u))
# Ensure "resource.owner" won't be notified for having uploaded its own resource
self.assertFalse(self.check_notification_out("dataset_created", self.norman))
self.clear_notifications_queue()
_l.name = "test notifications 2"
_l.save(notify=True)
self.assertTrue(self.check_notification_out("dataset_updated", self.u))
self.clear_notifications_queue()
lct = ContentType.objects.get_for_model(_l)
if "pinax.ratings" in settings.INSTALLED_APPS:
self.clear_notifications_queue()
from pinax.ratings.models import Rating
rating = Rating(user=self.norman, content_type=lct, object_id=_l.id, content_object=_l, rating=5)
rating.save()
self.assertTrue(self.check_notification_out("dataset_rated", self.u))
"""
Smoke test to explain how the uuidhandler will override the uuid for the layers
Documentation of the handler is available here:
https://github.com/GeoNode/documentation/blob/703cc6ba92b7b7a83637a874fb449420a9f8b78a/basic/settings/index.rst#uuid-handler
"""
[docs]
class DummyUUIDHandler:
def __init__(self, instance):
[docs]
self.instance = instance
[docs]
def create_uuid(self):
return f"abc:{self.instance.uuid}"
[docs]
class TestCustomUUidHandler(TestCase):
[docs]
def setUp(self):
User = get_user_model()
self.user = User.objects.create(username="test", email="test@test.com")
self.sut = Dataset.objects.create(
name="testLayer", owner=self.user, title="test", is_approved=True, uuid="abc-1234-abc"
)
[docs]
def test_dataset_will_maintain_his_uud_if_no_handler_is_definded(self):
expected = "abc-1234-abc"
self.assertEqual(expected, self.sut.uuid)
@override_settings(LAYER_UUID_HANDLER="geonode.layers.tests.DummyUUIDHandler")
[docs]
def test_dataset_will_override_the_uuid_if_handler_is_defined(self):
resource_manager.update(None, instance=self.sut, keywords=["updating", "values"])
expected = "abc:abc-1234-abc"
actual = Dataset.objects.get(id=self.sut.id)
self.assertEqual(expected, actual.uuid)
self.assertIsNotNone(self.sut.ows_url)
self.assertIsNotNone(self.sut.ptype)
self.assertIsNotNone(self.sut.sourcetype)
"""
Smoke test to explain how the new function for multiple parsers will work
Is required to define a fuction that takes 1 parameters (the metadata xml) and return 4 parameters.
Parameters:
xml (str): TextIOWrapper read example: open(self.exml_path).read())
Returns:
Tuple (tuple):
- (identifier, vals, regions, keywords)
identifier(str): default empy,
vals(dict): default empty,
regions(list): default empty,
keywords(list(dict)): default empty
"""
"""
Just a dummy function required for the smoke test above
"""
[docs]
class TestIsXmlUploadOnly(TestCase):
"""
This function will check if the files uploaded is a metadata file
"""
[docs]
def setUp(self):
self.exml_path = f"{settings.PROJECT_ROOT}/base/fixtures/test_xml.xml"
self.request = RequestFactory()
[docs]
def test_give_single_file_should_return_True(self):
with open(self.exml_path, "rb") as f:
request = self.request.post("/random/url")
request.FILES["base_file"] = f
actual = is_xml_upload_only(request)
self.assertTrue(actual)
[docs]
def test_give_single_file_should_return_False(self):
base_path = gisdata.GOOD_DATA
with open(f"{base_path}/vector/single_point.shp", "rb") as f:
request = self.request.post("/random/url")
request.FILES["base_file"] = f
actual = is_xml_upload_only(request)
self.assertFalse(actual)
[docs]
class TestIsSldUploadOnly(TestCase):
"""
This function will check if the files uploaded is a metadata file
"""
[docs]
def setUp(self):
self.exml_path = f"{settings.PROJECT_ROOT}/base/fixtures/test_sld.sld"
self.request = RequestFactory()
[docs]
def test_give_single_file_should_return_True(self):
with open(self.exml_path, "rb") as f:
request = self.request.post("/random/url")
request.FILES["base_file"] = f
actual = is_sld_upload_only(request)
self.assertTrue(actual)
[docs]
def test_give_single_file_should_return_False(self):
base_path = gisdata.GOOD_DATA
with open(f"{base_path}/vector/single_point.shp", "rb") as f:
request = self.request.post("/random/url")
request.FILES["base_file"] = f
actual = is_sld_upload_only(request)
self.assertFalse(actual)
[docs]
class SetLayersPermissionsCommand(GeoNodeBaseTestSupport):
"""
Unittest to ensure that the management command "set_layers_permissions"
behaves as expected
"""
[docs]
def test_user_get_the_download_permissions_for_the_selected_dataset(self):
"""
Given a user, the compact perms and the resource id, it shoul set the
permissions for the selected resource
"""
try:
expected_perms = {"view_resourcebase", "download_resourcebase"}
dataset, args, username, opts = self._create_arguments(perms_type="download")
call_command("set_layers_permissions", *args, **opts)
self._assert_perms(expected_perms, dataset, username)
finally:
if dataset:
dataset.delete()
[docs]
def test_user_get_the_view_permissions_for_the_selected_dataset(self):
"""
Given a user, the compact perms and the resource id, it shoul set the
permissions for the selected resource
"""
try:
expected_perms = {"view_resourcebase"}
dataset, args, username, opts = self._create_arguments(perms_type="view")
call_command("set_layers_permissions", *args, **opts)
self._assert_perms(expected_perms, dataset, username)
finally:
if dataset:
dataset.delete()
[docs]
def test_user_get_the_edit_permissions_for_the_selected_dataset(self):
"""
Given a user, the compact perms and the resource id, it shoul set the
permissions for the selected resource
"""
try:
expected_perms = {
"view_resourcebase",
"change_dataset_style",
"download_resourcebase",
"change_resourcebase_metadata",
"change_dataset_data",
"change_resourcebase",
}
dataset, args, username, opts = self._create_arguments(perms_type="edit")
call_command("set_layers_permissions", *args, **opts)
self._assert_perms(expected_perms, dataset, username)
finally:
if dataset:
dataset.delete()
[docs]
def test_user_get_the_manage_permissions_for_the_selected_dataset(self):
"""
Given a user, the compact perms and the resource id, it shoul set the
permissions for the selected resource
"""
try:
expected_perms = {
"delete_resourcebase",
"change_resourcebase",
"view_resourcebase",
"change_resourcebase_permissions",
"change_dataset_style",
"change_resourcebase_metadata",
"publish_resourcebase",
"change_dataset_data",
"download_resourcebase",
}
dataset, args, username, opts = self._create_arguments(perms_type="manage")
call_command("set_layers_permissions", *args, **opts)
self._assert_perms(expected_perms, dataset, username)
finally:
if dataset:
dataset.delete()
[docs]
def test_anonymous_user_cannot_get_edit_permissions(self):
"""
Given the Anonymous user, we should get an error trying to set "edit" permissions.
"""
try:
expected_perms = {}
dataset, args, username, opts = self._create_arguments(perms_type="edit")
username = "AnonymousUser"
opts["users"] = ["AnonymousUser"]
call_command("set_layers_permissions", *args, **opts)
self._assert_perms(expected_perms, dataset, username, assertion=False)
finally:
if dataset:
dataset.delete()
[docs]
def test_unset_anonymous_view_permissions(self):
"""
Given the Anonymous user, we should be able to unset any paermission.
"""
try:
expected_perms = {}
dataset, args, username, opts = self._create_arguments(perms_type="view", mode="unset")
username = "AnonymousUser"
opts["users"] = ["AnonymousUser"]
call_command("set_layers_permissions", *args, **opts)
self._assert_perms(expected_perms, dataset, username, assertion=False)
finally:
if dataset:
dataset.delete()
[docs]
def _create_arguments(self, perms_type, mode="set"):
dataset = create_single_dataset("dataset_for_management_command")
args = []
username = get_user_model().objects.exclude(username="admin").exclude(username="AnonymousUser").first().username
opts = {
"permission": perms_type,
"users": [username],
"resources": str(dataset.id),
"delete": True if mode == "unset" else False,
}
return dataset, args, username, opts
[docs]
def _assert_perms(self, expected_perms, dataset, username, assertion=True):
dataset.refresh_from_db()
perms = dataset.get_all_level_info()
if assertion:
self.assertTrue(username in [user.username for user in perms["users"]])
actual = set(
itertools.chain.from_iterable(
[perms for user, perms in perms["users"].items() if user.username == username]
)
)
self.assertSetEqual(expected_perms, actual)
else:
self.assertFalse(username in [user.username for user in perms["users"]])
[docs]
class TestDatasetDownloadHandler(GeoNodeBaseTestSupport):
[docs]
def setUp(self):
user = get_user_model().objects.first()
request = RequestFactory().get("http://test_url.com")
request.user = user
self.dataset = create_single_dataset("test_dataset_for_download")
self.sut = DatasetDownloadHandler(request, self.dataset.alternate)
[docs]
def test_download_url_without_original_link(self):
expected_url = reverse("dataset_download", args=[self.dataset.alternate])
self.assertEqual(expected_url, self.sut.download_url)
[docs]
def test_download_url_with_original_link(self):
Link.objects.update_or_create(
resource=self.dataset.resourcebase_ptr,
url="https://custom_dowonload_url.com",
defaults=dict(
extension="zip",
name="Original Dataset",
mime="application/octet-stream",
link_type="original",
),
)
expected_url = "https://custom_dowonload_url.com"
self.assertEqual(expected_url, self.sut.download_url)
[docs]
def test_get_resource_exists(self):
self.assertIsNotNone(self.sut.get_resource())
[docs]
def test_process_dowload(self):
response = self.sut.get_download_response()
self.assertIsNotNone(response)
[docs]
class DummyDownloadHandler(DatasetDownloadHandler):
[docs]
def get_download_response(self):
return HttpResponse(content=b"abcsfd2")
[docs]
class TestCustomDownloadHandler(GeoNodeBaseTestSupport):
@override_settings(DEFAULT_DATASET_DOWNLOAD_HANDLER="geonode.layers.tests.DummyDownloadHandler")
[docs]
def test_download_custom_handler(self):
clear_dataset_download_handlers()
dataset = create_single_dataset("test_custom_download_dataset")
url = reverse("dataset_download", args=[dataset.alternate])
self.client.login(username="admin", password="admin")
response = self.client.get(url)
self.assertTrue(response.status_code == 200)
self.assertEqual(response.content, b"abcsfd2")