Source code for geonode.layers.api.tests

#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
from io import BytesIO
import logging

from unittest.mock import patch
from django.contrib.auth import get_user_model
from urllib.parse import urljoin

from django.conf import settings
from django.urls import reverse
from rest_framework.test import APITestCase

from guardian.shortcuts import assign_perm, get_anonymous_user
from geonode.geoserver.createlayer.utils import create_dataset

from geonode.base.models import Link
from geonode.base.populate_test_data import create_models, create_single_dataset
from geonode.layers.models import Attribute, Dataset
from geonode.maps.models import Map, MapLayer

[docs] logger = logging.getLogger(__name__)
[docs] class DatasetsApiTests(APITestCase):
[docs] fixtures = ["initial_data.json", "group_test_data.json", "default_oauth_apps.json"]
[docs] def setUp(self): self.exml_path = f"{settings.PROJECT_ROOT}/base/fixtures/test_xml.xml" create_models(b"document") create_models(b"map") create_models(b"dataset")
[docs] def test_datasets(self): """ Ensure we can access the Layers list. """ url = reverse("datasets-list") # Anonymous response = self.client.get(url, format="json") self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 5) self.assertEqual(response.data["total"], 8) # Pagination self.assertEqual(len(response.data["datasets"]), 8) logger.debug(response.data) for _l in response.data["datasets"]: self.assertTrue(_l["resource_type"], "dataset") # Test list response doesn't have attribute_set self.assertIsNotNone(response.data["datasets"][0].get("ptype")) self.assertIsNone(response.data["datasets"][0].get("attribute_set")) self.assertIsNone(response.data["datasets"][0].get("featureinfo_custom_template")) _dataset = Dataset.objects.first() assign_perm("base.view_resourcebase", get_anonymous_user(), _dataset.get_self_resource()) # Test detail response has attribute_set url = urljoin(f"{reverse('datasets-list')}/", f"{_dataset.pk}") response = self.client.get(url, format="json") self.assertIsNotNone(response.data["dataset"].get("ptype")) self.assertIsNotNone(response.data["dataset"].get("subtype")) self.assertIsNotNone(response.data["dataset"].get("attribute_set")) # Test "featureinfo_custom_template" _attribute, _ = Attribute.objects.get_or_create(dataset=_dataset, attribute="name") try: _attribute.visible = True _attribute.attribute_type = Attribute.TYPE_PROPERTY _attribute.description = "The Name" _attribute.attribute_label = "Name" _attribute.display_order = 1 _attribute.save() url = urljoin(f"{reverse('datasets-list')}/", f"{_dataset.pk}") response = self.client.get(url, format="json") self.assertIsNotNone(response.data["dataset"].get("featureinfo_custom_template")) self.assertEqual( response.data["dataset"].get("featureinfo_custom_template"), '<div style="overflow-x:hidden"><div class="row"><div class="col-xs-6" style="font-weight: bold; word-wrap: break-word;">Name:</div>\ <div class="col-xs-6" style="word-wrap: break-word;">${properties[\'name\']}</div></div></div>', ) _dataset.featureinfo_custom_template = "<div>Foo Bar</div>" _dataset.save() url = urljoin(f"{reverse('datasets-list')}/", f"{_dataset.pk}") response = self.client.get(url, format="json") self.assertIsNotNone(response.data["dataset"].get("featureinfo_custom_template")) self.assertEqual( response.data["dataset"].get("featureinfo_custom_template"), '<div style="overflow-x:hidden"><div class="row"><div class="col-xs-6" style="font-weight: bold; word-wrap: break-word;">Name:</div>\ <div class="col-xs-6" style="word-wrap: break-word;">${properties[\'name\']}</div></div></div>', ) _dataset.use_featureinfo_custom_template = True _dataset.save() url = urljoin(f"{reverse('datasets-list')}/", f"{_dataset.pk}") response = self.client.get(url, format="json") self.assertIsNotNone(response.data["dataset"].get("featureinfo_custom_template")) self.assertEqual(response.data["dataset"].get("featureinfo_custom_template"), "<div>Foo Bar</div>") finally: _attribute.delete() _dataset.featureinfo_custom_template = None _dataset.use_featureinfo_custom_template = False _dataset.save()
[docs] def test_extra_metadata_included_with_param(self): _dataset = Dataset.objects.first() url = urljoin(f"{reverse('datasets-list')}/", f"{_dataset.pk}") data = {"include[]": "metadata"} response = self.client.get(url, format="json", data=data) self.assertIsNotNone(response.data["dataset"].get("metadata")) response = self.client.get(url, format="json") self.assertNotIn("metadata", response.data["dataset"])
[docs] def test_raw_HTML_stripped_properties(self): """ Ensure "raw_*" properties returns no HTML or carriage-return tag """ dataset = Dataset.objects.first() dataset.abstract = ( '<p><em>No abstract provided</em>.</p>\r\n<p><img src="data:image/jpeg;base64,/9j/4AAQSkZJR/>' ) dataset.constraints_other = '<p><span style="text-decoration: underline;">None</span></p>' dataset.supplemental_information = "<p>No information provided &iacute;</p> <p>&pound;682m</p>" dataset.data_quality_statement = '<p><strong>OK</strong></p>\r\n<table style="border-collapse: collapse; width:\ 85.2071%;" border="1">\r\n<tbody>\r\n<tr>\r\n<td style="width: 49.6528%;">1</td>\r\n<td style="width:\ 50%;">2</td>\r\n</tr>\r\n<tr>\r\n<td style="width: 49.6528%;">a</td>\r\n<td style="width: 50%;">b</td>\ \r\n</tr>\r\n</tbody>\r\n</table>' dataset.save() # Admin self.assertTrue(self.client.login(username="admin", password="admin")) url = reverse("datasets-detail", kwargs={"pk": dataset.pk}) response = self.client.get(url, format="json") self.assertEqual(response.status_code, 200) self.assertEqual(int(response.data["dataset"]["pk"]), int(dataset.pk)) self.assertEqual(response.data["dataset"]["raw_abstract"], "No abstract provided.") self.assertEqual(response.data["dataset"]["raw_constraints_other"], "None") self.assertEqual(response.data["dataset"]["raw_supplemental_information"], "No information provided í £682m") self.assertEqual(response.data["dataset"]["raw_data_quality_statement"], "OK 1 2 a b")
[docs] def test_layer_replace_anonymous_should_raise_error(self): layer = Dataset.objects.first() url = reverse("datasets-replace-dataset", args=(layer.id,)) expected = { "success": False, "errors": ["Authentication credentials were not provided."], "code": "not_authenticated", } response = self.client.patch(url) self.assertEqual(403, response.status_code) self.assertDictEqual(expected, response.json())
[docs] def test_layer_replace_should_redirect_for_not_accepted_method(self): layer = Dataset.objects.first() url = reverse("datasets-replace-dataset", args=(layer.id,)) self.client.login(username="admin", password="admin") response = self.client.post(url) self.assertEqual(405, response.status_code) response = self.client.get(url) self.assertEqual(405, response.status_code)
[docs] def test_layer_replace_should_raise_error_if_layer_does_not_exists(self): url = reverse("datasets-replace-dataset", args=(999999999999999,)) expected = {"success": False, "errors": ["Layer with ID 999999999999999 is not available"], "code": "not_found"} self.client.login(username="admin", password="admin") response = self.client.patch(url) self.assertEqual(404, response.status_code) self.assertDictEqual(expected, response.json())
[docs] def test_dataset_append_anonymous_should_raise_error(self): layer = Dataset.objects.first() url = reverse("datasets-append-dataset", args=(layer.id,)) expected = { "success": False, "errors": ["Authentication credentials were not provided."], "code": "not_authenticated", } response = self.client.patch(url) self.assertEqual(403, response.status_code) self.assertDictEqual(expected, response.json())
[docs] def test_dataset_append_should_redirect_for_not_accepted_method(self): layer = Dataset.objects.first() url = reverse("datasets-append-dataset", args=(layer.id,)) self.client.login(username="admin", password="admin") response = self.client.post(url) self.assertEqual(405, response.status_code) response = self.client.get(url) self.assertEqual(405, response.status_code)
[docs] def test_dataset_append_should_raise_error_if_layer_does_not_exists(self): url = reverse("datasets-append-dataset", args=(999999999999999,)) expected = {"success": False, "errors": ["Layer with ID 999999999999999 is not available"], "code": "not_found"} self.client.login(username="admin", password="admin") response = self.client.patch(url) self.assertEqual(404, response.status_code, response.json()) self.assertDictEqual(expected, response.json())
@patch("geonode.layers.api.views.validate_input_source")
[docs] def test_layer_replace_should_work(self, _validate_input_source): _validate_input_source.return_value = True admin = get_user_model().objects.get(username="admin") if Dataset.objects.filter(name="single_point").exists(): """ If the dataset already exists in the test env, we dont want that the test fail so we rename it and then we will rollback the cnahge """ _dataset = Dataset.objects.get(name="single_point") _dataset.name = "single_point_2" _dataset.save() try: layer = create_dataset( "single_point", "single_point", admin, "Point", ) except Exception as e: if "There is already a layer named" not in e.args[0]: raise e else: layer = create_single_dataset("single_point") cnt = Dataset.objects.count() layer.refresh_from_db() logger.error(layer.alternate) # renaming the file in the same way as the lasyer name # the filename must be consiste with the layer name github_path = "https://github.com/GeoNode/gisdata/tree/master/gisdata/data/good/vector/" payload = { "store_spatial_files": False, "base_file": f"{github_path}/single_point.shp", "dbf_file": f"{github_path}/single_point.dbf", "shx_file": f"{github_path}/single_point.shx", "prj_file": f"{github_path}/single_point.prj", } url = reverse("datasets-replace-dataset", args=(layer.id,)) self.client.login(username="admin", password="admin") response = self.client.patch(url, data=payload) self.assertEqual(200, response.status_code, response.json()) layer.refresh_from_db() # evaluate that the number of available layer is not changed self.assertEqual(Dataset.objects.count(), cnt)
[docs] def test_patch_point_of_contact(self): layer = Dataset.objects.first() url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}") self.client.login(username="admin", password="admin") get_user_model().objects.get_or_create(username="ninja") get_user_model().objects.get_or_create(username="turtle") users = get_user_model().objects.exclude(pk=-1) user_ids = [user.pk for user in users] patch_data = {"poc": [{"pk": uid} for uid in user_ids]} response = self.client.patch(url, data=patch_data, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all(user_id in [poc.get("pk") for poc in response.json().get("dataset").get("poc")] for user_id in user_ids) ) # Resetting all point of contact response = self.client.patch(url, data={"poc": []}, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id not in [poc.get("pk") for poc in response.json().get("dataset").get("poc")] for user_id in user_ids ) )
[docs] def test_patch_metadata_author(self): layer = Dataset.objects.first() url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}") self.client.login(username="admin", password="admin") get_user_model().objects.get_or_create(username="ninja") get_user_model().objects.get_or_create(username="turtle") users = get_user_model().objects.exclude(pk=-1) user_ids = [user.pk for user in users] patch_data = {"metadata_author": [{"pk": uid} for uid in user_ids]} response = self.client.patch(url, data=patch_data, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id in [ metadata_author.get("pk") for metadata_author in response.json().get("dataset").get("metadata_author") ] for user_id in user_ids ) ) # Resetting all metadata authors response = self.client.patch(url, data={"metadata_author": []}, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id not in [ metadata_author.get("pk") for metadata_author in response.json().get("dataset").get("metadata_author") ] for user_id in user_ids ) )
[docs] def test_patch_processor(self): layer = Dataset.objects.first() url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}") self.client.login(username="admin", password="admin") get_user_model().objects.get_or_create(username="ninja") get_user_model().objects.get_or_create(username="turtle") users = get_user_model().objects.exclude(pk=-1) user_ids = [user.pk for user in users] patch_data = {"processor": [{"pk": uid} for uid in user_ids]} response = self.client.patch(url, data=patch_data, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id in [processor.get("pk") for processor in response.json().get("dataset").get("processor")] for user_id in user_ids ) ) # Resetting all processors response = self.client.patch(url, data={"processor": []}, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id not in [processor.get("pk") for processor in response.json().get("dataset").get("processor")] for user_id in user_ids ) )
[docs] def test_patch_publisher(self): layer = Dataset.objects.first() url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}") self.client.login(username="admin", password="admin") get_user_model().objects.get_or_create(username="ninja") get_user_model().objects.get_or_create(username="turtle") users = get_user_model().objects.exclude(pk=-1) user_ids = [user.pk for user in users] patch_data = {"publisher": [{"pk": uid} for uid in user_ids]} response = self.client.patch(url, data=patch_data, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id in [publisher.get("pk") for publisher in response.json().get("dataset").get("publisher")] for user_id in user_ids ) ) # Resetting all publishers response = self.client.patch(url, data={"publisher": []}, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id not in [publisher.get("pk") for publisher in response.json().get("dataset").get("publisher")] for user_id in user_ids ) )
[docs] def test_patch_custodian(self): layer = Dataset.objects.first() url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}") self.client.login(username="admin", password="admin") get_user_model().objects.get_or_create(username="ninja") get_user_model().objects.get_or_create(username="turtle") users = get_user_model().objects.exclude(pk=-1) user_ids = [user.pk for user in users] patch_data = {"custodian": [{"pk": uid} for uid in user_ids]} response = self.client.patch(url, data=patch_data, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id in [custodian.get("pk") for custodian in response.json().get("dataset").get("custodian")] for user_id in user_ids ) ) # Resetting all custodians response = self.client.patch(url, data={"custodian": []}, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id not in [custodian.get("pk") for custodian in response.json().get("dataset").get("custodian")] for user_id in user_ids ) )
[docs] def test_patch_distributor(self): layer = Dataset.objects.first() url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}") self.client.login(username="admin", password="admin") get_user_model().objects.get_or_create(username="ninja") get_user_model().objects.get_or_create(username="turtle") users = get_user_model().objects.exclude(pk=-1) user_ids = [user.pk for user in users] patch_data = {"distributor": [{"pk": uid} for uid in user_ids]} response = self.client.patch(url, data=patch_data, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id in [distributor.get("pk") for distributor in response.json().get("dataset").get("distributor")] for user_id in user_ids ) ) # Resetting all distributers response = self.client.patch(url, data={"distributor": []}, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id not in [distributor.get("pk") for distributor in response.json().get("dataset").get("distributor")] for user_id in user_ids ) )
[docs] def test_patch_resource_user(self): layer = Dataset.objects.first() url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}") self.client.login(username="admin", password="admin") get_user_model().objects.get_or_create(username="ninja") get_user_model().objects.get_or_create(username="turtle") users = get_user_model().objects.exclude(pk=-1) user_ids = [user.pk for user in users] patch_data = {"resource_user": [{"pk": uid} for uid in user_ids]} response = self.client.patch(url, data=patch_data, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id in [resource_user.get("pk") for resource_user in response.json().get("dataset").get("resource_user")] for user_id in user_ids ) ) # Resetting all resource users response = self.client.patch(url, data={"resource_user": []}, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id not in [ resource_user.get("pk") for resource_user in response.json().get("dataset").get("resource_user") ] for user_id in user_ids ) )
[docs] def test_patch_resource_provider(self): layer = Dataset.objects.first() url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}") self.client.login(username="admin", password="admin") get_user_model().objects.get_or_create(username="ninja") get_user_model().objects.get_or_create(username="turtle") users = get_user_model().objects.exclude(pk=-1) user_ids = [user.pk for user in users] patch_data = {"resource_provider": [{"pk": uid} for uid in user_ids]} response = self.client.patch(url, data=patch_data, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id in [ resource_provider.get("pk") for resource_provider in response.json().get("dataset").get("resource_provider") ] for user_id in user_ids ) ) # Resetting all principal investigator response = self.client.patch(url, data={"resource_provider": []}, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id not in [ resource_provider.get("pk") for resource_provider in response.json().get("dataset").get("resource_provider") ] for user_id in user_ids ) )
[docs] def test_patch_originator(self): layer = Dataset.objects.first() url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}") self.client.login(username="admin", password="admin") get_user_model().objects.get_or_create(username="ninja") get_user_model().objects.get_or_create(username="turtle") users = get_user_model().objects.exclude(pk=-1) user_ids = [user.pk for user in users] patch_data = {"originator": [{"pk": uid} for uid in user_ids]} response = self.client.patch(url, data=patch_data, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id in [originator.get("pk") for originator in response.json().get("dataset").get("originator")] for user_id in user_ids ) ) # Resetting all originators response = self.client.patch(url, data={"originator": []}, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id not in [originator.get("pk") for originator in response.json().get("dataset").get("originator")] for user_id in user_ids ) )
[docs] def test_patch_principal_investigator(self): layer = Dataset.objects.first() url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}") self.client.login(username="admin", password="admin") get_user_model().objects.get_or_create(username="ninja") get_user_model().objects.get_or_create(username="turtle") users = get_user_model().objects.exclude(pk=-1) user_ids = [user.pk for user in users] patch_data = {"principal_investigator": [{"pk": uid} for uid in user_ids]} response = self.client.patch(url, data=patch_data, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id in [ principal_investigator.get("pk") for principal_investigator in response.json().get("dataset").get("principal_investigator") ] for user_id in user_ids ) ) # Resetting all principal investigator response = self.client.patch(url, data={"principal_investigator": []}, format="json") self.assertEqual(200, response.status_code) self.assertTrue( all( user_id not in [ principal_investigator.get("pk") for principal_investigator in response.json().get("dataset").get("principal_investigator") ] for user_id in user_ids ) )
[docs] def test_metadata_update_for_not_supported_method(self): layer = Dataset.objects.first() url = reverse("datasets-replace-metadata", args=(layer.id,)) self.client.login(username="admin", password="admin") response = self.client.post(url) self.assertEqual(405, response.status_code) response = self.client.get(url) self.assertEqual(405, response.status_code)
[docs] def test_metadata_update_for_not_authorized_user(self): layer = Dataset.objects.first() url = reverse("datasets-replace-metadata", args=(layer.id,)) response = self.client.put(url) self.assertEqual(403, response.status_code)
[docs] def test_unsupported_file_throws_error(self): layer = Dataset.objects.first() url = reverse("datasets-replace-metadata", args=(layer.id,)) self.client.login(username="admin", password="admin") data = '<?xml version="1.0" encoding="UTF-8"?><invalid></invalid>' f = BytesIO(bytes(data, encoding="utf-8")) f.name = "metadata.xml" put_data = {"metadata_file": f} response = self.client.put(url, data=put_data) self.assertEqual(500, response.status_code)
[docs] def test_valid_metadata_file_with_different_uuid(self): layer = Dataset.objects.first() url = reverse("datasets-replace-metadata", args=(layer.id,)) self.client.login(username="admin", password="admin") f = open(self.exml_path, "r") put_data = {"metadata_file": f} response = self.client.put(url, data=put_data) self.assertEqual(500, response.status_code)
[docs] def test_permissions_for_not_permitted_user(self): get_user_model().objects.create_user( username="some_user", password="some_password", email="some_user@geonode.org", ) layer = Dataset.objects.first() url = reverse("datasets-replace-metadata", args=(layer.id,)) self.client.login(username="some_user", password="some_password") uuid = layer.uuid data = open(self.exml_path).read() data = data.replace("7cfbc42c-efa7-431c-8daa-1399dff4cd19", uuid) f = BytesIO(bytes(data, encoding="utf-8")) f.name = "metadata.xml" put_data = {"metadata_file": f} response = self.client.put(url, data=put_data) self.assertEqual(403, response.status_code)
[docs] def test_permissions_for_permitted_user(self): another_non_admin_user = get_user_model().objects.create_user( username="some_other_user", password="some_other_password", email="some_other_user@geonode.org", ) layer = Dataset.objects.first() assign_perm("base.change_resourcebase_metadata", another_non_admin_user, layer.get_self_resource()) url = reverse("datasets-replace-metadata", args=(layer.id,)) self.client.login(username="some_other_user", password="some_other_password") uuid = layer.uuid data = open(self.exml_path).read() data = data.replace("7cfbc42c-efa7-431c-8daa-1399dff4cd19", uuid) f = BytesIO(bytes(data, encoding="utf-8")) f.name = "metadata.xml" put_data = {"metadata_file": f} response = self.client.put(url, data=put_data) self.assertEqual(200, response.status_code)
[docs] def test_valid_metadata_file(self): layer = Dataset.objects.first() url = reverse("datasets-replace-metadata", args=(layer.id,)) self.client.login(username="admin", password="admin") uuid = layer.uuid data = open(self.exml_path).read() data = data.replace("7cfbc42c-efa7-431c-8daa-1399dff4cd19", uuid) f = BytesIO(bytes(data, encoding="utf-8")) f.name = "metadata.xml" put_data = {"metadata_file": f} response = self.client.put(url, data=put_data) self.assertEqual(200, response.status_code)
[docs] def test_download_api(self): dataset = create_single_dataset("test_dataset") url = reverse("datasets-detail", kwargs={"pk": dataset.pk}) response = self.client.get(url) self.assertTrue(response.status_code == 200) data = response.json()["dataset"] download_url_data = data["download_urls"][0] download_url = reverse("dataset_download", args=[dataset.alternate]) self.assertEqual(download_url_data["default"], True) self.assertEqual(download_url_data["ajax_safe"], True) self.assertEqual(download_url_data["url"], download_url) link = Link(link_type="original", url="https://myoriginal.org", resource=dataset) link.save() response = self.client.get(url) data = response.json()["dataset"] download_url_data = data["download_urls"][0] download_url = reverse("dataset_download", args=[dataset.alternate]) self.assertEqual(download_url_data["default"], True) self.assertEqual(download_url_data["ajax_safe"], False) self.assertEqual(download_url_data["url"], "https://myoriginal.org")