#########################################################################
#
# Copyright (C) 2017 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import logging
import re
import pytz
from datetime import datetime, timedelta
from decimal import Decimal
from itertools import chain
from django.conf import settings
from django.db import models
from django.utils.html import strip_tags
from django.template.loader import get_template
from django.core.mail import EmailMultiAlternatives as EmailMessage
from django.utils.translation import ugettext_noop as _
from django.db.models import Max
from django.urls import resolve, Resolver404
from geonode.utils import raw_sql
from geonode.notifications_helper import send_notification
from geonode.monitoring import MonitoringAppConfig as AppConf
from geonode.monitoring.models import (
Metric,
MetricValue,
RequestEvent,
MonitoredResource,
ExceptionEvent,
EventType,
NotificationCheck,
BuiltIns,
)
from geonode.monitoring.utils import generate_periods, align_period_start, align_period_end
from geonode.monitoring.aggregation import (
aggregate_past_periods,
calculate_rate,
calculate_percent,
extract_resources,
extract_event_type,
extract_event_types,
extract_special_event_types,
get_resources_for_metric,
get_labels_for_metric,
get_metric_names,
)
from geonode.base.models import ResourceBase
from geonode.utils import parse_datetime
[docs]
log = logging.getLogger(__name__)
[docs]
class CollectorAPI:
def __init__(self):
pass
[docs]
def _calculate_rate(self, metric_name, metric_label, current_value, valid_to):
"""
Find previous network metric value and calculate rate between them
"""
return calculate_rate(metric_name, metric_label, current_value, valid_to)
[docs]
def _calculate_percent(self, metric_name, metric_label, current_value, valid_to):
"""
Find previous network metric value and calculate percent
"""
return calculate_percent(metric_name, metric_label, current_value, valid_to)
[docs]
def process_host_geoserver(self, service, data, valid_from, valid_to):
"""
Generates mertic values for system-level measurements
"""
desc_re = re.compile(r"\[(\w+)\]")
def get_iface_name(row):
desc = row["description"]
m = desc_re.search(desc)
if m is None:
return
return m.groups()[0]
def get_network_rate(row, value, metric_defaults, metric_name, valid_to):
iface_label = get_iface_name(row)
if not iface_label:
try:
log.debug("no label", metric_name, row.get("description"))
except Exception:
pass
return
rate = self._calculate_rate(metric_name, iface_label, value, valid_to)
if rate is None:
try:
log.debug("no rate for", metric_name)
except Exception:
pass
return
mdata = {
"value": rate,
"value_raw": rate,
"value_num": rate,
"label": iface_label,
"metric": f"{metric_name}.rate",
}
mdata.update(metric_defaults)
log.debug(MetricValue.add(**mdata))
def get_mem_label(*args):
return "B"
# gs metric -> monitoring metric name, label function, postproc
# function
GS_METRIC_MAP = dict(
(
(
"SYSTEM_UPTIME",
(
"uptime",
None,
None,
),
),
(
"SYSTEM_AVERAGE_LOAD",
(
"load.1m",
None,
None,
),
),
(
"CPU_LOAD",
(
"cpu.usage.percent",
None,
None,
),
),
(
"MEMORY_USED",
(
"mem.usage.percent",
get_mem_label,
None,
),
),
(
"MEMORY_TOTAL",
(
"mem.all",
get_mem_label,
None,
),
),
(
"MEMORY_FREE",
(
"mem.free",
get_mem_label,
None,
),
),
(
"NETWORK_INTERFACE_SEND",
("network.out", get_iface_name, get_network_rate),
),
(
"NETWORK_INTERFACE_RECEIVED",
("network.in", get_iface_name, get_network_rate),
),
(
"NETWORK_INTERFACES_SEND",
("network.out", None, get_network_rate),
),
(
"NETWORK_INTERFACES_RECEIVED",
("network.in", None, get_network_rate),
),
)
)
utc = pytz.utc
collected_at = datetime.utcnow().replace(tzinfo=utc)
valid_from = align_period_start(collected_at, service.check_interval)
valid_to = align_period_end(collected_at, service.check_interval)
mdefaults = {
"valid_from": valid_from,
"valid_to": valid_to,
"resource": None,
"samples_count": 1,
"service": service,
}
metrics = [m[0] for m in GS_METRIC_MAP.values()]
MetricValue.objects.filter(
service_metric__metric__name__in=metrics, valid_from=valid_from, valid_to=valid_to, service=service
).delete()
for metric_data in data:
map_data = GS_METRIC_MAP.get(metric_data["name"])
if not map_data:
continue
metric_name, label_function, processing_function = map_data
if metric_name is None:
continue
value = metric_data["value"]
if isinstance(value, str):
value = value.replace(",", ".")
mdata = {
"value": value,
"value_raw": value,
"value_num": value,
"label": label_function(metric_data) if callable(label_function) else None,
"metric": metric_name,
}
mdata.update(mdefaults)
log.debug(MetricValue.add(**mdata))
if callable(processing_function):
processing_function(metric_data, value, mdefaults, metric_name, valid_to)
[docs]
def process_host_geonode(self, service, data, valid_from, valid_to):
"""
Generates mertic values for system-level measurements
"""
utc = pytz.utc
import dateutil.parser
collected_at = parse_datetime(dateutil.parser.parse(data["timestamp"]).strftime("%Y-%m-%d %H:%M:%S")).replace(
tzinfo=utc
)
valid_from = align_period_start(collected_at, service.check_interval)
valid_to = align_period_end(collected_at, service.check_interval)
mdefaults = {
"valid_from": valid_from,
"valid_to": valid_to,
"resource": None,
"samples_count": 1,
"service": service,
}
MetricValue.objects.filter(
service_metric__metric__name__in=("network.in", "network.out"),
valid_from=valid_from,
valid_to=valid_to,
service=service,
).delete()
for ifname, ifdata in data["data"]["network"].items():
for tx_label, tx_value in ifdata["traffic"].items():
mdata = {
"value": tx_value,
"value_raw": tx_value,
"value_num": tx_value,
"label": ifname,
"metric": f"network.{tx_label}",
}
mdata.update(mdefaults)
rate = self._calculate_rate(mdata["metric"], ifname, tx_value, valid_to)
log.debug(MetricValue.add(**mdata))
if rate:
mdata["metric"] = f"{mdata['metric']}.rate"
mdata["value"] = rate
mdata["value_num"] = rate
mdata["value_raw"] = rate
log.debug(MetricValue.add(**mdata))
ldata = data["data"]["load"]
llabel = ["1", "5", "15"]
memory_info = data["data"]["memory"]
mkeys = [m.name[len("mem.") :] for m in service.get_metrics() if m.name.startswith("mem.")]
for mkey in mkeys:
mdata = memory_info.get(mkey)
if not mdata:
continue
mdata = {
"value": mdata,
"value_raw": mdata,
"value_num": mdata,
"metric": f"mem.{mkey}",
"label": "B",
}
mdata.update(mdefaults)
MetricValue.objects.filter(
service_metric__metric__name=mdata["metric"],
valid_from=mdata["valid_from"],
valid_to=mdata["valid_to"],
label__name="MB",
service=service,
).delete()
log.debug(MetricValue.add(**mdata))
MetricValue.objects.filter(
service_metric__metric__name__in=(
"storage.total",
"storage.used",
"storage.free",
),
valid_from=valid_from,
valid_to=valid_to,
service=service,
).delete()
for df in data["data"]["disks"]:
# dev = df['device']
total = df["total"]
used = df["used"]
free = df["free"]
# free_pct = df['percent']
mount = df["mountpoint"]
for metric, val in (
(
"storage.total",
total,
),
(
"storage.used",
used,
),
(
"storage.free",
free,
),
):
mdata = {
"value": val,
"value_raw": val,
"value_num": val,
"metric": metric,
"label": mount,
}
mdata.update(mdefaults)
log.debug(MetricValue.add(**mdata))
if ldata:
for lidx, l in enumerate(ldata):
mdata = {
"value": l,
"value_raw": l,
"value_num": l,
"metric": f"load.{llabel[lidx]}m",
"label": "Value",
}
mdata.update(mdefaults)
MetricValue.objects.filter(
service_metric__metric__name=mdata["metric"],
valid_from=mdata["valid_from"],
valid_to=mdata["valid_to"],
label__name="Value",
service=service,
).delete()
log.debug(MetricValue.add(**mdata))
uptime = data["data"].get("uptime")
if uptime is not None:
mdata = {"value": uptime, "value_raw": uptime, "value_num": uptime, "metric": "uptime", "label": "Seconds"}
mdata.update(mdefaults)
MetricValue.objects.filter(
service_metric__metric__name=mdata["metric"],
valid_from=mdata["valid_from"],
valid_to=mdata["valid_to"],
label__name=mdata["label"],
service=service,
).delete()
log.debug(MetricValue.add(**mdata))
if data["data"].get("cpu"):
_l = data["data"]["cpu"]["usage"]
mdata = {
"value": _l,
"value_raw": _l,
"value_num": _l,
"metric": "cpu.usage",
"label": "Seconds",
}
mdata.update(mdefaults)
MetricValue.objects.filter(
service_metric__metric__name=mdata["metric"],
valid_from=mdata["valid_from"],
valid_to=mdata["valid_to"],
label__name=mdata["label"],
service=service,
).delete()
log.debug(MetricValue.add(**mdata))
rate = self._calculate_rate(mdata["metric"], mdata["label"], mdata["value"], mdata["valid_to"])
if rate:
rate_data = mdata.copy()
rate_data["metric"] = f"{mdata['metric']}.rate"
rate_data["value"] = rate
rate_data["value_num"] = rate
rate_data["value_raw"] = rate
log.debug(MetricValue.add(**rate_data))
percent = self._calculate_percent(mdata["metric"], mdata["label"], mdata["value"], mdata["valid_to"])
if percent:
percent_data = mdata.copy()
percent_data["metric"] = f"{mdata['metric']}.percent"
percent_data["value"] = percent
percent_data["value_num"] = percent
percent_data["value_raw"] = percent
percent_data["label"] = "Value"
log.debug(MetricValue.add(**percent_data))
mdata.update(mdefaults)
log.debug(MetricValue.add(**mdata))
[docs]
def get_labels_for_metric(self, metric_name, resource=None):
return get_labels_for_metric(metric_name, resource)
[docs]
def get_resources_for_metric(self, metric_name):
return get_resources_for_metric(metric_name)
[docs]
def get_metric_names(self):
"""
Returns list of tuples: (service type, list of metrics)
"""
return get_metric_names()
[docs]
def set_metric_values(self, metric_name, column_name, requests, service, **metric_values):
metric = Metric.get_for(metric_name, service=service)
def _key(v):
return v["value"]
# we need list of three items:
# * value - numeric value for given metric
# * label - label value to be used
# * samples count - number of samples for a metric
if metric.is_rate:
row = requests.aggregate(value=models.Avg(column_name))
row["samples"] = requests.count()
row["label"] = Metric.TYPE_RATE
q = [row]
elif metric.is_count:
q = []
values = requests.distinct(column_name).values_list(column_name, flat=True)
for v in values:
rqs = requests.filter(**{column_name: v})
row = rqs.aggregate(value=models.Sum(column_name), samples=models.Count(column_name))
row["label"] = v
q.append(row)
q.sort(key=_key)
q.reverse()
elif metric.is_value:
q = []
is_user_metric = column_name == "user_identifier"
if is_user_metric:
values = requests.distinct(column_name).values_list(column_name, "user_username")
else:
values = requests.distinct(column_name).values_list(column_name, flat=True)
for v in values:
if v is not None:
value = v
if is_user_metric:
value = v[0]
rqs = requests.filter(**{column_name: value})
row = rqs.aggregate(value=models.Count(column_name), samples=models.Count(column_name))
row["label"] = v
q.append(row)
q.sort(key=_key)
q.reverse()
elif metric.is_value_numeric:
q = []
row = requests.aggregate(value=models.Max(column_name), samples=models.Count(column_name))
row["label"] = Metric.TYPE_VALUE_NUMERIC
q.append(row)
else:
raise ValueError(f"Unsupported metric type: {metric.type}")
rows = q[:100]
metric_values.update({"metric": metric_name, "service": service})
for row in rows:
label = row["label"]
value = row["value"]
samples = row["samples"]
metric_values.update(
{
"value": value or 0,
"label": label,
"samples_count": samples,
"value_raw": value or 0,
"value_num": value if isinstance(value, (float, Decimal, int)) else None,
}
)
log.debug(MetricValue.add(**metric_values))
[docs]
def process(self, service, data, valid_from, valid_to, *args, **kwargs):
if service.is_hostgeonode:
return self.process_host_geonode(service, data, valid_from, valid_to, *args, **kwargs)
elif service.is_hostgeoserver:
return self.process_host_geoserver(service, data, valid_from, valid_to, *args, **kwargs)
else:
return self.process_requests(service, data, valid_from, valid_to, *args, **kwargs)
[docs]
def process_requests(self, service, requests, valid_from, valid_to):
"""
Processes request list for specific service, generate stats
"""
interval = service.check_interval
periods = generate_periods(valid_from, interval, valid_to)
for pstart, pend in periods:
requests_batch = requests.filter(created__gte=pstart, created__lt=pend)
self.process_requests_batch(service, requests_batch, pstart, pend)
[docs]
def set_error_values(self, requests, valid_from, valid_to, service=None, resource=None, event_type=None):
with_errors = requests.exclude(exceptions=None)
if not with_errors.exists():
return
labels = ExceptionEvent.objects.filter(request__in=with_errors).distinct().values_list("error_type", flat=True)
defaults = {
"valid_from": valid_from,
"valid_to": valid_to,
"resource": resource,
"event_type": event_type,
"metric": "response.error.count",
"samples_count": requests.count(),
"label": "count",
"service": service,
}
cnt = with_errors.count()
log.debug(MetricValue.add(value=cnt, value_num=cnt, value_raw=cnt, **defaults))
defaults["metric"] = "response.error.types"
for label in labels:
cnt = with_errors.filter(exceptions__error_type=label).count()
defaults["label"] = label
defaults["samples_count"] = cnt
log.debug(MetricValue.add(value=cnt, value_num=cnt, value_raw=cnt, **defaults))
[docs]
def process_requests_batch(self, service, requests, valid_from, valid_to):
"""
Processes requests information into metric values
"""
log.debug("Processing batch of %s requests from %s to %s", requests.count(), valid_from, valid_to)
if not requests.count():
return
event_all = EventType.objects.get(name=EventType.EVENT_ALL)
metric_defaults = {
"valid_from": valid_from,
"valid_to": valid_to,
"event_type": event_all,
"resource": None,
"service": service,
}
MetricValue.objects.filter(valid_from__gte=valid_from, valid_to__lte=valid_to, service=service).delete()
requests = requests.filter(service=service)
resources = self.extract_resources(requests)
def push_metric_values(srequests, **mdefaults):
count = srequests.count()
count_mdefaults = mdefaults.copy()
count_mdefaults["value"] = count
count_mdefaults["label"] = "Count"
count_mdefaults["value_num"] = count
count_mdefaults["value_raw"] = count
count_mdefaults["samples_count"] = count
log.debug(MetricValue.add("request.count", **count_mdefaults))
paths = srequests.distinct("request_path").values_list("request_path", flat=True)
for path in paths:
count = srequests.filter(request_path=path).count()
count_mdefaults["value"] = count
count_mdefaults["label"] = path
count_mdefaults["value_num"] = count
count_mdefaults["value_raw"] = count
count_mdefaults["samples_count"] = count
log.debug(MetricValue.add("request.path", **count_mdefaults))
for mname, cname in (
(
"request.ip",
"client_ip",
),
(
"request.users",
"user_identifier",
),
("request.country", "client_country"),
(
"request.city",
"client_city",
),
("request.region", "client_region"),
(
"request.ua",
"user_agent",
),
(
"request.ua.family",
"user_agent_family",
),
(
"response.time",
"response_time",
),
(
"response.size",
"response_size",
),
(
"response.status",
"response_status",
),
(
"request.method",
"request_method",
),
):
# calculate overall stats
self.set_metric_values(mname, cname, srequests, **mdefaults)
self.set_error_values(srequests, valid_from, valid_to, service=service, resource=None)
push_metric_values(requests, **metric_defaults)
# for each resource we should calculate another set of stats
for resource, _requests in [
(
None,
requests,
)
] + resources:
metric_defaults["resource"] = resource
metric_defaults["event_type"] = event_all
push_metric_values(_requests, **metric_defaults)
# for each event type we need separate metrics set
event_types = self.extract_event_types(_requests)
for event_type in event_types:
event_type_requests = _requests.filter(event_type=event_type)
metric_defaults["event_type"] = event_type
push_metric_values(event_type_requests, **metric_defaults)
# combined event types: ows and non-ows
for evt, rq in self.extract_special_event_types(_requests):
metric_defaults["event_type"] = evt
push_metric_values(rq, **metric_defaults)
[docs]
def get_metrics_for(
self,
metric_name,
valid_from=None,
valid_to=None,
interval=None,
service=None,
label=None,
user=None,
resource=None,
event_type=None,
service_type=None,
group_by=None,
resource_type=None,
):
"""
Returns metric data for given metric. Returned dataset contains list of periods and values in that periods
"""
utc = pytz.utc
default_interval = False
now = datetime.utcnow().replace(tzinfo=utc)
if not interval:
default_interval = True
interval = timedelta(seconds=60)
if not isinstance(interval, timedelta):
interval = timedelta(seconds=interval)
valid_from = valid_from or (now - interval)
valid_to = valid_to or now
if (not interval or default_interval) and (valid_to - valid_from).total_seconds() > 24 * 3600:
default_interval = True
interval = timedelta(seconds=3600)
if not isinstance(interval, timedelta):
interval = timedelta(seconds=interval)
metric = Metric.objects.get(name=metric_name)
out = {
"metric": metric.name,
"input_valid_from": valid_from.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"input_valid_to": valid_to.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"interval": interval.total_seconds(),
"label": label.name if label else None,
"type": metric.type,
"axis_label": metric.unit,
"data": [],
}
periods = generate_periods(valid_from, interval, valid_to, align=False)
for pstart, pend in periods:
pdata = self.get_metrics_data(
metric_name,
pstart,
pend,
interval=interval,
service=service,
label=label,
user=user,
event_type=event_type,
service_type=service_type,
resource=resource,
resource_type=resource_type,
group_by=group_by,
)
out["data"].append(
{
"valid_from": pstart.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"valid_to": pend.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"data": pdata,
}
)
return out
[docs]
def get_aggregate_function(self, column_name, metric_name, service=None):
"""
Returns string with metric value column name surrounded by aggregate function
based on metric type (which tells how to interpret value - is it a counter,
rate or something else).
"""
metric = Metric.get_for(metric_name, service=service)
if not metric:
raise ValueError(f"Invalid metric {metric_name}")
f = metric.get_aggregate_name()
return f or column_name
[docs]
def get_metrics_data(
self,
metric_name,
valid_from,
valid_to,
interval,
service=None,
label=None,
user=None,
resource=None,
resource_type=None,
event_type=None,
service_type=None,
group_by=None,
):
"""
Returns metric values for metric within given time span
"""
utc = pytz.utc
params = {}
col = "mv.value_num"
agg_f = self.get_aggregate_function(col, metric_name, service)
has_agg = agg_f != col
group_by_map = {
"resource": {
"select": ["mr.id", "mr.type", "mr.name", "mr.resource_id"],
"from": ["join monitoring_monitoredresource mr on (mv.resource_id = mr.id)"],
"where": ["and mv.resource_id is not NULL"],
"order_by": None,
"grouper": ["resource", "name", "type", "id", "resource_id"],
},
# for each resource get the number of unique labels
"resource_on_label": {
"select_only": [
"mr.id",
"mr.type",
"mr.name",
"mr.resource_id",
"count(distinct(ml.name)) as val",
"count(1) as metric_count",
"sum(samples_count) as samples_count",
"sum(mv.value_num), min(mv.value_num)",
"max(mv.value_num)",
],
"from": [("join monitoring_monitoredresource mr " "on (mv.resource_id = mr.id)")],
"where": ["and mv.resource_id is not NULL"],
"order_by": ["val desc"],
"group_by": ["mr.id", "mr.type", "mr.name"],
"grouper": ["resource", "name", "type", "id", "resource_id"],
},
# for each resource get the number of unique users
"resource_on_user": {
"select_only": [
"mr.id",
"mr.type",
"mr.name",
"mr.resource_id",
"count(distinct(ml.user)) as val",
"count(1) as metric_count",
"sum(samples_count) as samples_count",
"sum(mv.value_num), min(mv.value_num)",
"max(mv.value_num)",
],
"from": [("join monitoring_monitoredresource mr " "on (mv.resource_id = mr.id)")],
"where": ["and mv.resource_id is not NULL"],
"order_by": ["val desc"],
"group_by": ["mr.id", "mr.type", "mr.name"],
"grouper": ["resource", "name", "type", "id", "resource_id"],
},
# resource count
"count_on_resource": {
"select_only": [
(
"count(distinct(mr.id)) as val, "
"count(1) as metric_count, "
"sum(samples_count) as samples_count, "
"sum(mv.value_num), min(mv.value_num), "
"max(mv.value_num)"
)
],
"from": [("join monitoring_monitoredresource mr " "on (mv.resource_id = mr.id)")],
"where": ["and mr.id is not NULL"],
"order_by": ["val desc"],
"group_by": [],
"grouper": [],
},
"event_type": {
"select_only": [
"ev.name as event_type",
"sum(mv.value_num) as val",
"count(1) as metric_count",
"sum(samples_count) as samples_count",
"sum(mv.value_num), min(mv.value_num)",
"max(mv.value_num)",
],
"from": [
"join monitoring_eventtype ev on (ev.id = mv.event_type_id)",
("join monitoring_monitoredresource mr " "on (mv.resource_id = mr.id)"),
],
"where": [],
"order_by": ["val desc"],
"group_by": ["ev.name"],
"grouper": [],
},
# for each event the unique label count
"event_type_on_label": {
"select_only": [
"ev.name as event_type",
"count(distinct(ml.name)) as val",
"count(1) as metric_count",
"sum(samples_count) as samples_count",
"sum(mv.value_num), min(mv.value_num)",
"max(mv.value_num)",
],
"from": [
"join monitoring_eventtype ev on (ev.id = mv.event_type_id)",
("join monitoring_monitoredresource mr " "on (mv.resource_id = mr.id)"),
],
"where": [],
"order_by": ["val desc"],
"group_by": ["ev.name"],
"grouper": [],
},
# for each event the unique user count
"event_type_on_user": {
"select_only": [
"ev.name as event_type",
"count(distinct(ml.user)) as val",
"count(1) as metric_count",
"sum(samples_count) as samples_count",
"sum(mv.value_num), min(mv.value_num)",
"max(mv.value_num)",
],
"from": [
"join monitoring_eventtype ev on (ev.id = mv.event_type_id)",
("join monitoring_monitoredresource mr " "on (mv.resource_id = mr.id)"),
],
"where": [],
"order_by": ["val desc"],
"group_by": ["ev.name"],
"grouper": [],
},
# group by user: number of unique user
"user": {
"select_only": [
(
"count(distinct(ml.user)) as val, "
"count(1) as metric_count, sum(samples_count) as samples_count, "
"sum(mv.value_num), min(mv.value_num), max(mv.value_num)"
)
],
"from": [("join monitoring_monitoredresource mr " "on (mv.resource_id = mr.id)")],
# 'from': [], do we want to retrieve also events not related to a monitored resource?
"where": ["and ml.user is not NULL"],
"order_by": ["val desc"],
"group_by": [],
"grouper": [],
},
# number of labels for each user
"user_on_label": {
"select_only": [
"ml.user as user, count(distinct(ml.name)) as val, " "count(1) as metric_count",
"sum(samples_count) as samples_count",
"sum(mv.value_num), min(mv.value_num)",
"max(mv.value_num)",
],
"from": [("join monitoring_monitoredresource mr " "on (mv.resource_id = mr.id)")],
"where": ["and ml.user is not NULL"],
"order_by": ["val desc"],
"group_by": ["ml.user"],
"grouper": [],
},
# group by label
"label": {
"select_only": [
(
"count(distinct(ml.name)) as val, "
"count(1) as metric_count, sum(samples_count) as samples_count, "
"sum(mv.value_num), min(mv.value_num), max(mv.value_num)"
)
],
"from": [("join monitoring_monitoredresource mr " "on (mv.resource_id = mr.id)")],
"where": [], # ["and mv.resource_id is NULL or (mr.type = '')"],
"order_by": ["val desc"],
"group_by": [],
"grouper": [],
},
}
q_from = [
"from monitoring_metricvalue mv",
"join monitoring_servicetypemetric mt on (mv.service_metric_id = mt.id)",
"join monitoring_metric m on (m.id = mt.metric_id)",
"join monitoring_metriclabel ml on (mv.label_id = ml.id) ",
]
q_where = [
"where",
" ((mv.valid_from >= TIMESTAMP %(valid_from)s AT TIME ZONE 'UTC' ",
"and mv.valid_to < TIMESTAMP %(valid_to)s AT TIME ZONE 'UTC') ",
"or (mv.valid_from > TIMESTAMP %(valid_from)s AT TIME ZONE 'UTC' ",
"and mv.valid_to <= TIMESTAMP %(valid_to)s AT TIME ZONE 'UTC')) ",
"and m.name = %(metric_name)s",
]
if metric_name == "uptime":
q_where = ["where", "m.name = %(metric_name)s"]
q_group = ["ml.name"]
params.update(
{
"metric_name": metric_name,
"valid_from": valid_from.replace(tzinfo=utc).isoformat(),
"valid_to": valid_to.replace(tzinfo=utc).isoformat(),
}
)
q_order_by = ["val desc"]
q_select = [
(
f"select ml.name as label, {agg_f} as val, "
"count(1) as metric_count, sum(samples_count) as samples_count, "
"sum(mv.value_num), min(mv.value_num), max(mv.value_num)"
)
]
if service and service_type:
raise ValueError("Cannot use service and service type in the same query")
if service:
q_where.append("and mv.service_id = %(service_id)s")
params["service_id"] = service.id
elif service_type:
q_from.append(
"join monitoring_service ms on "
"(ms.id = mv.service_id and ms.service_type_id = %(service_type_id)s ) "
)
params["service_type_id"] = service_type.id
if event_type is None and group_by not in ("event_type", "event_type_on_label", "event_type_on_user"):
event_type = EventType.get(EventType.EVENT_ALL)
exclude_ev_type = BuiltIns.host_metrics + ("response.error.count",)
if event_type and metric_name not in exclude_ev_type:
q_where.append(" and mv.event_type_id = %(event_type)s ")
params["event_type"] = event_type.id
if label:
q_where.append(" and ml.id = %(label)s")
params["label"] = label.id
# if not group_by and not resource:
# resource = MonitoredResource.get('', '', or_create=True)
if resource and has_agg:
q_group.append("mr.name")
# group returned columns into a dict
# config in grouping map: target_column = {source_column1: val, ...}
if label and has_agg:
q_group.extend(["ml.name"])
grouper = None
if group_by:
group_by_cfg = group_by_map[group_by]
g_sel = group_by_cfg.get("select")
if g_sel:
q_select.append(f", {(', '.join(g_sel))}")
g_sel = group_by_cfg.get("select_only")
if g_sel:
q_select = [f"select {(', '.join(g_sel))}"]
q_from.extend(group_by_cfg["from"])
q_where.extend(group_by_cfg["where"])
if group_by_cfg.get("group_by") is not None:
q_group = group_by_cfg["group_by"]
else:
q_group.extend(group_by_cfg["select"])
grouper = group_by_cfg["grouper"]
if resource_type and not resource:
if not [mr for mr in q_from if "monitoring_monitoredresource" in mr]:
q_from.append("join monitoring_monitoredresource mr on mv.resource_id = mr.id ")
q_where.append(" and mr.type = %(resource_type)s ")
params["resource_type"] = resource_type
if resource and group_by in (
"resource",
"resource_on_label",
"resource_on_user",
):
raise ValueError("Cannot use resource and group by resource at the same time")
elif resource:
if not [mr for mr in q_from if "monitoring_monitoredresource" in mr]:
q_from.append("join monitoring_monitoredresource mr on mv.resource_id = mr.id ")
q_where.append(" and mr.id = %(resource_id)s ")
params["resource_id"] = resource.id
if "ml.name" in q_group:
q_select.append(", max(ml.user) as user")
# q_group.extend(['ml.user']) not needed
if user:
q_where.append(" and ml.user = %(user)s ")
params["user"] = user
if q_group:
q_group = [" group by ", ",".join(q_group)]
if q_order_by:
q_order_by = f"order by {(','.join(q_order_by))}"
q = " ".join(chain(q_select, q_from, q_where, q_group, [q_order_by]))
def postproc(row):
if grouper:
t = {}
tcol = grouper[0]
for scol in grouper[1:]:
if scol == "resource_id":
if scol in row:
r_id = row.pop(scol)
if "type" in t and t["type"] != MonitoredResource.TYPE_URL:
try:
rb = ResourceBase.objects.get(id=r_id)
t["href"] = rb.detail_url
except Exception:
t["href"] = ""
else:
t[scol] = row.pop(scol)
if scol == "type" and scol in t and t[scol] == MonitoredResource.TYPE_URL:
try:
resolve(t["name"])
t["href"] = t["name"]
except Resolver404:
t["href"] = ""
row[tcol] = t
return row
def check_row(r):
is_ok = True
# Avoid Count label for countries
# (it has been already fixed in "set_metric_values"
# but the following line avoid showing the label in case of existing dirty db)
if metric_name == "request.country" and r["label"] == "count":
is_ok = False
return is_ok
return [postproc(row) for row in raw_sql(q, params) if check_row(row)]
[docs]
def aggregate_past_periods(self, metric_data_q=None, periods=None, **kwargs):
"""
Aggregate past metric data into longer periods
"""
return aggregate_past_periods(metric_data_q, periods, **kwargs)
[docs]
def clear_old_data(self):
utc = pytz.utc
threshold = settings.MONITORING_DATA_TTL
if not isinstance(threshold, timedelta):
raise TypeError(
"MONITORING_DATA_TTL should be an instance of " f"datatime.timedelta, not {threshold.__class__}"
)
cutoff = datetime.utcnow().replace(tzinfo=utc) - threshold
ExceptionEvent.objects.filter(created__lte=cutoff).delete()
RequestEvent.objects.filter(created__lte=cutoff).delete()
MetricValue.objects.filter(valid_to__lte=cutoff).delete()
[docs]
def compose_notifications(self, ndata, when=None):
utc = pytz.utc
return {"alerts": ndata, "when": when or datetime.utcnow().replace(tzinfo=utc), "host": settings.SITEURL}
[docs]
def emit_notifications(self, for_timestamp=None):
notifications = self.get_notifications(for_timestamp)
for n, ndata in notifications:
if not n.can_send:
continue
try:
users = n.get_users()
content = self.compose_notifications(ndata, when=for_timestamp)
send_notification(users=users, label=AppConf.NOTIFICATION_NAME, extra_context=content)
emails = n.get_emails()
self.send_mails(n, emails, ndata, for_timestamp)
finally:
n.mark_send()
[docs]
def send_mails(self, notification, emails, ndata, when=None):
base_ctx = self.compose_notifications(ndata, when=when)
subject = _(f"GeoNode Monitoring on {base_ctx['host']} " f"reports errors: {notification.notification_subject}")
for email in emails:
ctx = {"recipient": {"username": email}}
ctx.update(base_ctx)
body_html = get_template("pinax/notifications/monitoring_alert/full.txt").render(ctx)
body_plain = strip_tags(body_html)
msg = EmailMessage(subject, body_plain, to=(email,))
msg.attach_alternative(body_html, "text/html")
msg.send()
[docs]
def get_last_usable_timestamp(self):
metrics = Metric.objects.filter(notification_checks__isnull=False).distinct()
mv = MetricValue.objects.filter(service_metric__metric__in=metrics).aggregate(Max("valid_to"))
return mv["valid_to__max"]
[docs]
def get_notifications(self, for_timestamp=None):
if for_timestamp is None:
for_timestamp = self.get_last_usable_timestamp()
notifications = NotificationCheck.check_for(for_timestamp=for_timestamp, active=True)
non_empty = [n for n in notifications if n[1]]
return non_empty