Source code for geonode.people.models

#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

from uuid import uuid4
import logging

from allauth.account.adapter import get_adapter
from django.conf import settings

from django.db import models
from django.db.models import signals
from django.db.models.deletion import ProtectedError

from django.urls import reverse
from django.contrib.sites.models import Site
from django.utils.translation import ugettext_lazy as _
from django.contrib.auth.models import AbstractUser, Permission, UserManager
from django.contrib.auth.signals import user_logged_in, user_logged_out

from taggit.managers import TaggableManager

from geonode.base.enumerations import COUNTRIES
from geonode.base.models import Configuration, ResourceBase
from geonode.groups.models import GroupProfile
from geonode.security.permissions import PERMISSIONS, READ_ONLY_AFFECTED_PERMISSIONS

from allauth.account.signals import user_signed_up
from allauth.socialaccount.signals import social_account_added

from .utils import format_address
from .signals import do_login, do_logout, profile_post_save, update_user_email_addresses, notify_admins_new_signup
from .languages import LANGUAGES
from .timezones import TIMEZONES

[docs] logger = logging.getLogger(__name__)
[docs] class ProfileUserManager(UserManager):
[docs] def get_by_natural_key(self, username): return self.get(username=username)
[docs] class Profile(AbstractUser): """Fully featured Geonode user"""
[docs] organization = models.CharField( _("Organization Name"), max_length=255, blank=True, null=True, help_text=_("name of the responsible organization"), )
[docs] profile = models.TextField(_("Profile"), null=True, blank=True, help_text=_("introduce yourself"))
[docs] position = models.CharField( _("Position Name"), max_length=255, blank=True, null=True, help_text=_("role or position of the responsible person"), )
[docs] voice = models.CharField( _("Voice"), max_length=255, blank=True, null=True, help_text=_("telephone number by which individuals can speak to the responsible organization or individual"), )
[docs] fax = models.CharField( _("Facsimile"), max_length=255, blank=True, null=True, help_text=_("telephone number of a facsimile machine for the responsible organization or individual"), )
[docs] delivery = models.CharField( _("Delivery Point"), max_length=255, blank=True, null=True, help_text=_("physical and email address at which the organization or individual may be contacted"), )
[docs] city = models.CharField(_("City"), max_length=255, blank=True, null=True, help_text=_("city of the location"))
[docs] area = models.CharField( _("Administrative Area"), max_length=255, blank=True, null=True, help_text=_("state, province of the location") )
[docs] zipcode = models.CharField( _("Postal Code"), max_length=255, blank=True, null=True, help_text=_("ZIP or other postal code") )
[docs] country = models.CharField( _("Country"), choices=COUNTRIES, max_length=3, blank=True, null=True, help_text=_("country of the physical address"), )
[docs] keywords = TaggableManager( _("keywords"), blank=True, help_text=_( "commonly used word(s) or formalised word(s) or phrase(s) used to describe the subject \ (space or comma-separated" ), )
[docs] language = models.CharField(_("language"), max_length=10, choices=LANGUAGES, default=settings.LANGUAGE_CODE)
[docs] timezone = models.CharField( _("Timezone"), max_length=100, default="", choices=TIMEZONES, blank=True, )
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] self._previous_active_state = self.is_active
[docs] def get_absolute_url(self): return reverse( "profile_detail", args=[ self.username, ], )
[docs] def __str__(self): return str(self.username)
@staticmethod
[docs] def class_name(value): return value.__class__.__name__
[docs] objects = ProfileUserManager()
[docs] USERNAME_FIELD = "username"
[docs] def group_list_public(self): return GroupProfile.objects.exclude(access="private").filter(groupmember__user=self)
[docs] def group_list_all(self): return GroupProfile.objects.filter(groupmember__user=self).distinct()
[docs] def is_member_of_group(self, group_slug): """ Returns if the Profile belongs to a group of a given slug. """ return self.groups.filter(name=group_slug).exists()
[docs] def keyword_list(self): """ Returns a list of the Profile's keywords. """ return [kw.name for kw in self.keywords.all()]
@property
[docs] def name_long(self): if self.first_name and self.last_name: return f"{self.first_name} {self.last_name} ({self.username})" elif (not self.first_name) and self.last_name: return f"{self.last_name} ({self.username})" elif self.first_name and (not self.last_name): return f"{self.first_name} ({self.username})" else: return self.username
@property
[docs] def full_name_or_nick(self): if self.first_name and self.last_name: return f"{self.first_name} {self.last_name}" else: return self.username
@property
[docs] def first_name_or_nick(self): return self.first_name if self.first_name else self.username
@property
[docs] def location(self): return format_address(self.delivery, self.zipcode, self.city, self.area, self.country)
@property
[docs] def perms(self): if self.is_superuser or self.is_staff: # return all permissions for admins perms = PERMISSIONS.values() else: user_groups = self.groups.values_list("name", flat=True) group_perms = ( Permission.objects.filter(group__name__in=user_groups).distinct().values_list("codename", flat=True) ) # return constant names defined by GeoNode perms = [PERMISSIONS[db_perm] for db_perm in group_perms] # check READ_ONLY mode config = Configuration.load() if config.read_only: # exclude permissions affected by readonly perms = [perm for perm in perms if perm not in READ_ONLY_AFFECTED_PERMISSIONS] return perms
[docs] def save(self, *args, **kwargs): super().save(*args, **kwargs) self._notify_account_activated() self._previous_active_state = self.is_active
[docs] def delete(self, using=None, keep_parents=False): resources = ResourceBase.objects.filter(owner=self) if resources: default_owner = ( Profile.objects.filter(username="admin").first() or Profile.objects.filter(is_superuser=True).first() ) if default_owner: resources.update(owner=default_owner) else: raise ProtectedError return super().delete(using=using, keep_parents=keep_parents)
[docs] def _notify_account_activated(self): """Notify user that its account has been activated by a staff member""" became_active = self.is_active and not self._previous_active_state if became_active and self.last_login is None: try: # send_notification(users=(self,), label="account_active") from invitations.adapters import get_invitations_adapter current_site = Site.objects.get_current() ctx = { "username": self.username, "current_site": current_site, "site_name": current_site.name, "email": self.email, "inviter": self, "LOGIN_URL": settings.LOGIN_URL, } email_template = "pinax/notifications/account_active/account_active" adapter = get_invitations_adapter() adapter.send_invitation_email(email_template, self.email, ctx) except Exception as e: logger.exception(e)
[docs] def send_mail(self, template_prefix, context): if self.email: get_adapter().send_mail(template_prefix, self.email, context)
[docs] def get_anonymous_user_instance(user_model): return user_model(pk=-1, username="AnonymousUser")
""" Connect relevant signals to their corresponding handlers. """ user_logged_in.connect(do_login) user_logged_out.connect(do_logout) social_account_added.connect(update_user_email_addresses, dispatch_uid=str(uuid4()), weak=False) user_signed_up.connect(notify_admins_new_signup, dispatch_uid=str(uuid4()), weak=False) signals.post_save.connect(profile_post_save, sender=settings.AUTH_USER_MODEL)