Source code for geonode.monitoring.service_handlers

#########################################################################
#
# Copyright (C) 2017 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import logging
import pytz
from datetime import datetime, timedelta

import requests
from geonode.monitoring.utils import GeoServerMonitorClient
from geonode.monitoring.probes import get_probe
from geonode.monitoring.models import RequestEvent, ExceptionEvent

[docs] log = logging.getLogger(__name__)
[docs] class BaseServiceExpose:
[docs] NAME = None
def __init__(self, *args, **kwargs):
[docs] self.args = args
[docs] self.kwargs = kwargs
[docs] def setup(self): pass
[docs] def expose(self): raise NotImplementedError
@classmethod
[docs] def get_name(cls): if cls.NAME: return cls.NAME n = cls.__name__ return n[: len("serviceexpose")].lower()
[docs] class HostGeoNodeServiceExpose(BaseServiceExpose):
[docs] NAME = "hostgeonode"
[docs] def expose(self, *args, **kwargs): probe = get_probe() uptime = probe.get_uptime() disks = probe.get_disk() load = probe.get_loadavg() mem = probe.get_mem() uname = probe.get_uname() cpu = probe.get_cpu() network = probe.get_network() data = { "uptime": uptime, "uname": uname, "load": load, "cpu": cpu, "disks": disks, "network": network, "memory": mem, } return data
[docs] class GeoNodeServiceExpose(BaseServiceExpose):
[docs] NAME = "geonode"
[docs] def expose(self, *args, **kwargs): utc = pytz.utc data = {} exceptions = [] since = datetime.utcnow().replace(tzinfo=utc) - timedelta(minutes=10) for e in ExceptionEvent.objects.filter(created__gte=since, service__service_type__name=self.NAME): exceptions.append(e.expose()) data["exceptions"] = exceptions return data
[docs] class BaseServiceHandler: def __init__(self, service, force_check=False):
[docs] utc = pytz.utc
[docs] self.service = service
[docs] self.now = datetime.utcnow().replace(tzinfo=utc)
[docs] self.check_since = service.last_check.astimezone(utc) if service.last_check else self.now
[docs] self.force_check = force_check
[docs] def setup(self): pass
[docs] def get_last_request(self): s = self.service return RequestEvent.objects.filter(service=s).order_by("-created").first()
[docs] def get_last_request_timestamp(self): r = self.get_last_request() if r: return r.created
[docs] def collect(self, since=None, until=None, **kwargs): utc = pytz.utc now = self.now or datetime.utcnow().replace(tzinfo=utc) if since is None: since = self.service.last_check.astimezone(utc) if self.service.last_check else now if until is None: until = now if not self.service.last_check: self.service.last_check = self.now self.service.save() if self.service.last_check and not self.force_check: last_check = self.service.last_check.astimezone(utc) if self.service.last_check else now if last_check + self.service.check_interval > now: log.warning("Next check too soon") return _collected = self._collect(since.astimezone(utc), until.astimezone(utc), **kwargs) return self.handle_collected(_collected)
[docs] def _collect(self, since, until, *args, **kwargs): raise NotImplementedError
[docs] def handle_collected(self): raise NotImplementedError
[docs] def mark_as_checked(self): self.service.last_check = self.now self.service.save()
@classmethod
[docs] def get_name(cls): n = cls.__name__ return n[: -len("service")].lower()
[docs] class GeoNodeService(BaseServiceHandler): def __init__(self, service, force_check=False): BaseServiceHandler.__init__(self, service, force_check)
[docs] def _get_collected_set(self, since=None, until=None): filter_kwargs = {"service": self.service} if since: filter_kwargs = {"created__gt": since} return RequestEvent.objects.filter(**filter_kwargs)
[docs] def _collect(self, since=None, until=None, **kwargs): return self._get_collected_set(since=since, until=until)
[docs] def handle_collected(self, requests, *args, **kwargs): return requests
[docs] class GeoServerService(BaseServiceHandler): def __init__(self, service, force_check=False): BaseServiceHandler.__init__(self, service, force_check) self.setup()
[docs] def setup(self): if not self.service.url: raise ValueError(f"Monitoring is not configured to fetch from {self.service.name}") self.gs_monitor = GeoServerMonitorClient(self.service.url)
[docs] def _collect(self, since, until, format=None, **kwargs): format = format or "json" requests = list(self.gs_monitor.get_requests(format=format, since=since, until=until)) return requests
[docs] def handle_collected(self, requests): utc = pytz.utc now = datetime.utcnow().replace(tzinfo=utc) for r in requests: RequestEvent.from_geoserver(self.service, r, received=now) return RequestEvent.objects.filter(service=self.service, received=now)
[docs] class HostGeoServerService(BaseServiceHandler):
[docs] PATH = "/rest/about/system-status.json"
def __init__(self, service, force_check=False): BaseServiceHandler.__init__(self, service, force_check)
[docs] def _collect(self, *args, **kwargs): base_url = self.service.url if not base_url: raise ValueError(f"Service {self.service.name} should have url provided") url = f"{base_url.rstrip('/')}{self.PATH}" rdata = requests.get(url, timeout=10, verify=False) if rdata.status_code != 200: raise ValueError(f"Error response from api: ({url}) {rdata}") data = rdata.json()["metrics"]["metric"] return data
[docs] def handle_collected(self, data, *args, **kwargs): return data
[docs] class HostGeoNodeService(BaseServiceHandler): def __init__(self, service, force_check=False): BaseServiceHandler.__init__(self, service, force_check)
[docs] def _collect(self, since, until, *args, **kwargs): base_url = self.service.url if not base_url: raise ValueError(f"Service {self.service.name} should have url provided") url = f"{base_url.rstrip('/')}/monitoring/api/beacon/{self.service.service_type.name}/" rdata = requests.get(url, timeout=10, verify=False) if rdata.status_code != 200: raise ValueError(f"Error response from api: ({url}) {rdata}") data = rdata.json() return data
[docs] def handle_collected(self, data, *args, **kwargs): return data
[docs] services = { c.get_name(): c for c in ( GeoNodeService, GeoServerService, HostGeoNodeService, HostGeoServerService, ) }
[docs] def get_for_service(sname): return services[sname]
[docs] exposes = { c.get_name(): c for c in ( GeoNodeServiceExpose, HostGeoNodeServiceExpose, ) }
[docs] def exposes_for_service(sname): return exposes[sname]