#########################################################################
#
# Copyright (C) 2020 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import os
import copy
import shutil
from unittest import TestCase
import zipfile
import tempfile
from django.test import override_settings
from osgeo import ogr
from unittest.mock import patch
from datetime import datetime, timedelta
from django.contrib.gis.geos import GEOSGeometry, Polygon
from django.contrib.auth import get_user_model
from django.core.management import call_command
from geonode.maps.models import Dataset
from geonode.layers.models import Attribute
from geonode.geoserver.helpers import set_attributes
from geonode.tests.base import GeoNodeBaseTestSupport
from geonode.br.management.commands.utils.utils import ignore_time
from geonode.utils import copy_tree, fixup_shp_columnnames, get_supported_datasets_file_types, unzip_file, bbox_to_wkt
from geonode import settings
[docs]
class TestCopyTree(GeoNodeBaseTestSupport):
@patch("shutil.copy2")
@patch("os.path.getmtime", return_value=(datetime.now() - timedelta(days=1)).timestamp())
@patch("os.listdir", return_value=["Erling_Haaland.jpg"])
@patch("os.path.isdir", return_value=False)
[docs]
def test_backup_of_root_files_with_modification_dates_meeting_less_than_filter_criteria(
self, patch_isdir, patch_listdir, patch_getmtime, patch_shutil_copy2
):
"""
Test that all root directories whose modification dates meet the 'data_dt_filter' less-than iso timestamp are backed-up successfully
"""
copy_tree("/src", "/dst", ignore=ignore_time("<", datetime.now().isoformat()))
self.assertTrue(patch_shutil_copy2.called)
@patch("shutil.copy2")
@patch("os.path.getmtime", return_value=(datetime.now() + timedelta(days=1)).timestamp())
@patch("os.listdir", return_value=["Sancho.jpg"])
@patch("os.path.isdir", return_value=False)
[docs]
def test_skipped_backup_of_root_files_with_modification_dates_not_meeting_less_than_filter_criteria(
self, patch_isdir, patch_listdir, patch_getmtime, patch_shutil_copy2
):
"""
Test that all root directories whose modification dates do not meet the 'data_dt_filter' less-than iso timestamp are not backed-up
"""
copy_tree("/src", "/dst", ignore=ignore_time("<", datetime.now().isoformat()))
self.assertFalse(patch_shutil_copy2.called)
@patch("shutil.copy2")
@patch("os.path.getmtime", return_value=(datetime.now() - timedelta(days=1)).timestamp())
@patch("os.listdir", return_value=["Saala.jpg"])
@patch("os.path.isdir", return_value=False)
[docs]
def test_backup_of_root_files_with_modification_dates_meeting_greater_than_filter_criteria(
self, patch_isdir, patch_listdir, patch_getmtime, patch_shutil_copy2
):
"""
Test that all root directories whose modification dates do not meet the 'data_dt_filter' greater-than iso timestamp are backed-up successfully
"""
copy_tree("/src", "/dst", ignore=ignore_time(">", datetime.now().isoformat()))
self.assertFalse(patch_shutil_copy2.called)
@patch("shutil.copy2")
@patch("os.path.getmtime", return_value=(datetime.now() + timedelta(days=1)).timestamp())
@patch("os.listdir", return_value=["Sadio.jpg"])
@patch("os.path.isdir", return_value=False)
[docs]
def test_skipped_backup_of_root_files_with_modification_dates_not_meeting_greater_than_filter_criteria(
self, patch_isdir, patch_listdir, patch_getmtime, patch_shutil_copy2
):
"""
Test that all root directories whose modification dates do not meet the 'data_dt_filter' less-than iso timestamp are not backed-up
"""
copy_tree("/src", "/dst", ignore=ignore_time(">", datetime.now().isoformat()))
self.assertTrue(patch_shutil_copy2.called)
@patch("os.path.exists", return_value=True)
@patch("shutil.copytree")
@patch("os.path.getmtime", return_value=0)
@patch("os.listdir", return_value=["an_awesome_directory"])
@patch("os.path.isdir", return_value=True)
[docs]
def test_backup_of_child_directories(
self, patch_isdir, patch_listdir, patch_getmtime, patch_shutil_copytree, path_os_exists
):
"""
Test that all directories which meet the 'ignore criteria are backed-up'
"""
copy_tree("/src", "/dst", ignore=ignore_time(">=", datetime.now().isoformat()))
self.assertTrue(patch_shutil_copytree.called)
[docs]
class TestFixupShp(GeoNodeBaseTestSupport):
[docs]
def test_fixup_shp_columnnames(self):
project_root = os.path.abspath(os.path.dirname(__file__))
dataset_zip = os.path.join(project_root, "data", "ming_female_1.zip")
self.failUnless(zipfile.is_zipfile(dataset_zip))
dataset_shp = unzip_file(dataset_zip)
expected_fieldnames = [
"ID",
"_f",
"__1",
"__2",
"m",
"_",
"_M2",
"_M2_1",
"l",
"x",
"y",
"_WU",
"_1",
]
_, _, fieldnames = fixup_shp_columnnames(dataset_shp, "windows-1258")
inDriver = ogr.GetDriverByName("ESRI Shapefile")
inDataSource = inDriver.Open(dataset_shp, 0)
inLayer = inDataSource.GetLayer()
inLayerDefn = inLayer.GetLayerDefn()
self.assertEqual(inLayerDefn.GetFieldCount(), len(expected_fieldnames))
for i, fn in enumerate(expected_fieldnames):
self.assertEqual(inLayerDefn.GetFieldDefn(i).GetName(), fn)
inDataSource.Destroy()
# Cleanup temp dir
shp_parent = os.path.dirname(dataset_shp)
if shp_parent.startswith(tempfile.gettempdir()):
shutil.rmtree(shp_parent, ignore_errors=True)
[docs]
class TestSetAttributes(GeoNodeBaseTestSupport):
[docs]
def setUp(self):
super().setUp()
# Load users to log in as
call_command("loaddata", "people_data", verbosity=0)
self.user = get_user_model().objects.get(username="admin")
[docs]
def test_set_attributes_creates_attributes(self):
"""
Test utility function set_attributes() which creates Attribute instances attached to a Dataset instance.
"""
# Creating a dataset requires being logged in
self.client.login(username="norman", password="norman")
# Create dummy dataset to attach attributes to
_l = Dataset.objects.create(
owner=self.user,
name="dummy_dataset",
bbox_polygon=Polygon.from_bbox((-180, -90, 180, 90)),
srid="EPSG:4326",
)
attribute_map = [
["id", "Integer"],
["date", "IntegerList"],
["enddate", "Real"],
["date_as_date", "xsd:dateTime"],
]
# attribute_map gets modified as a side-effect of the call to set_attributes()
expected_results = copy.deepcopy(attribute_map)
# set attributes for resource
set_attributes(_l, attribute_map.copy())
# 2 items in attribute_map should translate into 2 Attribute instances
self.assertEqual(_l.attributes.count(), len(expected_results))
# The name and type should be set as provided by attribute map
for a in _l.attributes:
self.assertIn([a.attribute, a.attribute_type], expected_results)
# GeoNode cleans up local duplicated attributes
for attribute in attribute_map:
field = attribute[0]
ftype = attribute[1]
if field:
la = Attribute.objects.create(dataset=_l, attribute=field)
la.visible = ftype.find("gml:") != 0
la.attribute_type = ftype
la.description = None
la.attribute_label = None
la.display_order = 0
# set attributes for resource
set_attributes(_l, attribute_map.copy())
# 2 items in attribute_map should translate into 2 Attribute instances
self.assertEqual(_l.attributes.count(), len(expected_results))
# The name and type should be set as provided by attribute map
for a in _l.attributes:
self.assertIn([a.attribute, a.attribute_type], expected_results)
# Test that deleted attributes from GeoServer gets deleted on GeoNode too
attribute_map = [
["id", "Integer"],
["date_as_date", "xsd:dateTime"],
]
# attribute_map gets modified as a side-effect of the call to set_attributes()
expected_results = copy.deepcopy(attribute_map)
# set attributes for resource
set_attributes(_l, attribute_map.copy())
# 2 items in attribute_map should translate into 2 Attribute instances
self.assertEqual(_l.attributes.count(), len(expected_results))
# The name and type should be set as provided by attribute map
for a in _l.attributes:
self.assertIn([a.attribute, a.attribute_type], expected_results)
[docs]
class TestSupportedTypes(TestCase):
[docs]
def setUp(self):
self.replaced = [
{
"id": "shp",
"label": "Replaced type",
"format": "vector",
"ext": ["shp"],
"requires": ["shp", "prj", "dbf", "shx"],
"optional": ["xml", "sld"],
},
]
@override_settings(
ADDITIONAL_DATASET_FILE_TYPES=[
{"id": "dummy_type", "label": "Dummy Type", "format": "dummy", "ext": ["dummy"]},
]
)
[docs]
def test_should_append_additional_type_if_config_is_provided(self):
prev_count = len(settings.SUPPORTED_DATASET_FILE_TYPES)
supported_types = get_supported_datasets_file_types()
supported_keys = [t.get("id") for t in supported_types]
self.assertIn("dummy_type", supported_keys)
self.assertEqual(len(supported_keys), prev_count + 1)
@override_settings(
ADDITIONAL_DATASET_FILE_TYPES=[
{
"id": "shp",
"label": "Replaced type",
"format": "vector",
"ext": ["shp"],
"requires": ["shp", "prj", "dbf", "shx"],
"optional": ["xml", "sld"],
},
]
)
[docs]
def test_should_replace_the_type_id_if_already_exists(self):
prev_count = len(settings.SUPPORTED_DATASET_FILE_TYPES)
supported_types = get_supported_datasets_file_types()
supported_keys = [t.get("id") for t in supported_types]
self.assertIn("shp", supported_keys)
self.assertEqual(len(supported_keys), prev_count)
shp_type = [t for t in supported_types if t["id"] == "shp"][0]
self.assertEqual(shp_type["label"], "Replaced type")
[docs]
class TestRegionsCrossingDateLine(TestCase):
[docs]
def setUp(self):
self.test_region_coords = [
[112.921111999999994, -108.872910000000005, -54.640301000000001, 20.417120000000001], # Pacific
[174.866196000000002, -178.203156000000007, -21.017119999999998, -12.466220000000000], # Fiji
[158.418335000000013, -150.208359000000002, -11.437030000000000, 4.719560000000000], # Kiribati
[165.883803999999998, -175.987198000000006, -52.618591000000002, -29.209969999999998], # New Zeland
]
self.region_across_idl = [
112.921111999999994,
-108.872910000000005,
-54.640301000000001,
20.417120000000001,
] # Pacific
self.region_not_across_idl = [
-31.266000999999999,
39.869301000000000,
27.636310999999999,
81.008797000000001,
] # Europe
self.dataset_crossing = GEOSGeometry(
"POLYGON ((-179.30100799101168718 -17.31310259828852693, -170.41740336774466869 -9.63481116511300328, -164.30155495876181249 -19.7237289784715415, \
-177.91712988386967709 -30.43762400150689018, -179.30100799101168718 -17.31310259828852693))",
srid=4326,
)
self.dataset_not_crossing = GEOSGeometry(
"POLYGON ((-41.86461347546176626 5.43160371103088835, 34.20404118809119609 4.3602142087273279, 9.56208263510924894 -48.8521310723496498, \
-42.22174330956295307 -32.42415870369499942, -41.86461347546176626 5.43160371103088835))",
srid=4326,
)
[docs]
def test_region_across_dateline_do_not_intersect_areas_outside(self):
for i, region_coords in enumerate(self.test_region_coords):
geographic_bounding_box = bbox_to_wkt(
region_coords[0], region_coords[1], region_coords[2], region_coords[3]
)
_, wkt = geographic_bounding_box.split(";")
poly = GEOSGeometry(wkt, srid=4326)
self.assertFalse(
poly.intersection(self.dataset_crossing).empty, f"True intersection not detected for region {i}"
)
self.assertTrue(poly.intersection(self.dataset_not_crossing).empty, "False intersection detected")
[docs]
def test_region_wkt_multipolygon_if_across_idl(self):
bbox_across_idl = bbox_to_wkt(
self.region_not_across_idl[0],
self.region_not_across_idl[1],
self.region_not_across_idl[2],
self.region_not_across_idl[3],
)
_, wkt = bbox_across_idl.split(";")
poly = GEOSGeometry(wkt, srid=4326)
self.assertEqual(poly.geom_type, "Polygon", f"Expexted 'Polygon' type but received {poly.geom_type}")
bbox_across_idl = bbox_to_wkt(
self.region_across_idl[0], self.region_across_idl[1], self.region_across_idl[2], self.region_across_idl[3]
)
_, wkt = bbox_across_idl.split(";")
poly = GEOSGeometry(wkt, srid=4326)
self.assertEqual(poly.geom_type, "MultiPolygon", f"Expexted 'MultiPolygon' type but received {poly.geom_type}")