Source code for geonode.facets.views

#########################################################################
#
# Copyright (C) 2023 Open Source Geospatial Foundation - all rights reserved
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import logging
from urllib.parse import urlencode

from rest_framework.authentication import SessionAuthentication, BasicAuthentication
from rest_framework.views import APIView

from django.http import HttpResponseNotFound, JsonResponse
from django.urls import reverse
from django.conf import settings

from geonode.base.api.views import ResourceBaseViewSet
from geonode.base.models import ResourceBase
from geonode.facets.models import FacetProvider, DEFAULT_FACET_PAGE_SIZE, facet_registry
from geonode.security.utils import get_visible_resources

[docs] PARAM_PAGE = "page"
[docs] PARAM_PAGE_SIZE = "page_size"
[docs] PARAM_LANG = "lang"
[docs] PARAM_INCLUDE_TOPICS = "include_topics"
[docs] PARAM_INCLUDE_CONFIG = "include_config"
[docs] PARAM_TOPIC_CONTAINS = "topic_contains"
[docs] logger = logging.getLogger(__name__)
[docs] class BaseFacetingView(APIView):
[docs] authentication_classes = [SessionAuthentication, BasicAuthentication]
@classmethod
[docs] def _get_topics( cls, provider, queryset, page: int = 0, page_size: int = DEFAULT_FACET_PAGE_SIZE, lang: str = "en", topic_contains: str = None, keys: set = {}, **kwargs, ): start = page * page_size end = start + page_size cnt, items = provider.get_facet_items( queryset, start=start, end=end, lang=lang, topic_contains=topic_contains, keys=keys ) if keys: keys.difference_update({str(t["key"]) for t in items}) if keys: ext = provider.get_topics(keys, lang) items.extend(ext) cnt += len(ext) logger.debug("Extending facets to %d for %s", cnt, provider.name) return {"page": page, "page_size": page_size, "start": start, "total": cnt, "items": items}
@classmethod
[docs] def _prefilter_topics(cls, request): """ Perform some prefiltering on resources, such as - auth visibility - filtering by other facets already applied :param request: :return: a QuerySet on ResourceBase """ logger.debug("Filtering by user '%s'", request.user) filters = {k: vlist for k, vlist in request.query_params.lists() if k.startswith("filter{")} logger.warning(f"FILTERING BY {filters}") if filters: viewset = ResourceBaseViewSet(request=request, format_kwarg={}, kwargs=filters) viewset.initial(request) return get_visible_resources(queryset=viewset.filter_queryset(viewset.get_queryset()), user=request.user) else: # return ResourceBase.objects return get_visible_resources(ResourceBase.objects, request.user)
@classmethod
[docs] def _resolve_language(cls, request) -> (str, bool): """ :return: the resolved language, a boolean telling if the language was requested """ # first try with an explicit request using params if lang := request.GET.get(PARAM_LANG, None): return lang, True # 2nd try: use LANGUAGE_CODE try: return request.LANGUAGE_CODE.split("-")[0], False except AttributeError: return settings.LANGUAGE_CODE, False
@classmethod
[docs] def _resolve_boolean(cls, request, name, fallback=None): """ Parse boolean query params """ val = request.GET.get(name, None) if val is None: return fallback val = val.lower() if val.startswith("t") or val.startswith("y") or val == "1": return True elif val.startswith("f") or val.startswith("n") or val == "0": return False else: return fallback
[docs] class ListFacetsView(BaseFacetingView):
[docs] def get(self, request, *args, **kwargs): lang, lang_requested = self._resolve_language(request) add_links = self._resolve_boolean(request, PARAM_ADD_LINKS, False) include_topics = self._resolve_boolean(request, PARAM_INCLUDE_TOPICS, False) include_config = self._resolve_boolean(request, PARAM_INCLUDE_CONFIG, False) facets = [] prefiltered = None for provider in facet_registry.get_providers(): logger.debug("Fetching data from provider %r", provider) info = provider.get_info(lang=lang) if include_config: info["config"] = provider.config if add_links: link_args = {PARAM_ADD_LINKS: True} if lang_requested: # only add lang param if specified in current call link_args[PARAM_LANG] = lang info["link"] = f"{reverse('get_facet', args=[info['name']])}?{urlencode(link_args)}" if include_topics: prefiltered = prefiltered or self._prefilter_topics(request) info["topics"] = self._get_topics(provider, queryset=prefiltered, lang=lang) facets.append(info) logger.debug("Returning facets %r", facets) return JsonResponse({"facets": facets})
[docs] class GetFacetView(BaseFacetingView):
[docs] def get(self, request, facet): logger.debug("get_facet -> %r for user '%r'", facet, request.user.username) # retrieve provider for the requested facet provider: FacetProvider = facet_registry.get_provider(facet) if not provider: return HttpResponseNotFound() # parse some query params lang, lang_requested = self._resolve_language(request) add_link = self._resolve_boolean(request, PARAM_ADD_LINKS, False) include_config = self._resolve_boolean(request, PARAM_INCLUDE_CONFIG, False) topic_contains = request.GET.get(PARAM_TOPIC_CONTAINS, None) keys = set(request.query_params.getlist("key")) page = int(request.GET.get(PARAM_PAGE, 0)) page_size = int(request.GET.get(PARAM_PAGE_SIZE, DEFAULT_FACET_PAGE_SIZE)) info = provider.get_info(lang) if include_config: info["config"] = provider.config qs = self._prefilter_topics(request) topics = self._get_topics( provider, queryset=qs, page=page, page_size=page_size, lang=lang, topic_contains=topic_contains, keys=keys ) if add_link: exist_prev = page > 0 exist_next = topics["total"] > (page + 1) * page_size link = reverse("get_facet", args=[info["name"]]) for exist, link_name, p in ( (exist_prev, "prev", page - 1), (exist_next, "next", page + 1), ): link_param = {PARAM_PAGE: p, PARAM_PAGE_SIZE: page_size, PARAM_LANG: lang, PARAM_ADD_LINKS: True} if lang_requested: # only add lang param if specified in current call link_param[PARAM_LANG] = lang if topic_contains: link_param[PARAM_TOPIC_CONTAINS] = topic_contains info[link_name] = f"{link}?{urlencode(link_param)}" if exist else None if topic_contains: # in the payload let's rmb this is a filtered output info["topic_contains"] = topic_contains info["topics"] = topics return JsonResponse(info)