Source code for geonode.messaging.consumer

#########################################################################
#
# Copyright (C) 2017 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import logging
import time
import json
from datetime import datetime

# from django.conf import settings
from kombu.mixins import ConsumerMixin
from geonode.security.views import send_email_consumer
from geonode.layers.views import dataset_view_counter
from geonode.layers.models import Dataset
from geonode.geoserver.helpers import gs_slurp

from .queues import (
    queue_email_events,
    queue_geoserver_events,
    queue_notifications_events,
    queue_all_events,
    queue_geoserver_catalog,
    queue_geoserver_data,
    queue_geoserver,
    queue_dataset_viewers,
)

[docs] logger = logging.getLogger(__package__)
[docs] class Consumer(ConsumerMixin): def __init__(self, connection, messages_limit=None):
[docs] self.last_message = None
[docs] self.connection = connection
[docs] self.messages_limit = messages_limit
[docs] def get_consumers(self, Consumer, channel): return [ Consumer(queue_all_events, callbacks=[self.on_message]), Consumer(queue_email_events, callbacks=[self.on_email_messages]), Consumer(queue_geoserver_events, callbacks=[self.on_geoserver_messages]), Consumer(queue_notifications_events, callbacks=[self.on_notifications_messages]), Consumer(queue_geoserver_catalog, callbacks=[self.on_geoserver_catalog]), Consumer(queue_geoserver_data, callbacks=[self.on_geoserver_data]), Consumer(queue_geoserver, callbacks=[self.on_geoserver_all]), Consumer(queue_dataset_viewers, callbacks=[self.on_dataset_viewer]), ]
[docs] def _check_message_limit(self): if self.messages_limit is not None: self.messages_limit -= 1 if self.messages_limit < 1: self.should_stop = True return True
[docs] def on_consume_end(self, connection, channel): super().on_consume_end(connection, channel) logger.debug("finished.")
[docs] def on_message(self, body, message): logger.debug(f"broadcast: RECEIVED MSG - body: {body}") message.ack() self._check_message_limit()
[docs] def on_email_messages(self, body, message): logger.debug(f"on_email_messages: RECEIVED MSG - body: {body}") dataset_uuid = body.get("dataset_uuid") user_id = body.get("user_id") send_email_consumer(dataset_uuid, user_id) # Not sure if we need to send ack on this fanout version. message.ack() logger.debug("on_email_messages: finished") self._check_message_limit()
[docs] def on_geoserver_messages(self, body, message): logger.debug(f"on_geoserver_messages: RECEIVED MSG - body: {body}") # dataset_id = body.get("id") # try: # layer = _wait_for_dataset(dataset_id) # except Dataset.DoesNotExist as err: # logger.debug(err) # return # geoserver_post_save_local(layer) # Not sure if we need to send ack on this fanout version. message.ack() logger.debug("on_geoserver_messages: finished") self._check_message_limit()
[docs] def on_notifications_messages(self, body, message): logger.debug(f"on_notifications_message: RECEIVED MSG - body: {body}") body.get("id") body.get("app_label") body.get("model") body.get("created") # notification_post_save_resource2(instance_id, app_label, model, created) message.ack() logger.debug("on_notifications_message: finished") self._check_message_limit()
[docs] def on_geoserver_all(self, body, message): logger.debug(f"on_geoserver_all: RECEIVED MSG - body: {body}") message.ack() logger.debug("on_geoserver_all: finished") # TODO:Adding consurmer's producers. self._check_message_limit()
[docs] def on_geoserver_catalog(self, body, message): logger.debug(f"on_geoserver_catalog: RECEIVED MSG - body: {body}") try: _update_dataset_data(body, self.last_message) self.last_message = json.loads(body) except Exception: logger.debug(f"Could not encode message {body}") message.ack() logger.debug("on_geoserver_catalog: finished") self._check_message_limit()
[docs] def on_geoserver_data(self, body, message): logger.debug(f"on_geoserver_data: RECEIVED MSG - body: {body}") try: _update_dataset_data(body, self.last_message) self.last_message = json.loads(body) except Exception: logger.debug(f"Could not encode message {body}") message.ack() logger.debug("on_geoserver_data: finished") self._check_message_limit()
[docs] def on_consume_ready(self, connection, channel, consumers, **kwargs): logger.debug(">>> Ready:") logger.debug(connection) logger.debug(f"{len(consumers)} consumers:") for i, consumer in enumerate(consumers, start=1): logger.debug(f"{i} {consumer}") super().on_consume_ready(connection, channel, consumers, **kwargs)
[docs] def on_dataset_viewer(self, body, message): logger.debug(f"on_dataset_viewer: RECEIVED MSG - body: {body}") viewer = body.get("viewer") # owner_dataset = body.get("owner_dataset") dataset_id = body.get("dataset_id") dataset_view_counter(dataset_id, viewer) # TODO Disabled for now. This should be handeld through Notifications # if settings.EMAIL_ENABLE: # send_email_owner_on_view(owner_dataset, viewer, dataset_id) message.ack() logger.debug("on_dataset_viewer: finished") self._check_message_limit()
[docs] def _update_dataset_data(body, last_message): message = json.loads(body) workspace = message["source"]["workspace"] if "workspace" in message["source"] else None store = message["source"]["store"] if "store" in message["source"] else None filter = message["source"]["name"] update_dataset = False if not last_message: last_message = message update_dataset = True last_workspace = message["source"]["workspace"] if "workspace" in message["source"] else None last_store = message["source"]["store"] if "store" in message["source"] else None last_filter = last_message["source"]["name"] if (last_workspace, last_store, last_filter) != (workspace, store, filter): update_dataset = True else: timestamp_t1 = datetime.strptime(last_message["timestamp"], "%Y-%m-%dT%H:%MZ") timestamp_t2 = datetime.strptime(message["timestamp"], "%Y-%m-%dT%H:%MZ") timestamp_delta = timestamp_t2 - timestamp_t1 if timestamp_t2 > timestamp_t1 and timestamp_delta.seconds > 60: update_dataset = True if update_dataset: gs_slurp(True, workspace=workspace, store=store, filter=filter, remove_deleted=True, execute_signals=True)
[docs] def _wait_for_dataset(dataset_id, num_attempts=5, wait_seconds=1): """Blocks execution while the Dataset instance is not found on the database This is a workaround for the fact that the ``geonode.geoserver.signals.geoserver_post_save_local`` function might try to access the layer's ``id`` parameter before the layer is done being saved in the database. """ for current in range(1, num_attempts + 1): try: instance = Dataset.objects.get(id=dataset_id) logger.debug(f"Attempt {current}/{num_attempts} - Found layer in the " "database") break except Dataset.DoesNotExist: time.sleep(wait_seconds) logger.debug(f"Attempt {current}/{num_attempts} - Could not find layer " "instance") else: logger.debug(f"Reached maximum attempts and layer {dataset_id} is still not " "saved. Exiting...") raise Dataset.DoesNotExist return instance