Source code for geonode.messaging.producer

#########################################################################
#
# Copyright (C) 2017 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import logging
import traceback

from decorator import decorator
from kombu import BrokerConnection
from kombu.common import maybe_declare
from .queues import queue_email_events, queue_geoserver_events, queue_notifications_events, queue_dataset_viewers

from . import url, producers, connection, broker_socket_timeout, task_serializer
from .consumer import Consumer

[docs] logger = logging.getLogger(__name__)
[docs] LOCAL_STARTED = False
@decorator
[docs] def sync_if_local_memory(func, *args, **kwargs): """ Runs locally, synchronous if connection is memory:// This will create in-memory transport if messaging is not configured. This allows to run synchronous queue in deployments which don't have external broker deployed """ try: return func(*args, **kwargs) finally: connection = BrokerConnection(url, connect_timeout=broker_socket_timeout) if getattr(connection.connection, "driver_name", None) == "memory": # hack explained: # when using memory://, first run usually contains only message for # specific queue. Subsequent runs will deliver the same message # in topic queue and broadcast one, which is weird. # that's why for first run we stop after first message, and then after # two (so we catch topic and broadcast) # This may change in the future global LOCAL_STARTED max_messages = 1 if not LOCAL_STARTED else 2 LOCAL_STARTED = True worker = Consumer(connection, max_messages) try: worker.run(timeout=broker_socket_timeout) except Exception: tb = traceback.format_exc() msg = f"Exception while publishing message: {tb}" logger.error(msg) raise Exception(msg) elif not getattr(connection.connection, "driver_name", None): msg = f"Exception while getting connection to {url}" logger.error(msg) raise Exception(msg)
@sync_if_local_memory
[docs] def send_email_producer(dataset_uuid, user_id): with producers[connection].acquire(block=True, timeout=broker_socket_timeout) as producer: maybe_declare(queue_email_events, producer.channel) payload = {"dataset_uuid": dataset_uuid, "user_id": user_id} producer.publish( payload, exchange="geonode", serializer=task_serializer, routing_key="email", timeout=broker_socket_timeout ) producer.release()
@sync_if_local_memory
[docs] def geoserver_upload_dataset(payload): with producers[connection].acquire(block=True, timeout=broker_socket_timeout) as producer: maybe_declare(queue_geoserver_events, producer.channel) producer.publish( payload, exchange="geonode", serializer=task_serializer, routing_key="geonode.geoserver", timeout=broker_socket_timeout, ) producer.release()
@sync_if_local_memory
[docs] def notifications_send(payload, created=None): with producers[connection].acquire(block=True, timeout=broker_socket_timeout) as producer: maybe_declare(queue_notifications_events, producer.channel) payload["created"] = created producer.publish( payload, exchange="geonode", serializer=task_serializer, routing_key="notifications", timeout=broker_socket_timeout, ) producer.release()
@sync_if_local_memory
[docs] def viewing_dataset(user, owner, dataset_id): with producers[connection].acquire(block=True, timeout=broker_socket_timeout) as producer: maybe_declare(queue_dataset_viewers, producer.channel) payload = {"viewer": user, "owner_dataset": owner, "dataset_id": dataset_id} producer.publish( payload, exchange="geonode", serializer=task_serializer, routing_key="geonode.viewer", timeout=broker_socket_timeout, ) producer.release()