Source code for geonode.base.management.commands.load_thesaurus

#########################################################################
#
# Copyright (C) 2016 OSGeo
# Copyright (C) 2022 King's College London
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

from typing import List

from django.conf import settings
from django.core.files.uploadedfile import UploadedFile
from django.core.management.base import BaseCommand, CommandError
from rdflib import Graph, Literal
from rdflib.namespace import RDF, RDFS, SKOS, DC, DCTERMS
from rdflib.util import guess_format

from geonode.base.models import Thesaurus, ThesaurusKeyword, ThesaurusKeywordLabel, ThesaurusLabel


[docs] class Command(BaseCommand):
[docs] help = "Load a thesaurus in RDF format into DB"
[docs] def add_arguments(self, parser): # Named (optional) arguments parser.add_argument( "-d", "--dry-run", action="store_true", dest="dryrun", default=False, help="Only parse and print the thesaurus file, without perform insertion in the DB.", ) parser.add_argument("--name", dest="name", help="Identifier name for the thesaurus in this GeoNode instance.") parser.add_argument("--file", dest="file", help="Full path to a thesaurus in RDF format.")
[docs] def handle(self, **options): input_file = options.get("file") name = options.get("name") dryrun = options.get("dryrun") if not input_file: raise CommandError("Missing thesaurus rdf file path (--file)") if not name: raise CommandError("Missing identifier name for the thesaurus (--name)") if name.startswith("fake"): self.create_fake_thesaurus(name) else: self.load_thesaurus(input_file, name, not dryrun)
[docs] def load_thesaurus(self, input_file, name, store): g = Graph() # if the input_file is an UploadedFile object rather than a file path the Graph.parse() # method may not have enough info to correctly guess the type; in this case supply the # name, which should include the extension, to guess_format manually... rdf_format = None if isinstance(input_file, UploadedFile): self.stderr.write(self.style.WARNING(f"Guessing RDF format from {input_file.name}...")) rdf_format = guess_format(input_file.name) g.parse(input_file, format=rdf_format) # An error will be thrown here there is more than one scheme in the file scheme = g.value(None, RDF.type, SKOS.ConceptScheme, any=False) if scheme is None: raise CommandError("ConceptScheme not found in file") default_lang = getattr(settings, "THESAURUS_DEFAULT_LANG", None) available_titles = [t for t in g.objects(scheme, DC.title) if isinstance(t, Literal)] thesaurus_title = value_for_language(available_titles, default_lang) description = g.value(scheme, DC.description, None, default=thesaurus_title) date_issued = g.value(scheme, DCTERMS.issued, None, default="") self.stderr.write( self.style.SUCCESS(f'Thesaurus "{thesaurus_title}", desc: {description} issued at {date_issued}') ) thesaurus = Thesaurus() thesaurus.identifier = name thesaurus.description = description thesaurus.title = thesaurus_title thesaurus.about = str(scheme) thesaurus.date = date_issued if store: thesaurus.save() for lang in available_titles: if lang.language is not None: thesaurus_label = ThesaurusLabel() thesaurus_label.lang = lang.language thesaurus_label.label = lang.value thesaurus_label.thesaurus = thesaurus if store: thesaurus_label.save() for concept in g.subjects(RDF.type, SKOS.Concept): pref = preferredLabel(g, concept, default_lang)[0][1] about = str(concept) alt_label = g.value(concept, SKOS.altLabel, object=None, default=None) if alt_label is not None: alt_label = str(alt_label) else: available_labels = [t for t in g.objects(concept, SKOS.prefLabel) if isinstance(t, Literal)] alt_label = value_for_language(available_labels, default_lang) self.stderr.write(self.style.SUCCESS(f"Concept {str(pref)}: {alt_label} ({about})")) tk = ThesaurusKeyword() tk.thesaurus = thesaurus tk.about = about tk.alt_label = alt_label if store: tk.save() for _, pref_label in preferredLabel(g, concept): lang = pref_label.language label = str(pref_label) self.stderr.write(self.style.SUCCESS(f" Label {lang}: {label}")) tkl = ThesaurusKeywordLabel() tkl.keyword = tk tkl.lang = lang tkl.label = label if store: tkl.save()
[docs] def create_fake_thesaurus(self, name): thesaurus = Thesaurus() thesaurus.identifier = name thesaurus.title = f"Title: {name}" thesaurus.description = "SAMPLE FAKE THESAURUS USED FOR TESTING" thesaurus.date = "2016-10-01" thesaurus.save() for keyword in ["aaa", "bbb", "ccc"]: tk = ThesaurusKeyword() tk.thesaurus = thesaurus tk.about = f"{keyword}_about" tk.alt_label = f"{keyword}_alt" tk.save() for _l in ["it", "en", "es"]: tkl = ThesaurusKeywordLabel() tkl.keyword = tk tkl.lang = _l tkl.label = f"{keyword}_l_{_l}_t_{name}" tkl.save()
[docs] def value_for_language(available: List[Literal], default_lang: str) -> str: sorted_lang = sorted(available, key=lambda literal: "" if literal.language is None else literal.language) for item in sorted_lang: if item.language is None: return str(item) elif item.language.split("-")[0] == default_lang: return str(item) return str(available[0])
[docs] def preferredLabel( g, subject, lang=None, default=None, label_properties=(SKOS.prefLabel, RDFS.label), ): """ Find the preferred label for subject. By default prefers skos:prefLabels over rdfs:labels. In case at least one prefLabel is found returns those, else returns labels. In case a language string (e.g., "en", "de" or even "" for no lang-tagged literals) is given, only such labels will be considered. Return a list of (labelProp, label) pairs, where labelProp is either skos:prefLabel or rdfs:label. Copied from rdflib 6.1.1 """ if default is None: default = [] # setup the language filtering if lang is not None: if lang == "": # we only want not language-tagged literals def langfilter(l_): return l_.language is None else: def langfilter(l_): return l_.language == lang else: # we don't care about language tags def langfilter(l_): return True for labelProp in label_properties: labels = list(filter(langfilter, g.objects(subject, labelProp))) if len(labels) == 0: continue else: return [(labelProp, l_) for l_ in labels] return default