Kapacitor中的度量处理技巧

最有可能的是,今天没有人质疑为什么他们需要收集服务指标。 下一步的逻辑步骤是为收集的指标配置警报,该警报将在您方便使用的渠道(邮件,Slack,电报)中通知您数据的任何偏差。 在Ostrovok.ru酒店的在线预订服务中,我们服务的所有度量标准都已注入InfluxDB并显示在Grafana中,还在那里设置了基本警报。 对于诸如“您需要计算并进行比较”之类的任务,我们使用Kapacitor。


Kapacitor是TICK堆栈的一部分,可以处理来自InfluxDB的指标。 他可以将多个维度相互连接(联接),从接收到的数据中计算出有用的东西,将结果写回到InfluxDB,向Slack / Telegram / mail发送警报。

整个文档库都有很酷且详细的文档 ,但是总有一些有用的内容没有在手册中明确指定。 在本文中,我决定收集一些此类有用的非显而易见的技巧( 此处描述了基本的TICKscipt语法),并以解决我们的问题之一为例,展示了如何应用它们。

走吧

浮点数和整数,计算错误


绝对标准的问题,是通过种姓解决的:

var alert_float = 5.0 var alert_int = 10 data|eval(lambda: float("value") > alert_float OR float("value") < float("alert_int")) 

使用默认值()


如果未填写标签/字段,则在计算中将发生错误:

 |default() .tag('status', 'empty') .field('value', 0) 

填写联接(内部与外部)


默认情况下,join将删除没有数据的点(内部)。
使用fill('null'),将执行外部联接,之后您需要执行default()并填充空值:

 var data = res1 |join(res2) .as('res1', 'res2) .fill('null') |default() .field('res1.value', 0.0) .field('res2.value', 100.0) 

仍然有细微差别。 如果在上面的示例中,序列之一(res1或res2)为空,则最终序列(数据)也将为空。 在github( 1633,1871,6967 )上有几张关于该主题的票证-我们正在等待修复,并且会稍有痛苦。

在计算中使用条件(如果在lambda中)


 |eval(lambda: if("value" > 0, true, false) 

在此期间,管道的最后五分钟


例如,您需要将最近五分钟的值与前一周的值进行比较。 您可以使用两个单独的batch'ami获取两个数据包,或者从较大的时期中提取部分数据:

  |where(lambda: duration((unixNano(now()) - unixNano("time"))/1000, 1u) < 5m) 

最后五分钟的替代方法是使用BarrierNode节点,该节点在指定时间之前切断数据:

 |barrier() .period(5m) 

在消息中使用Go'sh模式的示例


模板与text.template包中的格式相对应,以下是一些常见任务。

如果-否则


我们将事情井井有条,我们不会再用文字触发人们:

 |alert() ... .message( '{{ if eq .Level "OK" }}It is ok now{{ else }}Chief, everything is broken{{end}}' ) 

邮件中的小数点后两位


提高消息的可读性:

 |alert() ... .message( 'now value is {{ index .Fields "value" | printf "%0.2f" }}' ) 

在消息中扩展变量


我们在消息中显示更多信息,以回答“为什么大喊大叫?”的问题。

 var warnAlert = 10 |alert() ... .message( 'Today value less then '+string(warnAlert)+'%' ) 

唯一警报标识


当数据中存在多个组时,这是正确的,否则只会生成一个警报:

 |alert() ... .id('{{ index .Tags "myname" }}/{{ index .Tags "myfield" }}') 

自定义处理程序


大量的处理程序都有exec,它使您可以通过传递的参数(stdin)执行脚本-创造力等等!

我们的自定义工具之一是一个小的python脚本,用于向松弛发送通知。
首先,我们要从消息中受授权保护的grafana发送图片。 之后-将OK写入同一组中上一个警报的线程,而不是作为单独的消息。 稍后-将最近X分钟内最常见的错误附加到邮件中。

一个单独的主题是与其他服务的通信以及由警报启动的任何操作(仅当您的监视工作足够好时)。
描述处理程序的示例,其中slack_handler.py是我们自己编写的脚本:

 topic: slack_graph id: slack_graph.alert match: level() != INFO AND changed() == TRUE kind: exec options: prog: /sbin/slack_handler.py args: ["-c", "CHANNELID", "--graph", "--search"] 

如何首次亮相?


日志输出选项


 |log() .level("error") .prefix("something") 

观看(CLI):kapacitor -url host-or-ip :9092日志lvl =错误

带有httpOut的变体


显示当前管道中的数据:

 |httpOut('something') 

观看(获取): host-or-ip :9092 / kapacitor / v1 /任务/ task_name /其他

执行方案


  • 每个任务都以graphviz格式返回带有有用数字的运行树。
  • 块。
  • 我们插入查看器中, 欣赏


我还能在哪里得到耙


回写时的influxdb时间戳


例如,我们为每小时的请求总数(groupBy(1h))设置了一个警报,并希望将发生的警报记录在influxdb中(以在grafana的图表上漂亮地显示问题的事实)。

influxDBOut()会将警报的时间值分别写入时间戳,图表上的点将在警报出现之前/之后进行记录。

当需要准确性时:我们通过调用自定义处理程序来绕过此问题,该处理程序将使用当前时间戳将数据写入influxdb。

码头工人,建立和部署


启动时,kapacitor可以从[load]块中config中指定的目录中加载任务,模板和处理程序。

要正确创建任务,需要执行以下操作:

  1. 文件名-扩展为ID /脚本名
  2. 类型-流/批次
  3. dbrp-指示脚本在哪个数据库+策略中工作的关键字(dbrp“供应商”。“ autogen”)

如果某个批处理任务中没有与dbrp匹配的行,则整个服务将拒绝启动,并如实地在日志中写出它。

相反,在chronograf中,此行不应被接受,它不会被接口接受并引发错误。

构建容器时会被黑客入侵:如果在//.+dbrp中有一行,则Dockerfile会以-1退出,这将在构建构建时立即了解文件的原因。

一对多加入


任务示例:您需要每周使用服务运行时间的95%,并将最后10分钟的每分钟与该值进行比较。

您无法按一组点加入一对,最后/均值/中位数将节点转换为流,将返回错误“无法添加子项不匹配的边:批处理->流”。

批处理的结果作为lambda表达式中的变量,也不会被替换。

有一个选项可以通过udf将第一批中的必要编号保存到文件中,并通过sideload加载该文件。

我们决定了什么?


我们大约有100家酒店供应商,每个供应商都可以有多个联系,我们称其为渠道。 这些通道大约有300个;每个通道都可能掉线。 在所有记录的指标中,我们将监视错误率(请求和错误)。

为什么不使用格拉法纳呢?


在grafan中配置的错误警报有几个缺点。 根据情况的不同,有些危急,有些可以闭上眼睛。

Grafana不知道如何在维度+警报之间进行计算,但是我们需要一个比率(请求-错误)/请求。

错误看起来很恶心:



与成功的请求一起查看时,不会那么恶毒



好的,我们可以在grafana之前预先计算服务中的费率,并且在某些情况下可以。 但不是我们的,因为 对于每个通道,其比率均被视为“正常”,并且警报根据静态值工作(我们用眼睛观察,如果经常警报,则将其更改)。

这些是不同渠道的“正常”示例:





我们忽略了上一段,并假设所有供应商的情况都是“正常”的。 现在一切都很好,我们可以通过grafana中的警报来度过难关吗?
我们可以,但实际上不希望这样做,因为我们必须选择以下选项之一:
a)为每个通道分别制作许多图形(并痛苦地陪伴它们)
b)在所有通道中保留一张图表(并迷失在彩色线条和已调整的警报中)



