Thrift不同服务类型的使用探索
Thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 等等编程语言间无缝结合的、高效的服务。
一、目标
1. 实例代码准备
2. 对不同的服务类型进行介绍说明,并给出示例
3. 异步客户端调用实例
4. Nifty库的使用,包含服务端和客户端代码示例
二、实例
2.1 说明
在这个示例中,我们主要在用户接口中定义三个接口:保存用户,根据name获取用户列表以及删除用户
如:
/**
* 保存用户
*
* @param user
*/
public boolean save(com.xxx.tutorial.thrift.entity.User user) throws org.apache.thrift.TException;
/**
* 根据name获取用户列表
*
* @param name
*/
public java.util.List<com.xxx.tutorial.thrift.entity.User> findUsersByName(java.lang.String name) throws org.apache.thrift.TException;
/**
* 删除用户
*
* @param userId
*/
public void deleteByUserId(int userId) throws com.xxx.tutorial.thrift.exception.UserNotFoundException, org.apache.thrift.TException;
2.2 编写thrift文件
user.thrift用于定义用户类
namespace java com.xxx.tutorial.thrift.entity
/**
* 用户类
*/
struct User {
1:i32 userId,
2:string name
}
exception.thrift用于自定义异常
include "user.thrift"
include "exception.thrift"
namespace java com.xxx.tutorial.thrift.service
/**
* 用户服务
*/
service UserService {
/**保存用户*/
bool save(1:user.User user),
/**根据name获取用户列表*/
list<user.User> findUsersByName(1:string name),
/**删除用户*/
void deleteByUserId(1:i32 userId) throws (1: exception.UserNotFoundException e)
}
2.3 产生代码
2.4 服务接口代码
2.6 服务实现代码
UserServiceImpl.java的内容如下:
package com.xxx.tutorial.thrift.service.impl;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.exception.UserNotFoundException;
import com.xxx.tutorial.thrift.service.UserService;
/**
* @author wangmengjun
*
*/
public class UserServiceImpl implements UserService.Iface {
private static final Logger logger = Logger.getLogger(UserServiceImpl.class.getName());
public boolean save(User user) throws TException {
logger.info("方法save的参数user的内容==>" + user.toString());
return true;
}
public List<User> findUsersByName(String name) throws TException {
logger.info("方法findUsersByName的参数name的内容==>" + name);
return Arrays.asList(new User(1, "Wang"), new User(2, "Mengjun"));
}
public void deleteByUserId(int userId) throws UserNotFoundException, TException {
/**
* 直接模拟抛出异常,用于测试
*/
logger.info("方法deleteByUserId的参数userId的内容==>" + userId);
throw new UserNotFoundException("1001", String.format("userId=%d的用户不存在", userId));
}
}
三、Thrift不同服务端类型
3.1 服务端类型
TSimpleServer —— 单线程服务器端使用标准的阻塞式 I/O
/**
* Simple singlethreaded server for testing.
*
*/
public class TSimpleServer extends TServer {
... ...
}
TThreadPoolServer —— 多线程服务器端使用标准的阻塞式 I/O
/**
* Server which uses Java's built in ThreadPool management to spawn off
* a worker pool that
*
*/
public class TThreadPoolServer extends TServer {
... ...
}
TNonblockingServer —— 多线程服务器端使用非阻塞式 I/O
/**
* A nonblocking TServer implementation. This allows for fairness amongst all
* connected clients in terms of invocations.
*
* This server is inherently single-threaded. If you want a limited thread pool
* coupled with invocation-fairness, see THsHaServer.
*
* To use this server, you MUST use a TFramedTransport at the outermost
* transport, otherwise this server will be unable to determine when a whole
* method call has been read off the wire. Clients must also use TFramedTransport.
*/
public class TNonblockingServer extends AbstractNonblockingServer {
... ...
}
THsHaSercver —— 是TNonblockingServer的扩展, 多线程服务器端使用非阻塞式 I/O
/**
* An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
* Like TNonblockingServer, it relies on the use of TFramedTransport.
*/
public class THsHaServer extends TNonblockingServer {
... ...
}
TThreadedSelectorServer —— 多线程服务器端使用非阻塞式 I/O
TThreadedSelectorServer是对以上NonblockingServer的扩充, 其分离了Accept和Read/Write的Selector线程,
同时引入Worker工作线程池. 它也是种Half-sync/Half-async的服务模型。
/**
* A Half-Sync/Half-Async server with a separate pool of threads to handle
* non-blocking I/O. Accepts are handled on a single thread, and a configurable
* number of nonblocking selector threads manage reading and writing of client
* connections. A synchronous worker thread pool handles processing of requests.
*
* Performs better than TNonblockingServer/THsHaServer in multi-core
* environments when the the bottleneck is CPU on the single selector thread
* handling I/O. In addition, because the accept handling is decoupled from
* reads/writes and invocation, the server has better ability to handle back-
* pressure from new connections (e.g. stop accepting when busy).
*
* Like TNonblockingServer, it relies on the use of TFramedTransport.
*/
public class TThreadedSelectorServer extends AbstractNonblockingServer {
... ...
}
3.2 服务端创建步骤
(1) 创建一个transport对象
(2) 为transport对象创建输入输出protocol
(3) 基于输入输出protocol创建processor
(4) 等待连接请求并将之交给processor处理
如:
try {
/**
* 1. 创建Transport
*/
TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
TServer.Args tArgs = new TServer.Args(serverTransport);
/**
* 2. 为Transport创建Protocol
*/
tArgs.protocolFactory(new TBinaryProtocol.Factory());
// tArgs.protocolFactory(new TCompactProtocol.Factory());
// tArgs.protocolFactory(new TJSONProtocol.Factory());
/**
* 3. 为Protocol创建Processor
*/
TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
tArgs.processor(tprocessor);
/**
* 4. 创建Server并启动
*
* org.apache.thrift.server.TSimpleServer - 简单的单线程服务模型,一般用于测试
*/
TServer server = new TSimpleServer(tArgs);
logger.info("UserService TSimpleServer start ....");
server.serve();
} catch (Exception e) {
logger.severe("Server start error!!!" + e.getLocalizedMessage());
e.printStackTrace();
}
四、TSimpleServer服务类型
4.1 服务端
package com.xxx.tutorial.thrift.server;
import java.util.logging.Logger;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TServerSocket;
import com.xxx.tutorial.thrift.service.UserService;
import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;
/**
* @author wangmengjun
*
*/
public class TSimpleServerExample {
private static final Logger logger = Logger.getLogger(TSimpleServerExample.class.getName());
private static final int SERVER_PORT = 9123;
public static void main(String[] args) {
try {
/**
* 1. 创建Transport
*/
TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
TServer.Args tArgs = new TServer.Args(serverTransport);
/**
* 2. 为Transport创建Protocol
*/
tArgs.protocolFactory(new TBinaryProtocol.Factory());
// tArgs.protocolFactory(new TCompactProtocol.Factory());
// tArgs.protocolFactory(new TJSONProtocol.Factory());
/**
* 3. 为Protocol创建Processor
*/
TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
tArgs.processor(tprocessor);
/**
* 4. 创建Server并启动
*
* org.apache.thrift.server.TSimpleServer - 简单的单线程服务模型,一般用于测试
*/
TServer server = new TSimpleServer(tArgs);
logger.info("UserService TSimpleServer start ....");
server.serve();
} catch (Exception e) {
logger.severe("Server start error!!!" + e.getLocalizedMessage());
e.printStackTrace();
}
}
}
4.2 客户端
package com.xxx.tutorial.thrift.client;
import java.util.List;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.exception.UserNotFoundException;
import com.xxx.tutorial.thrift.service.UserService;
/**
*
* @author wangmengjun
*
*/
public class UserClient {
private static final Logger logger = Logger.getLogger(UserClient.class.getName());
public static void main(String[] args) {
try {
TTransport transport = new TSocket("127.0.0.1", 9123);
TProtocol protocol = new TBinaryProtocol(transport);
UserService.Client client = new UserService.Client(protocol);
transport.open();
/**
* 查询User列表
*/
List<User> users = client.findUsersByName("wang");
logger.info("client.findUsersByName()方法結果 == >" + users);
/**
* 保存User
*/
boolean isUserSaved = client.save(new User(101, "WMJ"));
logger.info("user saved result == > " + isUserSaved);
/**
* 删除用户
*/
client.deleteByUserId(1002);
transport.close();
} catch (TTransportException e) {
logger.severe("TTransportException==>" + e.getLocalizedMessage());
} catch (UserNotFoundException e) {
logger.severe("UserNotFoundException==>" + e.getMessage());
} catch (TException e) {
logger.severe("TException==>" + e.getLocalizedMessage());
}
}
}
4.3 测试
服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
六月 09, 2017 8:29:14 下午 com.xxx.tutorial.thrift.server.TSimpleServerExample main
信息: UserService TSimpleServer start ....
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName
信息: 方法findUsersByName的参数name的内容==>wang
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save
信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId
信息: 方法deleteByUserId的参数userId的内容==>1002
客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received 1
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.client.UserClient main
信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]
Received 2
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.client.UserClient main
信息: user saved result == > true
Received 3
六月 09, 2017 8:29:27 下午 com.xxx.tutorial.thrift.client.UserClient main
严重: UserNotFoundException==>userId=1002的用户不存在
五、TThreadPoolServer 服务类型
5.1 服务端
package com.xxx.tutorial.thrift.server;
import java.util.logging.Logger;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import com.xxx.tutorial.thrift.service.UserService;
import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;
/**
*
* @author wangmengjun
*
*/
public class TThreadPoolServerExample {
private static final Logger logger = Logger.getLogger(TThreadPoolServerExample.class.getName());
private static final int SERVER_PORT = 9123;
public static void main(String[] args) {
try {
/**
* 1. 创建Transport
*/
TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
TThreadPoolServer.Args tArgs = new TThreadPoolServer.Args(serverTransport);
/**
* 2. 为Transport创建Protocol
*/
tArgs.protocolFactory(new TBinaryProtocol.Factory());
// tArgs.protocolFactory(new TCompactProtocol.Factory());
// tArgs.protocolFactory(new TJSONProtocol.Factory());
/**
* 3. 为Protocol创建Processor
*/
TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
tArgs.processor(tprocessor);
/**
* 4. 创建Server并启动
*
* org.apache.thrift.server.TThreadPoolServer
*/
TServer server = new TThreadPoolServer(tArgs);
logger.info("UserService TThreadPoolServer start ....");
server.serve();
} catch (Exception e) {
logger.severe("Server start error!!!" + e.getLocalizedMessage());
e.printStackTrace();
}
}
}
5.2 客户端
客户端的代码可以和TSimpleServer中使用的Client代码一致,
package com.xxx.tutorial.thrift.client;
import java.util.List;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.exception.UserNotFoundException;
import com.xxx.tutorial.thrift.service.UserService;
/**
*
* @author wangmengjun
*
*/
public class UserClient {
private static final Logger logger = Logger.getLogger(UserClient.class.getName());
public static void main(String[] args) {
try {
TTransport transport = new TSocket("127.0.0.1", 9123);
TProtocol protocol = new TBinaryProtocol(transport);
UserService.Client client = new UserService.Client(protocol);
transport.open();
/**
* 查询User列表
*/
List<User> users = client.findUsersByName("wang");
logger.info("client.findUsersByName()方法結果 == >" + users);
/**
* 保存User
*/
boolean isUserSaved = client.save(new User(101, "WMJ"));
logger.info("user saved result == > " + isUserSaved);
/**
* 删除用户
*/
client.deleteByUserId(1002);
transport.close();
} catch (TTransportException e) {
logger.severe("TTransportException==>" + e.getLocalizedMessage());
} catch (UserNotFoundException e) {
logger.severe("UserNotFoundException==>" + e.getMessage());
} catch (TException e) {
logger.severe("TException==>" + e.getLocalizedMessage());
}
}
}
5.3 测试
服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
六月 09, 2017 8:31:44 下午 com.xxx.tutorial.thrift.server.TThreadPoolServerExample main
信息: UserService TThreadPoolServer start ....
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName
信息: 方法findUsersByName的参数name的内容==>wang
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save
信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId
信息: 方法deleteByUserId的参数userId的内容==>1002
客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received 1
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.client.UserClient main
信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]
Received 2
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.client.UserClient main
信息: user saved result == > true
Received 3
六月 09, 2017 8:32:05 下午 com.xxx.tutorial.thrift.client.UserClient main
严重: UserNotFoundException==>userId=1002的用户不存在
六、TNonblockingServer 服务类型
6.1 服务端
package com.xxx.tutorial.thrift.server;
import java.util.logging.Logger;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import com.xxx.tutorial.thrift.service.UserService;
import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;
/**
*
* @author wangmengjun
*
*/
public class TNonblockingServerExample {
private static final Logger logger = Logger.getLogger(TNonblockingServerExample.class.getName());
private static final int SERVER_PORT = 9123;
public static void main(String[] args) {
try {
/**
* 1. 创建Transport
*/
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(SERVER_PORT);
TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverTransport);
/**
* 2. 为Transport创建Protocol
*/
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory(new TBinaryProtocol.Factory());
// tArgs.protocolFactory(new TCompactProtocol.Factory());
// tArgs.protocolFactory(new TJSONProtocol.Factory());
/**
* 3. 为Protocol创建Processor
*/
TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
tArgs.processor(tprocessor);
/**
* 4. 创建Server并启动
*/
TServer server = new TNonblockingServer(tArgs);
logger.info("UserService TNonblockingServer start ....");
server.serve();
} catch (Exception e) {
logger.severe("Server start error!!!" + e.getLocalizedMessage());
e.printStackTrace();
}
}
}
6.2 客户端
package com.xxx.tutorial.thrift.client;
import java.util.List;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.exception.UserNotFoundException;
import com.xxx.tutorial.thrift.service.UserService;
public class UserClient2 {
private static final Logger logger = Logger.getLogger(UserClient.class.getName());
public static void main(String[] args) {
try {
TTransport transport = new TFramedTransport(new TSocket("127.0.0.1", 9123, 3000));
TProtocol protocol = new TBinaryProtocol(transport);
UserService.Client client = new UserService.Client(protocol);
transport.open();
/**
* 查询User列表
*/
List<User> users = client.findUsersByName("wang");
logger.info("client.findUsersByName()方法結果 == >" + users);
/**
* 保存User
*/
boolean isUserSaved = client.save(new User(101, "WMJ"));
logger.info("user saved result == > " + isUserSaved);
/**
* 删除用户
*/
client.deleteByUserId(1002);
transport.close();
} catch (TTransportException e) {
logger.severe("TTransportException==>" + e.getLocalizedMessage());
} catch (UserNotFoundException e) {
logger.severe("UserNotFoundException==>" + e.getLocalizedMessage());
} catch (TException e) {
logger.severe("TException==>" + e.getLocalizedMessage());
}
}
}
6.3 测试
服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
六月 09, 2017 8:34:38 下午 com.xxx.tutorial.thrift.server.TNonblockingServerExample main
信息: UserService TNonblockingServer start ....
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName
信息: 方法findUsersByName的参数name的内容==>wang
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save
信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId
信息: 方法deleteByUserId的参数userId的内容==>1002
客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received 1
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.client.UserClient2 main
信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]
Received 2
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.client.UserClient2 main
信息: user saved result == > true
Received 3
六月 09, 2017 8:34:59 下午 com.xxx.tutorial.thrift.client.UserClient2 main
严重: UserNotFoundException==>userId=1002的用户不存在
七、THsHaServer服务类型
7.1 服务端
package com.xxx.tutorial.thrift.server;
import java.util.logging.Logger;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import com.xxx.tutorial.thrift.service.UserService;
import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;
/**
* @author wangmengjun
*
*/
public class THsHaServerExample {
private static final Logger logger = Logger.getLogger(THsHaServerExample.class.getName());
private static final int SERVER_PORT = 9123;
public static void main(String[] args) {
try {
/**
* 1. 创建Transport
*/
//TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(SERVER_PORT);
THsHaServer.Args tArgs = new THsHaServer.Args(serverTransport);
/**
* 2. 为Transport创建Protocol
*/
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory(new TBinaryProtocol.Factory());
// tArgs.protocolFactory(new TCompactProtocol.Factory());
// tArgs.protocolFactory(new TJSONProtocol.Factory());
/**
* 3. 为Protocol创建Processor
*/
TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
tArgs.processor(tprocessor);
/**
* 4. 创建Server并启动
*
*/
//半同步半异步的服务模型
TServer server = new THsHaServer(tArgs);
logger.info("UserService TSimpleServer start ....");
server.serve();
} catch (Exception e) {
logger.severe("Server start error!!!" + e.getLocalizedMessage());
e.printStackTrace();
}
}
}
7.2 客户端
package com.xxx.tutorial.thrift.client;
import java.util.List;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.exception.UserNotFoundException;
import com.xxx.tutorial.thrift.service.UserService;
public class UserClient2 {
private static final Logger logger = Logger.getLogger(UserClient.class.getName());
public static void main(String[] args) {
try {
TTransport transport = new TFramedTransport(new TSocket("127.0.0.1", 9123, 3000));
TProtocol protocol = new TBinaryProtocol(transport);
UserService.Client client = new UserService.Client(protocol);
transport.open();
/**
* 查询User列表
*/
List<User> users = client.findUsersByName("wang");
logger.info("client.findUsersByName()方法結果 == >" + users);
/**
* 保存User
*/
boolean isUserSaved = client.save(new User(101, "WMJ"));
logger.info("user saved result == > " + isUserSaved);
/**
* 删除用户
*/
client.deleteByUserId(1002);
transport.close();
} catch (TTransportException e) {
logger.severe("TTransportException==>" + e.getLocalizedMessage());
} catch (UserNotFoundException e) {
logger.severe("UserNotFoundException==>" + e.getLocalizedMessage());
} catch (TException e) {
logger.severe("TException==>" + e.getLocalizedMessage());
}
}
}
7.3 测试
服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
六月 09, 2017 8:57:26 下午 com.xxx.tutorial.thrift.server.THsHaServerExample main
信息: UserService TSimpleServer start ....
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName
信息: 方法findUsersByName的参数name的内容==>wang
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl save
信息: 方法save的参数user的内容==>User(userId:101, name:WMJ)
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl deleteByUserId
信息: 方法deleteByUserId的参数userId的内容==>1002
客户端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received 1
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.client.UserClient2 main
信息: client.findUsersByName()方法結果 == >[User(userId:1, name:Wang), User(userId:2, name:Mengjun)]
Received 2
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.client.UserClient2 main
信息: user saved result == > true
Received 3
六月 09, 2017 8:57:43 下午 com.xxx.tutorial.thrift.client.UserClient2 main
严重: UserNotFoundException==>userId=1002的用户不存在
八、Nifty服务类型Facebook还开源了Nifty -- 一种基于netty的thrift服务端和客户端实现。
Nifty是facebook公司开源的,基于netty的thrift服务端和客户端实现。
使用Nifty,我们只要只要导入Nifty的jar包即可。
<dependency>
<groupId>com.facebook.nifty</groupId>
<artifactId>nifty-core</artifactId>
<version>0.10.0</version>
</dependency>
8.1 服务端
package com.xxx.tutorial.thrift.server;
import org.apache.thrift.TProcessor;
import com.facebook.nifty.core.NettyServerTransport;
import com.facebook.nifty.core.ThriftServerDef;
import com.facebook.nifty.core.ThriftServerDefBuilder;
import com.xxx.tutorial.thrift.service.UserService;
import com.xxx.tutorial.thrift.service.impl.UserServiceImpl;
public class NiftyServer {
public static void main(String[] args) {
// Create the handler
UserService.Iface userServiceImpl = new UserServiceImpl();
// Create the processor
TProcessor processor = new UserService.Processor<>(userServiceImpl);
// Build the server definition
ThriftServerDef serverDef = new ThriftServerDefBuilder().withProcessor(processor).listen(9123).build();
// Create the server transport
final NettyServerTransport server = new NettyServerTransport(serverDef);
// Create netty boss and executor thread pools
/* ExecutorService bossExecutor = Executors.newCachedThreadPool();
ExecutorService workerExecutor = Executors.newCachedThreadPool();
// Start the server
server.start(bossExecutor, workerExecutor);*/
System.out.println("启动~~~");
server.start();
// Arrange to stop the server at shutdown
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
server.stop();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
}
8.2 客户端
如下示例给出NiftyClient来完成调用,使用NiftyClient需要导入相关jar包。
<dependency>
<groupId>com.facebook.nifty</groupId>
<artifactId>nifty-client</artifactId>
<version>0.10.0</version>
</dependency>
package com.xxx.tutorial.thrift.client;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import com.facebook.nifty.client.NiftyClient;
import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.service.UserService;
public class ThriftNiftyClient {
private static final Logger logger = Logger.getLogger(ThriftNiftyClient.class.getName());
@SuppressWarnings({ "resource" })
public static void main(String[] args) {
NiftyClient niftyClient = new NiftyClient();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9123);
try {
TTransport transport = niftyClient.connectSync(address);
TBinaryProtocol tp = new TBinaryProtocol(transport);
UserService.Client userService = new UserService.Client(tp);
logger.info("userService.findUsersByName 方法调用~");
List<User> users = userService.findUsersByName("wang");
logger.info("userService.findUsersByName 调用结果~");
logger.info("users ==> " + users);
} catch (TTransportException e) {
logger.severe("TTransportException ==> " + e.getMessage());
} catch (InterruptedException e) {
logger.severe("InterruptedException ==> " + e.getMessage());
} catch (TException e) {
logger.severe("TException ==> " + e.getMessage());
}
}
}
8.3 测试
服务端控制台输出
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
启动~~~
六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.service.impl.UserServiceImpl findUsersByName
信息: 方法findUsersByName的参数name的内容==>wang
客户端控制台输出
六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.client.ThriftNiftyClient main
信息: userService.findUsersByName 方法调用~
Received 1
六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.client.ThriftNiftyClient main
信息: userService.findUsersByName 调用结果~
六月 09, 2017 9:01:44 下午 com.xxx.tutorial.thrift.client.ThriftNiftyClient main
信息: users ==> [User(userId:1, name:Wang), User(userId:2, name:Mengjun)]
九、异步客户端
package com.xxx.tutorial.thrift.client;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import com.xxx.tutorial.thrift.entity.User;
import com.xxx.tutorial.thrift.service.UserService;
public class UserAsynClient {
private static final Logger logger = Logger.getLogger(UserAsynClient.class.getName());
public static void main(String[] args) {
try {
TAsyncClientManager clientManager = new TAsyncClientManager();
TNonblockingTransport transport = new TNonblockingSocket("127.0.0.1", 9123, 3000);
TProtocolFactory tprotocol = new TBinaryProtocol.Factory();
UserService.AsyncClient asyncClient = new UserService.AsyncClient(tprotocol, clientManager, transport);
System.out.println("Client start .....");
CountDownLatch latch = new CountDownLatch(1);
/*AsynCallback_FindUsersByName callBack = new AsynCallback_FindUsersByName(latch);
asyncClient.findUsersByName("wang", callBack);*/
AsynCallback_SaveUser saveUserCallBack = new AsynCallback_SaveUser(latch);
asyncClient.save(new User(10001,"A name"), saveUserCallBack);
System.out.println("call method findUsersByName_call .... end");
boolean wait = latch.await(30, TimeUnit.SECONDS);
System.out.println("latch.await =:" + wait);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("startClient end.");
}
public static class AsynCallback_FindUsersByName implements AsyncMethodCallback<List<User>> {
private CountDownLatch latch;
public AsynCallback_FindUsersByName(CountDownLatch latch) {
this.latch = latch;
}
public void onComplete(List<User> response) {
logger.info("onComplete ==> findUsersByName_call");
try {
logger.info("findUsersByName_call response ==> " + response.toString());
} finally {
latch.countDown();
}
}
public void onError(Exception exception) {
logger.severe("onError ==> " + exception.getMessage());
latch.countDown();
}
}
public static class AsynCallback_SaveUser implements AsyncMethodCallback<Boolean> {
private CountDownLatch latch;
public AsynCallback_SaveUser(CountDownLatch latch) {
this.latch = latch;
}
public void onComplete(Boolean response) {
logger.info("onComplete ==> save_call");
try {
logger.info("save_call response ==> " + response.toString());
} finally {
latch.countDown();
}
}
public void onError(Exception exception) {
logger.severe("onError ==> " + exception.getMessage());
latch.countDown();
}
}
}
UserService.AsyncClient asyncClient = new UserService.AsyncClient(tprotocol, clientManager, transport);
运行结果~
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Client start .....
call method findUsersByName_call .... end
Received 0
六月 12, 2017 10:52:44 上午 com.xxx.tutorial.thrift.client.UserAsynClient$AsynCallback_SaveUser onComplete
信息: onComplete ==> save_call
六月 12, 2017 10:52:44 上午 com.xxx.tutorial.thrift.client.UserAsynClient$AsynCallback_SaveUser onComplete
信息: save_call response ==> true
latch.await =:true
startClient end.