1.hdfs文件夹大小监控
前言
我们公司在各地都有分公司,分公司下面基本都有一套属于自己的hadoop集群。这时候需要一个监控,来采集一些我们需要的信息,在发生异常时候可以告知我们
hdfs
本篇主要讲解下怎么监控hdfs文件夹大小,这个需求是因为我们会把采集的数据写进hive,有时候前一天的数据会特别少,可能只有几M,这时候就属于异常情况,我们不可能每天都登录服务器去执行命令查看,这时候怎么设计一款监控程序能够告诉我们前一天的数据到底是否正常。
实现思路
我们要监控的是hadoop生态圈的各种组件,不只是hdfs。所以我们要有多模块功能,并且这个模块是根据我们的需要来决定是否调用。然后我的目录结构设这样的
目录 | 说明 |
config | 存放了程序需要的配置文件 |
deploy | 通过playbook脚本自动构建并上传到服务器 |
.vscode | 里面定义了快速执行playbook脚本 |
hadoop | go package ,里面用于存放本项目的各种包 |
入口程序
hadoop-go.go
package main
import (
"fmt"
"hadoop-go/hadoop"
"io/ioutil"
"log"
"os"
"time"
"github.com/urfave/cli"
"gopkg.in/yaml.v2"
)
var (
hadoopConfig map[string]hadoop.HadoopConfig
config_path string
ding hadoop.Ding
)
func init() {
}
func load_config() {
log.Print("读取配置文件:", config_path)
f, err := os.OpenFile(config_path, os.O_RDONLY, 0444)
if err != nil {
log.Panic("读取配置文件失败 ", err)
}
data, _ := ioutil.ReadAll(f)
err = yaml.Unmarshal(data, &hadoopConfig)
if err != nil {
log.Panic("格式化配置文件失败", err)
}
}
func action(c *cli.Context) error {
load_config()
// fix: error caused when modifying the configuration path
if len(c.Args()) == 0 {
println("请选择一个配置")
i := 0
for k := range hadoopConfig {
i++
println(i, k)
}
return nil
}
config, err := hadoopConfig[c.Args()[0]]
if !err {
log.Panic("没有", c.Args()[0], "的配置")
}
// 初始化dingding监听
if config.Dingding != "" {
log.Println("启用钉钉,token:", config.Dingding)
ding = hadoop.Ding{Token: config.Dingding, Msg: make(chan string, 999)}
go ding.Send()
}
// 循环遍历任务加到定时任务里面
for n, t := range config.Tasks {
log.Println("任务:", n)
t.Ding = &ding
switch t.Module {
case "hdfs":
hadoop.Hdfs{}.Select(t)
default:
log.Println("没有对应的模块")
}
}
time.Sleep(10 * time.Second)
return nil
}
func main() {
app := cli.NewApp()
app.Name = "hadoop-go"
app.Version = "1.0.0"
app.Usage = "hadoop监控"
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "c",
Usage: "配置文件路径。default: ./config/hadoop.yml",
Value: "./config/hadoop.yml",
Destination: &config_path,
},
}
app.Action = action
err := app.Run(os.Args)
if err != nil {
fmt.Println(err)
}
}
配置文件
可以看到的是,在入口文件我们读取了一个yaml格式的文件,然后通过yaml.Unmarshal将配置文件转换成go struct(结构体)
key | 描述 |
chengdu/xian | 对应的分公司名称 |
dingding | 这里可以不设置,设置的话代表启用dingding |
tasks | 在当前地域执行哪些任务 |
v_report | 任务名,可以自定义 |
module | 要调用哪个模块,这次用的是hdfs,还会有hive,hbase等 |
type | 调用模块的什么方法 |
args | 传给模块方法的参数 |
crond | 是否启用定时任务,秒 分 时 日 月 周 |
desc | 任务描述 |
yaml文件如下:
chengdu:
dingding: dingding-token
tasks:
v_report:
module: hdfs
type: CompareSize
args: /user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}} gt 40282905455
crond: 00 00 09 * * *
desc: 成都{{.Yesterday}} v_report状态
xian:
dingding: dingding-token
tasks:
v_report:
module: hdfs
type: CompareSize
args: /user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}} gt 20282905455
crond: 00 00 09 * * *
desc: 西安{{.Yesterday}} v_report状态
go结构体
type HadoopConfig struct {
Dingding string
Tasks map[string]Task
}
type Task struct {
Module string
Type string
Args string
Crond string
Desc string
Ding *Ding
}
CompareSize方法
在执行命令的时候会对命令进行一次render,主要就是替换掉里面的变量。我们比较的是昨天的,所以要将/user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}}中的{.Yesterday}}替换成昨天的日期,这里写了一个方法,以后可以通过修改这个方法来添加变量
代码
func TempFunc() TempArgs {
ta := TempArgs{}
ta.Yesterday = time.Now().AddDate(0, 0, -1).Format("2006-01-02")
return ta
}
func CompareSizeTemp(s string) string {
/* 用来生成对应的执行命令 */
// 用来存放生成后的模板
var buff bytes.Buffer
// 利用模板动态解析
ta := TempFunc()
tmpl, err := template.New("test").Parse(s)
if err != nil {
log.Panic(err)
}
err = tmpl.Execute(&buff, ta)
if err != nil {
log.Panic("模板解析错误", err)
}
rs := buff.String()
log.Print("Parse before:", s, "Parse after:", rs)
return rs
}
CompareSize Code
func (h Hdfs) CompareSize(t Task) {
var rs bool
var color string
cmd := strings.Split(CompareSizeTemp(t.Args), " ")
r := "hadoop fs -du -s " + cmd[0]
o, err := Exec(r)
if err != nil {
log.Println("执行 ", r, " 失败.", err)
}
if len(cmd) == 3 {
log.Println("执行命令: ", r, "运算符:", cmd[1], "对比值:", cmd[2])
v, err := strconv.ParseInt(cmd[2], 10, 64)
if err != nil {
log.Panic("CompareSize类型转换错误:", cmd[2], "->", err)
}
rs, color = FormatCompareInfo(o, cmd[1], v)
} else {
log.Println("执行命令: /opt/zcsy/hadoop/bin/hadoop fs -du -s ", cmd[0])
rs, color = FormatCompareInfo(o, "", 0)
}
title := CompareSizeTemp(t.Desc)
s := `# <font color=#` + color + `>` + title + `正常</font>
- 返回信息: ` + o + ``
if !rs {
s = `# <font color=#` + color + `>` + title + `异常</font>
- 返回信息: ` + o + ``
}
// 通过chan来进行异步发送
md_msg := Msg_MD(title, s)
t.Ding.Msg <- md_msg
}
Exec方法是调用bash去执行render后的命令
func Exec(cmd string) (string, error) {
log.Print("执行命令: ", cmd)
c := exec.Command("bash", "-c", cmd)
c.Stderr = os.Stderr
o, err := c.Output()
if err != nil {
return "", err
}
return string(o), nil
}
FormatCompareInfo是为了格式化调用Exec方法执行后的结果为预期格式。
func FormatCompareInfo(s string, c string, v int64) (bool, string) {
ss := strings.Split(s, " ")
color := "00A600"
r := true
log.Println("分割后的比较信息", ss)
i, err := strconv.ParseInt(ss[0], 10, 64)
if err != nil {
log.Panic("类型转换错误:", ss[0], "->", err)
}
log.Println("进行比较", i, c, v)
switch c {
case "gt":
if i < v {
color = "FF0000"
r = false
}
case "lt":
if i > v {
color = "FF0000"
r = false
}
case "eq":
if i > v {
color = "FF0000"
r = false
}
}
return r, color
}
这些都完成之后我们通过channel 将消息发给dingding 协程。完成本次发送。最后的实现效果
异常情况
正常情况:
这样就实现了一个对hdfs的文件夹大小监控了。十分的简单方便。后续我会更新怎么监控hadoop其他组件,以及定时任务实现。就不在本篇介绍了。