Hadoop清洗Nginx日志数据(项目实战)
对于上一篇Flume跨服务器监控日志数据(项目实战) 获取的nginx日志数据进行数据清洗
原始数据是这样的
对这些数据进行切分,获取
//ip地址
//日期
//请求方式
//状态码
代理
访问网址
http协议
请求页面地址
操作系统名称//浏览器名称
编写Hadoop代码
1,创建Maven项目
导入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>code</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins><!--解决本地jar无法打包问题--><resources><resource><directory>lib</directory><targetPath>BOOT-INF/lib/</targetPath><includes><include>**/*.jar</include></includes></resource></resources></build><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.2</version></dependency><!-- 获取浏览器信息依赖--><dependency><groupId>nl.bitwalker</groupId><artifactId>UserAgentUtils</artifactId><version>1.2.4</version><!-- <scope>System<scope>--><!-- <systemPath>${basedir}/src/main/resources/lib/commons-logging-1.1.1.jar</systemPath>--></dependency><!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency></dependencies></project>
2,LogBean类
package cn.awz.log;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/*** @version 1.0* @Author:杨杰* @Date:2021/10/21 9:56*/public class LogBean implements Writable {private String ip;//ip地址private String date;//日期private String method;//请求方式private String code;//状态码private String userAgent;//代理private String staticUrl;//访问网址private String http;//http协议private String url;//请求页面地址private String os;//操作系统名称private String browser; //浏览器名称public LogBean() {super();}public void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(this.ip);dataOutput.writeUTF(this.date);dataOutput.writeUTF(this.method);dataOutput.writeUTF(this.code);dataOutput.writeUTF(this.userAgent);dataOutput.writeUTF(this.url);dataOutput.writeUTF(this.http);dataOutput.writeUTF(this.staticUrl);dataOutput.writeUTF(this.os);dataOutput.writeUTF(this.browser);}public void readFields(DataInput dataInput) throws IOException {this.ip = dataInput.readUTF();this.date = dataInput.readUTF();this.method = dataInput.readUTF();this.code = dataInput.readUTF();this.userAgent = dataInput.readUTF();this.url = dataInput.readUTF();this.http = dataInput.readUTF();this.staticUrl = dataInput.readUTF();this.os = dataInput.readUTF();this.browser = dataInput.readUTF();}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public String getDate() {return date;}public void setDate(String date) {this.date = date;}public String getMethod() {return method;}public void setMethod(String method) {this.method = method;}public String getCode() {return code;}public void setCode(String code) {this.code = code;}public String getUserAgent() {return userAgent;}public void setUserAgent(String userAgent) {this.userAgent = userAgent;}public String getUrl() {return url;}public void setUrl(String url) {this.url = url;}public String getHttp() {return http;}public void setHttp(String http) {this.http = http;}public String getStaticUrl() {return staticUrl;}public void setStaticUrl(String staticUrl) {this.staticUrl = staticUrl;}public String getOs() {return os;}public void setOs(String os) {this.os = os;}public String getBrowser() {return browser;}public void setBrowser(String browser) {this.browser = browser;}public String toString() {return ip +'\t'+ date + '\t'+ method + '\t' +code + '\t' + userAgent + '\t' + staticUrl + '\t' +http + '\t' + url + '\t' + os + '\t'+ browser;}}
3,LogMapper类
package cn.awz.log;import nl.bitwalker.useragentutils.UserAgent;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @version 1.0* @Author:杨杰* @Date:2021/10/21 10:32*/public class LogMapper extends Mapper<LongWritable, Text,Text,LogBean> {private static Text logMapperKey = new Text();private static LogBean logMapperValue = new LogBean();protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String lines = value.toString();if (!parse(lines)) {return;}logMapperKey.set(logMapperValue.getIp());context.write(logMapperKey,logMapperValue);}public static boolean parse(String line) {if (line == null) {return false;}String[] lines = line.split("\"");//判断lines长度小于8的为不合法请求直接拦截if (lines.length < 8) return false;if (lines.length < 6) {logMapperValue.setUserAgent("");} else {logMapperValue.setUserAgent(lines[5]);logMapperValue.setBrowser(getBrowser(lines[5]));logMapperValue.setOs(getOS(lines[5]));}String[] logBeanValue = lines[0].split(" ");String[] logBeanValueUrl = lines[1].split(" ");String[] logBeanStatus = lines[2].split(" ");logMapperValue.setIp(logBeanValue[0]);logMapperValue.setDate(logBeanValue[3].replace("[",""));if (logBeanValueUrl.length == 1) {return false;}if (logBeanValueUrl.length < 3) {logMapperValue.setHttp(" ");} else {logMapperValue.setHttp(logBeanValueUrl[2]);}logMapperValue.setMethod(logBeanValueUrl[0]);logMapperValue.setUrl(logBeanValueUrl[1]);logMapperValue.setStaticUrl(lines[3]);logMapperValue.setCode(logBeanStatus[1]);return true;}//获取系统信息private static UserAgent getOperatingSystem(String userAgents) {return UserAgent.parseUserAgentString(userAgents);}//获取系统名称public static String getOS(String userAgents) {return getOperatingSystem(userAgents).getOperatingSystem().getName();}//获取浏览器名称public static String getBrowser(String userAgents) {return getOperatingSystem(userAgents).getBrowser().getName();}}
4,LogReduce类
package cn.awz.log;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @version 1.0* @Author:杨杰* @Date:2021/10/21 11:04*/public class LogReduce extends Reducer<Text,LogBean,LogBean, NullWritable> {protected void reduce(Text key, Iterable<LogBean> values, Context context) throws IOException, InterruptedException {for (LogBean value : values) {context.write(value,null);}}}
5,LogDriver类
package cn.awz.log;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @version 1.0* @Author:杨杰* @Date:2021/10/21 11:17*/public class LogDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogDriver.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LogBean.class);job.setOutputKeyClass(LogBean.class);job.setOutputValueClass(NullWritable.class);// job.setPartitionerClass(LogPartitioner.class);// job.setNumReduceTasks(7);//D:\HadoopDevelopment\classDemo\data//outFileInputFormat.setInputPaths(job,new Path("D:\HadoopDevelopment\classDemo\data"));FileOutputFormat.setOutputPath(job,new Path("out"));job.waitForCompletion(true);}}
运行
hadoop jar code-1.0-SNAPSHOT.jar cn/awz/log/LogDriver /flume/upload/20220317/17 /nginxLog
查看结果
