Java开源框架中的设计模式以及应用场景
public class ConfigManager {
private static final ConfigManager configManager = new ConfigManager();
private ConfigManager() {}
public static ConfigManager getInstance() {
return configManager;
}
}
public class MQClientManager {
private static MQClientManager instance = new MQClientManager();
private MQClientManager() {}
public static MQClientManager getInstance() {
return instance;
}
}
public class ConfigurationFactory{
private static volatile Configuration CONFIG_INSTANCE = null;
public static Configuration getInstance() {
if (CONFIG_INSTANCE == null) {
synchronized (Configuration.class) {
if (CONFIG_INSTANCE == null) {
CONFIG_INSTANCE = buildConfiguration();
}
}
}
return CONFIG_INSTANCE;
}
}
public class DefaultRMHandler extends AbstractRMHandler{
protected DefaultRMHandler() {
initRMHandlers();
}
private static class SingletonHolder {
private static AbstractRMHandler INSTANCE = new DefaultRMHandler();
}
public static AbstractRMHandler get() {
return DefaultRMHandler.SingletonHolder.INSTANCE;
}
}
public abstract class AbstractUndoExecutor{
//生成撤销SQL
protected abstract String buildUndoSQL();
}
public class UndoExecutorFactory {
public static AbstractUndoExecutor getUndoExecutor(String dbType, SQLUndoLog sqlUndoLog) {
switch (sqlUndoLog.getSqlType()) {
case INSERT:
return new MySQLUndoInsertExecutor(sqlUndoLog);
case UPDATE:
return new MySQLUndoUpdateExecutor(sqlUndoLog);
case DELETE:
return new MySQLUndoDeleteExecutor(sqlUndoLog);
default:
throw new ShouldNeverHappenException();
}
}
}
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(),sqlUndoLog);
undoExecutor.executeOn(conn);
public interface Cache {
void put(Object key, Object value);
Object get(Object key);
}
public interface CacheFactory {
Cache getCache(URL url, Invocation invocation);
}
public abstract class AbstractCacheFactory implements CacheFactory {
//具体的缓存实现类
private final ConcurrentMap<String, Cache> caches = new ConcurrentHashMap<String, Cache>();
@Override
public Cache getCache(URL url, Invocation invocation) {
url = url.addParameter(Constants.METHOD_KEY, invocation.getMethodName());
String key = url.toFullString();
Cache cache = caches.get(key);
if (cache == null) {
//创建缓存实现类,交给子类实现
caches.put(key, createCache(url));
cache = caches.get(key);
}
return cache;
}
//抽象方法,交给子类实现
protected abstract Cache createCache(URL url);
}
ExpiringCacheFactory、JCacheFactory、LruCacheFactory、ThreadLocalCacheFactory。
这些工厂类,只有一个方法,就是创建具体的缓存实现类。
public class ThreadLocalCacheFactory extends AbstractCacheFactory {
@Override
protected Cache createCache(URL url) {
return new ThreadLocalCache(url);
}
}
public class ThreadLocalCache implements Cache {
private final ThreadLocal<Map<Object, Object>> store;
public ThreadLocalCache(URL url) {
this.store = new ThreadLocal<Map<Object, Object>>() {
@Override
protected Map<Object, Object> initialValue() {
return new HashMap<Object, Object>();
}
};
}
@Override
public void put(Object key, Object value) {
store.get().put(key, value);
}
@Override
public Object get(Object key) {
return store.get().get(key);
}
}
public static void main(String[] args) {
URL url = URL.valueOf("http://localhost:8080/cache=jacache&.cache.write.expire=1");
Invocation invocation = new RpcInvocation();
CacheFactory cacheFactory = new ThreadLocalCacheFactory();
Cache cache = cacheFactory.getCache(url, invocation);
cache.put("java","java");
System.out.println(cache.get("java"));
}
工厂方法模式只有一个抽象产品类,具体工厂类只能创建一个具体产品类的实例;
抽象工厂模式有多个抽象产品类,具体工厂类可以创建多个具体产品类的实例。
public interface DataBase {
void insert(Object tableName, Object record);
Object select(Object tableName);
}
public class MysqlDataBase implements DataBase{
Map<Object,Object> mysqlDb = new HashMap<>();
@Override
public void insert(Object tableName, Object record) {
mysqlDb.put(tableName,record);
}
@Override
public Object select(Object tableName) {
return mysqlDb.get(tableName);
}
}
public class OracleDataBase implements DataBase {
Map<Object,Object> oracleDb = new HashMap<>();
@Override
public void insert(Object tableName, Object record) {
oracleDb.put(tableName,record);
}
@Override
public Object select(Object tableName) {
return oracleDb.get(tableName);
}
}
public interface DataAccessFactory {
Cache getCache(URL url);
DataBase getDb();
}
public class DataAccessFactory1 implements DataAccessFactory {
@Override
public Cache getCache(URL url) {
return new ThreadLocalCache(url);
}
@Override
public DataBase getDb() {
return new MysqlDataBase();
}
}
public class DataAccessFactory2 implements DataAccessFactory {
@Override
public Cache getCache(URL url) {
return new LruCache(url);
}
@Override
public DataBase getDb() {
return new OracleDataBase();
}
}
public interface LoadBalance {
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.isEmpty()) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
return doSelect(invokers, url, invocation);
}
//抽象方法,由子类选择一个Invoker
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
}
基于权重随机算法的 RandomLoadBalance
基于最少活跃调用数算法的 LeastActiveLoadBalance
基于 hash 一致性的 ConsistentHashLoadBalance
基于加权轮询算法的 RoundRobinLoadBalance
public class RandomLoadBalance extends AbstractLoadBalance {
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
//省略逻辑....
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
}
public final class MappedStatement {
private String resource;
private Configuration configuration;
private String id;
private SqlSource sqlSource;
private ParameterMap parameterMap;
private List<ResultMap> resultMaps;
//.....省略大部分属性
}
public static class Builder {
private MappedStatement mappedStatement = new MappedStatement();
public Builder(Configuration configuration, String id, SqlSource sqlSource, SqlCommandType sqlCommandType) {
mappedStatement.configuration = configuration;
mappedStatement.id = id;
mappedStatement.sqlSource = sqlSource;
mappedStatement.statementType = StatementType.PREPARED;
mappedStatement.resultSetType = ResultSetType.DEFAULT;
//.....省略大部分过程
}
}
public static class Builder {
public Builder parameterMap(ParameterMap parameterMap) {
mappedStatement.parameterMap = parameterMap;
return this;
}
public Builder resultMaps(List<ResultMap> resultMaps) {
mappedStatement.resultMaps = resultMaps;
for (ResultMap resultMap : resultMaps) {
mappedStatement.hasNestedResultMaps = mappedStatement.hasNestedResultMaps || resultMap.hasNestedResultMaps();
}
return this;
}
public Builder statementType(StatementType statementType) {
mappedStatement.statementType = statementType;
return this;
}
public Builder resultSetType(ResultSetType resultSetType) {
mappedStatement.resultSetType = resultSetType == null ? ResultSetType.DEFAULT : resultSetType;
return this;
}
}
public MappedStatement build() {
assert mappedStatement.configuration != null;
assert mappedStatement.id != null;
assert mappedStatement.sqlSource != null;
assert mappedStatement.lang != null;
mappedStatement.resultMaps = Collections.unmodifiableList(mappedStatement.resultMaps);
return mappedStatement;
}
MappedStatement.Builder statementBuilder = new MappedStatement.Builder(configuration, id, sqlSource, sqlCommandType)
.resource(resource)
.fetchSize(fetchSize)
.timeout(timeout)
.statementType(statementType)
.keyGenerator(keyGenerator)
.keyProperty(keyProperty)
.keyColumn(keyColumn)
.databaseId(databaseId)
.lang(lang)
.resultOrdered(resultOrdered)
.resultSets(resultSets)
.resultMaps(getStatementResultMaps(resultMap, resultType, id))
.resultSetType(resultSetType)
.flushCacheRequired(valueOrDefault(flushCache, !isSelect))
.useCache(valueOrDefault(useCache, isSelect))
.cache(currentCache);
ParameterMap statementParameterMap = getStatementParameterMap(parameterMap, parameterType, id);
MappedStatement statement = statementBuilder.build();
configuration.addMappedStatement(statement);
return statement;
@SPI
public interface Codec2 {
@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
@Adaptive({Constants.CODEC_KEY})
Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
enum DecodeResult {
NEED_MORE_INPUT, SKIP_SOME_INPUT
}
}
final public class NettyCodecAdapter {
private final ChannelHandler encoder = new InternalEncoder();
private final ChannelHandler decoder = new InternalDecoder();
private final Codec2 codec;
private final URL url;
private final org.apache.dubbo.remoting.ChannelHandler handler;
public NettyCodecAdapter(Codec2 codec, URL url, org.apache.dubbo.remoting.ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
}
public ChannelHandler getEncoder() {
return encoder;
}
public ChannelHandler getDecoder() {
return decoder;
}
private class InternalEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
org.apache.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
codec.encode(channel, buffer, msg);
}
}
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
//解码对象
codec.decode(channel, message);
//省略部分代码...
}
}
}
//通过SPI方式获取编解码器的实现类,比如这里是DubboCountCodec
Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
URL url = new URL("dubbo", "localhost", 22226);
//创建适配器
NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, NettyClient.this);
//向ChannelPipeline中添加编解码处理器
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel channel) {
channel.pipeline()
.addLast(new ChannelHandler1())
.addLast(new ChannelHandler2())
.addLast(new ChannelHandler3());
}
});
public interface ChannelHandler {
//当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
//当从 ChannelPipeline 中移除 ChannelHandler 时被调用
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
//当处理过程中在 ChannelPipeline 中有错误产生时被调用
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
public interface ChannelInboundHandler extends ChannelHandler {
//当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
//当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调用
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
//当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪
void channelActive(ChannelHandlerContext ctx) throws Exception;
//当 Channel 离开活动状态并且不再连接它的远程节点时被调用
void channelInactive(ChannelHandlerContext ctx) throws Exception;
当从 Channel 读取数据时被调用
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
//当 Channel上的一个读操作完成时被调用
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
public interface ChannelPipeline{
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline remove(ChannelHandler handler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
@Override
ChannelPipeline fireChannelRegistered();
@Override
ChannelPipeline fireChannelActive();
@Override
ChannelPipeline fireExceptionCaught(Throwable cause);
@Override
ChannelPipeline fireUserEventTriggered(Object event);
@Override
ChannelPipeline fireChannelRead(Object msg);
@Override
ChannelPipeline flush();
//省略部分方法.....
}
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
public interface ChannelHandlerContext{
Channel channel();
EventExecutor executor();
ChannelHandler handler();
ChannelPipeline pipeline();
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext read();
@Override
ChannelHandlerContext flush();
//省略部分方法……
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
//查找下一个Inbound类型的处理器,左 > 右
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
//查找下一个Outbound类型的处理器,右 > 左
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.prev;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
}
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
public final void read() {
//从Channel中获取对应的ChannelPipeline
final ChannelPipeline pipeline = pipeline();
//数据载体
ByteBuf byteBuf = allocHandle.allocate(allocator);
//传递数据
pipeline.fireChannelRead(byteBuf);
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
//找到下一个ChannelHandler并执行
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
}
public class ChannelHandler1 extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg){
System.out.println( "ChannelHandler1:"+msg);
ctx.fireChannelRead(msg);
}
}
public interface NettyRequestProcessor {
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws Exception;
boolean rejectRequest();
}
//默认的消息处理器
public class DefaultRequestProcessor implements NettyRequestProcessor {}
//发送消息的处理器
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {}
//拉取消息的处理器
public class PullMessageProcessor implements NettyRequestProcessor {}
//查询消息的处理器
public class QueryMessageProcessor implements NettyRequestProcessor {}
//消费者端管理的处理器
public class ConsumerManageProcessor implements NettyRequestProcessor {}
public class BrokerController {
public void registerProcessor() {
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE,this.pullMessageProcessor,this.pullMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
//省略部分注册过程.....
}
}
public abstract class NettyRemotingAbstract {
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//根据请求类型找到对应的策略类
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
//如果没有找到就使用默认的
final Pair<NettyRequestProcessor, ExecutorService> pair =
null == matched ? this.defaultRequestProcessor : matched;
//执行策略
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
//省略大部分代码......
}
}
面向接口代理;
屏蔽调用底层细节。
public interface StorageDubboService {
int decreaseStorage(StorageDTO storage);
}
@Reference
StorageDubboService storageDubboService;
public class ReferenceBean<T>{
@Override
public Object getObject() {
return get();
}
public synchronized T get() {
if (ref == null) {
init();
}
return ref;
}
}
public interface UserMapper {
List<User> getUserList();
}
protected T newInstance(SqlSession sqlSession) {
final MapperProxy<T> mapperProxy = new MapperProxy<>(sqlSession, mapperInterface, methodCache);
return (T) Proxy.newProxyInstance(mapperInterface.getClassLoader(), new Class[] { mapperInterface }, mapperProxy);
}
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
//默认的执行器
executorType = executorType == null ? defaultExecutorType : executorType;
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
Executor executor;
if (ExecutorType.BATCH == executorType) {
executor = new BatchExecutor(this, transaction);
} else if (ExecutorType.REUSE == executorType) {
executor = new ReuseExecutor(this, transaction);
} else {
executor = new SimpleExecutor(this, transaction);
}
//使用缓存执行器来装饰
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}
public class CachingExecutor implements Executor {
@Override
public <E> List<E> query()throws SQLException {
Cache cache = ms.getCache();
if (cache != null) {
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
}
@Component
public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
System.out.println("干一些系统初始化的事情....");
ApplicationContext context = event.getApplicationContext();
String[] names = context.getBeanDefinitionNames();
for (String beanName:names){
System.out.println("----------"+beanName+"---------");
}
}
}
public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext {
//观察者容器
private final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();
//被观察者
private ApplicationEventMulticaster applicationEventMulticaster;
}
public abstract class AbstractApplicationContext{
protected void registerListeners() {
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
}
protected void finishRefresh() {
publishEvent(new ContextRefreshedEvent(this));
}
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
listener.onApplicationEvent(event);
}
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
ApplicationEventPublisherAware {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
}
public class ServiceBean<T>{
public void export() {
super.export();
publishExportEvent();
}
private void publishExportEvent() {
ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this);
applicationEventPublisher.publishEvent(exportEvent);
}
}
public class ServiceBeanExportedEvent extends ApplicationEvent {
public ServiceBeanExportedEvent(ServiceBean serviceBean) {
super(serviceBean);
}
public ServiceBean getServiceBean() {
return (ServiceBean) super.getSource();
}
}
@Component
public class ServiceBeanListener implements ApplicationListener<ServiceBeanExportedEvent> {
@Override
public void onApplicationEvent(ServiceBeanExportedEvent event) {
ServiceBean serviceBean = event.getServiceBean();
String beanName = serviceBean.getBeanName();
Service service = serviceBean.getService();
System.out.println(beanName+":"+service);
}
}
public class OrderServiceHystrixCommand extends HystrixCommand<Object> {
//接收者,处理业务逻辑
private OrderService orderService;
public OrderServiceHystrixCommand(OrderService orderService) {
super(setter());
this.orderService = orderService;
}
//设置Hystrix相关参数
public static Setter setter() {
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("orderGroup");
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("orderService");
HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter().withCoreSize(1)
.withQueueSizeRejectionThreshold(1);
HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter();
return Setter.withGroupKey(groupKey)
.andCommandKey(commandKey)
.andThreadPoolPropertiesDefaults(threadPoolProperties)
.andCommandPropertiesDefaults(commandProperties);
}
@Override
protected Object run() throws InterruptedException {
Thread.sleep(500);
return orderService.orders();
}
@Override
protected Object getFallback() {
System.out.println("-------------------------------");
return new ArrayList();
}
}
@RestController
public class OrderController {
@Autowired
OrderService orderService;
@RequestMapping("/orders")
public Object orders(){
OrderServiceHystrixCommand command = new OrderServiceHystrixCommand(orderService);
return command.execute();
}
}