【你好Hystrix】六:Hystrix结果缓存-HystrixCachedObservable
ReplaySubject
ReplaySubject
会发射所有来自原始Observable
的数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject
,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable
发射的)。
通过一个demo来说明
ReplaySubject<Long> objectReplaySubject = ReplaySubject.create();
//创建一个原始的发射器
Observable<Long> just = Observable.interval(1 , TimeUnit.SECONDS);
//使用ReplaySubject 来订阅 原始的发射器
Subscription subscribe = just.subscribe(objectReplaySubject);
//订阅 ReplaySubject 就像我们直接订阅just 一样。
objectReplaySubject.subscribe(integer -> System.out.println(integer));
TimeUnit.SECONDS.sleep(10);
//可以通过 ReplaySubject 订阅的结果来取消 订阅ReplaySubject的 订阅者
subscribe.unsubscribe();
//10s后 重新订阅ReplaySubject
objectReplaySubject.subscribe(integer -> System.out.println("====" + integer));
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
上面注释比较详细。基本上就是一个简单的缓存模型。
打印结果
0
1
2
3
4
5
6
7
8
9
====0
====1
====2
====3
====4
====5
====6
====7
====8
====9
订阅 ReplaySubject
顺利打印出 0-9 10s之后取消订阅 再次订阅同样打印出 0-9!
HystrixCachedObservable
该类是最基础的对Observable
的缓存,和上面的demo思路几乎是一致的。下面的代码可以结合上面的例子来理解。
//ReplaySubject对原始的Observable订阅结果 用于取消订阅
protected final Subscription originalSubscription;
//其实就是ReplaySubject对象 这里把Subject当做一个Observable
protected final Observable<R> cachedObservable;
//订阅计数器 用于取消对originalObservable的订阅
private volatile int outstandingSubscriptions = 0;
//重点在构造方法里面 使用ReplaySubject来订阅Observable 利用ReplaySubject的回放能力来达到
//缓存的目的
protected HystrixCachedObservable(final Observable<R> originalObservable) {
ReplaySubject<R> replaySubject = ReplaySubject.create();
this.originalSubscription = originalObservable
.subscribe(replaySubject);
this.cachedObservable = replaySubject
.doOnUnsubscribe(() -> {
outstandingSubscriptions--;
if (outstandingSubscriptions == 0) {
originalSubscription.unsubscribe();
}
})
.doOnSubscribe(() -> outstandingSubscriptions++);
}
//下面两个from方法提供给使用者 用来构造一个HystrixCachedObservable
public static <R> HystrixCachedObservable<R> from(Observable<R> o, AbstractCommand<R> originalCommand) {
return new HystrixCommandResponseFromCache<R>(o, originalCommand);
}
public static <R> HystrixCachedObservable<R> from(Observable<R> o) {
return new HystrixCachedObservable<R>(o);
}
//返回缓存的Observable 实际是一个ReplaySubject对象
public Observable<R> toObservable() {
return cachedObservable;
}
//取消订阅 这里取消之后所有对ReplaySubject的订阅都会取消订阅
public void unsubscribe() {
originalSubscription.unsubscribe();
}
Hystrix
对缓存的实现其实和上面的demo的思路是一样的,都是借助ReplaySubject
的回放能力来达到缓存的目的。
HystrixRequestVariableHolder
这个类本应该和 之前的HystrixRequestContext
一起介绍的 在Hystrix合并请求的内容也会使用到它。所以我们就放在使用到它的地方顺带说一下 这样还能了解其使用场景。
前面讲到 Hystrix解决跨线程数据传递的解决方案 之后 这个类理解起来其实是很简单。简单回顾一下: HystrixRequestContext
里面保存着一个很重要的数据结构用来存储我们需要在线程间或线程内传递的值-ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<>()
。所以HystrixRequestVariableDefault
其实只是提供了一个获取值的方法 它并不存储任何具体的数据,存储的任务是交给 LazyInitializer
来完成的。而state
是HystrixRequetContext
的实例变量(HystrixRequetContext
实例是保存在ThreadLocal
里面) 。所以state
保存的值是对一次请求有效。
使用 demo
HystrixRequestVariableHolder<Map<String , String>> requestVariableHolder
= new HystrixRequestVariableHolder<>(new HystrixRequestVariableLifecycle<Map<String, String>>() {
@Override
public Map<String, String> initialValue() {
return new HashMap<>();
}
@Override
public void shutdown(Map<String, String> value) {
}
});
@Test
public void func1() throws InterruptedException {
Thread thread = new Thread(() -> {
testRequestHolder("coredy" , "111111");
});
thread.start();
Thread thread1 = new Thread(() -> {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
testRequestHolder("kiete" , "22222");
});
thread1.start();
thread1.join();
}
private void testRequestHolder(String key , String value){
HystrixRequestContext hystrixRequestContext = HystrixRequestContext.initializeContext();
Map<String, String> firstMap = requestVariableHolder.get(HystrixPlugins.getInstance().getConcurrencyStrategy());
System.out.println(Thread.currentThread().getName() + ":" + firstMap);
firstMap.put(key , value );
Map<String, String> secondMap = requestVariableHolder.get(HystrixPlugins.getInstance().getConcurrencyStrategy());
System.out.println(Thread.currentThread().getName() + ":" + secondMap);
hystrixRequestContext.shutdown();
}
输出
Thread-1:{}
Thread-1:{coredy=111111}
Thread-2:{}
Thread-2:{kiete=22222}
上面的demo实例化一个HystrixRequestVariableHolder
,启动了两个线程 每一个都调用testRequestHolder
方法,我们发现两个线程的数据是独立的互不干扰。所以使用HystrixRequestVariableHolder
保存的数据是非共享的。web环境来说 就是对一次请求有效。
源码
private static ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>> requestVariableInstance = new ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>>();
public T get(HystrixConcurrencyStrategy concurrencyStrategy) {
//构造一个key 注意这里使用不当可能会出现内存泄露
RVCacheKey key = new RVCacheKey(this, concurrencyStrategy);
//获取一个 HystrixRequestVariable对象
HystrixRequestVariable<?> rvInstance = requestVariableInstance.get(key);
if (rvInstance == null) {
requestVariableInstance.putIfAbsent(key, concurrencyStrategy.getRequestVariable(lifeCycleMethods));
//这里Hystrix很友好的提供了一个日志 来避免你的程序错误使用导致的内存泄露
if (requestVariableInstance.size() > 100) {
logger.warn("Over 100 instances of HystrixRequestVariable are being stored. This is likely the sign of a memory leak caused by using unique instances of HystrixConcurrencyStrategy instead of a single instance.");
}
}
return (T) requestVariableInstance.get(key).get();
}
RVCacheKey
是由HystrixRequestVariableHolder
和HystrixConcurrencyStrategy
组成的 这个key决定了requestVariableInstance
的大小。如果没有必要 不要一直new HystrixRequestVariableHolder
然后去get
。上面的例子 我们把实例化HystrixRequestVariableHolder
的操作放在各自的线程中那么requestVariableInstance
中的就会有两条记录。get(HystrixConcurrencyStrategy concurrencyStrategy)
: 该类没有提供类似set的方法 所以 实例化的时候必须要传入一个HystrixRequestVariableLifecycle
来初始化一个引用类型的对象。错误的使用可能会导致程序内存泄露 所以hystrix提供了一个提醒日志 当
requestVariableInstance
的大小超过100 会出现一个警告日志,实际生产中需要注意
HystrixRequestCache
HystrixRequestVariableHolder
了解之后 HystrixRequestCache
就很好理解了。
实例化HystrixRequestVariableHolder
private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>> requestVariableForCache
= new HystrixRequestVariableHolder<>
(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>() {
@Override
public ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> initialValue() {
return new ConcurrentHashMap<>();
}
@Override
public void shutdown(ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> value) {
}
});
上面的requestVariableForCache
就是实例化HystrixRequestVariableHolder
的过程
ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>
缓存到HystrixRequestContext
中。ValueCacheKey
由RequestCacheKey
和HystrixCommandKey
组成。而RequestCacheKey
是由三部分组成:type
1(Command)或者2(Collapser),key
自定义缓存key和concurrencyStrategy
并发策略
获取一个HystrixCachedObservable
对应的代码和我们上面的demo很相似
<T> HystrixCachedObservable<T> get(String cacheKey) {
ValueCacheKey key = getRequestCacheKey(cacheKey);
if (key != null) {
ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
if (cacheInstance == null) {
throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
}
/* look for the stored value */
return (HystrixCachedObservable<T>) cacheInstance.get(key);
}
return null;
}
缓存一个HystrixCachedObservable
<T> HystrixCachedObservable<T> putIfAbsent(String cacheKey, HystrixCachedObservable<T> f) {
ValueCacheKey key = getRequestCacheKey(cacheKey);
if (key != null) {
ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
if (cacheInstance == null) {
throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
}
HystrixCachedObservable<T> alreadySet = (HystrixCachedObservable<T>) cacheInstance.putIfAbsent(key, f);
if (alreadySet != null) {
return alreadySet;
}
}
return null;
}
上面缓存的过程 有一点需要注意 如果抛出 IllegalStateException
异常要检查是否已经执行HystrixRequestContext.initializeContext()
进行初始化操作。
HystrixRequestCache
只是对 HystrixRequestVariableHolder
进行包装让其缓存和Observable
有关的内容 并且对外提供缓存操作。
使用demo
public class HystrixRequestCacheTest extends HystrixCommand<String> {
private String name;
protected HystrixRequestCacheTest(String name) {
super(HystrixCommandGroupKey.Factory.asKey(name));
this.name = name;
}
@Override
protected String run() throws Exception {
return Math.random() + "";
}
@Override
protected String getCacheKey() {
return this.name;
}
public static void main(String[] args) throws InterruptedException {
HystrixRequestContext hystrixRequestContext = HystrixRequestContext.initializeContext();
HystrixRequestCacheTest firstCommand= new HystrixRequestCacheTest("coredy");
firstCommand.toObservable().subscribe(System.out::println);
HystrixRequestCacheTest secondCommand= new HystrixRequestCacheTest("coredy");
secondCommand.toObservable().subscribe(System.out::println);
hystrixRequestContext.shutdown();
TimeUnit.SECONDS.sleep(2);
}
}
0.0021000423483946706
0.0021000423483946706
默认情况下Hystrix
对缓存的配置是开启的。所以我们只需要实现getCacheKey
方法即可。
SpringCloud环境使用Hystrix缓存
在web环境下使用@CacheResult、@CacheKey
来模拟。
UserRest
在控制器中连续调用两次userService.list
@RestController
@RequestMapping("/user")
public class UserRest {
@Autowired
private UserService userService;
@RequestMapping("/list")
public Object list() throws InterruptedException {
HystrixRequestContext.initializeContext();
List list = new ArrayList();
list.add(userService.list("default"));
list.add(userService.list("default"));
return list;
}
}
UserService
在Hystrix的基本用法上面加了两个注解。并且返回的时候增加一个随机值来区别缓存。
@Service
public class UserService {
@HystrixCommand(commandKey = "user_list" , fallbackMethod = "fallback" ,
commandProperties = {
@HystrixProperty(name="execution.isolation.thread.timeoutInMilliseconds" , value = "2000"),
@HystrixProperty(name="execution.isolation.strategy" , value = "THREAD")
})
@CacheResult
public Object list(@CacheKey String group) throws InterruptedException {
return new ArrayList<String>(){{add("coredy");add("lili");add(Math.random() + "");}};
}
public Object fallback(String group){
return new ArrayList<String>(){{add("fallback");add(Math.random() + "");}};
}
}
返回值
[["coredy","lili","0.43455577682806457"],["coredy","lili","0.43455577682806457"]]
总结
Hystrix
请求结果缓存完全是通过Rxjava来实现的,所以需要对响应式编程有一点的了解。文章使用了4个demo分别来对ReplaySubject
、HystrixRequestVariableHolder
、编程式使用Hystrix缓存以及SpringCloud环境下的缓存使用。先会使用然后再看源码的大致实现,也还是很容易理解的。