Source code for geonode.people.tests

#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
from django.test.utils import override_settings
from mock import MagicMock, PropertyMock, patch
from geonode.tests.base import GeoNodeBaseTestSupport

from django.core import mail
from django.urls import reverse
from django.db import transaction
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.contrib.sites.models import Site
from allauth.account.forms import SignupForm

from geonode.layers import utils
from geonode.layers.models import Dataset
from geonode.people import profileextractors

from geonode.base.populate_test_data import all_public, create_models, remove_models


[docs] class PeopleAndProfileTests(GeoNodeBaseTestSupport):
[docs] fixtures = ["initial_data.json", "group_test_data.json", "default_oauth_apps.json"]
@classmethod
[docs] def setUpClass(cls): super().setUpClass() create_models(type=cls.get_type, integration=cls.get_integration) all_public()
@classmethod
[docs] def tearDownClass(cls): super().tearDownClass() remove_models(cls.get_obj_ids, type=cls.get_type, integration=cls.get_integration)
[docs] def setUp(self): super().setUp() self.layers = Dataset.objects.all()[:3] self.dataset_ids = [layer.pk for layer in self.layers] self.user_ids = ",".join(str(element.pk) for element in get_user_model().objects.all()[:3]) self.permission_type = ("view", "download", "edit") self.groups = Group.objects.all()[:3] self.group_ids = ",".join(str(element.pk) for element in self.groups)
[docs] def test_redirect_on_get_request(self): """ Test that an immediate redirect occurs back to the admin page of origin when no IDS are supplied """ self.client.login(username="admin", password="admin") response = self.client.get(reverse("set_user_dataset_permissions")) self.assertEqual(response.status_code, 302)
[docs] def test_admin_only_access(self): """ Test that only admin users can access the routes """ self.client.logout() response = self.client.get(reverse("set_user_dataset_permissions")) self.assertEqual(response.status_code, 302)
@patch("geonode.base.views.UserAndGroupPermissionsForm.is_valid") @patch("geonode.base.views.UserAndGroupPermissionsForm.errors", new_callable=PropertyMock)
[docs] def test_invalid_form_return_the_expected_message(self, form_error, form_valid): """ The form should retrun a pre-defined message if the dataset is not part of the choices """ error_obj = MagicMock(data=[MagicMock(code="invalid_choice")]) form_valid.return_value = False form_error.return_value = {"layers": error_obj} self.client.login(username="admin", password="admin") response = self.client.post( path=reverse("set_user_dataset_permissions"), data={"ids": [99999], "permission_type": "view", "mode": "set"}, follow=True, ) self.assertEqual(response.status_code, 200) message = list(response.context.get("messages"))[0] self.assertEqual(message.tags, "error") self.assertTrue("The following dataset ID selected are not part of the available choices" in message.message)
@override_settings(ASYNC_SIGNALS=False)
[docs] def test_set_unset_user_dataset_permissions(self): """ Test that user permissions are set for layers """ self.client.login(username="admin", password="admin") response = self.client.post( reverse("set_user_dataset_permissions"), data={ "ids": self.user_ids, "layers": self.dataset_ids, "permission_type": self.permission_type, "mode": "set", }, ) self.assertEqual(response.status_code, 302) with transaction.atomic(): for permissions_name in self.permission_type: utils.set_datasets_permissions( permissions_name, [ resource.name for resource in Dataset.objects.filter(id__in=[int(_id) for _id in self.dataset_ids]) ], [user.username for user in get_user_model().objects.filter(id__in=self.user_ids.split(","))], [], False, verbose=True, ) for layer in self.layers: user = get_user_model().objects.first() perm_spec = layer.get_all_level_info() self.assertFalse(user in perm_spec["users"], f"{layer} - {user}")
@override_settings(ASYNC_SIGNALS=False)
[docs] def test_set_unset_group_dataset_permissions(self): """ Test that group permissions are set for layers """ self.client.login(username="admin", password="admin") response = self.client.post( reverse("set_group_dataset_permissions"), data={ "ids": self.group_ids, "layers": self.dataset_ids, "permission_type": self.permission_type, "mode": "set", }, ) self.assertEqual(response.status_code, 302) with transaction.atomic(): for permissions_name in self.permission_type: utils.set_datasets_permissions( permissions_name, [ resource.name for resource in Dataset.objects.filter(id__in=[int(_id) for _id in self.dataset_ids]) ], [], [group.name for group in Group.objects.filter(id__in=self.group_ids.split(","))], False, verbose=True, ) for layer in self.layers: perm_spec = layer.get_all_level_info() self.assertTrue(self.groups[0] in perm_spec["groups"])
@override_settings(ASYNC_SIGNALS=False)
[docs] def test_unset_group_dataset_perms(self): """ Test that group permissions are unset for layers """ user = get_user_model().objects.first() for layer in self.layers: layer.set_permissions( { "users": { user.username: [ "change_dataset_data", "view_resourcebase", "download_resourcebase", "change_resourcebase_metadata", ] } } ) self.client.login(username="admin", password="admin") response = self.client.post( reverse("set_user_dataset_permissions"), data={ "ids": self.user_ids, "layers": self.dataset_ids, "permission_type": self.permission_type, "mode": "unset", }, ) self.assertEqual(response.status_code, 302) with transaction.atomic(): for permissions_name in self.permission_type: utils.set_datasets_permissions( permissions_name, [ resource.name for resource in Dataset.objects.filter(id__in=[int(_id) for _id in self.dataset_ids]) ], [user.username for user in get_user_model().objects.filter(id__in=self.user_ids.split(","))], [], True, verbose=True, ) for layer in self.layers: perm_spec = layer.get_all_level_info() self.assertTrue(user not in perm_spec["users"])
[docs] def test_forgot_username(self): url = reverse("forgot_username") # page renders response = self.client.get(url) self.assertEqual(response.status_code, 200) # and responds for a bad email response = self.client.post(url, data={"email": "foobar@doesnotexist.com"}) self.assertContains(response, "No user could be found with that email address.") norman = get_user_model().objects.get(username="norman") norman.email = "contact@admin.admin" norman.save() response = self.client.post(url, data={"email": norman.email}) # and sends a mail for a good one self.assertEqual(len(mail.outbox), 1) site = Site.objects.get_current() # Verify that the subject of the first message is correct. self.assertEqual(mail.outbox[0].subject, f"Your username for {site.name}")
[docs] def test_get_profile(self): admin = get_user_model().objects.get(username="admin") norman = get_user_model().objects.get(username="norman") bobby = get_user_model().objects.get(username="bobby") bobby.voice = "+245-897-7889" bobby.save() url = reverse("profile_detail", args=["bobby"]) # Get user's profile as anonymous response = self.client.get(url) self.assertEqual(response.status_code, 302) # Get user's profile by another authenticated user self.assertTrue(self.client.login(username="norman", password="norman")) self.assertTrue(norman.is_authenticated) response = self.client.get(url, user=norman) self.assertEqual(response.status_code, 200) # Returns limitted info about a user content = response.content if isinstance(content, bytes): content = content.decode("UTF-8") self.assertIn("Profile of bobby", content) self.assertNotIn(bobby.voice, content) # Get user's profile as owner self.assertTrue(self.client.login(username="bobby", password="bob")) self.assertTrue(bobby.is_authenticated) response = self.client.get(url) self.assertEqual(response.status_code, 200) # Returns all profile info content = response.content if isinstance(content, bytes): content = content.decode("UTF-8") self.assertIn("Profile of bobby", content) self.assertIn(bobby.voice, content) # Get user's profile as admin self.assertTrue(self.client.login(username="admin", password="admin")) self.assertTrue(admin.is_authenticated) response = self.client.get(url) self.assertEqual(response.status_code, 200) # Returns all profile info content = response.content if isinstance(content, bytes): content = content.decode("UTF-8") self.assertIn("Profile of bobby", content) self.assertIn(bobby.voice, content)
[docs] def _facebook_extractor_init(self): data = { "email": "phony_mail", "first_name": "phony_first_name", "last_name": "phony_last_name", "cover": "phony_cover", } extractor = profileextractors.FacebookExtractor() return extractor, data
[docs] def test_extract_area(self): extractor, data = self._facebook_extractor_init() with self.assertRaises(NotImplementedError): extractor.extract_area(data)
[docs] def test_extract_city(self): extractor, data = self._facebook_extractor_init() with self.assertRaises(NotImplementedError): extractor.extract_city(data)
[docs] def test_extract_country(self): extractor, data = self._facebook_extractor_init() with self.assertRaises(NotImplementedError): extractor.extract_country(data)
[docs] def test_extract_delivery(self): extractor, data = self._facebook_extractor_init() with self.assertRaises(NotImplementedError): extractor.extract_delivery(data)
[docs] def test_fb_extract_email(self): extractor, data = self._facebook_extractor_init() result = extractor.extract_email(data) self.assertEqual(result, data["email"])
[docs] def test_extract_fax(self): extractor, data = self._facebook_extractor_init() with self.assertRaises(NotImplementedError): extractor.extract_fax(data)
[docs] def test_fb_extract_first_name(self): extractor, data = self._facebook_extractor_init() result = extractor.extract_first_name(data) self.assertEqual(result, data["first_name"])
[docs] def test_fb_extract_last_name(self): extractor, data = self._facebook_extractor_init() result = extractor.extract_last_name(data) self.assertEqual(result, data["last_name"])
[docs] def test_extract_organization(self): extractor, data = self._facebook_extractor_init() with self.assertRaises(NotImplementedError): extractor.extract_organization(data)
[docs] def test_extract_position(self): extractor, data = self._facebook_extractor_init() with self.assertRaises(NotImplementedError): extractor.extract_position(data)
[docs] def test_extract_profile(self): extractor, data = self._facebook_extractor_init() result = extractor.extract_profile(data) self.assertEqual(result, data["cover"])
[docs] def test_extract_voice(self): extractor, data = self._facebook_extractor_init() with self.assertRaises(NotImplementedError): extractor.extract_voice(data)
[docs] def test_extract_zipcode(self): extractor, data = self._facebook_extractor_init() with self.assertRaises(NotImplementedError): extractor.extract_zipcode(data)
[docs] def _linkedin_extractor_init(self): data = { "id": "REDACTED", "firstName": {"localized": {"en_US": "Tina"}, "preferredLocale": {"country": "US", "language": "en"}}, "lastName": {"localized": {"en_US": "Belcher"}, "preferredLocale": {"country": "US", "language": "en"}}, "profilePicture": {"displayImage": "urn:li:digitalmediaAsset:B54328XZFfe2134zTyq"}, "elements": [ {"handle": "urn:li:emailAddress:3775708763", "handle~": {"emailAddress": "hsimpson@linkedin.com"}} ], } extractor = profileextractors.LinkedInExtractor() return extractor, data
[docs] def test_ln_extract_email(self): extractor, data = self._linkedin_extractor_init() result = extractor.extract_email(data) self.assertEqual(result, data["elements"][0]["handle~"]["emailAddress"])
[docs] def test_ln_extract_first_name(self): extractor, data = self._linkedin_extractor_init() result = extractor.extract_first_name(data) self.assertEqual(result, data["firstName"]["localized"]["en_US"])
[docs] def test_ln_extract_last_name(self): extractor, data = self._linkedin_extractor_init() result = extractor.extract_last_name(data) self.assertEqual(result, data["lastName"]["localized"]["en_US"])
@override_settings( AUTH_PASSWORD_VALIDATORS=[ { "NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator", }, { "NAME": "django.contrib.auth.password_validation.MinimumLengthValidator", "OPTIONS": { "min_length": 14, }, }, { "NAME": "django.contrib.auth.password_validation.CommonPasswordValidator", }, { "NAME": "geonode.people.password_validators.UppercaseValidator", }, { "NAME": "geonode.people.password_validators.NumberValidator", "OPTIONS": { "min_digits": 1, }, }, { "NAME": "geonode.people.password_validators.LowercaseValidator", }, { "NAME": "geonode.people.password_validators.SpecialCharsValidator", }, ] )
[docs] def test_password_validators(self): data = {"username": "username", "email": "user@example.com", "password1": "qwerty", "password2": "qwerty"} error_codes = [ "password_too_short", "password_too_common", "password_no_upper", "password_no_number", "password_no_upper", ] form = SignupForm(data, email_required=True) self.assertFalse(form.is_valid()) for errors in form.errors.values(): for error in errors.data: self.assertTrue(error.code in error_codes) data = { "username": "username", "email": "user@example.com", "password1": "@!2XJSL_S&V^0nt", "password2": "@!2XJSL_S&V^0nt", } form = SignupForm(data, email_required=True) self.assertTrue(form.is_valid())
[docs] def test_new_user_is_assigned_automatically_to_contributors(self): """ By default the contributors group is assigned to each new user """ new_user = get_user_model().objects.create(username="random_username") self.assertTrue("contributors" in [x.name for x in new_user.groups.iterator()])
@override_settings(AUTO_ASSIGN_REGISTERED_MEMBERS_TO_CONTRIBUTORS=False)
[docs] def test_new_user_is_no_assigned_automatically_to_contributors_if_disabled(self): """ If AUTO_ASSIGN_REGISTERED_MEMBERS_TO_CONTRIBUTORS is false, each new user is not automatically assinged to the contributors group """ new_user = get_user_model().objects.create(username="random_username") self.assertFalse("contributors" in [x.name for x in new_user.groups.iterator()])