#########################################################################
#
# Copyright (C) 2023 Open Source Geospatial Foundation - all rights reserved
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import logging
import json
from tastypie.test import TestApiClient
from uuid import uuid4
from django.contrib.auth import get_user_model
from django.http import JsonResponse
from django.test import RequestFactory
from django.urls import reverse
from geonode.base.models import (
Thesaurus,
ThesaurusLabel,
ThesaurusKeyword,
ThesaurusKeywordLabel,
ResourceBase,
Region,
TopicCategory,
HierarchicalKeyword,
)
from geonode.facets.models import facet_registry
from geonode.facets.providers.baseinfo import FeaturedFacetProvider
from geonode.facets.providers.category import CategoryFacetProvider
from geonode.facets.providers.keyword import KeywordFacetProvider
from geonode.facets.providers.region import RegionFacetProvider
from geonode.facets.views import ListFacetsView, GetFacetView
from geonode.tests.base import GeoNodeBaseTestSupport
[docs]
logger = logging.getLogger(__name__)
[docs]
class TestFacets(GeoNodeBaseTestSupport):
@classmethod
[docs]
def setUpClass(cls):
super().setUpClass()
cls.user = get_user_model().objects.create(username="user_00")
cls.admin = get_user_model().objects.get(username="admin")
cls._create_thesauri()
cls._create_regions()
cls._create_categories()
cls._create_keywords()
cls._create_resources()
cls.rf = RequestFactory()
@classmethod
[docs]
def tearDownClass(cls):
super().tearDownClass()
# remove_models(cls.get_obj_ids, type=cls.get_type, integration=cls.get_integration)
[docs]
def setUp(self):
super().setUp()
self.api_client = TestApiClient()
self.assertEqual(self.admin.username, "admin")
self.assertEqual(self.admin.is_superuser, True)
@classmethod
[docs]
def _create_thesauri(cls):
cls.thesauri = {}
cls.thesauri_k = {}
for tn in range(2):
t = Thesaurus.objects.create(identifier=f"t_{tn}", title=f"Thesaurus {tn}", order=100 + tn * 10)
cls.thesauri[tn] = t
for tl in ( # fmt: skip
"en",
"it",
):
ThesaurusLabel.objects.create(thesaurus=t, lang=tl, label=f"TLabel {tn} {tl}")
for tkn in range(10):
tk = ThesaurusKeyword.objects.create(thesaurus=t, alt_label=f"T{tn}_K{tkn}_ALT")
cls.thesauri_k[f"{tn}_{tkn}"] = tk
for tkl in ( # fmt: skip
"en",
"it",
):
ThesaurusKeywordLabel.objects.create(keyword=tk, lang=tkl, label=f"T{tn}_K{tkn}_{tkl}")
@classmethod
[docs]
def _create_regions(cls):
cls.regions = {}
for code, name in ( # fmt: skip
("R0", "Region0"),
("R1", "Region1"),
("R2", "Region2"),
):
cls.regions[code] = Region.objects.create(code=code, name=name)
@classmethod
[docs]
def _create_categories(cls):
cls.cats = {}
for code, name in ( # fmt: skip
("C0", "Cat0"),
("C1", "Cat1"),
("C2", "Cat2"),
("C3", "Cat3"),
):
cls.cats[code] = TopicCategory.objects.create(identifier=code, description=name, gn_description=name)
@classmethod
[docs]
def _create_keywords(cls):
cls.kw = {}
for code, name in ( # fmt: skip
("K0", "Keyword0"),
("K1", "Keyword1"),
("K2", "Keyword2"),
("K3", "Keyword3"),
):
cls.kw[code] = HierarchicalKeyword.objects.create(slug=code, name=name)
@classmethod
[docs]
def _create_resources(self):
public_perm_spec = {"users": {"AnonymousUser": ["view_resourcebase"]}, "groups": []}
for x in range(20):
d: ResourceBase = ResourceBase.objects.create(
title=f"dataset_{x:02}",
uuid=str(uuid4()),
owner=self.user,
abstract=f"Abstract for dataset {x:02}",
subtype="vector",
is_approved=True,
is_published=True,
)
# These are the assigned keywords to the Resources
# RB00 -> T1K0 R0,R1 FEAT K0 C0
# RB01 -> T0K0 T1K0 R0 FEAT K1
# RB02 -> T1K0 R1 FEAT K2 C0
# RB03 -> T0K0 T1K0 K0
# RB04 -> T1K0 K0K1 C0
# RB05 -> T0K0 T1K0 K0 K2 C1
# RB06 -> T1K0 FEAT
# RB07 -> T0K0 T1K0 R2 FEAT K3 C3
# RB08 -> T1K0 T1K1 R1,R2 FEAT K3 C3
# RB09 -> T0K0 T1K0 T1K1 R2 K3 C3
# RB10 -> T1K1 R2 K3 C3
# RB11 -> T0K0 T0K1 T1K1
# RB12 -> T1K1 FEAT
# RB13 -> T0K0 T0K1 R1 FEAT
# RB14 -> FEAT
# RB15 -> T0K0 T0K1 C1
# RB16 -> C1
# RB17 -> T0K0 T0K1
# RB18 -> FEAT C2
# RB19 -> T0K0 T0K1 FEAT C2
if x % 2 == 1:
logger.debug(f"ADDING KEYWORDS {self.thesauri_k['0_0']} to RB {d}")
d.tkeywords.add(self.thesauri_k["0_0"])
if x % 2 == 1 and x > 10:
logger.debug(f"ADDING KEYWORDS {self.thesauri_k['0_1']} to RB {d}")
d.tkeywords.add(self.thesauri_k["0_1"])
if x < 10:
logger.debug(f"ADDING KEYWORDS {self.thesauri_k['1_0']} to RB {d}")
d.tkeywords.add(self.thesauri_k["1_0"])
if 7 < x < 13:
d.tkeywords.add(self.thesauri_k["1_1"])
if (x % 6) in (0, 1, 2):
d.featured = True
for reg, idx in ( # fmt: skip
("R0", (0, 1)),
("R1", (0, 2, 8, 13)),
("R2", (7, 8, 9, 10)),
):
if x in idx:
d.regions.add(self.regions[reg])
for kw, idx in ( # fmt: skip
("K0", (0, 3, 4, 5)),
("K1", [1, 4]),
("K2", [2, 5]),
("K3", [7, 8, 9, 10]),
):
if x in idx:
d.keywords.add(self.kw[kw])
for cat, idx in ( # fmt: skip
("C0", [0, 2, 4]),
("C1", [5, 15, 16]),
("C2", [18, 19]),
("C3", [7, 8, 9, 10]),
):
if x in idx:
d.category = self.cats[cat]
d.save()
d.set_permissions(public_perm_spec)
@staticmethod
[docs]
def _facets_to_map(facets):
return {f["name"]: f for f in facets}
[docs]
def test_facets_base(self):
req = self.rf.get(reverse("list_facets"), data={"lang": "en"})
res: JsonResponse = ListFacetsView.as_view()(req)
obj = json.loads(res.content)
self.assertIn("facets", obj)
facets_list = obj["facets"]
self.assertEqual(8, len(facets_list))
fmap = self._facets_to_map(facets_list)
for name in ("category", "owner", "t_0", "t_1", "featured", "resourcetype", "keyword"):
self.assertIn(name, fmap)
[docs]
def test_facets_rich(self):
# make sure the resources are in
c = ResourceBase.objects.count()
self.assertEqual(20, c)
# make sure tkeywords have been assigned by checking a sample resource
rb = ResourceBase.objects.get(title="dataset_01")
self.assertEqual(2, rb.tkeywords.count())
# run the request
req = self.rf.get(reverse("list_facets"), data={"include_topics": 1, "lang": "en"})
res: JsonResponse = ListFacetsView.as_view()(req)
obj = json.loads(res.content)
facets_list = obj["facets"]
self.assertEqual(8, len(facets_list))
fmap = self._facets_to_map(facets_list)
for expected in ( # fmt: skip
{
"name": "category",
"topics": {
"total": 4,
"items": [
{"label": "Cat0", "count": 3},
{"label": "Cat1", "count": 3},
{"label": "Cat3", "count": 4},
{"label": "Cat2", "count": 2},
],
},
},
{
"name": "keyword",
"topics": {
"total": 4,
"items": [
{"label": "Keyword0", "count": 4},
{"label": "Keyword1", "count": 2},
{"label": "Keyword2", "count": 2},
{"label": "Keyword3", "count": 4},
],
},
},
{
"name": "owner",
"topics": {
"total": 1,
},
},
{
"name": "t_0",
"topics": {
"total": 2,
"items": [
{"label": "T0_K0_en", "count": 10},
{"label": "T0_K1_en", "count": 5},
],
},
},
{
"name": "t_1",
"topics": {
"total": 2,
"items": [
{"label": "T1_K0_en", "count": 10},
],
},
},
{
"name": "region",
"topics": {
"total": 3,
"items": [
{"label": "Region0", "key": "R0", "count": 2},
{"label": "Region1", "key": "R1", "count": 4},
{"label": "Region2", "key": "R2", "count": 4},
],
},
},
{
"name": "featured",
"topics": {
"total": 2,
"items": [
{"label": "True", "key": True, "count": 11},
{"label": "False", "key": False, "count": 9},
],
},
},
{
"name": "resourcetype",
"topics": {
"total": 1,
"items": [
{"label": "resourcebase", "key": "resourcebase", "count": 20},
],
},
},
):
name = expected["name"]
self.assertIn(name, fmap)
facet = fmap[name]
expected_topics = expected["topics"]
for topic_key in expected_topics:
if topic_key != "items":
self.assertEqual(
expected_topics[topic_key], facet["topics"][topic_key], f"Mismatching '{topic_key}' for {name}"
)
else:
items = facet["topics"]["items"]
expected_items = expected_topics["items"]
for exp_item in expected_items:
exp_label = exp_item["label"]
found = None
for item in items:
if item["label"] == exp_label:
found = item
break
self.assertIsNotNone(
found, f"topic not found '{exp_label}' for facet '{name}' -- found items {items}"
)
for exp_field in exp_item:
self.assertEqual(
exp_item[exp_field], found[exp_field], f"Mismatch item key:{exp_field} facet:{name}"
)
[docs]
def test_bad_lang(self):
# for thesauri, make sure that by requesting a non-existent language the faceting is still working,
# using the default labels
# run the request with a valid language
req = self.rf.get(reverse("get_facet", args=["t_0"]), data={"lang": "en"})
res: JsonResponse = GetFacetView.as_view()(req, "t_0")
obj = json.loads(res.content)
self.assertEqual(2, obj["topics"]["total"])
self.assertEqual(10, obj["topics"]["items"][0]["count"])
self.assertEqual("T0_K0_en", obj["topics"]["items"][0]["label"])
self.assertTrue(obj["topics"]["items"][0]["is_localized"])
# run the request with an INVALID language
req = self.rf.get(reverse("get_facet", args=["t_0"]), data={"lang": "ZZ"})
res: JsonResponse = GetFacetView.as_view()(req, "t_0")
obj = json.loads(res.content)
self.assertEqual(2, obj["topics"]["total"])
self.assertEqual(10, obj["topics"]["items"][0]["count"]) # make sure the count is still there
self.assertEqual("T0_K0_ALT", obj["topics"]["items"][0]["label"]) # check for the alternate label
self.assertFalse(obj["topics"]["items"][0]["is_localized"]) # check for the localization flag
[docs]
def test_prefiltering(self):
reginfo = RegionFacetProvider().get_info()
regfilter = reginfo["filter"]
t0filter = facet_registry.get_provider("t_0").get_info()["filter"]
t1filter = facet_registry.get_provider("t_1").get_info()["filter"]
for facet, filters, totals, count0 in (
("t_0", {}, 2, 10),
("t_0", {regfilter: "R0"}, 1, 1),
("t_1", {}, 2, 10),
("t_1", {regfilter: "R0"}, 1, 2),
("t_1", {regfilter: "R1"}, 2, 3),
(reginfo["name"], {}, 3, 4),
(reginfo["name"], {t0filter: self.thesauri_k["0_0"].id}, 3, 2),
(reginfo["name"], {t1filter: self.thesauri_k["1_0"].id}, 3, 3),
):
req = self.rf.get(reverse("get_facet", args=[facet]), data=filters)
res: JsonResponse = GetFacetView.as_view()(req, facet)
obj = json.loads(res.content)
self.assertEqual(
totals,
obj["topics"]["total"],
f"Bad totals for facet '{facet} and filter {filters}\nRESPONSE: {obj}",
)
self.assertEqual(
count0, obj["topics"]["items"][0]["count"], f"Bad count0 for facet '{facet}\nRESPONSE: {obj}"
)
[docs]
def test_prefiltering_tkeywords(self):
regname = RegionFacetProvider().name
featname = FeaturedFacetProvider().name
t1filter = facet_registry.get_provider("t_1").get_info()["filter"]
tkey_1_1 = self.thesauri_k["1_1"].id
expected_region = {"R1": 1, "R2": 3}
expected_feat = {True: 2, False: 3}
# Run the single requests
for facet, params, items in (
(regname, {t1filter: tkey_1_1}, expected_region),
(featname, {t1filter: tkey_1_1}, expected_feat),
):
req = self.rf.get(reverse("get_facet", args=[facet]), data=params)
res: JsonResponse = GetFacetView.as_view()(req, facet)
obj = json.loads(res.content)
self.assertEqual(
len(items),
len(obj["topics"]["items"]),
f"Bad count for items '{facet} \n PARAMS: {params} \n RESULT: {obj} \n EXPECTED: {items}",
)
# search item
for item in items.keys():
found = next((i for i in obj["topics"]["items"] if i["key"] == item), None)
self.assertIsNotNone(found, f"Topic '{item}' not found in facet {facet} -- {obj}")
self.assertEqual(items[item], found.get("count", None), f"Bad count for facet '{facet}:{item}")
# Run the single request
req = self.rf.get(reverse("list_facets"), data={"include_topics": 1, t1filter: tkey_1_1})
res: JsonResponse = ListFacetsView.as_view()(req)
obj = json.loads(res.content)
facets_list = obj["facets"]
fmap = self._facets_to_map(facets_list)
for name, items in ( # fmt: skip
(regname, expected_region),
(featname, expected_feat),
):
self.assertIn(name, fmap)
facet = fmap[name]
for item in items.keys():
found = next((i for i in facet["topics"]["items"] if i["key"] == item), None)
self.assertIsNotNone(found, f"Topic '{item}' not found in facet {facet} -- {facet}")
self.assertEqual(items[item], found.get("count", None), f"Bad count for facet '{facet}:{item}")
[docs]
def test_config(self):
for facet, type, order in (
("resourcetype", None, None),
("t_0", "select", 100),
("category", "select", 5),
("region", "select", 7),
("owner", "select", 8),
):
req = self.rf.get(reverse("get_facet", args=[facet]), data={"include_config": True})
res: JsonResponse = GetFacetView.as_view()(req, facet)
obj = json.loads(res.content)
self.assertIn("config", obj, "Config info not found in payload")
conf = obj["config"]
if type is None:
self.assertNotIn("type", conf)
else:
self.assertEqual(type, conf["type"], "Unexpected type")
if order is None:
self.assertNotIn("order", conf)
else:
self.assertEqual(order, conf["order"], "Unexpected order")
[docs]
def test_count0(self):
reginfo = RegionFacetProvider().get_info()
regflt = reginfo["filter"]
regname = reginfo["name"]
catinfo = CategoryFacetProvider().get_info()
catflt = catinfo["filter"]
catname = catinfo["name"]
kwinfo = KeywordFacetProvider().get_info()
kwflt = kwinfo["filter"]
kwname = kwinfo["name"]
t0flt = facet_registry.get_provider("t_0").get_info()["filter"]
t1flt = facet_registry.get_provider("t_1").get_info()["filter"]
def t(tk):
return self.thesauri_k[tk].id
for facet, params, items in ( # fmt: skip
# thesauri
("t_1", {regflt: "R0"}, {t("1_0"): 2}),
("t_1", {regflt: "R0", "key": [t("1_0")]}, {t("1_0"): 2}),
("t_1", {regflt: "R0", t0flt: t("0_1")}, {}),
("t_1", {regflt: "R0", t0flt: t("0_1"), "key": [t("1_0")]}, {t("1_0"): None}),
(
"t_1",
{regflt: "R0", t0flt: t("0_1"), "key": [t("1_1"), t("1_0")]},
{t("1_0"): None, t("1_1"): None},
),
("t_1", {"key": [t("0_1")]}, {}),
("t_1", {t0flt: t("0_0")}, {t("1_0"): 5, t("1_1"): 2}),
("t_1", {t0flt: t("0_1")}, {t("1_1"): 1}),
("t_1", {t0flt: [t("0_1"), t("0_0")]}, {t("1_0"): 5, t("1_1"): 2}),
("t_1", {catflt: ["C0"]}, {t("1_0"): 3}),
("t_1", {catflt: ["C0", "C1"]}, {t("1_0"): 4}),
# regions
(regname, {t1flt: t("1_0")}, {"R0": 2, "R1": 3, "R2": 3}),
(regname, {t1flt: t("1_1")}, {"R1": 1, "R2": 3}),
(regname, {t1flt: [t("1_1"), t("1_0")]}, {"R0": 2, "R1": 3, "R2": 4}),
(regname, {t1flt: t("1_1"), "key": ["R0", "R1"]}, {"R1": 1, "R0": None}),
(regname, {t1flt: t("1_1"), "key": ["R0"]}, {"R0": None}),
# category
(catname, {t1flt: t("1_0")}, {"C0": 3, "C1": 1, "C3": 3}),
(catname, {t1flt: t("1_0"), "key": ["C0", "C2"]}, {"C0": 3, "C2": None}),
(catname, {t1flt: [t("1_0"), t("1_1")]}, {"C0": 3, "C1": 1, "C3": 4}),
(catname, {kwflt: "K1"}, {"C0": 1}),
(catname, {kwflt: "K1", "key": ["C0", "C2"]}, {"C0": 1, "C2": None}),
# keyword
(kwname, {t0flt: t("0_0")}, {"K0": 2, "K1": 1, "K2": 1, "K3": 2}),
(kwname, {t0flt: t("1_0")}, {"K0": 4, "K1": 2, "K2": 2, "K3": 3}),
(kwname, {t0flt: [t("1_0"), t("1_1")]}, {"K0": 4, "K1": 2, "K2": 2, "K3": 4}),
(kwname, {t0flt: t("0_0"), regflt: "R0"}, {"K1": 1}),
(kwname, {t0flt: t("0_0"), regflt: "R0", "key": ["K0"]}, {"K0": None}),
):
req = self.rf.get(reverse("get_facet", args=[facet]), data=params)
req.user = self.admin
res: JsonResponse = GetFacetView.as_view()(req, facet)
obj = json.loads(res.content)
# self.assertEqual(totals, obj["topics"]["total"], f"Bad totals for facet '{facet} and params {params}")
self.assertEqual(
len(items),
len(obj["topics"]["items"]),
f"Bad count for items '{facet} \n PARAMS: {params} \n RESULT: {obj} \n EXPECTED: {items}",
)
# search item
for item in items.keys():
found = next((i for i in obj["topics"]["items"] if i["key"] == item), None)
self.assertIsNotNone(found, f"Topic '{item}' not found in facet {facet} -- {obj}")
self.assertEqual(items[item], found.get("count", None), f"Bad count for facet '{facet}:{item}")
[docs]
def test_user_auth(self):
# make sure the user authorization pre-filters the visible resources
# TODO test
pass
[docs]
def test_thesauri_reloading(self):
# Thesauri facets are cached.
# Make sure that when Thesauri or ThesauriLabel change the facets cache is invalidated
# TODO impl+test
pass