Source code for geonode.thumbs.background

#########################################################################
#
# Copyright (C) 2021 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import time
import ast
import typing
import logging
import math
import mercantile
import requests

from io import BytesIO
from pyproj import Transformer
from abc import ABC, abstractmethod
from math import ceil, floor, copysign
from PIL import Image, UnidentifiedImageError
from owslib.wmts import WebMapTileService

from django.conf import settings
from django.utils.html import strip_tags

from geonode.thumbs import utils
from geonode.utils import http_client
from geonode.thumbs.exceptions import ThumbnailError

[docs] logger = logging.getLogger(__name__)
[docs] class BaseThumbBackground(ABC): def __init__(self, thumbnail_width: int, thumbnail_height: int, max_retries: int = 3, retry_delay: int = 1): """ Base class for thumbnails background retrieval. :param thumbnail_width: target width of the background image in pixels :param thumbnail_height: target height of the background image in pixels :param max_retries: maximum number of retrieval retries before raising an exception :param retry_delay: number of seconds waited between consecutive retrieval retries """
[docs] self.thumbnail_width = thumbnail_width
[docs] self.thumbnail_height = thumbnail_height
[docs] self.max_retries = max_retries
[docs] self.retry_delay = retry_delay
@abstractmethod
[docs] def fetch(self, bbox: typing.List, *args, **kwargs) -> typing.Optional[Image.Image]: """ Function fetching background image, based on the given BBOX. On error should raise an exception or return None. :param bbox: a dataset compliant BBOX: [west, east, south, north, CRS] """ pass
[docs] class GenericWMSBackground(BaseThumbBackground): def __init__( self, thumbnail_width: int, thumbnail_height: int, max_retries: int = 3, retry_delay: int = 1, ): """ Generic WMS background generation class. Initialization options (valid in settings.THUMBNAIL_BACKGROUND['options']): :key service_url: dataset's provider (OGC server's location) :key dataset_name: name of a dataset to be used as the background :key format: retrieve image's format/mime-type (defautl 'image/png') :key version: WMS service version (default '1.3.0') :key styles: dataset's style (default None) :key srid: CRS which an image should be retrieved in (default 'EPSG:3857') """ super().__init__(thumbnail_width, thumbnail_height, max_retries, retry_delay)
[docs] options = settings.THUMBNAIL_BACKGROUND.get("options", {})
# WMS specific attributes (to be overwritten in specific background classes)
[docs] service_url = options.get("service_url", None)
self.service_url = f"{service_url}/" if service_url and not service_url.endswith("/") else service_url
[docs] self.dataset_name = options.get("dataset_name", None)
[docs] self.format = options.get("format", "image/png")
[docs] self.version = options.get("version", "1.3.0")
[docs] self.styles = options.get("styles", None)
[docs] srid = options.get("srid", "EPSG:3857")
self.srid = srid if "EPSG:" in srid else f"EPSG:{srid}" # ---
[docs] def bbox_to_projection(self, bbox: typing.List): """ Function converting BBOX to target projection system, keeping the order of the coordinates. To ensure no additional change is performed, conversion is based on top-left and bottom-right points conversion. :param bbox: a dataset compliant BBOX: [west, east, south, north, CRS] """ transformer = Transformer.from_crs(bbox[-1].lower(), self.srid.lower(), always_xy=True) left, top = transformer.transform(bbox[0], bbox[3]) right, bottom = transformer.transform(bbox[1], bbox[2]) return [left, right, bottom, top]
[docs] def fetch(self, bbox: typing.List, *args, **kwargs): """ Function fetching background image, based on the given BBOX. On error should raise an exception or return None. :param bbox: a dataset compliant BBOX: [west, east, south, north, CRS] :param *args*: not used, kept for API compatibility :param **kargs**: not used, kept for API compatibility """ if not self.service_url or not self.dataset_name: logger.error("Thumbnail background configured improperly: service URL and dataset name may not be empty") return background = Image.new("RGB", (self.thumbnail_width, self.thumbnail_height), (250, 250, 250)) img = utils.get_map( self.service_url, [self.dataset_name], self.bbox_to_projection(bbox) + [self.srid], wms_version=self.version, mime_type=self.format, styles=self.styles, width=self.thumbnail_width, height=self.thumbnail_height, max_retries=self.max_retries, retry_delay=self.retry_delay, ) try: content = BytesIO(img) with Image.open(content) as image: image.verify() # verify that it is, in fact an image image = Image.open(content) # "re-open" the file (required after running verify method) background.paste(image) except UnidentifiedImageError as e: logger.error(f"Thumbnail generation. Error occurred while fetching background image: {e}") raise e except Exception as e: logger.error(f"Thumbnail generation. Error occurred while fetching background image: {e}") logger.exception(e) return background
[docs] class GenericXYZBackground(BaseThumbBackground): def __init__( self, thumbnail_width: int, thumbnail_height: int, max_retries: int = 3, retry_delay: int = 1, ): """ Generic Slippy Maps background generation class for services EPSG:3857 compliant. Initialization options (valid in settings.THUMBNAIL_BACKGROUND['options']): :key url: XYZ url template with '{x}', '{y}' and '{z}' placeholders for x, y coordinates and zoom respectively :key tile_size: tile size in pixels (default 256) """ super().__init__(thumbnail_width, thumbnail_height, max_retries, retry_delay)
[docs] options = settings.THUMBNAIL_BACKGROUND.get("options", {})
# Slippy Maps specific attributes (to be overwritten in specific background classes)
[docs] self.url = options.get("url", None)
[docs] self.tile_size = options.get("tile_size", 256)
[docs] self.tms = False
try: self.tms = ast.literal_eval(str(options.get("tms"))) except Exception: pass # --- # class's internal attributes
[docs] self.crs = "EPSG:3857"
[docs] self._epsg3857_max_x = 20026376.39
[docs] self._epsg3857_max_y = 20048966.10
[docs] self._mercantile_bbox = None # BBOX compliant with mercantile lib: [west, south, east, north] bounds list
[docs] def point3857to4326(self, x, y): transformer = Transformer.from_crs("EPSG:3857", "EPSG:4326", always_xy=True) return transformer.transform(x, y)
[docs] def point4326to3857(self, x, y): transformer = Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True) return transformer.transform(x, y)
[docs] def bbox3857to4326(self, x_min, x_max, y_min, y_max): """ Function converting BBOX from EPSG:3857 to EPSG:4326, keeping the order of the coordinates. To ensure no additional change is performed, conversion is based on top-left and bottom-right points conversion. """ left, top = self.point3857to4326(x_min, y_max) right, bottom = self.point3857to4326(x_max, y_min) return [left, right, bottom, top]
[docs] def bbox4326to3857(self, x_min, x_max, y_min, y_max): """ Function converting BBOX from EPSG:4326 to EPSG:3857, keeping the order of the coordinates. To ensure no additional change is performed, conversion is based on top-left and bottom-right points conversion. """ left, top = self.point4326to3857(x_min, y_max) right, bottom = self.point4326to3857(x_max, y_min) return [left, right, bottom, top]
[docs] def fetch(self, bbox: typing.List, zoom: int = None, *args, **kwargs): """ The function fetching tiles from a Slippy Map provider, composing them into a single image, and cropping it to match the given BBOX. Retrieval of each tile is repeated self.max_retries times, waiting self.retry_delay seconds between consecutive requests. :param bbox: bounding box of the background image, dataset compliant format: [west, east, south, north, CRS] :param zoom: zoom with which to retrieve Slippy Map's tiles (by default, it's calculated based on width, height) :return: None if the CRS is different from self.tiles_crs, or background Image """ if not self.url: logger.error("Thumbnail background requires url to be configured.") raise ThumbnailError("Tiled background improperly configured.") if bbox[-1].lower() != self.crs.lower(): # background service is not available the requested CRS CRS logger.debug( f"Thumbnail background generation skipped. " f"Clashing CRSs: requested {bbox[-1]}, supported {self.crs}" ) return bbox = [float(coord) for coord in bbox[0:4]] # check if BBOX fits within the EPSG:3857 map, if not - return an empty background if bbox[2] > self._epsg3857_max_y or bbox[3] < -self._epsg3857_max_y: return Image.new("RGB", (self.thumbnail_width, self.thumbnail_height), (250, 250, 250)) bbox4326 = self.bbox3857to4326(*bbox) # change bbox from dataset (left, right, bottom, top) to mercantile (left, bottom, right, top) self._mercantile_bbox = [bbox4326[0], bbox4326[2], bbox4326[1], bbox4326[3]] # calculate zoom level if zoom is None: zoom = self.calculate_zoom() else: zoom = int(zoom) top_left_tile = mercantile.tile(bbox4326[0], bbox4326[3], zoom) bottom_right_tile = mercantile.tile(bbox4326[1], bbox4326[2], zoom) # rescaling factors - indicators of how west and east BBOX boundaries are offset in respect to the world's map; # east and west boundaries may exceed the maximum coordinate of the world in EPSG:3857. In such case additinal # number of tiles need to be fetched to compose the image and the boundary tiles' coordinates need to be # rescaled to ensure the proper image cropping. epsg3857_world_width = 2 * self._epsg3857_max_x west_rescaling_factor = 0 if abs(bbox[0]) > self._epsg3857_max_x: west_rescaling_factor = ceil((abs(bbox[0]) - self._epsg3857_max_x) / epsg3857_world_width) * copysign( 1, bbox[0] ) east_rescaling_factor = 0 if abs(bbox[1]) > self._epsg3857_max_x: east_rescaling_factor = ceil((abs(bbox[1]) - self._epsg3857_max_x) / epsg3857_world_width) * copysign( 1, bbox[1] ) map_row_tiles = 2**zoom - 1 # number of tiles in the Map's row for a certain zoom level map_worlds = int(east_rescaling_factor - west_rescaling_factor) # number maps in an image worlds_between = map_worlds - 1 # number of full maps in an image if top_left_tile.x > bottom_right_tile.x or bbox[1] - bbox[0] > epsg3857_world_width or map_worlds > 0: # BBOX crosses Slippy Map's border if worlds_between > 0: tiles_rows = ( list(range(top_left_tile.x, map_row_tiles + 1)) + worlds_between * list(range(map_row_tiles + 1)) + list(range(bottom_right_tile.x + 1)) ) else: tiles_rows = list(range(top_left_tile.x, map_row_tiles + 1)) + list(range(bottom_right_tile.x + 1)) else: # BBOx is contained by the Slippy Map if worlds_between > 0: tiles_rows = list(range(top_left_tile.x, bottom_right_tile.x + 1)) + worlds_between * list( range(map_row_tiles + 1) ) else: tiles_rows = list(range(top_left_tile.x, bottom_right_tile.x + 1)) tiles_cols = list(range(top_left_tile.y, bottom_right_tile.y + 1)) # if latitude boundaries extend world's height - add background's height, and set constant Y offset for tiles additional_height = 0 fixed_top_offset = 0 fixed_bottom_offset = 0 north_extension3857 = max(0, bbox[3] - self._epsg3857_max_y) south_extension3857 = abs(min(0, bbox[2] + self._epsg3857_max_y)) extension3857 = north_extension3857 + south_extension3857 if extension3857: # get single tile's height in ESPG:3857 tile_bounds = mercantile.bounds(tiles_rows[0], tiles_cols[0], zoom) _, south = self.point4326to3857(getattr(tile_bounds, "west"), getattr(tile_bounds, "south")) _, north = self.point4326to3857(getattr(tile_bounds, "west"), getattr(tile_bounds, "north")) tile_hight3857 = north - south additional_height = round(self.tile_size * extension3857 / tile_hight3857) # based on linear proportion if north_extension3857: fixed_top_offset = round(self.tile_size * north_extension3857 / tile_hight3857) if south_extension3857: fixed_bottom_offset = round(self.tile_size * south_extension3857 / tile_hight3857) background = Image.new( "RGB", (len(tiles_rows) * self.tile_size, len(tiles_cols) * self.tile_size + additional_height), (250, 250, 250), ) for offset_x, x in enumerate(tiles_rows): for offset_y, y in enumerate(tiles_cols): if self.tms: y = (2**zoom) - y - 1 imgurl = self.url.format(x=x, y=y, z=zoom) im = None for retries in range(self.max_retries): try: resp, content = http_client.request(imgurl) if resp.status_code > 400: retries = self.max_retries - 1 raise Exception(f"{strip_tags(content)}") im = BytesIO(content) Image.open(im).verify() # verify that it is, in fact an image break except Exception as e: logger.error(f"Thumbnail background fetching from {imgurl} failed {retries} time(s) with: {e}") if retries + 1 == self.max_retries: raise e time.sleep(self.retry_delay) continue if im: image = Image.open(im) # "re-open" the file (required after running verify method) # add the fetched tile to the background image, placing it under proper coordinates background.paste(image, (offset_x * self.tile_size, offset_y * self.tile_size + fixed_top_offset)) # get BBOX of the tiles top_left_bounds = mercantile.bounds(top_left_tile) bottom_right_bounds = mercantile.bounds(bottom_right_tile) tiles_bbox3857 = self.bbox4326to3857( getattr(top_left_bounds, "west"), getattr(bottom_right_bounds, "east"), getattr(bottom_right_bounds, "south"), getattr(top_left_bounds, "north"), ) # rescale tiles' boundaries - if space covered by the input BBOX extends the width of the world, # (e.g. two "worlds" are present on the map), translation between tiles' BBOX and image's pixel requires # additional rescaling, for tiles' BBOX coordinates to match input BBOX coordinates west_coord = tiles_bbox3857[0] + west_rescaling_factor * epsg3857_world_width east_coord = tiles_bbox3857[1] + east_rescaling_factor * epsg3857_world_width # prepare translating function from received BBOX to pixel values of the background image src_quad = (0, fixed_top_offset, background.size[0], background.size[1] - fixed_bottom_offset) to_src_px = utils.make_bbox_to_pixels_transf( [west_coord, tiles_bbox3857[2], east_coord, tiles_bbox3857[3]], src_quad ) # translate received BBOX to pixel values minx, miny = to_src_px(bbox[0], bbox[2]) maxx, maxy = to_src_px(bbox[1], bbox[3]) # max and min function for Y axis were introduced to mitigate rounding errors crop_box = ( ceil(minx), max(ceil(maxy) + fixed_top_offset, 0), floor(maxx), min(floor(miny) + fixed_top_offset, background.size[1]), ) if not all([0 <= crop_x <= background.size[0] for crop_x in [crop_box[0], crop_box[2]]]): raise ThumbnailError(f"Tiled background cropping error. Boundaries outside of the image: {crop_box}") # crop background image to the desired bbox and resize it background = background.crop(box=crop_box) background = background.resize((self.thumbnail_width, self.thumbnail_height)) if sum(background.convert("L").getextrema()) in (0, 2): # either all black or all white logger.error("Thumbnail background outside the allowed area.") raise ThumbnailError("Thumbnail background outside the allowed area.") return background
[docs] def calculate_zoom(self): # maximum number of needed tiles for thumbnail of given width and height max_tiles = (ceil(self.thumbnail_width / self.tile_size) + 1) * ( ceil(self.thumbnail_height / self.tile_size) + 1 ) # zoom for which there are less needed tiles than max_tiles zoom = 0 for z in range(1, 16): if len(list(mercantile.tiles(*self._mercantile_bbox, z))) > max_tiles: break else: zoom = max(zoom, z) return zoom
[docs] class WikiMediaTileBackground(GenericXYZBackground): def __init__( self, thumbnail_width: int, thumbnail_height: int, max_retries: int = 3, retry_delay: int = 1, ): """ Specific Wikimedia background generation class for thumbnails. """ super().__init__(thumbnail_width, thumbnail_height, max_retries, retry_delay)
[docs] self.url = "https://maps.wikimedia.org/osm-intl/{z}/{x}/{y}.png"
[docs] self.tile_size = 256
[docs] class OSMTileBackground(GenericXYZBackground): def __init__( self, thumbnail_width: int, thumbnail_height: int, max_retries: int = 3, retry_delay: int = 1, ): """ Specific OpenStreetMaps background generation class for thumbnails. """ super().__init__(thumbnail_width, thumbnail_height, max_retries, retry_delay)
[docs] self.url = "https://tile.openstreetmap.org/{z}/{x}/{y}.png"
[docs] self.tile_size = 256
[docs] WMTS_TILEMATRIXSET_LEVELS = None
[docs] class GenericWMTSBackground(BaseThumbBackground): def __init__(self, thumbnail_width: int, thumbnail_height: int, max_retries: int = 3, retry_delay: int = 1): super().__init__(thumbnail_width, thumbnail_height, max_retries, retry_delay)
[docs] self.options = settings.THUMBNAIL_BACKGROUND.get("options", {})
[docs] self.levels = self.get_levels_for_tilematrix()
[docs] self.thumbnail_width = thumbnail_width
[docs] self.thumbnail_height = thumbnail_height
[docs] def fetch(self, bbox: typing.List, *args, **kwargs): bbox = [bbox[0], bbox[2], bbox[1], bbox[3]] target_pixelspan = self.get_target_pixelspan(bbox) level = self.get_level_for_targetpixelspan(target_pixelspan) tilewidth = level["tilewidth"] tileheight = level["tileheight"] zoom = level["zoom"] pixelspan = level["pixelspan"] tilespanx = level["tilespanx"] tilespany = level["tilespany"] pixelspan_ratio = level["pixelspan"] / target_pixelspan tile_rowcols = self.get_tiles_coords(level, bbox) tiles_cols_list = set([tile_rowcol[0] for tile_rowcol in tile_rowcols]) tiles_mincol = min(tiles_cols_list) tiles_maxcol = max(tiles_cols_list) tiles_minx = level["bounds"][0] + (tiles_mincol * tilespanx) tiles_rows_list = set([tile_rowcol[1] for tile_rowcol in tile_rowcols]) tiles_minrow = min(tiles_rows_list) tiles_maxrow = max(tiles_rows_list) tiles_maxy = level["bounds"][3] - (tiles_minrow * tilespany) tiles_width = (tiles_maxcol - tiles_mincol + 1) * tilewidth tiles_height = (tiles_maxrow - tiles_minrow + 1) * tileheight background = Image.new("RGB", (tiles_width, tiles_height), (250, 250, 250)) for tile_coord in tile_rowcols: try: im = None imgurl = self.build_request([tile_coord[0], tile_coord[1], zoom]) resp = requests.get(imgurl) if resp.status_code > 400: raise Exception(f"{strip_tags(resp.content)}") im = BytesIO(resp.content) Image.open(im).verify() if im: offsetx = (tile_coord[0] - tiles_mincol) * tilewidth offsety = (tile_coord[1] - tiles_minrow) * tileheight image = Image.open(im) background.paste(image, (offsetx, offsety)) except Exception as e: logger.error(f"Error fetching {imgurl} for thumbnail: {e}") left = abs(tiles_minx - bbox[0]) / pixelspan right = left + self.thumbnail_width top = abs(tiles_maxy - bbox[3]) / pixelspan bottom = top + self.thumbnail_height background = background.crop((left, top, right, bottom)) width = round(self.thumbnail_width * pixelspan_ratio) height = round(self.thumbnail_height * pixelspan_ratio) background = background.resize((width, height)) background.crop((left, top, right, bottom)) return background
[docs] def build_kvp_request(self, baseurl, layer, style, xyz): return f"{baseurl}?&Service=WMTS&Request=GetTile&Version=1.0.0&Format=image/png&layer={layer}&style={style}\ &tilematrixset={self.options['tilematrixset']}&TileMatrix={xyz[2]}&TileRow={xyz[1]}&TileCol={xyz[0]}"
[docs] def build_request(self, xyz): request_encoding = self.options.get("requestencoding", "KVP") baseurl = self.options["url"] layer = self.options["layer"] style = self.options["style"] imgurl = None if request_encoding == "KVP": imgurl = self.build_kvp_request(baseurl, layer, style, xyz) return imgurl
[docs] def get_image_bbox_for_level(self, level, bbox): image_width = self.thumbnail_width image_height = self.thumbnail_height half_imagespanx = image_width * level["pixelspan"] / 2 half_imagespany = image_height * level["pixelspan"] / 2 ( boundsminx, boundsminy, boundsmaxx, boundsmaxy, ) = bbox bboxcentrex = boundsminx + ((boundsmaxx - boundsminx) / 2) bboxcentrey = boundsminy + ((boundsmaxy - boundsminy) / 2) image_minx = bboxcentrex - half_imagespanx image_maxx = bboxcentrex + half_imagespanx image_miny = bboxcentrey - half_imagespany image_maxy = bboxcentrey + half_imagespany return [image_minx, image_miny, image_maxx, image_maxy]
[docs] def get_tiles_coords(self, level, bbox): tile_coords = [] tilematrixminx = level["bounds"][0] tilematrixmaxy = level["bounds"][3] tilespanx = level["tilespanx"] tilespany = level["tilespany"] boundsminx, boundsminy, boundsmaxx, boundsmaxy = bbox tile_coord_minx = int(math.floor(boundsminx - tilematrixminx) / tilespanx) # min tile coord corresponds to the maxy coordinate tile_coord_miny = int(math.floor(tilematrixmaxy - boundsmaxy) / tilespany) tile_coord_maxx = int(math.floor(boundsmaxx - tilematrixminx) / tilespanx) # max tile coord corresponds to the miny coordinate tile_coord_maxy = int(math.floor(tilematrixmaxy - boundsminy) / tilespany) for x in range(tile_coord_minx, tile_coord_maxx + 1): for y in range(tile_coord_miny, tile_coord_maxy + 1): tile_coords.append([x, y]) return tile_coords
[docs] def get_level_for_targetpixelspan(self, target_pixelspan): level = None for _level in self.levels: is_level_under_minscaledenominator = False minscaledenominator = self.options.get("minscaledenominator") if minscaledenominator: is_level_under_minscaledenominator = _level["scaledenominator"] < self.options.get( "minscaledenominator" ) if _level["pixelspan"] < target_pixelspan or is_level_under_minscaledenominator: return level level = _level
[docs] def get_target_pixelspan(self, bbox): x_min, y_min, x_max, y_max = bbox return (x_max - x_min) / self.thumbnail_width
[docs] def get_levels_for_tilematrix(self): url = self.options["url"] tilematrixset = self.options["tilematrixset"] global WMTS_TILEMATRIXSET_LEVELS if not WMTS_TILEMATRIXSET_LEVELS: service = WebMapTileService(url=url) tilematrixsset = service.tilematrixsets[tilematrixset] levels = [] for index, tilematrix in tilematrixsset.tilematrix.items(): scaledenominator = tilematrix.scaledenominator * 1 # here we assume 3857 matrixheight = tilematrix.matrixheight matrixwidth = tilematrix.matrixwidth tileheight = tilematrix.tileheight tilewidth = tilematrix.tilewidth tilematrixminx = tilematrix.topleftcorner[0] # here we assume 3857 tilematrixmaxy = tilematrix.topleftcorner[1] # here we assume 3857 pixelspan = scaledenominator * 0.00028 # OGC standardized rendering pixel size tilespanx = tilewidth * pixelspan tilespany = tileheight * pixelspan tilematrixmaxx = tilematrixminx + tilespanx * matrixwidth tilematrixminy = tilematrixmaxy - tilespany * matrixheight levels.append( { "zoom": int(index), "bounds": [ tilematrixminx, tilematrixminy, tilematrixmaxx, tilematrixmaxy, ], "scaledenominator": scaledenominator, "tilewidth": tilewidth, "tileheight": tileheight, "pixelspan": pixelspan, "tilespanx": tilespanx, "tilespany": tilespany, } ) WMTS_TILEMATRIXSET_LEVELS = levels return WMTS_TILEMATRIXSET_LEVELS