Source code for geonode.upload.models

#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import os
import base64
import pickle
import logging

from gsimporter.api import NotFound
from django.utils.timezone import now

from django.db import models
from django.urls import reverse
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.template.defaultfilters import filesizeformat
from django.utils.translation import ugettext_lazy as _

from geonode import GeoNodeException
from geonode.base import enumerations
from geonode.base.models import ResourceBase
from geonode.storage.manager import storage_manager
from geonode.geoserver.helpers import gs_uploader, ogc_server_settings

[docs] logger = logging.getLogger(__name__)
[docs] class UploadManager(models.Manager): def __init__(self): models.Manager.__init__(self)
[docs] def invalidate_from_session(self, upload_session): return self.filter(user=upload_session.user, import_id=upload_session.import_session.id).update( state=enumerations.STATE_INVALID )
[docs] def update_from_session(self, upload_session, resource: ResourceBase = None): _upload = None if resource: try: _upload = self.get(resource=resource) except Upload.DoesNotExist: _upload = None if not _upload: _upload = self.filter(user=upload_session.user, name=upload_session.name).order_by("-date").first() if _upload: return _upload.update_from_session(upload_session, resource=resource) return None
[docs] def get_incomplete_uploads(self, user): return ( self.filter(user=user).exclude(state=enumerations.STATE_PROCESSED).exclude(state=enumerations.STATE_WAITING) )
[docs] class UploadSizeLimitManager(models.Manager):
[docs] def create_default_limit(self): max_size_db_obj = self.create( slug="dataset_upload_size", description="The sum of sizes for the files of a dataset upload.", max_size=settings.DEFAULT_MAX_UPLOAD_SIZE, ) return max_size_db_obj
[docs] def create_default_limit_with_slug(self, slug): max_size_db_obj = self.create( slug=slug, description="Size limit.", max_size=settings.DEFAULT_MAX_UPLOAD_SIZE, ) return max_size_db_obj
[docs] class UploadParallelismLimitManager(models.Manager):
[docs] def create_default_limit(self): default_limit = self.create( slug="default_max_parallel_uploads", description="The default maximum parallel uploads per user.", max_number=settings.DEFAULT_MAX_PARALLEL_UPLOADS_PER_USER, ) return default_limit
[docs] class Upload(models.Model):
[docs] objects = UploadManager()
[docs] import_id = models.BigIntegerField(null=True)
[docs] user = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, on_delete=models.SET_NULL)
[docs] state = models.CharField(max_length=16)
[docs] create_date = models.DateTimeField("create_date", default=now)
[docs] date = models.DateTimeField("date", default=now)
[docs] resource = models.ForeignKey(ResourceBase, null=True, on_delete=models.SET_NULL)
[docs] upload_dir = models.TextField(null=True)
[docs] store_spatial_files = models.BooleanField(default=True)
[docs] name = models.CharField(max_length=64, null=False, blank=False)
[docs] complete = models.BooleanField(default=False)
# hold our serialized session object
[docs] session = models.TextField(null=True, blank=True)
# hold a dict of any intermediate Dataset metadata - not used for now
[docs] metadata = models.TextField(null=True)
[docs] mosaic = models.BooleanField(default=False)
[docs] append_to_mosaic_opts = models.BooleanField(default=False)
[docs] append_to_mosaic_name = models.CharField(max_length=128, null=True)
[docs] mosaic_time_regex = models.CharField(max_length=128, null=True)
[docs] mosaic_time_value = models.CharField(max_length=128, null=True)
[docs] mosaic_elev_regex = models.CharField(max_length=128, null=True)
[docs] mosaic_elev_value = models.CharField(max_length=128, null=True)
[docs] resume_url = models.CharField(max_length=256, null=True, blank=True)
[docs] class Meta:
[docs] ordering = ["-date"]
@property
[docs] def get_session(self): if self.session: return pickle.loads(base64.decodebytes(self.session.encode("UTF-8")))
[docs] def update_from_session(self, upload_session, resource: ResourceBase = None): self.session = base64.encodebytes(pickle.dumps(upload_session)).decode("UTF-8") self.import_id = upload_session.import_session.id self.name = upload_session.name self.user = upload_session.user self.date = now() if not self.upload_dir: self.upload_dir = os.path.dirname(upload_session.base_file) if resource: if not self.resource: if not isinstance(resource, ResourceBase) and hasattr(resource, "resourcebase_ptr"): self.resource = resource.resourcebase_ptr elif not isinstance(resource, ResourceBase): raise GeoNodeException("Invalid resource uploaded, plase select one of the available") else: self.resource = resource if upload_session.base_file and len(self.resource.files) == 0: uploaded_files = upload_session.base_file[0] aux_files = uploaded_files.auxillary_files sld_files = uploaded_files.sld_files xml_files = uploaded_files.xml_files if self.store_spatial_files and self.resource and not self.resource.files: files_to_upload = aux_files + sld_files + xml_files + [uploaded_files.base_file] if len(files_to_upload): ResourceBase.objects.upload_files(resource_id=self.resource.id, files=files_to_upload) self.resource.refresh_from_db() if self.resource: if self.resource.processed: self.state = enumerations.STATE_PROCESSED else: self.state = enumerations.STATE_RUNNING elif self.state in (enumerations.STATE_READY, enumerations.STATE_PENDING): self.state = upload_session.import_session.state if self.state == enumerations.STATE_COMPLETE: self.complete = True self.save() return self.get_session
@property
[docs] def progress(self): if self.state in (enumerations.STATE_READY, enumerations.STATE_INVALID, enumerations.STATE_INCOMPLETE): return 0.0 elif self.state == enumerations.STATE_PENDING: return 33.0 elif self.state == enumerations.STATE_WAITING: return 50.0 elif self.state == enumerations.STATE_PROCESSED: if (self.resource and self.resource.processed) or self.complete: return 100.0 return 80.0 elif self.state in (enumerations.STATE_COMPLETE, enumerations.STATE_RUNNING): if self.resource and self.resource.state == enumerations.STATE_PROCESSED: self.state = enumerations.STATE_PROCESSED self.save() return 90.0 return 80.0
[docs] def set_resume_url(self, resume_url): if self.resume_url != resume_url: self.resume_url = resume_url Upload.objects.filter(id=self.id).update(resume_url=resume_url)
[docs] def get_resume_url(self): if self.state == enumerations.STATE_WAITING and self.import_id: return self.resume_url return None
[docs] def get_delete_url(self): if self.state != enumerations.STATE_PROCESSED: return reverse("data_upload_delete", args=[self.id]) return None
[docs] def get_import_url(self): session = None try: if not self.import_id: raise NotFound session = self.get_session.import_session if not session or session.state != enumerations.STATE_COMPLETE: session = gs_uploader.get_session(self.import_id) except (NotFound, Exception): if not session and self.state not in (enumerations.STATE_COMPLETE, enumerations.STATE_PROCESSED): logger.warning(f"Import session was not found for upload with ID: {self.pk}") if session and self.state != enumerations.STATE_INVALID: return f"{ogc_server_settings.LOCATION}rest/imports/{session.id}" else: return None
[docs] def get_detail_url(self): if self.resource and self.resource.processed: return getattr(self.resource, "detail_url", None) else: return None
[docs] def delete(self, *args, **kwargs): super().delete(*args, **kwargs) storage_manager.rmtree(self.upload_dir, ignore_errors=True)
[docs] def set_processing_state(self, state): if self.state != state: self.state = state Upload.objects.filter(id=self.id).update(state=state) if self.resource: self.resource.set_processing_state(state)
[docs] def __str__(self): return f"Upload [{self.pk}] gs{self.import_id} - {self.name}, {self.user}"
[docs] class UploadSizeLimit(models.Model):
[docs] objects = UploadSizeLimitManager()
[docs] slug = models.SlugField( primary_key=True, max_length=255, unique=True, null=False, blank=False, validators=[MinLengthValidator(limit_value=3)], )
[docs] description = models.TextField( max_length=255, default=None, null=True, blank=True, )
[docs] max_size = models.PositiveBigIntegerField( help_text=_("The maximum file size allowed for upload (bytes)."), default=settings.DEFAULT_MAX_UPLOAD_SIZE, )
@property
[docs] def max_size_label(self): return filesizeformat(self.max_size)
[docs] def __str__(self): return f'UploadSizeLimit for "{self.slug}" (max_size: {self.max_size_label})'
[docs] class Meta:
[docs] ordering = ("slug",)
[docs] class UploadParallelismLimit(models.Model):
[docs] objects = UploadParallelismLimitManager()
[docs] slug = models.SlugField( primary_key=True, max_length=255, unique=True, null=False, blank=False, validators=[MinLengthValidator(limit_value=3)], )
[docs] description = models.TextField( max_length=255, default=None, null=True, blank=True, )
[docs] max_number = models.PositiveSmallIntegerField( help_text=_("The maximum number of parallel uploads (0 to 32767)."), default=settings.DEFAULT_MAX_PARALLEL_UPLOADS_PER_USER, )
[docs] def __str__(self): return f'UploadParallelismLimit for "{self.slug}" (max_number: {self.max_number})'
[docs] class Meta:
[docs] ordering = ("slug",)