#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import os
import logging
from django.conf import settings
from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import render, redirect
from django.contrib.auth import get_user_model
from django.views.decorators.csrf import csrf_exempt
from pycsw import server
from guardian.shortcuts import get_objects_for_user
from geonode.catalogue.backends.pycsw_local import CONFIGURATION
from geonode.base.models import ResourceBase
from geonode.layers.models import Dataset
from geonode.base.auth import get_or_create_token
from geonode.base.models import SpatialRepresentationType
from geonode.groups.models import GroupProfile
from geonode.utils import resolve_object
from django.db import connection
from django.core.exceptions import ObjectDoesNotExist
from geonode.people import Roles
@csrf_exempt
[docs]
def csw_global_dispatch(request, dataset_filter=None, config_updater=None):
"""
pycsw wrapper
"""
# this view should only operate if pycsw_local is the backend
# else, redirect to the URL of the non-pycsw_local backend
if settings.CATALOGUE["default"]["ENGINE"] != "geonode.catalogue.backends.pycsw_local":
return HttpResponseRedirect(settings.CATALOGUE["default"]["URL"])
mdict = dict(settings.PYCSW["CONFIGURATION"], **CONFIGURATION)
mdict = config_updater(mdict) if config_updater else mdict
access_token = None
if request and request.user:
access_token = get_or_create_token(request.user)
if access_token and access_token.is_expired():
access_token = None
absolute_uri = f"{request.build_absolute_uri()}"
query_string = f"{request.META['QUERY_STRING']}"
env = request.META.copy()
if access_token and not access_token.is_expired():
env.update({"access_token": access_token.token})
if "access_token" not in query_string:
absolute_uri = f"{absolute_uri}&access_token={access_token.token}"
query_string = f"{query_string}&access_token={access_token.token}"
env.update({"local.app_root": os.path.dirname(__file__), "REQUEST_URI": absolute_uri, "QUERY_STRING": query_string})
# Save original filter before doing anything
mdict_filter = mdict["repository"]["filter"]
try:
# Filter out Layers not accessible to the User
authorized_ids = []
if request.user:
profiles = get_user_model().objects.filter(username=str(request.user))
else:
profiles = get_user_model().objects.filter(username="AnonymousUser")
if profiles:
authorized = list(get_objects_for_user(profiles[0], "base.view_resourcebase").values("id"))
layers = ResourceBase.objects.filter(id__in=[d["id"] for d in authorized])
if dataset_filter and layers:
layers = dataset_filter(layers)
if layers:
authorized_ids = [d.id for d in layers]
if len(authorized_ids) > 0:
authorized_datasets = f"({', '.join(str(e) for e in authorized_ids)})"
authorized_datasets_filter = f"id IN {authorized_datasets}"
mdict["repository"]["filter"] += f" AND {authorized_datasets_filter}"
if request.user and request.user.is_authenticated:
mdict["repository"]["filter"] = f"({mdict['repository']['filter']}) OR ({authorized_datasets_filter})"
else:
authorized_datasets_filter = "id = -9999"
mdict["repository"]["filter"] += f" AND {authorized_datasets_filter}"
# Filter out Documents and Maps
if "ALTERNATES_ONLY" in settings.CATALOGUE["default"] and settings.CATALOGUE["default"]["ALTERNATES_ONLY"]:
mdict["repository"]["filter"] += " AND alternate IS NOT NULL"
# Filter out Layers belonging to specific Groups
is_admin = False
if request.user:
is_admin = request.user.is_superuser if request.user else False
if not is_admin and settings.GROUP_PRIVATE_RESOURCES:
groups_ids = []
if request.user and request.user.is_authenticated:
for group in request.user.groups.all():
groups_ids.append(group.id)
group_list_all = []
try:
group_list_all = request.user.group_list_all().values("group")
except Exception:
pass
for group in group_list_all:
if isinstance(group, dict):
if "group" in group:
groups_ids.append(group["group"])
else:
groups_ids.append(group.id)
public_groups = GroupProfile.objects.exclude(access="private").values("group")
for group in public_groups:
if isinstance(group, dict):
if "group" in group:
groups_ids.append(group["group"])
else:
groups_ids.append(group.id)
if len(groups_ids) > 0:
groups = f"({', '.join(str(e) for e in groups_ids)})"
groups_filter = f"(group_id IS NULL OR group_id IN {groups})"
mdict["repository"]["filter"] += f" AND {groups_filter}"
else:
groups_filter = "group_id IS NULL"
mdict["repository"]["filter"] += f" AND {groups_filter}"
csw = server.Csw(mdict, env, version="2.0.2")
content = csw.dispatch_wsgi()
# pycsw 2.0 has an API break:
# - pycsw < 2.0: content = xml_response
# - pycsw >= 2.0: content = [http_status_code, content]
# deal with the API break
if isinstance(content, list): # pycsw 2.0+
content = content[1]
finally:
# Restore original filter before doing anything
mdict["repository"]["filter"] = mdict_filter
return HttpResponse(content, content_type=csw.contenttype)
@csrf_exempt
[docs]
def opensearch_dispatch(request):
"""
OpenSearch wrapper
"""
ctx = {
"shortname": settings.PYCSW["CONFIGURATION"]["metadata:main"]["identification_title"],
"description": settings.PYCSW["CONFIGURATION"]["metadata:main"]["identification_abstract"],
"developer": settings.PYCSW["CONFIGURATION"]["metadata:main"]["contact_name"],
"contact": settings.PYCSW["CONFIGURATION"]["metadata:main"]["contact_email"],
"attribution": settings.PYCSW["CONFIGURATION"]["metadata:main"]["provider_name"],
"tags": settings.PYCSW["CONFIGURATION"]["metadata:main"]["identification_keywords"].replace(",", " "),
"url": settings.SITEURL.rstrip("/") if settings.SITEURL.startswith("http") else settings.SITEURL,
}
return render(
request,
"catalogue/opensearch_description.xml",
context=ctx,
content_type="application/opensearchdescription+xml",
)
# transforms a row sql query into a two dimension array
[docs]
def dictfetchall(cursor):
"""
Generate all rows from a cursor as a dict
"""
for row in cursor.fetchall():
yield {col[0]: row for col in cursor.description}
# choose separators
[docs]
def get_CSV_spec_char():
return {"separator": ";", "carriage_return": "\r\n"}
# format value to unicode str without ';' char
[docs]
def fst(value):
chrs = get_CSV_spec_char()
result = str(value)
result = result.replace(chrs["separator"], ",").replace("\\n", " ").replace("\r\n", " ")
return result
# from a resource object, build the corresponding metadata dict
# the aim is to handle the output format (csv, html or pdf) the same structure
[docs]
def build_md_dict(resource):
md_dict = {
"r_uuid": {"label": "uuid", "value": resource.uuid},
"r_title": {"label": "titre", "value": resource.title},
}
return md_dict
[docs]
def get_keywords(resource):
content = " "
cursor = connection.cursor()
cursor.execute(
"SELECT a.*,b.* FROM taggit_taggeditem as a,taggit_tag" " as b WHERE a.object_id = %s AND a.tag_id=b.id",
[resource.id],
)
for x in dictfetchall(cursor):
content += f"{fst(x['name'])}, "
return content[:-2]
# from a resource uuid, return a httpResponse
# containing the whole geonode metadata
@csrf_exempt
[docs]
def resolve_uuid(request, uuid):
resource = resolve_object(request, ResourceBase, {"uuid": uuid})
return redirect(resource)