最有可能的是,今天没有人质疑为什么他们需要收集服务指标。 下一步的逻辑步骤是为收集的指标配置警报,该警报将在您方便使用的渠道(邮件,Slack,电报)中通知您数据的任何偏差。 在
Ostrovok.ru酒店的在线预订服务中,我们服务的所有度量标准都已注入InfluxDB并显示在Grafana中,还在那里设置了基本警报。 对于诸如“您需要计算并进行比较”之类的任务,我们使用Kapacitor。
Kapacitor是TICK堆栈的一部分,可以处理来自InfluxDB的指标。 他可以将多个维度相互连接(联接),从接收到的数据中计算出有用的东西,将结果写回到InfluxDB,向Slack / Telegram / mail发送警报。
整个
文档库都有很酷且详细的
文档 ,但是总有一些有用的内容没有在手册中明确指定。 在本文中,我决定收集一些此类有用的非显而易见的技巧(
此处描述了基本的TICKscipt语法),并以解决我们的问题之一为例,展示了如何应用它们。
走吧
浮点数和整数,计算错误
绝对标准的问题,是通过种姓解决的:
var alert_float = 5.0 var alert_int = 10 data|eval(lambda: float("value") > alert_float OR float("value") < float("alert_int"))
使用默认值()
如果未填写标签/字段,则在计算中将发生错误:
|default() .tag('status', 'empty') .field('value', 0)
填写联接(内部与外部)
默认情况下,join将删除没有数据的点(内部)。
使用fill('null'),将执行外部联接,之后您需要执行default()并填充空值:
var data = res1 |join(res2) .as('res1', 'res2) .fill('null') |default() .field('res1.value', 0.0) .field('res2.value', 100.0)
仍然有细微差别。 如果在上面的示例中,序列之一(res1或res2)为空,则最终序列(数据)也将为空。 在github(
1633,1871,6967 )上有几张关于该主题的票证-我们正在等待修复,并且会稍有痛苦。
在计算中使用条件(如果在lambda中)
|eval(lambda: if("value" > 0, true, false)
在此期间,管道的最后五分钟
例如,您需要将最近五分钟的值与前一周的值进行比较。 您可以使用两个单独的batch'ami获取两个数据包,或者从较大的时期中提取部分数据:
|where(lambda: duration((unixNano(now()) - unixNano("time"))/1000, 1u) < 5m)
最后五分钟的替代方法是使用BarrierNode节点,该节点在指定时间之前切断数据:
|barrier() .period(5m)
在消息中使用Go'sh模式的示例
模板与
text.template包中的格式相对应,以下是一些常见任务。
如果-否则
我们将事情井井有条,我们不会再用文字触发人们:
|alert() ... .message( '{{ if eq .Level "OK" }}It is ok now{{ else }}Chief, everything is broken{{end}}' )
邮件中的小数点后两位
提高消息的可读性:
|alert() ... .message( 'now value is {{ index .Fields "value" | printf "%0.2f" }}' )
在消息中扩展变量
我们在消息中显示更多信息,以回答“为什么大喊大叫?”的问题。
var warnAlert = 10 |alert() ... .message( 'Today value less then '+string(warnAlert)+'%' )
唯一警报标识
当数据中存在多个组时,这是正确的,否则只会生成一个警报:
|alert() ... .id('{{ index .Tags "myname" }}/{{ index .Tags "myfield" }}')
自定义处理程序
大量的处理程序都有exec,它使您可以通过传递的参数(stdin)执行脚本-创造力等等!
我们的自定义工具之一是一个小的python脚本,用于向松弛发送通知。
首先,我们要从消息中受授权保护的grafana发送图片。 之后-将OK写入同一组中上一个警报的线程,而不是作为单独的消息。 稍后-将最近X分钟内最常见的错误附加到邮件中。
一个单独的主题是与其他服务的通信以及由警报启动的任何操作(仅当您的监视工作足够好时)。
描述处理程序的示例,其中slack_handler.py是我们自己编写的脚本:
topic: slack_graph id: slack_graph.alert match: level() != INFO AND changed() == TRUE kind: exec options: prog: /sbin/slack_handler.py args: ["-c", "CHANNELID", "--graph", "--search"]
如何首次亮相?
日志输出选项
|log() .level("error") .prefix("something")
观看(CLI):kapacitor -url
host-or-ip :9092日志lvl =错误
带有httpOut的变体
显示当前管道中的数据:
|httpOut('something')
观看(获取):
host-or-ip :9092 / kapacitor / v1 /任务/ task_name /其他
执行方案
我还能在哪里得到耙
回写时的influxdb时间戳
例如,我们为每小时的请求总数(groupBy(1h))设置了一个警报,并希望将发生的警报记录在influxdb中(以在grafana的图表上漂亮地显示问题的事实)。
influxDBOut()会将警报的时间值分别写入时间戳,图表上的点将在警报出现之前/之后进行记录。
当需要准确性时:我们通过调用自定义处理程序来绕过此问题,该处理程序将使用当前时间戳将数据写入influxdb。
码头工人,建立和部署
启动时,kapacitor可以从[load]块中config中指定的目录中加载任务,模板和处理程序。
要正确创建任务,需要执行以下操作:
- 文件名-扩展为ID /脚本名
- 类型-流/批次
- dbrp-指示脚本在哪个数据库+策略中工作的关键字(dbrp“供应商”。“ autogen”)
如果某个批处理任务中没有与dbrp匹配的行,则整个服务将拒绝启动,并如实地在日志中写出它。
相反,在chronograf中,此行不应被接受,它不会被接口接受并引发错误。
构建容器时会被黑客入侵:如果在//.+dbrp中有一行,则Dockerfile会以-1退出,这将在构建构建时立即了解文件的原因。
一对多加入
任务示例:您需要每周使用服务运行时间的95%,并将最后10分钟的每分钟与该值进行比较。
您无法按一组点加入一对,最后/均值/中位数将节点转换为流,将返回错误“无法添加子项不匹配的边:批处理->流”。
批处理的结果作为lambda表达式中的变量,也不会被替换。
有一个选项可以通过udf将第一批中的必要编号保存到文件中,并通过sideload加载该文件。
我们决定了什么?
我们大约有100家酒店供应商,每个供应商都可以有多个联系,我们称其为渠道。 这些通道大约有300个;每个通道都可能掉线。 在所有记录的指标中,我们将监视错误率(请求和错误)。
为什么不使用格拉法纳呢?
在grafan中配置的错误警报有几个缺点。 根据情况的不同,有些危急,有些可以闭上眼睛。
Grafana不知道如何在维度+警报之间进行计算,但是我们需要一个比率(请求-错误)/请求。
错误看起来很恶心:

与成功的请求一起查看时,不会那么恶毒

好的,我们可以在grafana之前预先计算服务中的费率,并且在某些情况下可以。 但不是我们的,因为 对于每个通道,其比率均被视为“正常”,并且警报根据静态值工作(我们用眼睛观察,如果经常警报,则将其更改)。
这些是不同渠道的“正常”示例:


我们忽略了上一段,并假设所有供应商的情况都是“正常”的。 现在一切都很好,我们可以通过grafana中的警报来度过难关吗?
我们可以,但实际上不希望这样做,因为我们必须选择以下选项之一:
a)为每个通道分别制作许多图形(并痛苦地陪伴它们)
b)在所有通道中保留一张图表(并迷失在彩色线条和已调整的警报中)

