vlambda博客
学习文章列表

nodejs负载均衡(一):服务负载均衡

负载平衡(Load balancing)是一种计算机技术,用来在多个计算机(计算机集群)、网络连接、CPU、磁盘驱动器或其他资源中分配负载,以达到最优化资源使用、最大化吞吐率、最小化响应时间、同时避免过载的目的。使用带有负载平衡的多个服务器组件,取代单一的组件,可以通过冗余提高可靠性。负载平衡服务通常是由专用软件和硬件来完成。主要作用是将大量作业合理地分摊到多个操作单元上进行执行,用于解决互联网架构中的 高并发高可用的问题。- wiki

负载均衡(Load Balance)是建立在网络协议分层上的,通过网络协议里面的处理将负载的作业合理的分摊到多个操作单元上。

所以针对网络协议层有不同负载均衡策略 2/3/4/7层负载均衡 ,负载均衡的实现分 软/硬,顾名思义:

  • 一个是通过软件实现,成本低、灵活简单,缺点受服务器性能影响

  • 一个是通过硬件,性能优于软负载均衡,但是成本高 

nodejs能做哪些

先看下面的请求链路图(举个例子,实现方式、策略、架构等有很多)

  1. DNS、VIP、Nginx服务的负载均衡底层服务(云)或者运维已经搭建好了,不需node开发过多关心

  2. Nginx负载均衡到web服务集群,可以使用 upstream  模块配置不同策略

  3. 重点node单个服务负载均衡,主进程分派到多个子进程,这是属于软负载均衡

  4. 假如node服务要通过RPC调用远程其他服务,为了不影响其他服务,需要将RPC均衡分派到其他服务的不同节点上

结论:从上面看出3、4是nodejs服务可以做的,就是 服务负载均衡 和 rpc负载均衡

服务负载均衡

先了解一下nodejs cluster模块,下面是nodejs官方cluster例子代码

app.js 

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);

// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer((req, res) => {
res.writeHead(200);
res.end('hello world\n');
}).listen(8000);

console.log(`Worker ${process.pid} started`);
}
  1. 启动 app.js ,当前执行进程是主线程

  2. 然后会 fork 与cpu个数一样的worker进程

  3. worker进程默认执行 process.argv[1] 文件,即 app.js 

  4. 当非 master 进程程启动 http server ,每个worker进程启动一个

1.如何监听同一个端口

第一个问题:为什么多个进程server可以监听同一个port?

The first one (and the default one on all platforms except Windows), is the round-robin approach, where the master process listens on a port, accepts new connections and distributes them across the workers in a round-robin fashion, with some built-in smarts to avoid overloading a worker process.
第一种方法(也是除 Windows 外所有平台的默认方法)是循环法,由主进程负责监听端口,接收新连接后再将连接循环分发给工作进程,在分发中使用了一些内置技巧防止工作进程任务过载。

The second approach is where the master process creates the listen socket and sends it to interested workers. The workers then accept incoming connections directly.
第二种方法是,主进程创建监听 socket 后发送给感兴趣的工作进程,由工作进程负责直接接收连接。

The second approach should, in theory, give the best performance. In practice however, distribution tends to be very unbalanced due to operating system scheduler vagaries. Loads have been observed where over 70% of all connections ended up in just two processes, out of a total of eight.
理论上第二种方法应该是效率最佳的。但在实际情况下,由于操作系统调度机制的难以捉摸,会使分发变得不稳定。可能会出现八个进程中有两个分担了 70% 的负载。

官方支持2种方法,其实都是主进程负责监听端口,子进程会fork一个handle句柄给主线,通过循环分发或监听发送与worker进程通信,交替处理任务。

2.进程间如何通信

第二个问题:进程间如何通信?

1、主进程和子进程
主进程和子进程通过 IPC 通信

app.js 

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);

// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});

cluster.on('listening', (worker) => {
// send to worker
worker.send({message: 'from master'})
});

