Source code for geonode.harvesting.tests.test_tasks

##############################################
#
# Copyright (C) 2021 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
from unittest import mock

from django.contrib.auth import get_user_model
from django.utils.timezone import now
from geonode.tests.base import GeoNodeBaseTestSupport

from .. import (
    models,
    tasks,
)


[docs] class TasksTestCase(GeoNodeBaseTestSupport):
[docs] harvester: models.Harvester
[docs] harvester_remote_url = "fake url"
[docs] harvester_name = "harvester1"
[docs] harvester_owner = get_user_model().objects.get(username="AnonymousUser")
[docs] harvester_type = "geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester"
@classmethod
[docs] def setUpTestData(cls): cls.harvester = models.Harvester.objects.create( remote_url=cls.harvester_remote_url, name=cls.harvester_name, status=models.Harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES, default_owner=cls.harvester_owner, harvester_type=cls.harvester_type, ) cls.harvesting_session = models.AsynchronousHarvestingSession.objects.create( harvester=cls.harvester, session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING ) for index in range(3): models.HarvestableResource.objects.create( unique_identifier=f"fake-identifier-{index}", title=f"fake-title-{index}", harvester=cls.harvester, remote_resource_type="fake-remote-resource-type", last_refreshed=now(), )
@mock.patch("geonode.harvesting.tasks.update_asynchronous_session")
[docs] def test_harvest_resource_updates_geonode_when_remote_resource_exists(self, mock_update_asynchronous_session): """Test that `worker.get_resource()` is called by the `_harvest_resource()` task and that the related workflow is called too. Verify that `worker.get_resource()` is always called. Then verify that if the result of `worker.get_resource()` is not `None`, the `worker.update_geonode_resource()` is called and `worker.update_harvesting_session()` is called too. """ harvestable_resource_id = "fake id" with mock.patch("geonode.harvesting.tasks.models") as mock_models: mock_worker = mock.MagicMock() mock_worker.get_resource.return_value = "fake_gotten_resource" mock_worker.should_copy_resource.return_value = False mock_harvestable_resource = mock.MagicMock(models.HarvestableResource) mock_harvestable_resource.harvester.get_harvester_worker.return_value = mock_worker mock_models.HarvestableResource.objects.get.return_value = mock_harvestable_resource tasks._harvest_resource(harvestable_resource_id, self.harvesting_session.id) mock_models.HarvestableResource.objects.get.assert_called_with(pk=harvestable_resource_id) mock_worker.get_resource.assert_called() mock_worker.update_geonode_resource.assert_called() mock_update_asynchronous_session.assert_called()
[docs] def test_harvest_resource_does_not_update_geonode_when_remote_resource_does_not_exist(self): """Test that the worker does not try to update existing GeoNode resources when the remote resource cannot be harvested.""" harvestable_resource_id = "fake id" with mock.patch("geonode.harvesting.tasks.models") as mock_models: mock_worker = mock.MagicMock() mock_worker.get_resource.return_value = None # this means the remote resource was not harvested mock_worker.should_copy_resource.return_value = False mock_harvestable_resource = mock.MagicMock(models.HarvestableResource) mock_harvestable_resource.harvester.get_harvester_worker.return_value = mock_worker mock_models.HarvestableResource.objects.get.return_value = mock_harvestable_resource tasks._harvest_resource(harvestable_resource_id, self.harvesting_session.id) mock_models.HarvestableResource.objects.get.assert_called_with(pk=harvestable_resource_id) mock_worker.get_resource.assert_called() mock_worker.update_geonode_resource.assert_not_called()
[docs] def test_finish_harvesting_updates_harvester_status(self): tasks._finish_harvesting(self.harvesting_session.id) self.harvester.refresh_from_db() self.harvesting_session.refresh_from_db() self.assertEqual(self.harvester.status, models.Harvester.STATUS_READY) self.assertIsNotNone(self.harvesting_session.ended)
[docs] def test_handle_harvesting_error_cleans_up_harvest_execution(self): tasks._handle_harvesting_error( None, harvester_id=self.harvester.id, harvesting_session_id=self.harvesting_session.id ) self.harvester.refresh_from_db() self.harvesting_session.refresh_from_db() self.assertEqual(self.harvester.status, models.Harvester.STATUS_READY) self.assertIsNotNone(self.harvesting_session.ended)
@mock.patch("geonode.harvesting.tasks.models.Harvester")
[docs] def test_check_harvester_available(self, mock_harvester_model): mock_harvester = mock.MagicMock(spec=models.Harvester).return_value mock_harvester_model.objects.get.return_value = mock_harvester tasks.check_harvester_available(1000) mock_harvester.update_availability.assert_called()
@mock.patch("geonode.harvesting.tasks._handle_harvestable_resources_update_error") @mock.patch("geonode.harvesting.tasks._finish_harvestable_resources_update") @mock.patch("geonode.harvesting.tasks._update_harvestable_resources_batch") @mock.patch("geonode.harvesting.tasks.chord") @mock.patch("geonode.harvesting.tasks.models")
[docs] def test_update_harvestable_resources_sends_batched_requests( self, mock_models, mock_chord, mock_batch, mock_finalizer, mock_error_handler ): """Verify that the `update_harvestable_resources` task creates a celery chord with the batched task, a finalizer and an error handler.""" mock_worker = mock.MagicMock() mock_worker.get_num_available_resources.return_value = 1 mock_harvester = mock.MagicMock(models.Harvester) mock_models.Harvester.objects.get.return_value = mock_harvester mock_harvester.get_harvester_worker.return_value = mock_worker tasks.update_harvestable_resources("fake harvester id") mock_batch.signature.assert_called() mock_finalizer.signature.assert_called() mock_error_handler.signature.assert_called() mock_chord.assert_called() mock_chord.return_value.apply_async.assert_called()
[docs] def test_harvesting_scheduler(self): mock_harvester = mock.MagicMock(spec=models.Harvester).return_value mock_harvester.scheduling_enabled = True mock_harvester.is_harvestable_resources_refresh_due.return_value = True mock_harvester.is_harvesting_due.return_value = True with mock.patch("geonode.harvesting.tasks.models.Harvester.objects") as mock_qs: mock_qs.all.return_value = [mock_harvester] tasks.harvesting_scheduler() mock_harvester.is_availability_check_due.assert_called() mock_harvester.is_harvestable_resources_refresh_due.assert_called() mock_harvester.initiate_update_harvestable_resources.assert_called() mock_harvester.is_harvesting_due.assert_called() mock_harvester.initiate_perform_harvesting.assert_called()