pm2 reload 命令代码阅读

调研了一下 pm2 的 reload 功能。pm2 的很多功能都没有在文档中体现,这一点不太好。

需要对照着代码看,源码阅读文章都只是个记录,只看文章肯定看不懂。pm2 版本 4.5.6

TLDR:直接看本文的 hardReload 部分

CLI

命令行工具使用 commander 来实现,调用的是 PM2 类中的 reload 方法:

1
2
3
4
5
commander.command('reload <id|name|namespace|all>')
.description('reload processes (note that its for app using HTTP/HTTPS)')
.action(function(pm2_id) {
pm2.reload(pm2_id, commander);
});

PM2.reload

PM2 类在文件 lib/API.js 中。

检查目前是否已经在 reload 了

1
2
3
4
5
var delay = Common.lockReload();
if (delay > 0 && opts.force != true) {
Common.printError('xxx);
return cb ? cb(new Error('Reload in progress')) : that.exitCli(conf.ERROR_EXIT);
}

如果第二个参数是配置文件,会进行配置文件的解析

1
2
3
4
5
6
7
if (Common.isConfigFile(process_name))
that._startJson(process_name, opts, 'reloadProcessId', function(err, apps) {
Common.unlockReload();
if (err)
return cb ? cb(err) : that.exitCli(conf.ERROR_EXIT);
return cb ? cb(null, apps) : that.exitCli(conf.SUCCESS_EXIT);
});

由于我们重点关注 reload 过程,解析部分就不看了,直接到 else 部分。

调用 _operate 方法,action_name 为 reloadProcessId

1
2
3
4
5
6
7
that._operate('reloadProcessId', process_name, opts, function(err, apps) {
Common.unlockReload();

if (err)
return cb ? cb(err) : that.exitCli(conf.ERROR_EXIT);
return cb ? cb(null, apps) : that.exitCli(conf.SUCCESS_EXIT);
});

所以其实主要的操作都是在 _operate 中进行的。

_operate

开头处理了一堆环境变量,可以先跳过。

之后创建了一个 processIds 函数,该函数就是真正执行 action 的地方。

后面有几个 if 判断,是为了兼容 pm2 reload 后面跟正则、名称等情况,最终都是要转成 processId,再调用 processIds 函数处理。

所以我们直接看 processIds 函数。

processIds 里面使用了 async/eachLimit 来遍历执行所有的 id,并且限制了同时执行的并发数。

1
2
3
4
5
6
7
8
9
10
11
12
13
function processIds(ids, cb) {
Common.printOut(conf.PREFIX_MSG + 'Applying action %s on app [%s](ids: %s)', action_name, process_name, ids);

if (ids.length <= 2)
concurrent_actions = 1;

if (action_name == 'deleteProcessId')
concurrent_actions = 10;

eachLimit(ids, concurrent_actions, function(id, next) {
}, function(err) {
});
}

这里面的 concurrent_actions 代码的就是同时有几个进程在重启。

eachLimit 第一个参数为需要遍历的数组,第二个参数为并发数,第三个参数为要执行的函数,最后一个参数为执行完毕后的回调。

所以直接看第三个参数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function (id, next) {
var opts;

// These functions need extra param to be passed
if (action_name == 'restartProcessId' ||
action_name == 'reloadProcessId' ||
action_name == 'softReloadProcessId') {
}
else {
}

that.Client.executeRemote(action_name, opts, function (err, res) {


return next();
});
}

前面又是一些参数的处理,直接看 that.Client.executeRemote

executeRemote 用于 Client 给 God 发消息,pm2 中 God 是最终控制子进程的类

executeRemote 方法在 lib/Client.js 中定义,做了一些初始化操作,直接关注重点,就是这句:

1
return self.client.call(method, app_conf, fn);

这里面的 client 指的是 rpc 的 client,容易混淆。接收消息的自然就是 rpc 的 server,这个 server 在 lib/Daemon.js 中进行定义,暴露了 God 上的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
server.expose({
// ...

startProcessId : God.startProcessId,
stopProcessId : God.stopProcessId,
restartProcessId : God.restartProcessId,
deleteProcessId : God.deleteProcessId,

sendLineToStdin : God.sendLineToStdin,
softReloadProcessId : God.softReloadProcessId,
reloadProcessId : God.reloadProcessId,

// ...
});

现在看 God.reloadProcessId 就可以了!

God.reloadProcessId

其实到这里才真正开始

God.reloadProcessId 在 lib/God/Reload.js中进行定义。

这里面给 God 加了两个方法,softReloadProcessIdreloadProcessIdsoftReloadProcessId 会等待进程同意才会 reload。

pm2 reload 执行的就是 reloadProcessId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
God.reloadProcessId = function(opts, cb) {
var id = opts.id;
var env = opts.env || {};

if (!(id in God.clusters_db))
return cb(new Error('PM2 ID unknown'));

if (God.clusters_db[id].pm2_env.status == cst.ONLINE_STATUS &&
God.clusters_db[id].pm2_env.exec_mode == 'cluster_mode') {

Utility.extend(God.clusters_db[id].pm2_env.env, opts.env);
Utility.extendExtraConfig(God.clusters_db[id], opts);

var wait_msg = God.clusters_db[id].pm2_env.wait_ready ? 'ready' : 'listening';
return hardReload(God, id, wait_msg, cb);
}
else {
console.log('Process %s in a stopped status, starting it', id);
return God.restartProcessId(opts, cb);
}
};

好吧,又调用了 hardReload,传入了对应的进程 id。

hardReload

这个函数是最终执行 reload 的方法,他做了以下几件事情:

  1. 根据要重启的进程 id,生成一个新的 key:'_old_' + id
  2. 把旧进程的索引 key,从索引中删掉,替换成新的 key。
  3. 复制当前进程的参数,调用 executeApp 方法新建一个进程。
  4. 进程创建完成后,调用 deleteProcessId 方法将旧进程删除。