vlambda博客
学习文章列表

Hadoop清洗Nginx日志数据(项目实战)

    对于上一篇Flume跨服务器监控日志数据(项目实战) 获取的nginx日志数据进行数据清洗

原始数据是这样的


对这些数据进行切分,获取

//ip地址
//日期
//请求方式
//状态码
代理
访问网址
http协议
请求页面地址
操作系统名称

//浏览器名称

编写Hadoop代码


1,创建Maven项目

导入依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId> <artifactId>code</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> <!--解决本地jar无法打包问题--> <resources> <resource> <directory>lib</directory> <targetPath>BOOT-INF/lib/</targetPath> <includes> <include>**/*.jar</include> </includes> </resource> </resources> </build>


<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency>
<!-- 获取浏览器信息依赖--> <dependency> <groupId>nl.bitwalker</groupId> <artifactId>UserAgentUtils</artifactId> <version>1.2.4</version><!-- <scope>System<scope>--><!-- <systemPath>${basedir}/src/main/resources/lib/commons-logging-1.1.1.jar</systemPath>--> </dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency>

</dependencies>

</project>


2,LogBean


package cn.awz.log;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
/** * @version 1.0 * @Author:杨杰 * @Date:2021/10/21 9:56 */public class LogBean implements Writable {
private String ip;//ip地址 private String date;//日期 private String method;//请求方式 private String code;//状态码 private String userAgent;//代理 private String staticUrl;//访问网址 private String http;//http协议 private String url;//请求页面地址 private String os;//操作系统名称 private String browser; //浏览器名称
public LogBean() {super();}
@Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.ip); dataOutput.writeUTF(this.date); dataOutput.writeUTF(this.method); dataOutput.writeUTF(this.code); dataOutput.writeUTF(this.userAgent); dataOutput.writeUTF(this.url); dataOutput.writeUTF(this.http); dataOutput.writeUTF(this.staticUrl); dataOutput.writeUTF(this.os); dataOutput.writeUTF(this.browser); }
@Override public void readFields(DataInput dataInput) throws IOException {
this.ip = dataInput.readUTF(); this.date = dataInput.readUTF(); this.method = dataInput.readUTF(); this.code = dataInput.readUTF(); this.userAgent = dataInput.readUTF(); this.url = dataInput.readUTF(); this.http = dataInput.readUTF(); this.staticUrl = dataInput.readUTF(); this.os = dataInput.readUTF(); this.browser = dataInput.readUTF();
}
public String getIp() { return ip; }
public void setIp(String ip) { this.ip = ip; }
public String getDate() { return date; }
public void setDate(String date) { this.date = date; }
public String getMethod() { return method; }
public void setMethod(String method) { this.method = method; }
public String getCode() { return code; }
public void setCode(String code) { this.code = code; }
public String getUserAgent() { return userAgent; }
public void setUserAgent(String userAgent) { this.userAgent = userAgent; }
public String getUrl() { return url; }
public void setUrl(String url) { this.url = url; }
public String getHttp() { return http; }
public void setHttp(String http) { this.http = http; }
public String getStaticUrl() { return staticUrl; }
public void setStaticUrl(String staticUrl) { this.staticUrl = staticUrl; }
public String getOs() { return os; }
public void setOs(String os) { this.os = os; }
public String getBrowser() { return browser; }
public void setBrowser(String browser) { this.browser = browser; }
@Override public String toString() { return ip +'\t'+ date + '\t'+ method + '\t' + code + '\t' + userAgent + '\t' + staticUrl + '\t' + http + '\t' + url + '\t' + os + '\t'+ browser; }}



3,LogMapper类


package cn.awz.log;

import nl.bitwalker.useragentutils.UserAgent;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/** * @version 1.0 * @Author:杨杰 * @Date:2021/10/21 10:32 */public class LogMapper extends Mapper<LongWritable, Text,Text,LogBean> {
private static Text logMapperKey = new Text(); private static LogBean logMapperValue = new LogBean();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lines = value.toString();
if (!parse(lines)) { return; }
logMapperKey.set(logMapperValue.getIp());
context.write(logMapperKey,logMapperValue); }
public static boolean parse(String line) {
if (line == null) { return false; }
String[] lines = line.split("\"");
//判断lines长度小于8的为不合法请求直接拦截 if (lines.length < 8) return false;
if (lines.length < 6) { logMapperValue.setUserAgent(""); } else { logMapperValue.setUserAgent(lines[5]); logMapperValue.setBrowser(getBrowser(lines[5])); logMapperValue.setOs(getOS(lines[5])); }
String[] logBeanValue = lines[0].split(" ");
String[] logBeanValueUrl = lines[1].split(" ");
String[] logBeanStatus = lines[2].split(" ");
logMapperValue.setIp(logBeanValue[0]); logMapperValue.setDate(logBeanValue[3].replace("[",""));
if (logBeanValueUrl.length == 1) { return false; } if (logBeanValueUrl.length < 3) { logMapperValue.setHttp(" "); } else { logMapperValue.setHttp(logBeanValueUrl[2]); }


logMapperValue.setMethod(logBeanValueUrl[0]); logMapperValue.setUrl(logBeanValueUrl[1]);
logMapperValue.setStaticUrl(lines[3]); logMapperValue.setCode(logBeanStatus[1]);
return true; }
//获取系统信息 private static UserAgent getOperatingSystem(String userAgents) { return UserAgent.parseUserAgentString(userAgents); }
//获取系统名称 public static String getOS(String userAgents) { return getOperatingSystem(userAgents).getOperatingSystem().getName(); } //获取浏览器名称 public static String getBrowser(String userAgents) { return getOperatingSystem(userAgents).getBrowser().getName(); }
}


4,LogReduce类


package cn.awz.log;
import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/** * @version 1.0 * @Author:杨杰 * @Date:2021/10/21 11:04 */public class LogReduce extends Reducer<Text,LogBean,LogBean, NullWritable> {
@Override protected void reduce(Text key, Iterable<LogBean> values, Context context) throws IOException, InterruptedException {
for (LogBean value : values) { context.write(value,null); } }}


 5,LogDriver类


package cn.awz.log;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/** * @version 1.0 * @Author:杨杰 * @Date:2021/10/21 11:17 */
public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogDriver.class); job.setMapperClass(LogMapper.class); job.setReducerClass(LogReduce.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LogBean.class);
job.setOutputKeyClass(LogBean.class); job.setOutputValueClass(NullWritable.class);
// job.setPartitionerClass(LogPartitioner.class);// job.setNumReduceTasks(7);
//D:\HadoopDevelopment\classDemo\data //out FileInputFormat.setInputPaths(job,new Path("D:\HadoopDevelopment\classDemo\data")); FileOutputFormat.setOutputPath(job,new Path("out"));
job.waitForCompletion(true);
}
}


运行

hadoop jar code-1.0-SNAPSHOT.jar cn/awz/log/LogDriver /flume/upload/20220317/17 /nginxLog

查看结果