你是怎么做到的?


再次,该文档提供了一个很好的入门示例( 计算各个系列的比率 ),您可以窥视或作为类似任务的基础。

结果您做了什么:

  • 在几个小时内加入两集,按频道分组;
  • 如果没有数据,请按组填写系列;
  • 比较最近10分钟的中位数和先前的数据;
  • 如果发现什么我们就尖叫;
  • 在influxdb中写入计算的速率和发生的警报;
  • 发送有用的消息到松弛状态。

在我看来,我们尽可能地精美地管理了我们希望在输出中获得的所有内容(使用自定义处理程序甚至更多)。

在github.com上,您可以看到示例代码和所得脚本的最小图表(graphviz)

结果代码示例:
 dbrp "supplier"."autogen" var name = 'requests.rate' var grafana_dash = 'pczpmYZWU/mydashboard' var grafana_panel = '26' var period = 8h var todayPeriod = 10m var every = 1m var warnAlert = 15 var warnReset = 5 var reqQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."requests"' var errQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."errors"' var prevErr = batch |query(errQuery) .period(period) .every(every) .groupBy(1m, 'channel', 'supplier') var prevReq = batch |query(reqQuery) .period(period) .every(every) .groupBy(1m, 'channel', 'supplier') var rates = prevReq |join(prevErr) .as('req', 'err') .tolerance(1m) .fill('null') //   ,     |default() .field('err.value', 0.0) .field('req.value', 0.0) // if  lambda:  ,     |eval(lambda: if("err.value" > 0, 100.0 * (float("req.value") - float("err.value")) / float("req.value"), 100.0)) .as('rate') //      rates |influxDBOut() .quiet() .create() .database('kapacitor') .retentionPolicy('autogen') .measurement('rates') //     10 ,   var todayRate = rates |where(lambda: duration((unixNano(now()) - unixNano("time")) / 1000, 1u) < todayPeriod) |median('rate') .as('median') var prevRate = rates |median('rate') .as('median') var joined = todayRate |join(prevRate) .as('today', 'prev') |httpOut('join') var trigger = joined |alert() .warn(lambda: ("prev.median" - "today.median") > warnAlert) .warnReset(lambda: ("prev.median" - "today.median") < warnReset) .flapping(0.25, 0.5) .stateChangesOnly() //   message      .message( '{{ .Level }}: {{ index .Tags "channel" }} err/req ratio ({{ index .Tags "supplier" }}) {{ if eq .Level "OK" }}It is ok now{{ else }} '+string(todayPeriod)+' median is {{ index .Fields "today.median" | printf "%0.2f" }}%, by previous '+string(period)+' is {{ index .Fields "prev.median" | printf "%0.2f" }}%{{ end }} http://grafana.ostrovok.in/d/'+string(grafana_dash)+ '?var-supplier={{ index .Tags "supplier" }}&var-channel={{ index .Tags "channel" }}&panelId='+string(grafana_panel)+'&fullscreen&tz=UTC%2B03%3A00' ) .id('{{ index .Tags "name" }}/{{ index .Tags "channel" }}') .levelTag('level') .messageField('message') .durationField('duration') .topic('slack_graph') // "today.median"   "value",        (keep) trigger |eval(lambda: "today.median") .as('value') .keep() |influxDBOut() .quiet() .create() .database('kapacitor') .retentionPolicy('autogen') .measurement('alerts') .tag('alertName', name) 


结论是什么?


Kapacitor非常擅长监视一组警报,根据已记录的指标执行其他计算,执行自定义操作并运行脚本(udf)。

进入门槛不是很高-如果grafana或其他工具不能完全满足您的愿望,请尝试。

Source: https://habr.com/ru/post/zh-CN478702/


All Articles