Tars Java 客户端源码分析
作者:vivo 互联网服务器团队-Ke Shengkai
一、基本RPC框架简介
服务提供者(RPC Server),运行在服务端,提供服务接口定义与服务实现类,并对外暴露服务接口。
注册中心(Registry),运行在服务端,负责记录服务提供者的服务对象,并提供远程服务信息的查询服务和变更通知服务。
服务消费者(RPC Client),运行在客户端,通过远程代理对象调用远程服务。
1)客户端调用客户端桩模块。该调用是本地过程调用,其中参数以正常方式推入堆栈。
2)客户端桩模块将参数打包到消息中,并进行系统调用以发送消息。打包参数称为编组。
3)客户端的本地操作系统将消息从客户端计算机发送到服务器计算机。
4)服务器计算机上的本地操作系统将传入的数据包传递到服务器桩模块。
5)服务器桩模块从消息中解包出参数。解包参数称为解组。
6)最后,服务器桩模块执行服务器程序流程。回复是沿相反的方向执行相同的步骤。
二、Tars Java客户端设计介绍
// 先初始化基本Tars配置
CommunicatorConfig cfg = new CommunicatorConfig();
// 通过上述的CommunicatorConfig配置生成一个Communicator对象。
Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);
// 指定Tars远程服务的服务对象名、IP和端口生成一个远程服务代理对象。
HelloPrx proxy = communicator.stringToProxy(HelloPrx.class, "TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 18601 -t 60000");
//同步调用,阻塞直到远程服务对象的方法返回结果
String ret = proxy.hello(3000, "Hello World");
System.out.println(ret);
//异步调用,不关注异步调用最终的情况
proxy.async_hello(null, 3000, "Hello World");
//异步调用,注册一个实现TarsAbstractCallback接口的回执处理对象,该实现类分别处理调用成功,调用超时和调用异常的情况。
proxy.async_hello(new HelloPrxCallback() {
public void callback_expired() { //超时事件处理
}
public void callback_exception(Throwable ex) { //异常事件处理
}
public void callback_hello(String ret) { //调用成功事件处理
Main.logger.info("invoke async method successfully {}", ret);
}
}, 1000, "Hello World");
public final class ObjectProxy<T> implements ServantProxy, InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
InvokeContext context = this.protocolInvoker.createContext(proxy, method, args);
try {
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return this.toString();
} else if
//***** 省略代码 *****
} else {
// 在负载均衡器选取一个远程调用类,进行应用层协议的封装,最后调用TCP传输层进行发送。
Invoker invoker = this.loadBalancer.select(context);
return invoker.invoke(context);
}
} catch (Throwable var8) {
// ***** 省略代码 *****
}
}
}
class ServantProxyFactory {
private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap();
// ***** 省略代码 *****
public <T> Object getServantProxy(Class<T> clazz, String objName, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) {
Object proxy = this.cache.get(objName);
if (proxy == null) {
this.lock.lock(); // 加锁,保证只生成一个远程服务代理对象。
try {
proxy = this.cache.get(objName);
if (proxy == null) {
// 创建实现JDK的java.lang.reflect.InvocationHandler接口的对象
ObjectProxy<T> objectProxy = this.communicator.getObjectProxyFactory().getObjectProxy(clazz, objName, servantProxyConfig, loadBalance, protocolInvoker);
// 使用JDK的java.lang.reflect.Proxy来生成实际的代理对象
this.cache.putIfAbsent(objName, this.createProxy(clazz, objectProxy));
proxy = this.cache.get(objName);
}
} finally {
this.lock.unlock();
}
}
return proxy;
}
/** 使用JDK自带的Proxy.newProxyInstance生成代理对象 */
private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clazz, ServantProxy.class}, objectProxy);
}
// ***** 省略代码 *****
}
public interface LoadBalance<T> {
/** 根据负载均衡策略,挑选invoker */
Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException;
/** 通知invoker列表的更新 */
void refresh(Collection<Invoker<T>> invokers);
}
调用acceptor.handleConnectEvent(key)方法来处理客户端连接成功事件,
或acceptor.handleAcceptEvent(key)方法来处理服务器接受连接成功事件,
或调用acceptor.handleReadEvent(key)方法从Socket里读取数据,
或acceptor.handleWriteEvent(key)方法来写数据到Socket 。
public final class Reactor extends Thread {protected volatile Selector selector = null;private Acceptor acceptor = null;//***** 省略代码 *****public void run() {try {while (!Thread.interrupted()) {// 阻塞直到有网络事件发生。selector.select();//***** 省略代码 *****while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (!key.isValid()) continue;try {//***** 省略代码 *****// 分发传输层协议TCP或UDP网络事件dispatchEvent(key);//***** 省略代码 *****}}//***** 省略代码 *****}//***** 省略代码 *****private void dispatchEvent(final SelectionKey key) throws IOException {if (key.isConnectable()) {acceptor.handleConnectEvent(key);} else if (key.isAcceptable()) {acceptor.handleAcceptEvent(key);} else if (key.isReadable()) {acceptor.handleReadEvent(key);} else if (key.isValid() && key.isWritable()) {acceptor.handleWriteEvent(key);}}}
private void initCommunicator(CommunicatorConfig config) throws CommunicatorConfigException {//***** 省略代码 *****this.threadPoolExecutor = ClientPoolManager.getClientThreadPoolExecutor(config);//***** 省略代码 *****}public class ClientPoolManager {public static ThreadPoolExecutor getClientThreadPoolExecutor(CommunicatorConfig communicatorConfig) {//***** 省略代码 *****clientThreadPoolMap.put(communicatorConfig, createThreadPool(communicatorConfig));//***** 省略代码 *****return clientPoolExecutor;}private static ThreadPoolExecutor createThreadPool(CommunicatorConfig communicatorConfig) {int corePoolSize = communicatorConfig.getCorePoolSize();int maxPoolSize = communicatorConfig.getMaxPoolSize();int keepAliveTime = communicatorConfig.getKeepAliveTime();int queueSize = communicatorConfig.getQueueSize();TaskQueue taskqueue = new TaskQueue(queueSize);String namePrefix = "tars-client-executor-";TaskThreadPoolExecutor executor = new TaskThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, taskqueue, new TaskThreadFactory(namePrefix));taskqueue.setParent(executor);return executor;}}
public class TCPSession extends Session {
public void write(Request request) throws IOException {
try {
IoBuffer buffer = selectorManager.getProtocolFactory().getEncoder().encodeRequest(request, this);
write(buffer);
//***** 省略代码 *****
}
protected void write(IoBuffer buffer) throws IOException {
//***** 省略代码 *****
if (!this.queue.offer(buffer.buf())) {
throw new IOException("The session queue is full. [ queue size:" + queue.size() + " ]");
}
if (key != null) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
key.selector().wakeup();
}
}
protected synchronized int doWrite() throws IOException {
int writeBytes = 0;
while (true) {
ByteBuffer wBuf = queue.peek();
//***** 省略代码 *****
int bytesWritten = ((SocketChannel) channel).write(wBuf);
//***** 省略代码 *****
return writeBytes;
}
}
public class ServantClient {
public <T extends ServantResponse> T invokeWithSync(ServantRequest request) throws IOException {
//***** 省略代码 *****
ticket = TicketManager.createTicket(request, session, this.syncTimeout);
Session current = session;
current.write(request);
if (!ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)) {
//***** 省略代码 *****
response = ticket.response();
//***** 省略代码 *****
return response;
//***** 省略代码 *****
return response;
}
}
public final class WorkThread implements Runnable {
public void run() {
try {
//***** 省略代码 *****
Ticket<Response> ticket = TicketManager.getTicket(resp.getTicketNumber());
//***** 省略代码 *****
ticket.notifyResponse(resp);
ticket.countDown();
TicketManager.removeTicket(ticket.getTicketNumber());
}
//***** 省略代码 *****
}
}
public class TicketManager {//***** 省略代码 *****static {executor.scheduleAtFixedRate(new Runnable() {long currentTime = -1;public void run() {Collection<Ticket<?>> values = tickets.values();currentTime = System.currentTimeMillis();for (Ticket<?> t : values) {if ((currentTime - t.startTime) > t.timeout) {removeTicket(t.getTicketNumber());t.expired();}}}}, 500, 500, TimeUnit.MILLISECONDS);}}
三、总结
四、参考文献
TARS基金会是Linux基金会下的非营利性、微服务基金会,致力于建设一个强大而灵活的微服务生态系统。无论你在哪个行业,无论你使用什么技术栈,这里能助你快速实现你的创意。
点“在看”让TARS小姐姐变好看
