vlambda博客
学习文章列表

1.hdfs文件夹大小监控

前言

我们公司在各地都有分公司,分公司下面基本都有一套属于自己的hadoop集群。这时候需要一个监控,来采集一些我们需要的信息,在发生异常时候可以告知我们

hdfs

本篇主要讲解下怎么监控hdfs文件夹大小,这个需求是因为我们会把采集的数据写进hive,有时候前一天的数据会特别少,可能只有几M,这时候就属于异常情况,我们不可能每天都登录服务器去执行命令查看,这时候怎么设计一款监控程序能够告诉我们前一天的数据到底是否正常。

实现思路

我们要监控的是hadoop生态圈的各种组件,不只是hdfs。所以我们要有多模块功能,并且这个模块是根据我们的需要来决定是否调用。然后我的目录结构设这样的

目录 说明
config 存放了程序需要的配置文件
deploy 通过playbook脚本自动构建并上传到服务器
.vscode 里面定义了快速执行playbook脚本
hadoop go package ,里面用于存放本项目的各种包

入口程序

hadoop-go.go

package mainimport ( "fmt" "hadoop-go/hadoop" "io/ioutil" "log" "os" "time" "github.com/urfave/cli" "gopkg.in/yaml.v2")var ( hadoopConfig map[string]hadoop.HadoopConfig config_path string ding hadoop.Ding)func init() {}func load_config() { log.Print("读取配置文件:", config_path) f, err := os.OpenFile(config_path, os.O_RDONLY, 0444) if err != nil { log.Panic("读取配置文件失败 ", err) } data, _ := ioutil.ReadAll(f) err = yaml.Unmarshal(data, &hadoopConfig) if err != nil { log.Panic("格式化配置文件失败", err) }}func action(c *cli.Context) error { load_config() // fix: error caused when modifying the configuration path if len(c.Args()) == 0 { println("请选择一个配置") i := 0 for k := range hadoopConfig { i++ println(i, k) } return nil } config, err := hadoopConfig[c.Args()[0]] if !err { log.Panic("没有", c.Args()[0], "的配置") } // 初始化dingding监听 if config.Dingding != "" { log.Println("启用钉钉,token:", config.Dingding) ding = hadoop.Ding{Token: config.Dingding, Msg: make(chan string, 999)} go ding.Send() } // 循环遍历任务加到定时任务里面 for n, t := range config.Tasks { log.Println("任务:", n) t.Ding = &ding switch t.Module { case "hdfs": hadoop.Hdfs{}.Select(t) default: log.Println("没有对应的模块") } } time.Sleep(10 * time.Second) return nil}func main() { app := cli.NewApp() app.Name = "hadoop-go" app.Version = "1.0.0" app.Usage = "hadoop监控" app.Flags = []cli.Flag{ cli.StringFlag{ Name: "c", Usage: "配置文件路径。default: ./config/hadoop.yml", Value: "./config/hadoop.yml", Destination: &config_path, }, } app.Action = action err := app.Run(os.Args) if err != nil { fmt.Println(err) }}

配置文件

可以看到的是,在入口文件我们读取了一个yaml格式的文件,然后通过yaml.Unmarshal将配置文件转换成go struct(结构体)

key 描述
chengdu/xian 对应的分公司名称
dingding 这里可以不设置,设置的话代表启用dingding
tasks 在当前地域执行哪些任务
v_report 任务名,可以自定义
module 要调用哪个模块,这次用的是hdfs,还会有hive,hbase等
type 调用模块的什么方法
args 传给模块方法的参数
crond 是否启用定时任务,秒 分 时 日 月 周
desc 任务描述

yaml文件如下:

chengdu: dingding: dingding-token tasks: v_report:  module: hdfs type: CompareSize args: /user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}} gt 40282905455 crond: 00 00 09 * * * desc: 成都{{.Yesterday}} v_report状态xian: dingding: dingding-token tasks: v_report:  module: hdfs type: CompareSize args: /user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}} gt 20282905455 crond: 00 00 09 * * * desc: 西安{{.Yesterday}} v_report状态

go结构体

type HadoopConfig struct { Dingding string Tasks map[string]Task}type Task struct { Module string Type string Args string Crond string Desc string Ding *Ding}

CompareSize方法

在执行命令的时候会对命令进行一次render,主要就是替换掉里面的变量。我们比较的是昨天的,所以要将/user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}}中的{.Yesterday}}替换成昨天的日期,这里写了一个方法,以后可以通过修改这个方法来添加变量

代码

func TempFunc() TempArgs { ta := TempArgs{} ta.Yesterday = time.Now().AddDate(0, 0, -1).Format("2006-01-02") return ta}func CompareSizeTemp(s string) string { /* 用来生成对应的执行命令 */ // 用来存放生成后的模板 var buff bytes.Buffer // 利用模板动态解析 ta := TempFunc() tmpl, err := template.New("test").Parse(s) if err != nil { log.Panic(err) } err = tmpl.Execute(&buff, ta) if err != nil { log.Panic("模板解析错误", err) } rs := buff.String() log.Print("Parse before:", s, "Parse after:", rs) return rs}

CompareSize Code

func (h Hdfs) CompareSize(t Task) { var rs bool var color string cmd := strings.Split(CompareSizeTemp(t.Args), " ") r := "hadoop fs -du -s " + cmd[0] o, err := Exec(r) if err != nil { log.Println("执行 ", r, " 失败.", err) } if len(cmd) == 3 { log.Println("执行命令: ", r, "运算符:", cmd[1], "对比值:", cmd[2]) v, err := strconv.ParseInt(cmd[2], 10, 64) if err != nil { log.Panic("CompareSize类型转换错误:", cmd[2], "->", err) } rs, color = FormatCompareInfo(o, cmd[1], v) } else { log.Println("执行命令: /opt/zcsy/hadoop/bin/hadoop fs -du -s ", cmd[0]) rs, color = FormatCompareInfo(o, "", 0) } title := CompareSizeTemp(t.Desc) s := `# <font color=#` + color + `>` + title + `正常</font> - 返回信息: ` + o + `` if !rs { s = `# <font color=#` + color + `>` + title + `异常</font> - 返回信息: ` + o + `` } // 通过chan来进行异步发送 md_msg := Msg_MD(title, s) t.Ding.Msg <- md_msg}

Exec方法是调用bash去执行render后的命令

func Exec(cmd string) (string, error) { log.Print("执行命令: ", cmd) c := exec.Command("bash", "-c", cmd) c.Stderr = os.Stderr o, err := c.Output() if err != nil { return "", err } return string(o), nil}

FormatCompareInfo是为了格式化调用Exec方法执行后的结果为预期格式。

func FormatCompareInfo(s string, c string, v int64) (bool, string) { ss := strings.Split(s, " ") color := "00A600" r := true log.Println("分割后的比较信息", ss) i, err := strconv.ParseInt(ss[0], 10, 64) if err != nil { log.Panic("类型转换错误:", ss[0], "->", err) } log.Println("进行比较", i, c, v) switch c { case "gt": if i < v { color = "FF0000" r = false } case "lt": if i > v { color = "FF0000" r = false } case "eq": if i > v { color = "FF0000" r = false } } return r, color}

这些都完成之后我们通过channel 将消息发给dingding 协程。完成本次发送。最后的实现效果

异常情况

图1: 异常情况钉钉通知

正常情况:

图2: 正常情况钉钉通知

这样就实现了一个对hdfs的文件夹大小监控了。十分的简单方便。后续我会更新怎么监控hadoop其他组件,以及定时任务实现。就不在本篇介绍了。