你是怎么做到的?
再次,该文档提供了一个很好的入门示例(
计算各个系列的比率 ),您可以窥视或作为类似任务的基础。
结果您做了什么:
- 在几个小时内加入两集,按频道分组;
- 如果没有数据,请按组填写系列;
- 比较最近10分钟的中位数和先前的数据;
- 如果发现什么我们就尖叫;
- 在influxdb中写入计算的速率和发生的警报;
- 发送有用的消息到松弛状态。
在我看来,我们尽可能地精美地管理了我们希望在输出中获得的所有内容(使用自定义处理程序甚至更多)。
在github.com上,您可以看到
示例代码和所得脚本的
最小图表(graphviz) 。
结果代码示例: dbrp "supplier"."autogen" var name = 'requests.rate' var grafana_dash = 'pczpmYZWU/mydashboard' var grafana_panel = '26' var period = 8h var todayPeriod = 10m var every = 1m var warnAlert = 15 var warnReset = 5 var reqQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."requests"' var errQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."errors"' var prevErr = batch |query(errQuery) .period(period) .every(every) .groupBy(1m, 'channel', 'supplier') var prevReq = batch |query(reqQuery) .period(period) .every(every) .groupBy(1m, 'channel', 'supplier') var rates = prevReq |join(prevErr) .as('req', 'err') .tolerance(1m) .fill('null') // , |default() .field('err.value', 0.0) .field('req.value', 0.0) // if lambda: , |eval(lambda: if("err.value" > 0, 100.0 * (float("req.value") - float("err.value")) / float("req.value"), 100.0)) .as('rate') // rates |influxDBOut() .quiet() .create() .database('kapacitor') .retentionPolicy('autogen') .measurement('rates') // 10 , var todayRate = rates |where(lambda: duration((unixNano(now()) - unixNano("time")) / 1000, 1u) < todayPeriod) |median('rate') .as('median') var prevRate = rates |median('rate') .as('median') var joined = todayRate |join(prevRate) .as('today', 'prev') |httpOut('join') var trigger = joined |alert() .warn(lambda: ("prev.median" - "today.median") > warnAlert) .warnReset(lambda: ("prev.median" - "today.median") < warnReset) .flapping(0.25, 0.5) .stateChangesOnly() // message .message( '{{ .Level }}: {{ index .Tags "channel" }} err/req ratio ({{ index .Tags "supplier" }}) {{ if eq .Level "OK" }}It is ok now{{ else }} '+string(todayPeriod)+' median is {{ index .Fields "today.median" | printf "%0.2f" }}%, by previous '+string(period)+' is {{ index .Fields "prev.median" | printf "%0.2f" }}%{{ end }} http://grafana.ostrovok.in/d/'+string(grafana_dash)+ '?var-supplier={{ index .Tags "supplier" }}&var-channel={{ index .Tags "channel" }}&panelId='+string(grafana_panel)+'&fullscreen&tz=UTC%2B03%3A00' ) .id('{{ index .Tags "name" }}/{{ index .Tags "channel" }}') .levelTag('level') .messageField('message') .durationField('duration') .topic('slack_graph') // "today.median" "value", (keep) trigger |eval(lambda: "today.median") .as('value') .keep() |influxDBOut() .quiet() .create() .database('kapacitor') .retentionPolicy('autogen') .measurement('alerts') .tag('alertName', name)
结论是什么?
Kapacitor非常擅长监视一组警报,根据已记录的指标执行其他计算,执行自定义操作并运行脚本(udf)。
进入门槛不是很高-如果grafana或其他工具不能完全满足您的愿望,请尝试。