geonode.harvesting.tests.test_tasks =================================== .. py:module:: geonode.harvesting.tests.test_tasks Classes ------- .. autoapisummary:: geonode.harvesting.tests.test_tasks.TasksTestCase Module Contents --------------- .. py:class:: TasksTestCase Bases: :py:obj:`geonode.tests.base.GeoNodeBaseTestSupport` .. py:attribute:: harvester :type: geonode.harvesting.models.Harvester .. py:attribute:: harvester_remote_url :value: 'fake url' .. py:attribute:: harvester_name :value: 'harvester1' .. py:attribute:: harvester_owner .. py:attribute:: harvester_type :value: 'geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester' .. py:method:: setUpTestData() :classmethod: .. py:method:: test_harvest_resource_updates_geonode_when_remote_resource_exists(mock_update_asynchronous_session) Test that `worker.get_resource()` is called by the `_harvest_resource()` task and that the related workflow is called too. Verify that `worker.get_resource()` is always called. Then verify that if the result of `worker.get_resource()` is not `None`, the `worker.update_geonode_resource()` is called and `worker.update_harvesting_session()` is called too. .. py:method:: test_harvest_resource_does_not_update_geonode_when_remote_resource_does_not_exist() Test that the worker does not try to update existing GeoNode resources when the remote resource cannot be harvested. .. py:method:: test_finish_harvesting_updates_harvester_status() .. py:method:: test_handle_harvesting_error_cleans_up_harvest_execution() .. py:method:: test_check_harvester_available(mock_harvester_model) .. py:method:: test_update_harvestable_resources_sends_batched_requests(mock_models, mock_chord, mock_batch, mock_finalizer, mock_error_handler) Verify that the `update_harvestable_resources` task creates a celery chord with the batched task, a finalizer and an error handler. .. py:method:: test_harvesting_scheduler()