vlambda博客
学习文章列表

Java语言中Method Reference操作的一个坑

我,又遇到了一个深坑!

众所周知,在Java 8中,Java语言增加了lambda表达式,用于支持函数式编程。与此同时,Java中也增加了一个名为Method Reference的方法,即::。

因此,我们有了如下的写法:

public static void main(String[] args) {
    String[] str = {"Hello""World""Is""Beautiful"};
    Arrays.stream(str).forEach(System.out::println);
}

多么顺畅的编码方式!

然后,因为我们公司使用了CAT做全链路监控。我呢,又不想在每个需要打点的方法上很丑陋地hard coding对CAT的调用。所以,理所当然地要用AOP来实现啦。

于是就有了一个CAT监控的AOP切面类,基于Spring实现的,主要如下:

@Component
@Aspect
public class CatTransactionAspect {
    protected final String DEFAULT_TYPE = "UnknownType";
    protected final String DEFAULT_NAME = "UnknownName";

    @Around("@annotation(com.XXX.XXX.....CatTransactional)")
    public Object callWithCatTransaction(ProceedingJoinPoint pjp) throws Throwable {
        Transaction transaction = Cat.newTransaction(getCatTransactionType(pjp), getCatTransactionName(pjp));
        try {
            transaction.setStatus(Transaction.SUCCESS);
            return pjp.proceed();
        } catch (NoResultException ex) {
            // 这个表示没查到数据,不作为异常处理
            throw ex;
        }
        catch (Throwable ex) {
            transaction.setStatus(ex);
            Cat.logError(ex);
            throw ex;
        } finally {
            transaction.complete();
        }
    }
}

知道的人都知道,这个类切面切的是所有使用@CatTransactional方法注解的方法。所以,我就可以使用下面的方式来处理CAT监控打点上报的逻辑了:

    @CatTransactional
    public void syncDataIntoUserProperty(String message) {
        try {
            // 有双引号的话,EFK抓取日志解析的时候可能会出问题
            log.info("Sync Data from kafka: {}", message.replaceAll("\"""`"));
            UserPropertyEntity entity = proxy.apply(messageConverter::convert, message, MessageConversionException.class);
            proxy.consume(service::update, entity, InsightException.class);
        } catch (MessageConversionException ex) {
            // 如果是消息转换异常,则打印告警日志,继续处理下一条消息
            log.warn("Failed to convert kafka message[{}] to UserPropertyEntity object, ex[{}]", message, ex);
        } catch (InsightException ex) {
            // 如果系统内部出现异常,则打印告警日志,继续处理下一条消息
            log.warn("Failed to update message[{}] into USER_PROPERTY, ex[{}]", message, ex);
        }
    }

但是啊,这个不够啊!

这样的编码方式,只能支持我自己写的方法。如果需要使用CAT打点上报的不是我自己写的方法,而是对别人方法的调用呢?我总不能把别人写的方法的源码下载下来,然后强行在它们的方法头上加@CatTransactional注解吧!

所以,我又创建了一个类,用来对外部方法的调用进行封装:

/**
 * 由于Spring AOP只能切Bean的方法,设计这个类用于代理所有需要使用CAT打点的非bean对象方法的调用。
 * @author jingxuan
 * @date 2020/12/12 11:03 上午
 */

@Component
public class CatTransactionProxy {
    @CatTransactional
    public int getAsInt(IntSupplier supplier) {
        return supplier.getAsInt();
    }

    @CatTransactional
    public <T> void consume(Consumer<T> consumer, T value) {
        consumer.accept(value);
    }

    @CatTransactional
    public void run(Runnable runnable) {
        runnable.run();
    }

    @CatTransactional
    public <T, R> apply(Function<T, R> function, T t) {
        return function.apply(t);
    }

    @CatTransactional
    public <T, U, R> apply(BiFunction<T, U, R> function, T t, U u) {
        return function.apply(t, u);
    }

    @CatTransactional
    public <T> get(Supplier<T> supplier) {
        return supplier.get();
    }
}

有了它,我就可以这样子愉快地编程了:

    @Autowired
    protected CatTransactionProxy proxy = null;
    
    @Override
    public UserPropertyEntity replace(UserPropertyEntity entity) throws InsightException {
        Assert.notNull(entity, "The input entity should not be null.");
        EntityManager entityManager = entityManagerFactory.createEntityManager();
        try {
            EntityTransaction transaction = entityManager.getTransaction();

            proxy.run(transaction::begin);
            UserPropertyEntity result = proxy.apply(entityManager::merge, entity);
            proxy.run(transaction::commit);

            return result;
        } catch (Throwable ex) {
            throw new ReadRecordException("Fail to replace record " + ex.getMessage(), ex);
        } finally {
            entityManager.close();
        }
    }

凡是需要使用CAT监控打点的地方,我都改写成proxy.apply(entityManager::merge, entity)这样的方式,便可以自动进行上报了!

然后,CAT上终将看到如下这样的链路打点记录:

不过啊,作为一个特别讲究的程序员,我觉得这个applyconsume这样的展示不太好。如果能使用proxy.apply(entityManager::merge, entity)里面的entityManager::merge替代它们的话,那就更直观了。

然后呢,请看下图:

