Hadoop清洗Nginx日志数据(项目实战)
对于上一篇Flume跨服务器监控日志数据(项目实战) 获取的nginx日志数据进行数据清洗
原始数据是这样的
对这些数据进行切分,获取
//ip地址
//日期
//请求方式
//状态码
代理
访问网址
http协议
请求页面地址
操作系统名称//浏览器名称
编写Hadoop代码
1,创建Maven项目
导入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>code</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
<!--解决本地jar无法打包问题-->
<resources>
<resource>
<directory>lib</directory>
<targetPath>BOOT-INF/lib/</targetPath>
<includes>
<include>**/*.jar</include>
</includes>
</resource>
</resources>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<!-- 获取浏览器信息依赖-->
<dependency>
<groupId>nl.bitwalker</groupId>
<artifactId>UserAgentUtils</artifactId>
<version>1.2.4</version>
<!-- <scope>System<scope>-->
<!-- <systemPath>${basedir}/src/main/resources/lib/commons-logging-1.1.1.jar</systemPath>-->
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
</dependencies>
</project>
2,LogBean类
package cn.awz.log;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @version 1.0
* @Author:杨杰
* @Date:2021/10/21 9:56
*/
public class LogBean implements Writable {
private String ip;//ip地址
private String date;//日期
private String method;//请求方式
private String code;//状态码
private String userAgent;//代理
private String staticUrl;//访问网址
private String http;//http协议
private String url;//请求页面地址
private String os;//操作系统名称
private String browser; //浏览器名称
public LogBean() {super();}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.ip);
dataOutput.writeUTF(this.date);
dataOutput.writeUTF(this.method);
dataOutput.writeUTF(this.code);
dataOutput.writeUTF(this.userAgent);
dataOutput.writeUTF(this.url);
dataOutput.writeUTF(this.http);
dataOutput.writeUTF(this.staticUrl);
dataOutput.writeUTF(this.os);
dataOutput.writeUTF(this.browser);
}
public void readFields(DataInput dataInput) throws IOException {
this.ip = dataInput.readUTF();
this.date = dataInput.readUTF();
this.method = dataInput.readUTF();
this.code = dataInput.readUTF();
this.userAgent = dataInput.readUTF();
this.url = dataInput.readUTF();
this.http = dataInput.readUTF();
this.staticUrl = dataInput.readUTF();
this.os = dataInput.readUTF();
this.browser = dataInput.readUTF();
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getUserAgent() {
return userAgent;
}
public void setUserAgent(String userAgent) {
this.userAgent = userAgent;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getHttp() {
return http;
}
public void setHttp(String http) {
this.http = http;
}
public String getStaticUrl() {
return staticUrl;
}
public void setStaticUrl(String staticUrl) {
this.staticUrl = staticUrl;
}
public String getOs() {
return os;
}
public void setOs(String os) {
this.os = os;
}
public String getBrowser() {
return browser;
}
public void setBrowser(String browser) {
this.browser = browser;
}
public String toString() {
return ip +'\t'+ date + '\t'+ method + '\t' +
code + '\t' + userAgent + '\t' + staticUrl + '\t' +
http + '\t' + url + '\t' + os + '\t'+ browser;
}
}
3,LogMapper类
package cn.awz.log;
import nl.bitwalker.useragentutils.UserAgent;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @version 1.0
* @Author:杨杰
* @Date:2021/10/21 10:32
*/
public class LogMapper extends Mapper<LongWritable, Text,Text,LogBean> {
private static Text logMapperKey = new Text();
private static LogBean logMapperValue = new LogBean();
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lines = value.toString();
if (!parse(lines)) {
return;
}
logMapperKey.set(logMapperValue.getIp());
context.write(logMapperKey,logMapperValue);
}
public static boolean parse(String line) {
if (line == null) {
return false;
}
String[] lines = line.split("\"");
//判断lines长度小于8的为不合法请求直接拦截
if (lines.length < 8) return false;
if (lines.length < 6) {
logMapperValue.setUserAgent("");
} else {
logMapperValue.setUserAgent(lines[5]);
logMapperValue.setBrowser(getBrowser(lines[5]));
logMapperValue.setOs(getOS(lines[5]));
}
String[] logBeanValue = lines[0].split(" ");
String[] logBeanValueUrl = lines[1].split(" ");
String[] logBeanStatus = lines[2].split(" ");
logMapperValue.setIp(logBeanValue[0]);
logMapperValue.setDate(logBeanValue[3].replace("[",""));
if (logBeanValueUrl.length == 1) {
return false;
}
if (logBeanValueUrl.length < 3) {
logMapperValue.setHttp(" ");
} else {
logMapperValue.setHttp(logBeanValueUrl[2]);
}
logMapperValue.setMethod(logBeanValueUrl[0]);
logMapperValue.setUrl(logBeanValueUrl[1]);
logMapperValue.setStaticUrl(lines[3]);
logMapperValue.setCode(logBeanStatus[1]);
return true;
}
//获取系统信息
private static UserAgent getOperatingSystem(String userAgents) {
return UserAgent.parseUserAgentString(userAgents);
}
//获取系统名称
public static String getOS(String userAgents) {
return getOperatingSystem(userAgents).getOperatingSystem().getName();
}
//获取浏览器名称
public static String getBrowser(String userAgents) {
return getOperatingSystem(userAgents).getBrowser().getName();
}
}
4,LogReduce类
package cn.awz.log;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @version 1.0
* @Author:杨杰
* @Date:2021/10/21 11:04
*/
public class LogReduce extends Reducer<Text,LogBean,LogBean, NullWritable> {
protected void reduce(Text key, Iterable<LogBean> values, Context context) throws IOException, InterruptedException {
for (LogBean value : values) {
context.write(value,null);
}
}
}
5,LogDriver类
package cn.awz.log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @version 1.0
* @Author:杨杰
* @Date:2021/10/21 11:17
*/
public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LogBean.class);
job.setOutputKeyClass(LogBean.class);
job.setOutputValueClass(NullWritable.class);
// job.setPartitionerClass(LogPartitioner.class);
// job.setNumReduceTasks(7);
//D:\HadoopDevelopment\classDemo\data
//out
FileInputFormat.setInputPaths(job,new Path("D:\HadoopDevelopment\classDemo\data"));
FileOutputFormat.setOutputPath(job,new Path("out"));
job.waitForCompletion(true);
}
}
运行
hadoop jar code-1.0-SNAPSHOT.jar cn/awz/log/LogDriver /flume/upload/20220317/17 /nginxLog
查看结果