#########################################################################
#
# Copyright (C) 2021 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
from rest_framework import permissions, status, views, viewsets, mixins
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from geonode.base.api.pagination import GeoNodeApiPagination
from geonode.management_commands_http.mixins import CreateJobMixin
from geonode.management_commands_http.models import ManagementCommandJob
from geonode.management_commands_http.filters import ManagementCommandJobFilterSet
from geonode.management_commands_http.serializers import (
ManagementCommandJobSerializer,
ManagementCommandJobListSerializer,
ManagementCommandJobCreateSerializer,
)
from geonode.management_commands_http.utils.commands import (
get_management_command_details,
get_management_commands,
)
from geonode.management_commands_http.utils.jobs import (
start_task,
stop_task,
get_celery_task_meta,
)
[docs]
class ManagementCommandView(views.APIView, CreateJobMixin):
"""
Handle the exposed management commands usage:
- GET: List of exposed commands
- GET detail: Help for a specific command
- POST: Create a job (and automatic runs) for a specific command.
"""
[docs]
permission_classes = [permissions.IsAdminUser]
[docs]
allowed_methods = ["GET", "POST"]
[docs]
def retrieve_details(self, cmd_name):
# Object not found
if cmd_name not in self.available_commands:
return Response({"success": False, "error": "Command not found"}, status=status.HTTP_404_NOT_FOUND)
# Object Details: fetch help text of the Command
cmd_details = get_management_command_details(cmd_name)
return Response({"success": True, "error": None, "data": cmd_details})
[docs]
def list(self):
return Response({"success": True, "error": None, "data": self.available_commands})
[docs]
def get(self, request, cmd_name=None):
self.available_commands = get_management_commands()
# List
if cmd_name is None:
return self.list()
# Retrieve
return self.retrieve_details(cmd_name)
[docs]
def post(self, request, cmd_name=None):
"""
Creates and runs a management command job.
Expects application/json content type in the following shape:
.. code-block:: json
{
"args": ["<arg1>", "<arg2>"],
"kwargs": {"<key1": "<val1>", "<key2>": "<val2>"},
"autostart": true
}
By default, autostart is set to true.
"""
return self.create(request)
[docs]
def get_serializer(self, *args, **kwargs):
kwargs["context"] = {"request": self.request, "view": self}
return ManagementCommandJobCreateSerializer(*args, **kwargs)
[docs]
class ManagementCommandJobViewSet(CreateJobMixin, mixins.RetrieveModelMixin, viewsets.GenericViewSet):
"""
Create, List, Retrieve, Start, Stop and Get Status of a Management Command Job.
"""
[docs]
permission_classes = [permissions.IsAdminUser]
[docs]
queryset = ManagementCommandJob.objects.all().order_by("-created_at")
[docs]
serializer_class = ManagementCommandJobSerializer
[docs]
filter_class = ManagementCommandJobFilterSet
[docs]
filter_backends = (DjangoFilterBackend,)
[docs]
pagination_class = GeoNodeApiPagination
[docs]
def get_queryset(self):
queryset = super().get_queryset()
cmd_name = self.kwargs.pop("cmd_name", None)
if cmd_name:
queryset = queryset.filter(command=cmd_name)
return queryset
[docs]
def get_serializer_class(self):
if self.action == "list":
serializer = ManagementCommandJobListSerializer
elif self.action in ("create", "metadata"):
serializer = ManagementCommandJobCreateSerializer
else:
serializer = super().get_serializer_class()
return serializer
[docs]
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response({"data": serializer.data})
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=["PATCH"])
[docs]
def start(self, request, pk=None, **kwargs):
job = self.get_object()
try:
start_task(job)
except ValueError as exc:
error_message = str(exc)
response = {"success": False, "error": error_message, "data": self.get_serializer().data}
return Response(response, status=status.HTTP_400_BAD_REQUEST)
job.refresh_from_db()
serializer = self.get_serializer(instance=job)
response = {"success": True, "error": None, "data": serializer.data}
return Response(response)
@action(detail=True, methods=["PATCH"])
[docs]
def stop(self, request, pk=None, **kwargs):
job = self.get_object()
stop_task(job)
serializer = self.get_serializer(instance=job)
response = {"success": True, "error": None, "data": serializer.data}
return Response(response)
@action(detail=True, methods=["GET"])
[docs]
def status(self, request, pk=None, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance=instance)
celery_task_meta = get_celery_task_meta(instance)
celery_data = {
"celery_task_meta": {
"date_done": celery_task_meta.get("date_done"),
"status": celery_task_meta.get("status"),
"traceback": celery_task_meta.get("traceback"),
"worker": celery_task_meta.get("worker"),
}
}
return Response({**serializer.data, **celery_data})