for (const id in cluster.workers) {
cluster.workers[id].on('message', (data)=>{
// receive by the worker
console.log('master message: ', data)
});
}

} else {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer((req, res) => {
res.writeHead(200);
res.end('hello world\n');
}).listen(8000);

console.log(`Worker ${process.pid} started`);

// send to master
process.send({message: 'from worker'})

process.on('message', (data)=>{
// receive by the master
console.log('worker message', data)
})
}

这是通过node的原生ipc通信,ipc通信方式有很多种

  • node原先ipc channel

  • shell stdin/stdout

  • socket

  • pipe

  • message queues

2、子进程与子进程

  • 一对多,可以通过父进程进行分发

  • 一对一,可以通过ipc通信

3.如何做到进程负载均衡

第三个问题:如何做到进程负载均衡?

服务器集群的负载均衡通过上层已经处理了(Nginx、DNS、VIP等),那node服务怎么做的?cluster采用 round-robin 算法策略分发http请求到不同worker进程,关于负载均衡算法下一章《nodejs负载均衡(二):RPC负载均衡》里面会讲

4.服务异常退出怎么办

第四个问题:服务异常退出怎么办?

  1. 一般可以通过 try/catch 捕获异常错误,但是node里面如果遗漏异常捕获,可能导致整个进程崩溃

  2. 使用 try/catch 就够了吗?异常会冒泡到 event loop ,触发 uncaughtException 事件,这里可以阻止程序退出

  3. node异常默认情况是打印 stderr 并以代码1退出,触发 exit 事件

  4. 当异常退出时,主线程监听到worker死亡,可以refork一个新的worker 

Tips: 退出的事件还有  Signal Events 

现在来看下 graceful.js 大概实现,在下一节会有完整的代码,完整案例查看github graceful-shutdown-example

'use strict';

module.exports = options => {
const { processKillTimeout = 3000, server } = options;

let throwErrorTimes = 0

process.on('uncaughtException', function(err) {
throwErrorTimes += 1;
console.log('====uncaughtException====');
console.error(err)

if (throwErrorTimes > 1) {
return;
}

close()
});

function close(){
server.close(() => {
// ...do something
})
}
};

5.如何平滑退出

第五个问题:如何平滑退出?

在发布时,多台机器分组发布,可以保证服务不会不可访问,但是:

  • 用户正在访问一台下线的服务,如何确保等待用户请求返回在下线?

  • 一个worker服务异常退出,如何平滑重启一个worker?

一个平滑退出的大概流程:

  1. fork worker

  2. 监听worker状态

  3. worker异常退出refork

  4. 监听master signal退出信号

  5. master退出前kill所有worker

  6. worker退出前close server和worker的子进程

// master.js
'use strict';

const cluster = require('cluster');
const killTree = require('./kill-tree');
const numCPUs = require('os').cpus().length;
// const numCPUs = 1;

let stopping = false;

console.log(`Master ${process.pid} is running`);

cluster.setupMaster({
exec: 'worker.js',
// silent: true,
});

// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

cluster.on('fork', worker => {
worker.on('message', data => {
// Receive by the worker
console.log(`${worker.process.pid} master message: `, data);
});
});

// Kill all workers
async function onMasterSignal() {
if (stopping) return;
stopping = true;

const killsCall = Object.keys(cluster.workers).map(id => {
const worker = cluster.workers[id];

return killTree(worker.process.pid);
});

await Promise.all(killsCall);
}

// kill(2) Ctrl-C
// kill(3) Ctrl-\
// kill(15) default
// Master exit
['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(signal => {
process.once(signal, onMasterSignal);
});

// Terminate the master process
process.once('exit', () => {
console.log(`Master about to exit`);
});

// Worker is listening
cluster.on('listening', (worker, address) => {
// Send to worker
worker.send({ message: 'from master' });
});

cluster.on('disconnect', worker => {
console.log(`${worker.id} disconnect`);
});

