线上 48G Java 应用内存 OOM 真实案例分析(第 2 篇)
背景
现象:在第一版优化之后,会发现小应用内存会 OOM ,而且占用的 TCP 的文件句柄并未释放。
寻找问题
直接 dump 一份内存文件下来。如下图:
观察发现,怎么 byte[] 数组占用内存又升上来了???一脸懵逼。
仔细分析,其中 NioSocketSession 已经有 18w 了,说明此时的连接数已经升到了 18w,比正常运行情况下的连接数整整多了 10w。那么可以大胆猜测:是不是业务处理消息全部堆积在内存中,处理不过来了?
为了验证我们的猜想,继续往下分析:byte[] 数组 outgoing 进去后,发现又出现了大量 16921 位的 byte 数组,字节在 15.83 kb。
而且值是类似 < /stream:stream> ,占用的字节基本也是确定的。
继续跟踪是什么对象引用了这种类型的数组,如下图,可以发现是 HeapByteBuf 对象。这里就不得不提 Mina 的缓存机制和业务处理机制。
源码分析
XmppServer 的代码片段。
int processors = Config.getInt("server.processors", -1);
if (processors < 1) {
acceptor = new NioSocketAcceptor();
} else {
acceptor = new NioSocketAcceptor(processors);
}
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
// 业务处理的线程池,这里默认的线程池大小为 16 个,使用阻塞同步队列
chain.addLast("executor", new ExecutorFilter());
chain.addLast("codec", new ProtocolCodecFilter(new XmppCodecFactory()));
SocketSessionConfig cfg = acceptor.getSessionConfig();
cfg.setReceiveBufferSize(8096 * 2);
// 这里设置的 16192 的读取缓存数组,也就是 mat 分析中的数组大小
cfg.setReadBufferSize(8096 * 2);
cfg.setSendBufferSize(8096 * 2);
cfg.setKeepAlive(true);
cfg.setSoLinger(0);
XmppIoHandler xmppHandler = new XmppIoHandler();
acceptor.setHandler(xmppHandler);
acceptor.setReuseAddress(true);
acceptor.bind(new InetSocketAddress(PORT));
log.info("XmppServer started: " + serverName);
log.info("Androidpn Server v" + version);
started = true;
ExecutorFilter 代码片段。默认的线程池创建使用的核心线程数是 0 ,最大线程数是 16 ,线程闲置时间是 30 秒。
public class ExecutorFilter extends IoFilterAdapter {
private EnumSet<IoEventType> eventTypes;
private Executor executor;
private boolean manageableExecutor;
private static final int DEFAULT_MAX_POOL_SIZE = 16;
private static final int BASE_THREAD_NUMBER = 0;
private static final long DEFAULT_KEEPALIVE_TIME = 30L;
private static final boolean MANAGEABLE_EXECUTOR = true;
private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
private static IoEventType[] DEFAULT_EVENT_SET;
// 默认构造器
public ExecutorFilter() {
Executor executor = this.createDefaultExecutor(0, 16, 30L, TimeUnit.SECONDS, Executors.defaultThreadFactory(), (IoEventQueueHandler)null);
this.init(executor, true);
}
// 创建线程池
private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler);
return executor;
}
mina 的 OrderedThreadPoolExecutor
public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
...
public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {
// 这里使用了 SynchronousQueue 同步阻塞队列
super(0, 1, keepAliveTime, unit, new SynchronousQueue(), threadFactory, new AbortPolicy());
this.TASKS_QUEUE = new AttributeKey(this.getClass(), "tasksQueue");
this.waitingSessions = new LinkedBlockingQueue();
this.workers = new HashSet();
this.idleWorkers = new AtomicInteger();
if (corePoolSize < 0) {
throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
} else if (maximumPoolSize != 0 && maximumPoolSize >= corePoolSize) {
super.setCorePoolSize(corePoolSize);
super.setMaximumPoolSize(maximumPoolSize);
if (eventQueueHandler == null) {
this.eventQueueHandler = IoEventQueueHandler.NOOP;
} else {
this.eventQueueHandler = eventQueueHandler;
}
} else {
throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
}
}
private static final IoSession EXIT_SIGNAL = new DummySession();
private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
private final Set<Worker> workers = new HashSet<Worker>();
private final AtomicInteger idleWorkers = new AtomicInteger();
private long completedTaskCount;
private volatile boolean shutdown;
...
private SessionTasksQueue getSessionTasksQueue(IoSession session) {
SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);
if (queue == null) {
queue = new SessionTasksQueue();
SessionTasksQueue oldQueue =
(SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
if (oldQueue != null) {
queue = oldQueue;
}
}
return queue;
}
private void addWorker() {
synchronized (workers) {
if (workers.size() >= super.getMaximumPoolSize()) {
return;
}
// Create a new worker, and add it to the thread pool
Worker worker = new Worker();
Thread thread = getThreadFactory().newThread(worker);
// As we have added a new thread, it's considered as idle.
idleWorkers.incrementAndGet();
// Now, we can start it.
thread.start();
workers.add(worker);
if (workers.size() > largestPoolSize) {
largestPoolSize = workers.size();
}
}
}
private class SessionTasksQueue {
private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
private boolean processingCompleted = true;
}
...
}
Mina 的业务线程池策略图,如下:
解题思路
如上分析,既然内存里面堆积了很多的业务消息,那么我们要想办法处理掉这些消息,或者说加快处理的速度。那么怎么加快速度呢 ?
1、调整线程数配置;如将线程数上升至 128 ,但是对于大量的堆积,即使升高了线程数可能也是杯水车薪。
2、调整线程策略。如直接使用 CachedThreadPoolExecutor ,但是这个会使消息乱序。目前是不允许的。
另外一个问题是,即使内存堆积了,是否可以调整数组大小,当内存堆积时,少占用内存。
1、直接调整 mina 缓存的读取数组大小,不限制成 16192 ,考虑设置成活动的大小,比如最小是 256 ,最大占用了 16912。
上线
先在 4G 的小应用上进行改版测试、观察效果。
改版内存大小后,发现连接数正常运转上线可以支撑到 13.9W,相比之前的 8W 连接多了 5W 。然后观察在此时的深堆中,没有 org.apache.mina.filter.executor.OrderedThreadPoolExecutor$SessionTasksQueue 这个对象,说明此时内存中还未出现消息堆积。
题外话,这里又发现内存中堆积了很多 8192 的 byte 数组。之后再分析,这里是 redis 的读取数组。
OOM 再现
这里是一次内存 OOM 后的结果,发现在 17W 左右的内存数时,第二次优化后的程序也 OOM 了。不过此时 OOM ,仔细观察发现,byte 数组和 session 所占用内存都不大(与第一次相比),所以此时的问题和第一次 OOM 是不一样的。继续摸索。
注意左边黄框中的红框,大家会发现,加载的 class 居然到了 82145 个,同时加载的 class loader 也到了 74993 个,所以这里的内存 OOM 应该是 jdk7 中的永久区挂了。这就和代码以及永久去配置大小有关了,这个后文继续分享。