#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
from io import BytesIO
import logging
from unittest.mock import patch
from django.contrib.auth import get_user_model
from urllib.parse import urljoin
from django.conf import settings
from django.urls import reverse
from rest_framework.test import APITestCase
from guardian.shortcuts import assign_perm, get_anonymous_user
from geonode.geoserver.createlayer.utils import create_dataset
from geonode.base.models import Link
from geonode.base.populate_test_data import create_models, create_single_dataset
from geonode.layers.models import Attribute, Dataset
from geonode.maps.models import Map, MapLayer
[docs]
logger = logging.getLogger(__name__)
[docs]
class DatasetsApiTests(APITestCase):
[docs]
fixtures = ["initial_data.json", "group_test_data.json", "default_oauth_apps.json"]
[docs]
def setUp(self):
self.exml_path = f"{settings.PROJECT_ROOT}/base/fixtures/test_xml.xml"
create_models(b"document")
create_models(b"map")
create_models(b"dataset")
[docs]
def test_datasets(self):
"""
Ensure we can access the Layers list.
"""
url = reverse("datasets-list")
# Anonymous
response = self.client.get(url, format="json")
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 5)
self.assertEqual(response.data["total"], 8)
# Pagination
self.assertEqual(len(response.data["datasets"]), 8)
logger.debug(response.data)
for _l in response.data["datasets"]:
self.assertTrue(_l["resource_type"], "dataset")
# Test list response doesn't have attribute_set
self.assertIsNotNone(response.data["datasets"][0].get("ptype"))
self.assertIsNone(response.data["datasets"][0].get("attribute_set"))
self.assertIsNone(response.data["datasets"][0].get("featureinfo_custom_template"))
_dataset = Dataset.objects.first()
assign_perm("base.view_resourcebase", get_anonymous_user(), _dataset.get_self_resource())
# Test detail response has attribute_set
url = urljoin(f"{reverse('datasets-list')}/", f"{_dataset.pk}")
response = self.client.get(url, format="json")
self.assertIsNotNone(response.data["dataset"].get("ptype"))
self.assertIsNotNone(response.data["dataset"].get("subtype"))
self.assertIsNotNone(response.data["dataset"].get("attribute_set"))
# Test "featureinfo_custom_template"
_attribute, _ = Attribute.objects.get_or_create(dataset=_dataset, attribute="name")
try:
_attribute.visible = True
_attribute.attribute_type = Attribute.TYPE_PROPERTY
_attribute.description = "The Name"
_attribute.attribute_label = "Name"
_attribute.display_order = 1
_attribute.save()
url = urljoin(f"{reverse('datasets-list')}/", f"{_dataset.pk}")
response = self.client.get(url, format="json")
self.assertIsNotNone(response.data["dataset"].get("featureinfo_custom_template"))
self.assertEqual(
response.data["dataset"].get("featureinfo_custom_template"),
'<div style="overflow-x:hidden"><div class="row"><div class="col-xs-6" style="font-weight: bold; word-wrap: break-word;">Name:</div>\
<div class="col-xs-6" style="word-wrap: break-word;">${properties[\'name\']}</div></div></div>',
)
_dataset.featureinfo_custom_template = "<div>Foo Bar</div>"
_dataset.save()
url = urljoin(f"{reverse('datasets-list')}/", f"{_dataset.pk}")
response = self.client.get(url, format="json")
self.assertIsNotNone(response.data["dataset"].get("featureinfo_custom_template"))
self.assertEqual(
response.data["dataset"].get("featureinfo_custom_template"),
'<div style="overflow-x:hidden"><div class="row"><div class="col-xs-6" style="font-weight: bold; word-wrap: break-word;">Name:</div>\
<div class="col-xs-6" style="word-wrap: break-word;">${properties[\'name\']}</div></div></div>',
)
_dataset.use_featureinfo_custom_template = True
_dataset.save()
url = urljoin(f"{reverse('datasets-list')}/", f"{_dataset.pk}")
response = self.client.get(url, format="json")
self.assertIsNotNone(response.data["dataset"].get("featureinfo_custom_template"))
self.assertEqual(response.data["dataset"].get("featureinfo_custom_template"), "<div>Foo Bar</div>")
finally:
_attribute.delete()
_dataset.featureinfo_custom_template = None
_dataset.use_featureinfo_custom_template = False
_dataset.save()
[docs]
def test_raw_HTML_stripped_properties(self):
"""
Ensure "raw_*" properties returns no HTML or carriage-return tag
"""
dataset = Dataset.objects.first()
dataset.abstract = (
'<p><em>No abstract provided</em>.</p>\r\n<p><img src="data:image/jpeg;base64,/9j/4AAQSkZJR/>'
)
dataset.constraints_other = '<p><span style="text-decoration: underline;">None</span></p>'
dataset.supplemental_information = "<p>No information provided í</p> <p>£682m</p>"
dataset.data_quality_statement = '<p><strong>OK</strong></p>\r\n<table style="border-collapse: collapse; width:\
85.2071%;" border="1">\r\n<tbody>\r\n<tr>\r\n<td style="width: 49.6528%;">1</td>\r\n<td style="width:\
50%;">2</td>\r\n</tr>\r\n<tr>\r\n<td style="width: 49.6528%;">a</td>\r\n<td style="width: 50%;">b</td>\
\r\n</tr>\r\n</tbody>\r\n</table>'
dataset.save()
# Admin
self.assertTrue(self.client.login(username="admin", password="admin"))
url = reverse("datasets-detail", kwargs={"pk": dataset.pk})
response = self.client.get(url, format="json")
self.assertEqual(response.status_code, 200)
self.assertEqual(int(response.data["dataset"]["pk"]), int(dataset.pk))
self.assertEqual(response.data["dataset"]["raw_abstract"], "No abstract provided.")
self.assertEqual(response.data["dataset"]["raw_constraints_other"], "None")
self.assertEqual(response.data["dataset"]["raw_supplemental_information"], "No information provided í £682m")
self.assertEqual(response.data["dataset"]["raw_data_quality_statement"], "OK 1 2 a b")
[docs]
def test_layer_replace_anonymous_should_raise_error(self):
layer = Dataset.objects.first()
url = reverse("datasets-replace-dataset", args=(layer.id,))
expected = {
"success": False,
"errors": ["Authentication credentials were not provided."],
"code": "not_authenticated",
}
response = self.client.patch(url)
self.assertEqual(403, response.status_code)
self.assertDictEqual(expected, response.json())
[docs]
def test_layer_replace_should_redirect_for_not_accepted_method(self):
layer = Dataset.objects.first()
url = reverse("datasets-replace-dataset", args=(layer.id,))
self.client.login(username="admin", password="admin")
response = self.client.post(url)
self.assertEqual(405, response.status_code)
response = self.client.get(url)
self.assertEqual(405, response.status_code)
[docs]
def test_layer_replace_should_raise_error_if_layer_does_not_exists(self):
url = reverse("datasets-replace-dataset", args=(999999999999999,))
expected = {"success": False, "errors": ["Layer with ID 999999999999999 is not available"], "code": "not_found"}
self.client.login(username="admin", password="admin")
response = self.client.patch(url)
self.assertEqual(404, response.status_code)
self.assertDictEqual(expected, response.json())
[docs]
def test_dataset_append_anonymous_should_raise_error(self):
layer = Dataset.objects.first()
url = reverse("datasets-append-dataset", args=(layer.id,))
expected = {
"success": False,
"errors": ["Authentication credentials were not provided."],
"code": "not_authenticated",
}
response = self.client.patch(url)
self.assertEqual(403, response.status_code)
self.assertDictEqual(expected, response.json())
[docs]
def test_dataset_append_should_redirect_for_not_accepted_method(self):
layer = Dataset.objects.first()
url = reverse("datasets-append-dataset", args=(layer.id,))
self.client.login(username="admin", password="admin")
response = self.client.post(url)
self.assertEqual(405, response.status_code)
response = self.client.get(url)
self.assertEqual(405, response.status_code)
[docs]
def test_dataset_append_should_raise_error_if_layer_does_not_exists(self):
url = reverse("datasets-append-dataset", args=(999999999999999,))
expected = {"success": False, "errors": ["Layer with ID 999999999999999 is not available"], "code": "not_found"}
self.client.login(username="admin", password="admin")
response = self.client.patch(url)
self.assertEqual(404, response.status_code, response.json())
self.assertDictEqual(expected, response.json())
@patch("geonode.layers.api.views.validate_input_source")
[docs]
def test_layer_replace_should_work(self, _validate_input_source):
_validate_input_source.return_value = True
admin = get_user_model().objects.get(username="admin")
if Dataset.objects.filter(name="single_point").exists():
"""
If the dataset already exists in the test env, we dont want that the test fail
so we rename it and then we will rollback the cnahge
"""
_dataset = Dataset.objects.get(name="single_point")
_dataset.name = "single_point_2"
_dataset.save()
try:
layer = create_dataset(
"single_point",
"single_point",
admin,
"Point",
)
except Exception as e:
if "There is already a layer named" not in e.args[0]:
raise e
else:
layer = create_single_dataset("single_point")
cnt = Dataset.objects.count()
layer.refresh_from_db()
logger.error(layer.alternate)
# renaming the file in the same way as the lasyer name
# the filename must be consiste with the layer name
github_path = "https://github.com/GeoNode/gisdata/tree/master/gisdata/data/good/vector/"
payload = {
"store_spatial_files": False,
"base_file": f"{github_path}/single_point.shp",
"dbf_file": f"{github_path}/single_point.dbf",
"shx_file": f"{github_path}/single_point.shx",
"prj_file": f"{github_path}/single_point.prj",
}
url = reverse("datasets-replace-dataset", args=(layer.id,))
self.client.login(username="admin", password="admin")
response = self.client.patch(url, data=payload)
self.assertEqual(200, response.status_code, response.json())
layer.refresh_from_db()
# evaluate that the number of available layer is not changed
self.assertEqual(Dataset.objects.count(), cnt)
[docs]
def test_patch_processor(self):
layer = Dataset.objects.first()
url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}")
self.client.login(username="admin", password="admin")
get_user_model().objects.get_or_create(username="ninja")
get_user_model().objects.get_or_create(username="turtle")
users = get_user_model().objects.exclude(pk=-1)
user_ids = [user.pk for user in users]
patch_data = {"processor": [{"pk": uid} for uid in user_ids]}
response = self.client.patch(url, data=patch_data, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id in [processor.get("pk") for processor in response.json().get("dataset").get("processor")]
for user_id in user_ids
)
)
# Resetting all processors
response = self.client.patch(url, data={"processor": []}, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id not in [processor.get("pk") for processor in response.json().get("dataset").get("processor")]
for user_id in user_ids
)
)
[docs]
def test_patch_publisher(self):
layer = Dataset.objects.first()
url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}")
self.client.login(username="admin", password="admin")
get_user_model().objects.get_or_create(username="ninja")
get_user_model().objects.get_or_create(username="turtle")
users = get_user_model().objects.exclude(pk=-1)
user_ids = [user.pk for user in users]
patch_data = {"publisher": [{"pk": uid} for uid in user_ids]}
response = self.client.patch(url, data=patch_data, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id in [publisher.get("pk") for publisher in response.json().get("dataset").get("publisher")]
for user_id in user_ids
)
)
# Resetting all publishers
response = self.client.patch(url, data={"publisher": []}, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id not in [publisher.get("pk") for publisher in response.json().get("dataset").get("publisher")]
for user_id in user_ids
)
)
[docs]
def test_patch_custodian(self):
layer = Dataset.objects.first()
url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}")
self.client.login(username="admin", password="admin")
get_user_model().objects.get_or_create(username="ninja")
get_user_model().objects.get_or_create(username="turtle")
users = get_user_model().objects.exclude(pk=-1)
user_ids = [user.pk for user in users]
patch_data = {"custodian": [{"pk": uid} for uid in user_ids]}
response = self.client.patch(url, data=patch_data, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id in [custodian.get("pk") for custodian in response.json().get("dataset").get("custodian")]
for user_id in user_ids
)
)
# Resetting all custodians
response = self.client.patch(url, data={"custodian": []}, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id not in [custodian.get("pk") for custodian in response.json().get("dataset").get("custodian")]
for user_id in user_ids
)
)
[docs]
def test_patch_distributor(self):
layer = Dataset.objects.first()
url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}")
self.client.login(username="admin", password="admin")
get_user_model().objects.get_or_create(username="ninja")
get_user_model().objects.get_or_create(username="turtle")
users = get_user_model().objects.exclude(pk=-1)
user_ids = [user.pk for user in users]
patch_data = {"distributor": [{"pk": uid} for uid in user_ids]}
response = self.client.patch(url, data=patch_data, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id in [distributor.get("pk") for distributor in response.json().get("dataset").get("distributor")]
for user_id in user_ids
)
)
# Resetting all distributers
response = self.client.patch(url, data={"distributor": []}, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id
not in [distributor.get("pk") for distributor in response.json().get("dataset").get("distributor")]
for user_id in user_ids
)
)
[docs]
def test_patch_resource_user(self):
layer = Dataset.objects.first()
url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}")
self.client.login(username="admin", password="admin")
get_user_model().objects.get_or_create(username="ninja")
get_user_model().objects.get_or_create(username="turtle")
users = get_user_model().objects.exclude(pk=-1)
user_ids = [user.pk for user in users]
patch_data = {"resource_user": [{"pk": uid} for uid in user_ids]}
response = self.client.patch(url, data=patch_data, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id
in [resource_user.get("pk") for resource_user in response.json().get("dataset").get("resource_user")]
for user_id in user_ids
)
)
# Resetting all resource users
response = self.client.patch(url, data={"resource_user": []}, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id
not in [
resource_user.get("pk") for resource_user in response.json().get("dataset").get("resource_user")
]
for user_id in user_ids
)
)
[docs]
def test_patch_resource_provider(self):
layer = Dataset.objects.first()
url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}")
self.client.login(username="admin", password="admin")
get_user_model().objects.get_or_create(username="ninja")
get_user_model().objects.get_or_create(username="turtle")
users = get_user_model().objects.exclude(pk=-1)
user_ids = [user.pk for user in users]
patch_data = {"resource_provider": [{"pk": uid} for uid in user_ids]}
response = self.client.patch(url, data=patch_data, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id
in [
resource_provider.get("pk")
for resource_provider in response.json().get("dataset").get("resource_provider")
]
for user_id in user_ids
)
)
# Resetting all principal investigator
response = self.client.patch(url, data={"resource_provider": []}, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id
not in [
resource_provider.get("pk")
for resource_provider in response.json().get("dataset").get("resource_provider")
]
for user_id in user_ids
)
)
[docs]
def test_patch_originator(self):
layer = Dataset.objects.first()
url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}")
self.client.login(username="admin", password="admin")
get_user_model().objects.get_or_create(username="ninja")
get_user_model().objects.get_or_create(username="turtle")
users = get_user_model().objects.exclude(pk=-1)
user_ids = [user.pk for user in users]
patch_data = {"originator": [{"pk": uid} for uid in user_ids]}
response = self.client.patch(url, data=patch_data, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id in [originator.get("pk") for originator in response.json().get("dataset").get("originator")]
for user_id in user_ids
)
)
# Resetting all originators
response = self.client.patch(url, data={"originator": []}, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id not in [originator.get("pk") for originator in response.json().get("dataset").get("originator")]
for user_id in user_ids
)
)
[docs]
def test_patch_principal_investigator(self):
layer = Dataset.objects.first()
url = urljoin(f"{reverse('datasets-list')}/", f"{layer.id}")
self.client.login(username="admin", password="admin")
get_user_model().objects.get_or_create(username="ninja")
get_user_model().objects.get_or_create(username="turtle")
users = get_user_model().objects.exclude(pk=-1)
user_ids = [user.pk for user in users]
patch_data = {"principal_investigator": [{"pk": uid} for uid in user_ids]}
response = self.client.patch(url, data=patch_data, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id
in [
principal_investigator.get("pk")
for principal_investigator in response.json().get("dataset").get("principal_investigator")
]
for user_id in user_ids
)
)
# Resetting all principal investigator
response = self.client.patch(url, data={"principal_investigator": []}, format="json")
self.assertEqual(200, response.status_code)
self.assertTrue(
all(
user_id
not in [
principal_investigator.get("pk")
for principal_investigator in response.json().get("dataset").get("principal_investigator")
]
for user_id in user_ids
)
)
[docs]
def test_unsupported_file_throws_error(self):
layer = Dataset.objects.first()
url = reverse("datasets-replace-metadata", args=(layer.id,))
self.client.login(username="admin", password="admin")
data = '<?xml version="1.0" encoding="UTF-8"?><invalid></invalid>'
f = BytesIO(bytes(data, encoding="utf-8"))
f.name = "metadata.xml"
put_data = {"metadata_file": f}
response = self.client.put(url, data=put_data)
self.assertEqual(500, response.status_code)
[docs]
def test_permissions_for_not_permitted_user(self):
get_user_model().objects.create_user(
username="some_user",
password="some_password",
email="some_user@geonode.org",
)
layer = Dataset.objects.first()
url = reverse("datasets-replace-metadata", args=(layer.id,))
self.client.login(username="some_user", password="some_password")
uuid = layer.uuid
data = open(self.exml_path).read()
data = data.replace("7cfbc42c-efa7-431c-8daa-1399dff4cd19", uuid)
f = BytesIO(bytes(data, encoding="utf-8"))
f.name = "metadata.xml"
put_data = {"metadata_file": f}
response = self.client.put(url, data=put_data)
self.assertEqual(403, response.status_code)
[docs]
def test_permissions_for_permitted_user(self):
another_non_admin_user = get_user_model().objects.create_user(
username="some_other_user",
password="some_other_password",
email="some_other_user@geonode.org",
)
layer = Dataset.objects.first()
assign_perm("base.change_resourcebase_metadata", another_non_admin_user, layer.get_self_resource())
url = reverse("datasets-replace-metadata", args=(layer.id,))
self.client.login(username="some_other_user", password="some_other_password")
uuid = layer.uuid
data = open(self.exml_path).read()
data = data.replace("7cfbc42c-efa7-431c-8daa-1399dff4cd19", uuid)
f = BytesIO(bytes(data, encoding="utf-8"))
f.name = "metadata.xml"
put_data = {"metadata_file": f}
response = self.client.put(url, data=put_data)
self.assertEqual(200, response.status_code)
[docs]
def test_download_api(self):
dataset = create_single_dataset("test_dataset")
url = reverse("datasets-detail", kwargs={"pk": dataset.pk})
response = self.client.get(url)
self.assertTrue(response.status_code == 200)
data = response.json()["dataset"]
download_url_data = data["download_urls"][0]
download_url = reverse("dataset_download", args=[dataset.alternate])
self.assertEqual(download_url_data["default"], True)
self.assertEqual(download_url_data["ajax_safe"], True)
self.assertEqual(download_url_data["url"], download_url)
link = Link(link_type="original", url="https://myoriginal.org", resource=dataset)
link.save()
response = self.client.get(url)
data = response.json()["dataset"]
download_url_data = data["download_urls"][0]
download_url = reverse("dataset_download", args=[dataset.alternate])
self.assertEqual(download_url_data["default"], True)
self.assertEqual(download_url_data["ajax_safe"], False)
self.assertEqual(download_url_data["url"], "https://myoriginal.org")