网络编程-简单实现Hadoop RPC
一、我们应该如何去阅读一个大数据开源框架的源码
1. 阅读源码的思路
1.1 掌握其网络通信架构
我们应该都知道在大数据领域中,包含了很多大数据框架,例如Spark,Hadoop,Kafka,Zookeeper,Flink等,在这些组件当中,他们都是分布式的,我们想要阅读他们源码的时候,必须要明白分布式系统之间,他们是如何交互的,例如Spark之前采用的是akka,现在采用的Netty,kafka采用的是NIO等,也就不一一列举了,我们阅读源码的时候,如果不了解他们内部是如何通信的,那么我们根本无法知道他们的内部是如何工作的,所以掌握其通信架构是必须的,也是必然的
1.2 场景驱动
为什么要说场景驱动这个问题呢,因为我们在阅读一个开源框架源码的时候,例如Hadoop源码几百万行代码,我们一个个类去看的话,我相信,看不了一会,你就放弃了,根本不知道在看一些什么东西,但是场景驱动的方式可以帮助我们更加有效的去阅读,因为我们只关注其中的某一点,例如我就看NameNode启动的流程,DataNode的注册和心跳,这样我们就可以抛弃不看的,不重要的,只看我们需要的地方,这样我们既有兴趣,又能坚持的下来
二、Hadoop RPC
RPC是什么?
RPC(Remote Procedure Call)远程过程调用,用人话说,就是我们编写分布式系统的时候,可以本地调用远端的方法,这样我们编写代码的时候就和编写单机程序没有什么区别.也就是说客户端调用服务端的方法,方法的执行在服务端
1. 环境准备
Maven依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.0</version>
</dependency>
2. 代码实现
2.1 协议
/**
* Copyright (c) 2019 bigdata ALL Rights Reserved
* Project: learning
* Package: com.bigdata.hdfs.app
* Version: 1.0
*
* @author qingzhi.wu
* @date 2020/6/14 20:21
*/
public interface ClientProtocol {
long versionID = 1234;
void makeDir(String path);
}
2.2 服务端代码
/**
* Copyright (c) 2019 bigdata ALL Rights Reserved
* Project: learning
* Package: com.bigdata.hdfs.app
* Version: 1.0
*
* @author qingzhi.wu
* @date 2020/6/14 20:22
*/
public class NameNodeRpcServer implements ClientProtocol {
public static void main(String[] args) throws IOException {
RPC.Server server = new RPC.Builder(new Configuration())
.setBindAddress("localhost")
.setPort(9999)
.setProtocol(ClientProtocol.class)
.setInstance(new NameNodeRpcServer())
.build();
System.out.println("服务端启动");
server.start();
}
public void makeDir(String path) {
System.out.println("服务端:"+path);
}
}
2.3 客户端
/**
* Copyright (c) 2019 bigdata ALL Rights Reserved Project: learning Package: com.bigdata.hdfs.app
* Version: 1.0
*
* @author qingzhi.wu
* @date 2020/6/14 20:26
*/
public class DFSClient {
public static void main(String[] args) throws IOException {
ClientProtocol namenode =
RPC.getProxy(
ClientProtocol.class,
1234L,
new InetSocketAddress("localhost", 9999),
new Configuration());
namenode.makeDir("/usr/add");
System.out.println("已经向服务端发送请求");
}
}
2.4 测试运行
2.4.1 运行服务端
控制台打印的日志
2.4.2 运行客户端
客户端控制台打印的日志
服务端控制台打印的日志
3. Hadoop RPC 总结
不同进程间的调用,客户端调用服务端的方法,方法的执行是在服务器
协议是什么呢,其实就是一个接口,当然这个接口里面必须有versionID字段(避免版本问题)
服务端是真正实现协议的一方
如何创建一个服务端
RPC.Server server = new RPC.Builder(new Configuration())
.setBindAddress("localhost")
.setPort(9999)
.setProtocol(ClientProtocol.class)
.setInstance(new NameNodeRpcServer())
.build();
客户端如何调用远端的方法
ClientProtocol namenode =
RPC.getProxy(
ClientProtocol.class,
1234L,
new InetSocketAddress("localhost", 9999),
new Configuration());
namenode.makeDir("/usr/add");
三、我们能不能自己写一个呢?
当然可以啦,我们是不是已经记住知道Rpc 就是客户端调用服务端的方法,方法的执行在服务端。
那么我们就可以根据这个来进行编写了呢。
个人技术实力有限,就是写着玩玩,勿喷!
ClientProtocol
package com.bigdata.tcp.hadooprpc.common;
/**
* Copyright (c) 2020 bigdata ALL Rights Reserved
* Project: learning
* Package: com.bigdata.tcp.hadooprpc.common
* Version: 1.0
*
* @author qingzhi.wu 2021/2/7 14:25
*/
public interface ClientProtocol extends Protocol{
void makeDir(String path);
}
Protocol
package com.bigdata.tcp.hadooprpc.common;
/**
* Copyright (c) 2020 bigdata ALL Rights Reserved
* Project: learning
* Package: com.bigdata.tcp.hadooprpc.common
* Version: 1.0
*
* @author qingzhi.wu 2021/2/7 14:33
*/
public interface Protocol {
}
Client
package com.bigdata.tcp.hadooprpc;
import com.bigdata.tcp.hadooprpc.common.ClientProtocol;
import com.bigdata.tcp.hadooprpc.server.RPC;
/**
* Copyright (c) 2020 bigdata ALL Rights Reserved
* Project: learning
* Package: com.bigdata.tcp.hadooprpc
* Version: 1.0
*
* @author qingzhi.wu 2021/2/7 14:27
*/
public class Client {
public static void main(String[] args) {
ClientProtocol clientProtocol = (ClientProtocol)
RPC.getProxy(ClientProtocol.class, "127.0.0.1", 9999);
clientProtocol.makeDir("/data");
}
}
ClientProtocolServiceProxy
package com.bigdata.tcp.hadooprpc;
import com.bigdata.tcp.hadooprpc.common.ClientProtocol;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.Socket;
/**
* Copyright (c) 2020 bigdata ALL Rights Reserved
* Project: learning
* Package: com.bigdata.tcp.hadooprpc
* Version: 1.0
*
* @author qingzhi.wu 2021/2/7 15:22
*/
public class ClientProtocolServiceProxy implements ClientProtocol {
private Socket socket = null ;
public ClientProtocolServiceProxy(String ip,int port){
try {
socket = new Socket(ip,port);
System.out.println(socket);
} catch (IOException e) {
e.printStackTrace();
}
}
public void makeDir(String path) {
OutputStream outputStream = null;
ObjectOutputStream oos = null;
try {
outputStream = socket.getOutputStream();
oos = new ObjectOutputStream(outputStream);
Message message = new Message();
message.setMethodName("makeDir");
message.setParam(path);
message.setClassName(ClientProtocol.class.getName());
oos.writeObject(message);
outputStream.close();
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Message
package com.bigdata.tcp.hadooprpc;
import java.io.Serializable;
/**
* Copyright (c) 2020 bigdata ALL Rights Reserved
* Project: learning
* Package: com.bigdata.tcp.hadooprpc
* Version: 1.0
*
* @author qingzhi.wu 2021/2/7 15:08
*/
public class Message implements Serializable {
private String className;
private String methodName;
private String param;
public Message() {
}
public Message(String className, String methodName, String param) {
this.className = className;
this.methodName = methodName;
this.param = param;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public String getParam() {
return param;
}
public void setParam(String param) {
this.param = param;
}
}
NamenodeRpcServer
package com.bigdata.tcp.hadooprpc;
import com.bigdata.tcp.hadooprpc.common.ClientProtocol;
import com.bigdata.tcp.hadooprpc.server.RPC;
/**
* Copyright (c) 2020 bigdata ALL Rights Reserved
* Project: learning
* Package: com.bigdata.tcp.hadooprpc
* Version: 1.0
*
* @author qingzhi.wu 2021/2/7 14:25
*/
public class NameNodeRpcServer implements ClientProtocol {
public static void main(String[] args) throws Exception {
RPC.Server server = new RPC.Builder().setBindAddress("127.0.0.1")
.setPort(9999)
.setProtocolClazz(ClientProtocol.class)
.setInstance(new NameNodeRpcServer()).build();
System.out.println("服务端启动");
server.start();
}
public void makeDir(String path) {
System.out.println(path);
}
}
RPC
package com.bigdata.tcp.hadooprpc.server;
import com.bigdata.tcp.hadooprpc.ClientProtocolServiceProxy;
import com.bigdata.tcp.hadooprpc.Message;
import com.bigdata.tcp.hadooprpc.common.ClientProtocol;
import com.bigdata.tcp.hadooprpc.common.Protocol;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
/**
* Copyright (c) 2020 bigdata ALL Rights Reserved
* Project: learning
* Package: com.bigdata.tcp.hadooprpc.server
* Version: 1.0
*
* @author qingzhi.wu 2021/2/7 14:30
*/
public class RPC {
public static Object getProxy(Class clazz,String ip,int port){
System.out.println(clazz);
System.out.println(ClientProtocol.class);
if (clazz.equals(ClientProtocol.class)){
return new ClientProtocolServiceProxy(ip,port);
}
return null;
}
public static class Server {
private String bindAddress;
private int port;
private Class protocolClazz;
private ClientProtocol instance;
private ServerSocket serverSocket;
private Server(Builder builder){
this.bindAddress = builder.bindAddress;
this.port = builder.port;
this.protocolClazz = builder.protocolClazz;
this.instance = (ClientProtocol) builder.instance;
}
public void start() throws Exception{
System.out.println(port);
serverSocket = new ServerSocket(port);
while (true){
Socket accept = serverSocket.accept();
InputStream inputStream = accept.getInputStream();
ObjectInputStream object = new ObjectInputStream(inputStream);
Message message = (Message)object.readObject();
System.out.println(message);
String className = message.getClassName();
//假装匹配一下
System.out.println(className);
System.out.println(protocolClazz.getName());
Class aClass = Class.forName(className);
if(instance instanceof ClientProtocol){
Method method = protocolClazz.getMethod(message.getMethodName(), String.class);
method.invoke(instance,message.getParam());
}
object.close();
inputStream.close();
}
}
}
public static class Builder{
private String bindAddress;
private int port;
private Class protocolClazz;
private Protocol instance;
public Builder setBindAddress(String addr){
this.bindAddress = addr;
return this;
}
public Builder setPort(int port) {
this.port = port;
return this;
}
public Builder setProtocolClazz(Class protocolClazz) {
this.protocolClazz = protocolClazz;
return this;
}
public Builder setInstance(ClientProtocol instance) {
this.instance = instance;
return this;
}
public Server build(){
return new Server(this);
}
}
}
谢谢大家的观看!