红色框框圈出来的部分所显示的内容让人绝望!Method Reference对象在运行时会直接解释成lambda表达式!而众所周知的,lambda编译完之后会变成invokedynamic字节码,这玩意儿是不带任何能说明自己从哪儿创建的元数据信息的。所以在运行期想要通过lambda表达式获知entityManager::merge这个方法名,基本是歇菜了。

怎么办呢?我突然想到了ThreadLocal!感觉只能由ThreadLocal来拯救我了。

首先,我们创建一个类,用于管理transactionName

/**
 * 基于{@link ThreadLocal}实现的用于管理`transactionName`的对象
 * @author jingxuan
 * @date 2021/1/13 7:51 下午
 */

@Component
public class CatTransactionNameCache {
    protected ThreadLocal<String> transactionName = new ThreadLocal<>();
    public void setTransactionName(String transactionName) {
        this.transactionName.set(transactionName);
    }
    
    public String getTransactionName() {
        return this.transactionName.get();
    }

    public void remove() {
        this.transactionName.remove();
    }
}

同时,修改一下原来用于获取transactionName名称的方法,增加逻辑:

    @Autowired
    protected CatTransactionNameCache transactionNameCache = null;
    
    protected String getCatTransactionName(ProceedingJoinPoint pjp) {
        if (transactionNameCache.getTransactionName() != null) {
            return transactionNameCache.getTransactionName();
        }
        
        if (pjp instanceof MethodInvocationProceedingJoinPoint) {
            return getCatTransactionName((MethodInvocationProceedingJoinPoint)pjp);
        }
        return DEFAULT_NAME;
    }

在原来的切面方法的final段,增加清除transactionName的逻辑:

    @Around("@annotation(com.xiaohongshu.risk.platform.insight.monitor.CatTransactional)")
    public Object callWithCatTransaction(ProceedingJoinPoint pjp) throws Throwable {
        Transaction transaction = Cat.newTransaction(getCatTransactionType(pjp), getCatTransactionName(pjp));
        try {
            transaction.setStatus(Transaction.SUCCESS);
            return pjp.proceed();
        } catch (NoResultException ex) {
            // 这个表示没查到数据,不作为异常处理
            throw ex;
        }
        catch (Throwable ex) {
            transaction.setStatus(ex);
            Cat.logError(ex);
            throw ex;
        } finally {
            transactionNameCache.remove();
            transaction.complete();
        }
    }

再增加一个类,要求调用方法时,必须传入transactionName参数:

/**
 * @author jingxuan
 * @date 2021/1/13 8:13 下午
 */

@Component
public class NamedCatTransactionProxy {
    @Autowired
    protected CatTransactionNameCache transactionNameCache = null;

    @Autowired
    protected CatTransactionProxy proxy = null;

    public int getAsInt(String transactionName, IntSupplier supplier) {
        this.transactionNameCache.setTransactionName(transactionName);
        return this.proxy.getAsInt(supplier);
    }

    public <T> void consume(String transactionName, Consumer<T> consumer, T value) {
        this.transactionNameCache.setTransactionName(transactionName);
        this.proxy.consume(consumer, value);
    }

    public void run(String transactionName, Runnable runnable) {
        this.transactionNameCache.setTransactionName(transactionName);
        this.proxy.run(runnable);
    }

    public <T, R> apply(String transactionName, Function<T, R> function, T t) {
        this.transactionNameCache.setTransactionName(transactionName);
        return this.proxy.apply(function, t);
    }

    public <T, U, R> apply(String transactionName, BiFunction<T, U, R> function, T t, U u) {
        this.transactionNameCache.setTransactionName(transactionName);
        return this.proxy.apply(function, t, u);
    }

    public <T> get(String transactionName, Supplier<T> supplier) {
        this.transactionNameCache.setTransactionName(transactionName);
        return this.proxy.get(supplier);
    }
}

在方法调用处,修改下proxy类型为NamedCatTransactionProxy,同时为当前的CatTransaction起一个名字。这地方看起来就比较丑陋了,但没办法,Java真不支持更简单的写法。比较悲伤……

    @Autowired
    protected NamedCatTransactionProxy proxy;
    
    @CatTransactional
    public void syncDataIntoUserProperty(String message) {
        try {
            // 有双引号的话,EFK抓取日志解析的时候可能会出问题
            log.info("Sync Data from kafka: {}", message.replaceAll("\"""`"));
            UserPropertyEntity entity = proxy.apply("messageConverter::convert", messageConverter::convert, message, MessageConversionException.class);
            proxy.consume("service::update", service::update, entity, InsightException.class);
        } catch (MessageConversionException ex) {
            // 如果是消息转换异常,则打印告警日志,继续处理下一条消息
            log.warn("Failed to convert kafka message[{}] to UserPropertyEntity object, ex[{}]", message, ex);
        } catch (InsightException ex) {
            // 如果系统内部出现异常,则打印告警日志,继续处理下一条消息
            log.warn("Failed to update message[{}] into USER_PROPERTY, ex[{}]", message, ex);
        }
    }

然后,我们就能得到如下的打点记录了:

好了,填坑之旅结束了,感觉最终的效果总体而言,还是可以的。

不过个人认为,java在后续版本还是要参考下其他语言,增加对Method Reference相关的元数据管理,这才是王道啊。