Source code for geonode.documents.tasks

#########################################################################
#
# Copyright (C) 2017 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import os
import io

from PIL import Image
import fitz

from celery.utils.log import get_task_logger

from geonode.celery_app import app
from geonode.storage.manager import StorageManager

from ..base.models import ResourceBase
from .models import Document

[docs] logger = get_task_logger(__name__)
[docs] class DocumentRenderer:
[docs] FILETYPES = ["pdf"]
# See https://pillow.readthedocs.io/en/stable/reference/ImageOps.html#PIL.ImageOps.fit
[docs] CROP_CENTERING = {"pdf": (0.0, 0.0)}
def __init__(self) -> None: pass
[docs] def supports(self, filename): return self._get_filetype(filename) in self.FILETYPES
[docs] def render(self, filename): content = None if self.supports(filename): filetype = self._get_filetype(filename) render = getattr(self, f"render_{filetype}") content = render(filename) return content
[docs] def render_pdf(self, filename): try: doc = fitz.open(filename) pix = doc[0].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) return pix.pil_tobytes(format="PNG") except Exception as e: logger.warning(f"Cound not generate thumbnail for {filename}: {e}") return None
[docs] def preferred_crop_centering(self, filename): return self.CROP_CENTERING.get(self._get_filetype(filename))
[docs] def _get_filetype(self, filname): return os.path.splitext(filname)[1][1:]
[docs] doc_renderer = DocumentRenderer()
@app.task( bind=True, name="geonode.documents.tasks.create_document_thumbnail", queue="geonode", expires=30, time_limit=600, acks_late=False, autoretry_for=(Exception,), retry_kwargs={"max_retries": 5}, retry_backoff=3, retry_backoff_max=30, retry_jitter=False, )
[docs] def create_document_thumbnail(self, object_id): """ Create thumbnail for a document. """ logger.debug(f"Generating thumbnail for document #{object_id}.") storage_manager = StorageManager() try: document = Document.objects.get(id=object_id) except Document.DoesNotExist: logger.error(f"Document #{object_id} does not exist.") raise image_file = None thumbnail_content = None remove_tmp_file = False centering = (0.5, 0.5) doc_path = None if document.files: doc_path = storage_manager.path(document.files[0]) elif document.doc_url: doc_path = document.doc_url remove_tmp_file = True if document.is_image: try: image_file = storage_manager.open(doc_path) except Exception as e: logger.debug(f"Could not generate thumbnail from remote document {document.doc_url}: {e}") if image_file: try: image = Image.open(image_file) with io.BytesIO() as output: image.save(output, format="PNG") thumbnail_content = output.getvalue() output.close() except Exception as e: logger.debug(f"Could not generate thumbnail: {e}") finally: if image_file is not None: image_file.close() if remove_tmp_file: storage_manager.delete(doc_path) elif doc_renderer.supports(doc_path): # in case it's a remote document we want to retrieve it first if document.doc_url: doc_path = storage_manager.open(doc_path).name remove_tmp_file = True try: thumbnail_content = doc_renderer.render(doc_path) preferred_centering = doc_renderer.preferred_crop_centering(doc_path) if preferred_centering is not None: centering = preferred_centering except Exception as e: print(e) finally: if remove_tmp_file: storage_manager.delete(doc_path) if not thumbnail_content: logger.warning(f"Thumbnail for document #{object_id} empty.") ResourceBase.objects.filter(id=document.id).update(thumbnail_url=None) else: filename = f"document-{document.uuid}-thumb.jpg" document.save_thumbnail(filename, thumbnail_content, centering=centering) logger.debug(f"Thumbnail for document #{object_id} created.")
@app.task( bind=True, name="geonode.documents.tasks.delete_orphaned_document_files", queue="cleanup", expires=30, time_limit=600, acks_late=False, autoretry_for=(Exception,), retry_kwargs={"max_retries": 5}, retry_backoff=3, retry_backoff_max=30, retry_jitter=False, )
[docs] def delete_orphaned_document_files(self): from geonode.documents.utils import delete_orphaned_document_files delete_orphaned_document_files()
@app.task( bind=True, name="geonode.documents.tasks.delete_orphaned_thumbnails", queue="cleanup", expires=30, time_limit=600, acks_late=False, autoretry_for=(Exception,), retry_kwargs={"max_retries": 5}, retry_backoff=3, retry_backoff_max=30, retry_jitter=False, )
[docs] def delete_orphaned_thumbnails(self): from geonode.base.utils import delete_orphaned_thumbs delete_orphaned_thumbs()