和其他大数据系统类似,Flink 内置也提供 metric system 供我们监控 Flink 程序的运行情况,包括了JobManager、TaskManager、Job、Task以及Operator等组件的运行情况,大大方便我们调试监控我们的程序。 系统提供的一些监控指标名字有下面几个:
上面的<host>,<operator_name>以及<subtask_index>在程序运行的时候会替换成相应的值,比如<host>会替换成 ,<operator_name>会替换成FlatMapper等。 问题现在假设我们有如下的程序
我们把这个程序的metric信息发送到Graphite监控系统中:
当我们运行这个程序,相关的变量(<host>,<operator_name>以及<subtask_index>等)会替换成下面的值:
所以
注意看<task_name>变量的值为 解决这个问题主要两两种方法:
修改 |
metrics.scope.task: <host>.taskmanager.<tm_id>.<job_name>.custom_task_name.<subtask_index> |
这样运行上面程序的时候,metrics.scope.task
属性最终的值如下:
flinkjobs..taskmanager.34d31d54c0031328a6ec8910e571a7f8.MyFlinkJobs.custom_task_name.0 |
这样整个指标名中都无特殊字符,我们就可以在Graphite系统中看到相应的指标。
GraphiteReporter
类的相关代码上面那种方法我们将 <task_name> 变量的值写死了,有时候这并不是我们想要的,所有这时候我们可以修改 GraphiteReporter
类。GraphiteReporter
类继承自 org.apache.flink.dropwizard.ScheduledDropwizardReporter
,ScheduledDropwizardReporter
类中有个 filterCharacters
函数,我们可以在这个函数里将特殊字符全部替换掉,下面是一种实现:
@Override public String filterCharacters(String str) { char [] chars = null ; final int strLen = str.length(); int pos = 0 ; for ( int i = 0 ; i < strLen; i++) { final char c = str.charAt(i); switch (c) { case '>' : case '<' : case '"' : // remove character by not moving cursor if (chars == null ) { chars = str.toCharArray(); } break ; case ' ' : if (chars == null ) { chars = str.toCharArray(); } chars[pos++] = '_' ; break ; case ',' : case '=' : case ';' : case ':' : case '?' : case '\'' : case '*' : if (chars == null ) { chars = str.toCharArray(); } chars[pos++] = '-' ; break ; default : if (chars != null ) { chars[pos] = c; } pos++; } } return chars == null ? str : new String(chars, 0 , pos); } |
我们把
<
,>
以及"
特殊字符直接去掉了,
,=
,;
, :
, ?
,\
以及*
特殊字符替换成-
_
然后编译相关的模块,并替换flink-metrics-graphite-1.3.1.jar,我们再运行上面的例子,这时候metrics.scope.task
属性最终的值如下:
flinkjobs..taskmanager.34d31d54c0031328a6ec8910e571a7f8.MyFlinkJobs.Source: Custom_Source_-_TestFlat_-_Map_-_Sink: es.0 |
这时候我们也可以在Graphite监控系统中看到相关的数据了。
|