// Worker died
cluster.on('exit', (worker, code, signal) => {
console.log(
`Worker ${worker.process.pid} died, code: ${code}, signal: ${signal}`
);

worker.removeAllListeners();

// killTree(worker.process.pid, function(err) {
// console.log(err)
// });

// stopping server
if (stopping) return;

console.log('====Refork====');
// refork a new worker
cluster.fork();
});

setTimeout(() => {
cluster.workers[1].send({
action: 'throw error',
});
}, 600);
// worker.js
'use strict';

const http = require('http');
const { fork } = require('child_process');
const graceful = require('./graceful');

fork('./child');

// Workers can share any TCP connection
// In this case it is an HTTP server
const server = http
.createServer((req, res) => {
// services excption
try {
throw new Error('Happened error');
} catch (err) {
res.writeHead(200);
res.end(`${err.stack.toString()}`);
}
// console.log(res)
// res.setHeader('Content-Type', 'application/json');
// res.setHeader('Access-Control-Allow-Origin', '*');
// res.writeHead(200);
// res.end(JSON.stringify({ success: true }));
})
.listen(8000);

graceful({
server,
});

// Send to master
process.send({
message: 'from worker',
// server
});

process.on('message', data => {
// Receive by the master
if (data.action && data.action === 'throw error') {
// The process threw an exception
throw new Error('Kill myself');
}
console.log('Worker message', data);
});

**

// graceful.js
'use strict';

const cluster = require('cluster');
const killTree = require('./kill-tree');

module.exports = options => {
const { processKillTimeout = 3000, server } = options;

let throwErrorTimes = 0

process.on('SIGTERM', function onSigterm () {
console.info(`Only graceful shutdown, worker ${process.pid}`)
close()
})

process.on('uncaughtException', function(err) {
throwErrorTimes += 1;
console.log('====uncaughtException====');
console.error(err)

if (throwErrorTimes > 1) {
return;
}

close()
});

function close(){
server.on('request', (req, res) => {
// closing the http request
req.shouldKeepAlive = false;
res.shouldKeepAlive = false;
if (!res._header) {
// closing the socket connection
res.setHeader('Connection', 'close');
}
});

if (processKillTimeout) {
const timer = setTimeout(() => {
// Kill all child process
killTree(process.pid,()=>{
// Worker process to exit
process.exit(1);
})
}, processKillTimeout);

timer.unref && timer.unref();
}

const worker = cluster.worker;
if (worker) {
try {
server.close(() => {
try {
worker.send({ message: 'disconnect' });
worker.disconnect();
} catch (err) {
console.error('Error on worker disconnect');
}
});
} catch (err) {
console.error('Error on server close');
}
}
}
};

完整案例查看graceful-shutdown-example

6.守护进程或主进程挂了怎么办

第六个问题: 守护进程或主进程挂了怎么办?

防止出现单点故障,提供主从备份服务器。

7.主动停止服务

  1. 通过系统命令获取当前node进程信息

  2. 过滤停止脚本进程,获取启动脚本进程

  3. kill master进程,发送 SIGTERM 

  4. 主进程监听到 SIGTERM ,开始kill workers,停止server

// stop.js

const main = async () => {
const command = isWin
? 'wmic Path win32_process Where "Name = \'node.exe\'" Get CommandLine,ProcessId'
: // command, cmd are alias of args, not POSIX standard, so we use args
'ps -eo "pid,args" | grep node';
}

// ...
main().then((result)=>{
result.forEach((item)=>{
process.kill(item.pid, 'SIGTERM')
// killTree(item.pid)
});
})

// master.js

// kill(2) Ctrl-C
// kill(3) Ctrl-\
// kill(15) default
// Master exit
['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(signal => {
process.once(signal, onMasterSignal);
});

完整案例查看github graceful-shutdown-example,真正要实现一个合理node负载均衡框架,还需要做好 worker 管理及 IPC 通信机制、不同系统兼容性、dockersticky模式等等

下一章节再聊下 《nodejs负载均衡(二):RPC负载均衡》 的实现。