#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
from geonode.tests.base import GeoNodeBaseTestSupport
import os
import time
import base64
import pickle
import requests
from urllib.parse import urlencode, urlsplit
from urllib.request import (
urljoin,
urlopen,
build_opener,
install_opener,
HTTPCookieProcessor,
HTTPPasswordMgrWithDefaultRealm,
HTTPBasicAuthHandler,
)
from urllib.error import HTTPError, URLError
from urllib3.exceptions import ProtocolError
from requests.exceptions import ConnectionError
import logging
import contextlib
from io import IOBase
from bs4 import BeautifulSoup
from requests.packages.urllib3.util.retry import Retry
from requests_toolbelt.multipart.encoder import MultipartEncoder
from django.core import mail
from django.urls import reverse
from django.conf import settings
from django.contrib.auth import get_user_model
from django.test.client import Client as DjangoTestClient
from geonode.maps.models import Dataset
from geonode.notifications_helper import has_notifications, notifications
[docs]
logger = logging.getLogger(__name__)
[docs]
def upload_step(step=None):
step = urljoin(settings.SITEURL, reverse("data_upload", args=[step] if step else []))
return step
[docs]
class Client(DjangoTestClient):
"""client for making http requests"""
def __init__(self, url, user, passwd, *args, **kwargs):
super().__init__(*args)
[docs]
self.response_cookies = None
[docs]
self._session = requests.Session()
[docs]
self._retry = Retry(
total=3,
read=3,
connect=3,
backoff_factor=0.3,
status_forcelist=(104, 500, 502, 503, 504),
)
[docs]
self._adapter = requests.adapters.HTTPAdapter(max_retries=self._retry, pool_maxsize=10, pool_connections=10)
self._register_user()
[docs]
def _register_user(self):
u, _ = get_user_model().objects.get_or_create(username=self.user)
u.is_active = True
u.email = "admin@geonode.org"
u.set_password(self.passwd)
u.save()
[docs]
def make_request(self, path, data=None, ajax=False, debug=True, force_login=False):
url = path if path.startswith("http") else self.url + path
logger.error(f" make_request ----------> url: {url}")
if ajax:
url += f"{('&' if '?' in url else '?')}force_ajax=true"
self._session.headers["X_REQUESTED_WITH"] = "XMLHttpRequest"
cookie_value = self._session.cookies.get(settings.SESSION_COOKIE_NAME)
if force_login and cookie_value:
self.response_cookies += f"; {settings.SESSION_COOKIE_NAME}={cookie_value}"
if self.csrf_token:
self._session.headers["X-CSRFToken"] = self.csrf_token
if self.response_cookies:
self._session.headers["cookie"] = self.response_cookies
if data:
for name, value in data.items():
if isinstance(value, IOBase):
data[name] = (os.path.basename(value.name), value)
encoder = MultipartEncoder(fields=data)
self._session.headers["Content-Type"] = encoder.content_type
self._session.mount(f"{urlsplit(url).scheme}://", self._adapter)
self._session.verify = False
self._action = getattr(self._session, "post", None)
_retry = 0
_not_done = True
while _not_done and _retry < 3:
try:
response = self._action(
url=url, data=encoder, headers=self._session.headers, timeout=10, stream=False
)
_not_done = False
except (ProtocolError, ConnectionError, ConnectionResetError):
time.sleep(1.0)
_not_done = True
finally:
_retry += 1
else:
self._session.mount(f"{urlsplit(url).scheme}://", self._adapter)
self._session.verify = False
self._action = getattr(self._session, "get", None)
_retry = 0
_not_done = True
while _not_done and _retry < 3:
try:
response = self._action(url=url, data=None, headers=self._session.headers, timeout=10, stream=False)
_not_done = False
except (ProtocolError, ConnectionError, ConnectionResetError):
time.sleep(1.0)
_not_done = True
finally:
_retry += 1
try:
response.raise_for_status()
except requests.exceptions.HTTPError as ex:
message = ""
if hasattr(ex, "message"):
if debug:
logger.error(f"error in request to {path}")
logger.error(ex.message)
message = ex.message[ex.message.index(":") + 2 :]
else:
message = str(ex)
message = f"{message} (Content: {response.content.decode()}"
raise HTTPError(url, response.status_code, message, response.headers, None)
logger.error(f" make_request ----------> response: {response}")
return response
[docs]
def get(self, path, debug=True):
return self.make_request(path, debug=debug)
[docs]
def login(self):
"""Method to login the GeoNode site"""
from django.contrib.auth import authenticate
assert authenticate(username=self.user, password=self.passwd)
self.csrf_token = self.get_csrf_token()
params = {"csrfmiddlewaretoken": self.csrf_token, "login": self.user, "next": "/", "password": self.passwd}
response = self.make_request(urljoin(settings.SITEURL, reverse("account_login")), data=params)
self.csrf_token = self.get_csrf_token()
self.response_cookies = response.headers.get("Set-Cookie")
[docs]
def upload_file(self, _file, perms=None):
"""function that uploads a file, or a collection of files, to
the GeoNode"""
if not self.csrf_token:
self.login()
spatial_files = ("dbf_file", "shx_file", "prj_file")
base, ext = os.path.splitext(_file)
params = {
# make public if perms not provided since wms client doesn't do authentication
"permissions": perms or '{ "users": {"AnonymousUser": ["view_resourcebase"]} , "groups":{}}',
"csrfmiddlewaretoken": self.csrf_token,
"time": "true",
"charset": "UTF-8",
}
# deal with shapefiles
if ext.lower() == ".shp":
for spatial_file in spatial_files:
ext, _ = spatial_file.split("_")
file_path = f"{base}.{ext}"
# sometimes a shapefile is missing an extra file,
# allow for that
if os.path.exists(file_path):
params[spatial_file] = open(file_path, "rb")
with open(_file, "rb") as base_file:
params["base_file"] = base_file
resp = self.make_request(upload_step(), data=params, ajax=True, force_login=True)
# Closes the files
for spatial_file in spatial_files:
if isinstance(params.get(spatial_file), IOBase):
params[spatial_file].close()
try:
return resp, resp.json()
except ValueError:
logger.exception(ValueError(f"probably not json, status {resp.status_code}"))
return resp, resp.content
[docs]
def get_html(self, path, debug=True):
"""Method that make a get request and passes the results to bs4
Takes a path and returns a tuple
"""
resp = self.get(path, debug)
return resp, BeautifulSoup(resp.content, features="lxml")
[docs]
def get_json(self, path):
resp = self.get(path)
return resp, resp.json()
[docs]
def get_csrf_token(self, last=False):
"""Get a csrf_token from the home page or read from the cookie jar
based on the last response
"""
if not last:
self.get("/")
return self._session.cookies.get("csrftoken")
[docs]
def get_web_page(url, username=None, password=None, login_url=None):
"""Get url page possible with username and password."""
if login_url:
# Login via a form
cookies = HTTPCookieProcessor()
opener = build_opener(cookies)
install_opener(opener)
opener.open(login_url)
try:
token = [x.value for x in cookies.cookiejar if x.name == "csrftoken"][0]
except IndexError:
return False, "no csrftoken"
params = dict(
username=username,
password=password,
this_is_the_login_form=True,
csrfmiddlewaretoken=token,
)
encoded_params = urlencode(params)
with contextlib.closing(opener.open(login_url, encoded_params)) as f:
f.read()
elif username is not None:
# Login using basic auth
# Create password manager
passman = HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, url, username, password)
# create the handler
authhandler = HTTPBasicAuthHandler(passman)
opener = build_opener(authhandler)
install_opener(opener)
try:
pagehandle = urlopen(url)
except HTTPError as e:
msg = f"The server couldn't fulfill the request. Error code: {e.status_code}"
e.args = (msg,)
raise
except URLError as e:
msg = f'Could not open URL "{url}": {e}'
e.args = (msg,)
raise
else:
page = pagehandle.read()
return page
[docs]
def check_dataset(uploaded):
"""Verify if an object is a valid Dataset."""
msg = f"Was expecting dataset object, got {type(uploaded)}"
assert isinstance(uploaded, Dataset), msg
msg = f"The dataset does not have a valid name: {uploaded.name}"
assert len(uploaded.name) > 0, msg
if has_notifications:
from pinax.notifications.tests import get_backend_id
from pinax.notifications.engine import send_all
from pinax.notifications.models import NoticeQueueBatch
[docs]
class NotificationsTestsHelper(GeoNodeBaseTestSupport):
"""
Helper class for notification tests
This provides:
"""
[docs]
def setup_notifications_for(self, notifications_list, user):
notices = []
email_id = get_backend_id("email")
obc = notifications.models.NoticeSetting.objects.create
ont = notifications.models.NoticeType.objects.get
for name, label, desc in notifications_list:
n = obc(user=user, notice_type=ont(label=name), medium=email_id, send=True)
notices.append(n)
return notices
[docs]
def clear_notifications_queue(self):
send_all()
mail.outbox = []
[docs]
def check_notification_out(self, notification_name, user):
"""
Return True if user received notification
"""
# with queued notifications we can detect notification types easier
if settings.PINAX_NOTIFICATIONS_QUEUE_ALL:
self.assertTrue(NoticeQueueBatch.objects.all().exists())
user.noticesetting_set.get(notice_type__label=notification_name)
# we're looking for specific notification type/user combination, which is probably
# at the end
for queued_batch in NoticeQueueBatch.objects.all():
notices = pickle.loads(base64.b64decode(queued_batch.pickled_data))
for user_id, label, extra_context, sender in notices:
if label == notification_name and user_id == user.pk:
return True
# clear notifications queue
send_all()
return False
else:
send_all()
# empty outbox:/
if not mail.outbox:
return False
user.noticesetting_set.get(notice_type__label=notification_name)
# unfortunatelly we can't use description check in subject, because subject is
# generated from other template.
# and notification.notice_type.description in msg.subject
# last email should contain notification
for msg in mail.outbox:
if user.email in msg.to:
return True
return False