Source code for geonode.harvesting.tests.test_models

##############################################
#
# Copyright (C) 2021 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import datetime as dt
from unittest import mock

from django.contrib.auth import get_user_model
from django.test import SimpleTestCase
from geonode.tests.base import GeoNodeBaseTestSupport

from .. import models


[docs] class HarvesterTestCase(GeoNodeBaseTestSupport):
[docs] remote_url = "test.com"
[docs] name = "This is geonode harvester"
[docs] user = get_user_model().objects.get(username="AnonymousUser")
[docs] harvester_type = "geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester"
[docs] def setUp(self): super().setUp() self.harvester = models.Harvester.objects.create( remote_url=self.remote_url, name=self.name, default_owner=self.user, harvester_type=self.harvester_type )
[docs] def test_get_worker_works(self): worker = self.harvester.get_harvester_worker() self.assertEqual(worker.remote_url, self.remote_url)
@mock.patch("geonode.harvesting.models.jsonschema") @mock.patch("geonode.harvesting.models.import_string")
[docs] def test_validate_worker_configuration(self, mock_import_string, mock_jsonschema): extra_config_schema = "fake_config_schema" mock_worker_class = mock.MagicMock() mock_worker_class.get_extra_config_schema.return_value = extra_config_schema mock_import_string.return_value = mock_worker_class harvester_type = "fake_harvester_type" configuration = {"fake_key": "fake_configuration"} models.validate_worker_configuration(harvester_type, configuration) mock_import_string.assert_called_with(harvester_type) mock_worker_class.get_extra_config_schema.assert_called() mock_jsonschema.validate.assert_called_with(configuration, extra_config_schema)
@mock.patch("geonode.harvesting.models.timezone")
[docs] def test_get_next_check_availability_dispatch_time(self, mock_timezone): fixtures = [ ("2020-01-01T00:00:00", "2020-01-01T00:05:00", 10, "2020-01-01T00:10:00"), ("2020-01-01T00:00:00", "2020-01-01T00:11:00", 10, "2020-01-01T00:11:00"), ] for last_checked, now, frequency, expected in fixtures: mock_timezone.now.return_value = dt.datetime.fromisoformat(now).replace(tzinfo=dt.timezone.utc) harvester = models.Harvester(check_availability_frequency=frequency) harvester.last_checked_availability = dt.datetime.fromisoformat(last_checked).replace( tzinfo=dt.timezone.utc ) result = harvester.get_next_check_availability_dispatch_time() expected_result = dt.datetime.fromisoformat(expected).replace(tzinfo=dt.timezone.utc) self.assertEqual(result, expected_result)
@mock.patch("geonode.harvesting.models.timezone")
[docs] def test_get_next_dispatch_time(self, mock_timezone): fixtures = [ ("2020-01-01T00:00:00", "2020-01-01T00:05:00", 10, "2020-01-01T00:10:00"), ("2020-01-01T00:00:00", "2020-01-01T00:11:00", 10, "2020-01-01T00:11:00"), ] for last_check, now, frequency, expected in fixtures: mock_timezone.now.return_value = dt.datetime.fromisoformat(now).replace(tzinfo=dt.timezone.utc) with mock.patch.object( models.Harvester, "latest_refresh_session", new_callable=mock.PropertyMock ) as mock_latest_refresh_session: mock_latest_refresh_session.return_value = mock.MagicMock( started=dt.datetime.fromisoformat(last_check).replace(tzinfo=dt.timezone.utc) ) harvester = models.Harvester( harvesting_session_update_frequency=10, refresh_harvestable_resources_update_frequency=10, ) result = harvester._get_next_dispatch_time( models.AsynchronousHarvestingSession.TYPE_DISCOVER_HARVESTABLE_RESOURCES ) expected_result = dt.datetime.fromisoformat(expected).replace(tzinfo=dt.timezone.utc) self.assertEqual(result, expected_result)
[docs] class AsynchronousHarvestingSessionTestCase(GeoNodeBaseTestSupport):
[docs] remote_url = "test.com"
[docs] name = "This is geonode harvester"
[docs] user = get_user_model().objects.get(username="AnonymousUser")
[docs] harvester_type = "geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester"
[docs] def setUp(self): super().setUp() self.harvester = models.Harvester.objects.create( remote_url=self.remote_url, name=self.name, default_owner=self.user, harvester_type=self.harvester_type ) self.harvesting_session = models.AsynchronousHarvestingSession.objects.create( harvester=self.harvester, session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING )
[docs] def test_check_attributes(self): """ Test attributes of harvester_session after created. """ self.assertIsNotNone(self.harvesting_session.pk) self.assertEqual(self.harvesting_session.harvester, self.harvester) self.assertEqual(self.harvesting_session.records_done, 0)
[docs] class HarvestableResourceTestCase(GeoNodeBaseTestSupport):
[docs] unique_identifier = "id"
[docs] title = "Test"
[docs] remote_url = "test.com"
[docs] name = "This is geonode harvester"
[docs] user = get_user_model().objects.get(username="AnonymousUser")
[docs] harvester_type = "geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester"
[docs] def setUp(self): super().setUp() self.harvester = models.Harvester.objects.create( remote_url=self.remote_url, name=self.name, default_owner=self.user, harvester_type=self.harvester_type ) self.harvestable_resource = models.HarvestableResource.objects.create( unique_identifier=self.unique_identifier, title=self.title, harvester=self.harvester, last_refreshed=dt.datetime.now(), )
[docs] def test_check_attributes(self): self.assertIsNotNone(self.harvestable_resource.pk) self.assertEqual(self.harvestable_resource.harvester, self.harvester) self.assertEqual(self.harvestable_resource.title, self.title) self.assertEqual(self.harvestable_resource.unique_identifier, self.unique_identifier) self.assertFalse(self.harvestable_resource.should_be_harvested) self.assertEqual(self.harvestable_resource.status, models.HarvestableResource.STATUS_READY)
[docs] class WorkerConfigValidationTestCase(SimpleTestCase): @mock.patch("geonode.harvesting.models.jsonschema") @mock.patch("geonode.harvesting.models.import_string")
[docs] def test_validate_worker_configuration(self, mock_import_string, mock_jsonschema): extra_config_schema = "fake_config_schema" mock_worker_class = mock.MagicMock() mock_worker_class.get_extra_config_schema.return_value = extra_config_schema mock_import_string.return_value = mock_worker_class harvester_type = "fake_harvester_type" configuration = {"somekey": "fake_configuration"} models.validate_worker_configuration(harvester_type, configuration) mock_import_string.assert_called_with(harvester_type) mock_worker_class.get_extra_config_schema.assert_called() mock_jsonschema.validate.assert_called_with(configuration, extra_config_schema)