Source code for geonode.storage.tests

#########################################################################
#
# Copyright (C) 2021 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this profgram. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import io
import os
import shutil
from django.test import override_settings
import gisdata
from unittest.mock import patch

from django.test.testcases import SimpleTestCase, TestCase

from geonode.utils import mkdtemp
from geonode.storage.aws import AwsStorageManager
from geonode.storage.exceptions import DataRetrieverExcepion
from geonode.storage.manager import StorageManager
from geonode.storage.gcs import GoogleStorageManager
from geonode.storage.dropbox import DropboxStorageManager
from geonode.base.populate_test_data import create_single_dataset
from geonode.tests.base import GeoNodeBaseTestSupport


[docs] class TestDropboxStorageManager(SimpleTestCase):
[docs] def setUp(self): with self.settings(DROPBOX_OAUTH2_TOKEN="auth_token"): self.sut = DropboxStorageManager()
@patch("geonode.storage.dropbox.DropBoxStorage.delete")
[docs] def test_dropbox_deleted(self, dbx): """ Will test that the function returns the expected result and that the DropBoxStorage function as been called with the expected parameters """ dbx.return_value = None output = self.sut.delete("filename") self.assertIsNone(output) dbx.assert_called_once_with("filename")
@patch("geonode.storage.dropbox.DropBoxStorage.exists")
[docs] def test_dropbox_exists(self, dbx): """ Will test that the function returns the expected result and that the DropBoxStorage function as been called with the expected parameters """ dbx.return_value = True output = self.sut.exists("filename") self.assertTrue(output) dbx.assert_called_once_with("filename")
@patch("geonode.storage.dropbox.DropBoxStorage.listdir")
[docs] def test_dropbox_listdir(self, dbx): """ Will test that the function returns the expected result and that the DropBoxStorage function as been called with the expected parameters """ dbx.return_value = (["folder1"], ["file1", "file2"]) output = self.sut.listdir("Apps/") self.assertTupleEqual((["folder1"], ["file1", "file2"]), output) dbx.assert_called_once_with("Apps/")
@patch("geonode.storage.dropbox.DropBoxStorage._open")
[docs] def test_dropbox_open(self, dbx): """ Will test that the function returns the expected result and that the DropBoxStorage function as been called with the expected parameters """ dbx.return_value = io.StringIO() output = self.sut.open("name", mode="xx") self.assertEqual(type(output), io.StringIO().__class__) dbx.assert_called_once_with("name", "xx")
@patch("geonode.storage.dropbox.DropBoxStorage._full_path")
[docs] def test_dropbox_path(self, dbx): """ Will test that the function returns the expected result and that the DropBoxStorage function as been called with the expected parameters """ dbx.return_value = "/opt/full/path/to/file" output = self.sut.path("file") self.assertEqual("/opt/full/path/to/file", output) dbx.assert_called_once_with("file")
@patch("geonode.storage.dropbox.DropBoxStorage.save")
[docs] def test_dropbox_save(self, dbx): """ Will test that the function returns the expected result and that the DropBoxStorage function as been called with the expected parameters """ dbx.return_value = "cleaned_name" output = self.sut.save("file_name", "content") self.assertEqual("cleaned_name", output) dbx.assert_called_once_with("file_name", "content")
@patch("geonode.storage.dropbox.DropBoxStorage.size")
[docs] def test_dropbox_size(self, dbx): """ Will test that the function returns the expected result and that the DropBoxStorage function as been called with the expected parameters """ dbx.return_value = 1 output = self.sut.size("name") self.assertEqual(1, output) dbx.assert_called_once_with("name")
[docs] class TestGoogleStorageManager(SimpleTestCase):
[docs] def setUp(self): self.sut = GoogleStorageManager
@patch("storages.backends.gcloud.GoogleCloudStorage.delete")
[docs] def test_google_deleted(self, gcs): """ Will test that the function returns the expected result and that the GoogleCloudStorage function as been called with the expected parameters """ gcs.return_value = None output = self.sut().delete("filename") self.assertIsNone(output) gcs.assert_called_once_with("filename")
@patch("storages.backends.gcloud.GoogleCloudStorage.exists")
[docs] def test_google_exists(self, gcs): """ Will test that the function returns the expected result and that the GoogleCloudStorage function as been called with the expected parameters """ gcs.return_value = True output = self.sut().exists("filename") self.assertTrue(output) gcs.assert_called_once_with("filename")
@patch("storages.backends.gcloud.GoogleCloudStorage.listdir")
[docs] def test_google_listdir(self, gcs): """ Will test that the function returns the expected result and that the GoogleCloudStorage function as been called with the expected parameters """ gcs.return_value = (["folder1"], ["file1", "file2"]) output = self.sut().listdir("Apps/") self.assertTupleEqual((["folder1"], ["file1", "file2"]), output) gcs.assert_called_once_with("Apps/")
@patch("storages.backends.gcloud.GoogleCloudStorage._open")
[docs] def test_google_open(self, gcs): """ Will test that the function returns the expected result and that the GoogleCloudStorage function as been called with the expected parameters """ gcs.return_value = io.StringIO() output = self.sut().open("name", mode="xx") self.assertEqual(type(output), io.StringIO().__class__) gcs.assert_called_once_with("name", "xx")
[docs] def test_google_path(self): """ Will test that the function returns the expected result and that the GoogleCloudStorage function as been called with the expected parameters """ with self.assertRaises(NotImplementedError): self.sut().path("file")
@patch("storages.backends.gcloud.GoogleCloudStorage.save")
[docs] def test_google_save(self, gcs): """ Will test that the function returns the expected result and that the GoogleCloudStorage function as been called with the expected parameters """ gcs.return_value = "cleaned_name" output = self.sut().save("file_name", "content") self.assertEqual("cleaned_name", output) gcs.assert_called_once_with("file_name", "content")
@patch("storages.backends.gcloud.GoogleCloudStorage.size")
[docs] def test_google_size(self, gcs): """ Will test that the function returns the expected result and that the GoogleCloudStorage function as been called with the expected parameters """ gcs.return_value = 1 output = self.sut().size("name") self.assertEqual(1, output) gcs.assert_called_once_with("name")
@override_settings(AWS_STORAGE_BUCKET_NAME="my-bucket-name")
[docs] class TestAwsStorageManager(SimpleTestCase):
[docs] def setUp(self): self.sut = AwsStorageManager
@patch("storages.backends.s3boto3.S3Boto3Storage.delete")
[docs] def test_aws_deleted(self, aws): """ Will test that the function returns the expected result and that the AwsStorageManager function as been called with the expected parameters """ aws.return_value = None output = self.sut().delete("filename") self.assertIsNone(output) aws.assert_called_once_with("filename")
@patch("storages.backends.s3boto3.S3Boto3Storage.exists")
[docs] def test_aws_exists(self, aws): """ Will test that the function returns the expected result and that the AwsStorageManager function as been called with the expected parameters """ aws.return_value = True output = self.sut().exists("filename") self.assertTrue(output) aws.assert_called_once_with("filename")
@patch("storages.backends.s3boto3.S3Boto3Storage.listdir")
[docs] def test_aws_listdir(self, aws): """ Will test that the function returns the expected result and that the AwsStorageManager function as been called with the expected parameters """ aws.return_value = (["folder1"], ["file1", "file2"]) output = self.sut().listdir("Apps/") self.assertTupleEqual((["folder1"], ["file1", "file2"]), output) aws.assert_called_once_with("Apps/")
@patch("storages.backends.s3boto3.S3Boto3Storage._open")
[docs] def test_aws_open(self, aws): """ Will test that the function returns the expected result and that the AwsStorageManager function as been called with the expected parameters """ aws.return_value = io.StringIO() output = self.sut().open("name", mode="xx") self.assertEqual(type(output), io.StringIO().__class__) aws.assert_called_once_with("name", "xx")
@patch("storages.backends.s3boto3.S3Boto3Storage._normalize_name")
[docs] def test_aws_path(self, aws): """ Will test that the function returns the expected result and that the AwsStorageManager function as been called with the expected parameters """ aws.return_value = "/opt/full/path/to/file" output = self.sut().path("file") self.assertEqual("/opt/full/path/to/file", output) aws.assert_called_once_with("file")
@patch("storages.backends.s3boto3.S3Boto3Storage.save")
[docs] def test_aws_save(self, aws): """ Will test that the function returns the expected result and that the AwsStorageManager function as been called with the expected parameters """ aws.return_value = True output = self.sut().save("file_name", "content") self.assertTrue(output) aws.assert_called_once_with("file_name", "content")
@patch("storages.backends.s3boto3.S3Boto3Storage.size")
[docs] def test_aws_size(self, aws): """ Will test that the function returns the expected result and that the AwsStorageManager function as been called with the expected parameters """ aws.return_value = 1 output = self.sut().size("name") self.assertEqual(1, output) aws.assert_called_once_with("name")
[docs] class TestStorageManager(GeoNodeBaseTestSupport):
[docs] def setUp(self): self.sut = StorageManager self.project_root = os.path.abspath(os.path.dirname(__file__))
@patch("django.core.files.storage.FileSystemStorage.delete")
[docs] def test_storage_manager_deleted(self, strg): """ Will test that the function returns the expected result and that the StorageManager function as been called with the expected parameters """ strg.return_value = None output = self.sut().delete("filename") self.assertIsNone(output) strg.assert_called_once_with("filename")
@patch("django.core.files.storage.FileSystemStorage.exists")
[docs] def test_storage_manager_exists(self, strg): """ Will test that the function returns the expected result and that the StorageManager function as been called with the expected parameters """ strg.return_value = True output = self.sut().exists("filename") self.assertTrue(output) strg.assert_called_once_with("filename")
@patch("django.core.files.storage.FileSystemStorage.listdir")
[docs] def test_storage_manager_listdir(self, strg): """ Will test that the function returns the expected result and that the StorageManager function as been called with the expected parameters """ strg.return_value = (["folder1"], ["file1", "file2"]) output = self.sut().listdir("Apps/") self.assertTupleEqual((["folder1"], ["file1", "file2"]), output) strg.assert_called_once_with("Apps/")
@patch("django.core.files.storage.FileSystemStorage._open")
[docs] def test_storage_manager_open(self, strg): """ Will test that the function returns the expected result and that the StorageManager function as been called with the expected parameters """ strg.return_value = io.StringIO() output = self.sut().open("name", mode="xx") self.assertEqual(type(output), io.StringIO().__class__) strg.assert_called_once()
@patch("django.core.files.storage.FileSystemStorage.path")
[docs] def test_storage_manager_path(self, strg): """ Will test that the function returns the expected result and that the StorageManager function as been called with the expected parameters """ strg.return_value = "/opt/full/path/to/file" output = self.sut().path("file") self.assertEqual("/opt/full/path/to/file", output) strg.assert_called_once_with("file")
@patch("django.core.files.storage.FileSystemStorage.save")
[docs] def test_storage_manager_save(self, strg): """ Will test that the function returns the expected result and that the StorageManager function as been called with the expected parameters """ strg.return_value = True output = self.sut().save("file_name", "content") self.assertTrue(output) strg.assert_called_once()
@patch("django.core.files.storage.FileSystemStorage.size")
[docs] def test_storage_manager_size(self, strg): """ Will test that the function returns the expected result and that the StorageManager function as been called with the expected parameters """ strg.return_value = 1 output = self.sut().size("name") self.assertEqual(1, output) strg.assert_called_once_with("name")
# @patch('django.core.files.storage.FileSystemStorage.save') # @patch('django.core.files.storage.FileSystemStorage.path')
[docs] def test_storage_manager_replace_files_list(self): # , path, strg): """ Will test that the function returns the expected result and that the StorageManager function as been called with the expected parameters """ # path.return_value = '/opt/full/path/to/file' # strg.return_value = '/opt/full/path/to/file' old_files = ["/opt/full/path/to/file", "/opt/full/path/to/file"] new_files = [ os.path.join(f"{self.project_root}", "tests/data/test_sld.sld"), os.path.join(f"{self.project_root}", "tests/data/test_data.json"), ] dataset = create_single_dataset("storage_manager") dataset.files = old_files dataset.save() output = self.sut().replace(dataset, new_files) self.assertEqual(2, len(output["files"])) self.assertTrue("file.sld" in output["files"][0]) self.assertTrue("file.json" in output["files"][1])
@patch("django.core.files.storage.FileSystemStorage.save") @patch("django.core.files.storage.FileSystemStorage.path")
[docs] def test_storage_manager_replace_single_file(self, path, strg): """ Will test that the function returns the expected result and that the StorageManager function as been called with the expected parameters """ path.return_value = "/opt/full/path/to/file" strg.return_value = "/opt/full/path/to/file" expected = "/opt/full/path/to/file" dataset = create_single_dataset("storage_manager") dataset.files = ["/opt/full/path/to/file2"] dataset.save() with open("geonode/base/fixtures/test_sld.sld") as new_file: output = self.sut().replace(dataset, new_file) self.assertListEqual([expected], output["files"])
@override_settings(FILE_UPLOAD_DIRECTORY_PERMISSIONS=0o777) @override_settings(FILE_UPLOAD_PERMISSIONS=0o777)
[docs] def test_storage_manager_copy(self): """ Test that the copy works as expected and the permissions are corerct """ dataset = create_single_dataset(name="test_copy") dataset.files = [os.path.join(f"{self.project_root}", "tests/data/test_sld.sld")] dataset.save() output = self.sut().copy(dataset) self.assertTrue(os.path.exists(output.get("files")[0])) self.assertEqual(os.stat(os.path.exists(output.get("files")[0])).st_mode, 8592) os.remove(output.get("files")[0]) self.assertFalse(os.path.exists(output.get("files")[0]))
[docs] class TestDataRetriever(TestCase): @classmethod
[docs] def setUpClass(cls) -> None: super().setUpClass() cls.project_root = os.path.abspath(os.path.dirname(__file__))
[docs] def setUp(self): self.sut = StorageManager self.local_files_paths = { "base_file": f"{gisdata.GOOD_DATA}/vector/single_point.shp", "dbf_file": f"{gisdata.GOOD_DATA}/vector/single_point.dbf", "shx_file": f"{gisdata.GOOD_DATA}/vector/single_point.shx", "prj_file": f"{gisdata.GOOD_DATA}/vector/single_point.prj", } github_path = "https://github.com/GeoNode/gisdata/tree/master/gisdata/data/good/vector/" self.remote_files = { "base_file": f"{github_path}/single_point.shp", "dbf_file": f"{github_path}/single_point.dbf", "shx_file": f"{github_path}/single_point.shx", "prj_file": f"{github_path}/single_point.prj", } return super().setUp()
[docs] def test_file_are_not_transfered_local(self): _obj = self.sut(remote_files=self.local_files_paths) self.assertTrue(hasattr(_obj, "data_retriever")) self.assertIsNone(_obj.data_retriever.temporary_folder)
[docs] def test_clone_remote_files_local(self): storage_manager = self.sut(remote_files=self.local_files_paths) storage_manager.clone_remote_files() expected_file_set = {"single_point.shx", "single_point.prj", "single_point.dbf", "single_point.shp"} self.assertIsNotNone(storage_manager.data_retriever.temporary_folder) _files = os.listdir(storage_manager.data_retriever.temporary_folder) self.assertSetEqual(expected_file_set, set(_files))
[docs] def test_get_retrieved_paths_local(self): self.maxDiff = None storage_manager = self.sut(remote_files=self.local_files_paths) # get_retrieved_paths should not transfer the remote paths with self.assertRaises(DataRetrieverExcepion): storage_manager.get_retrieved_paths() # instead first is needed to clone the remove files and then take the paths storage_manager.clone_remote_files() files = storage_manager.get_retrieved_paths() expected_sorted_list = [ ("base_file", f"{storage_manager.data_retriever.temporary_folder}/single_point.shp"), ("dbf_file", f"{storage_manager.data_retriever.temporary_folder}/single_point.dbf"), ("prj_file", f"{storage_manager.data_retriever.temporary_folder}/single_point.prj"), ("shx_file", f"{storage_manager.data_retriever.temporary_folder}/single_point.shx"), ] self.assertIsNotNone(storage_manager.data_retriever.temporary_folder) self.assertListEqual(expected_sorted_list, sorted(files.items()))
[docs] def test_delete_retrieved_paths_local(self): self.maxDiff = None storage_manager = self.sut(remote_files=self.local_files_paths) # instead first is needed to clone the remove files and then take the paths storage_manager.clone_remote_files() expected_file_set = {"single_point.shx", "single_point.prj", "single_point.dbf", "single_point.shp"} _tmp_folder_path = storage_manager.data_retriever.temporary_folder self.assertIsNotNone(_tmp_folder_path) _files = os.listdir(storage_manager.data_retriever.temporary_folder) # check the file exists in the temporary folder so we can be sure that the delete works self.assertSetEqual(expected_file_set, set(_files)) storage_manager.delete_retrieved_paths(force=True) self.assertIsNone(storage_manager.data_retriever.temporary_folder) # the directory does not exists self.assertFalse(os.path.exists(_tmp_folder_path))
[docs] def test_file_are_not_transfered_remote(self): _obj = self.sut(remote_files=self.remote_files) self.assertTrue(hasattr(_obj, "data_retriever")) self.assertIsNone(_obj.data_retriever.temporary_folder)
[docs] def test_clone_remote_files_remote(self): storage_manager = self.sut(remote_files=self.remote_files) storage_manager.clone_remote_files() expected_file_set = {"single_point.shx", "single_point.prj", "single_point.dbf", "single_point.shp"} self.assertIsNotNone(storage_manager.data_retriever.temporary_folder) _files = os.listdir(storage_manager.data_retriever.temporary_folder) self.assertSetEqual(expected_file_set, set(_files))
[docs] def test_get_retrieved_paths_remote(self): self.maxDiff = None storage_manager = self.sut(remote_files=self.remote_files) # get_retrieved_paths should not transfer the remote paths with self.assertRaises(DataRetrieverExcepion): storage_manager.get_retrieved_paths() # instead first is needed to clone the remove files and then take the paths storage_manager.clone_remote_files() files = storage_manager.get_retrieved_paths() expected_sorted_list = [ ("base_file", f"{storage_manager.data_retriever.temporary_folder}/single_point.shp"), ("dbf_file", f"{storage_manager.data_retriever.temporary_folder}/single_point.dbf"), ("prj_file", f"{storage_manager.data_retriever.temporary_folder}/single_point.prj"), ("shx_file", f"{storage_manager.data_retriever.temporary_folder}/single_point.shx"), ] self.assertIsNotNone(storage_manager.data_retriever.temporary_folder) self.assertListEqual(expected_sorted_list, sorted(files.items()))
[docs] def test_delete_retrieved_paths_remote(self): self.maxDiff = None storage_manager = self.sut(remote_files=self.remote_files) # instead first is needed to clone the remove files and then take the paths storage_manager.clone_remote_files() expected_file_set = {"single_point.shx", "single_point.prj", "single_point.dbf", "single_point.shp"} _tmp_folder_path = storage_manager.data_retriever.temporary_folder self.assertIsNotNone(_tmp_folder_path) _files = os.listdir(storage_manager.data_retriever.temporary_folder) # check the file exists in the temporary folder so we can be sure that the delete works self.assertSetEqual(expected_file_set, set(_files)) storage_manager.delete_retrieved_paths(force=True) self.assertIsNone(storage_manager.data_retriever.temporary_folder) # the directory does not exists self.assertFalse(os.path.exists(_tmp_folder_path))
[docs] def test_storage_manager_rmtree(self): """ Will test that the rmtree function works as expected """ _dirs = ["subdir1", "subdir2", "subdir1/subsubdir1"] _files = ["subdir1/tmp_file1", "subdir1/tmp_file2", "subdir2/tmp_file2", "subdir1/subsubdir1/tmp_file11"] _tmpdir = mkdtemp() for _dir in _dirs: if not os.path.exists(os.path.join(_tmpdir, _dir)): os.makedirs(os.path.join(_tmpdir, _dir), exist_ok=True) self.assertTrue(os.path.exists(os.path.join(_tmpdir, _dir))) self.assertTrue(os.path.isdir(os.path.join(_tmpdir, _dir))) for _file in _files: with open(os.path.join(_tmpdir, _file), "wb+"): pass self.assertTrue(os.path.exists(os.path.join(_tmpdir, _file))) self.assertTrue(os.path.isfile(os.path.join(_tmpdir, _file))) self.sut().rmtree(_tmpdir) self.assertFalse(os.path.exists(_tmpdir))
[docs] def test_zip_file_should_correctly_recognize_main_extension_with_csv(self): # reinitiate the storage manager with the zip file storage_manager = self.sut( remote_files={"base_file": os.path.join(f"{self.project_root}", "tests/data/example.zip")} ) storage_manager.clone_remote_files() self.assertIsNotNone(storage_manager.data_retriever.temporary_folder) _files = storage_manager.get_retrieved_paths() # Selected base_file is not defined in case of multiple csv files self.assertTrue(_files.get("base_file").endswith(".csv"))
[docs] def test_zip_file_should_correctly_index_file_extensions(self): # reinitiate the storage manager with the zip file storage_manager = self.sut( remote_files={"base_file": os.path.join(f"{self.project_root}", "tests/data/example.zip")} ) storage_manager.clone_remote_files() self.assertIsNotNone(storage_manager.data_retriever.temporary_folder) _files = storage_manager.get_retrieved_paths() self.assertIsNotNone(_files.get("csv_file")) # extensions found more than once get indexed self.assertIsNotNone(_files.get("csv_file_1"))
@override_settings( SUPPORTED_DATASET_FILE_TYPES=[ {"id": "kmz", "label": "kmz", "format": "vector", "ext": ["kmz"]}, {"id": "kml", "label": "kml", "format": "vector", "ext": ["kml"]}, ] )
[docs] def test_zip_file_should_correctly_recognize_main_extension_with_kmz(self): # reinitiate the storage manager with the zip file storage_manager = self.sut( remote_files={"base_file": os.path.join(f"{self.project_root}", "tests/data/Italy.kmz")} ) storage_manager.clone_remote_files() self.assertIsNotNone(storage_manager.data_retriever.temporary_folder) _files = storage_manager.get_retrieved_paths() self.assertTrue("doc.kml" in _files.get("base_file"), msg=f"files available: {_files}")
[docs] def test_zip_file_should_correctly_recognize_main_extension_with_shp(self): # zipping files storage_manager = self.sut(remote_files=self.local_files_paths) storage_manager.clone_remote_files() storage_manager.data_retriever.temporary_folder output = shutil.make_archive( f"{storage_manager.data_retriever.temporary_folder}/output", "zip", storage_manager.data_retriever.temporary_folder, ) # reinitiate the storage manager with the zip file storage_manager = self.sut(remote_files={"base_file": output}) storage_manager.clone_remote_files() self.assertIsNotNone(storage_manager.data_retriever.temporary_folder) _files = storage_manager.get_retrieved_paths() self.assertTrue("single_point.shp" in _files.get("base_file"))