Nodejs v11.7.0 发布后,woker_threads API 就进入基本稳定状态,不需要在运行时候加上 --experimental-worker
标识了。
worker 对于执行 CPU 密集型操作非常有用,但对 I/O 密集型操作没有多大帮助,使用内置异步 I/O 操作效率更高。
API 简介
创建一个 worker 线程很简单:
1 2 3
| const { Worker } = require("worker_threads"); const worker = new Worker(`${__dirname}/worker.js`);
|
这里是在相同的目录创建一个文件 worker.js,通过加载这个文件来创建线程,这个和前端 worker 基本一致,不过 node.js 也提供了一个同文件创建线程的方式:
1 2 3 4 5 6 7 8
| const { Worker, isMainThread } = require("worker_threads");
if (isMainThread) { const worker = new Worker(__filename); } else { }
|
如果要向 worker 线程传递数据的话,可以通过事件通知的方式来传递数据:
1
| worker.postMessage(value);
|
value 可以是合法地 JS 数据,不过这里会拷贝数据,并且拷贝数据的逻辑要遵循 HTML 结构化克隆算法。
结构化克隆算法是由 HTML5 规范定义的用于复制复杂 JavaScript 对象的算法。通过来自 Workers 的 postMessage() 或使用 IndexedDB 存储对象时在内部使用。它通过递归输入对象来构建克隆,同时保持先前访问过的引用的映射,以避免无限遍历循环。结构化克隆所不能做到的: Error 以及 Function 对象是不能被结构化克隆算法复制的;如果你尝试这样子去做,这会导致抛出 DATA_CLONE_ERR 的异常。2、企图去克隆 DOM 节点同样会抛出 DATA_CLONE_ERROR 异常。3、对象的某些特定参数也不会被保留,RegExp 对象的 lastIndex 字段不会被保留;属性描述符,setters 以及 getters(以及其他类似元数据的功能)同样不会被复制。例如,如果一个对象用属性描述符标记为 read-only,它将会被复制为 read-write,因为这是默认的情况下;原形链上的属性也不会被追踪以及复制。https://developer.mozilla.org/zh-CN/docs/Web/Guide/API/DOM/The_structured_clone_algorithm
相应的主线程接收数据则可以通过 worker.on("message", ...)
接收数据:
1 2 3
| worker.on("message", resp => { });
|
相应的 Worker 线程收发数据和主线程基本一致,不过这里的事件对象换成了 parentPort,而这个对象可以直接从 worker 包获取
那么 worker 线程可以使用 parentPort.on("message", ...)
接收数据:
1 2 3 4 5 6 7 8
| const { parentPort } = require("worker_threads");
parentPort.on("message", buf => { parentPort.postMessage(buf); }); });
|
主线程除了可以直接 postMessage 传递数据,另外也可以创建 worker 的时候传递。
1 2 3
| const worker = new Worker(__filename, { workerData: "data" });
|
相应的 worker 线程接受则换成了 workerData
1
| const { workerData } = require("worker_threads");
|
最基本的 API 介绍完毕后就可以写一个简单的 demo ,从 worker 线程中获取随机数据。
index.js 暴露随机数据接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| const { Worker } = require("worker_threads");
const random = (size /** number */) => { return new Promise((resolve, reject) => { const worker = new Worker(`${__dirname}/worker.js`);
worker.postMessage(size);
worker.on("message", (resp ) => { const { data, error } = resp; if (error) { reject(err); } else { resolve(data); } worker.terminate(); });
worker.on("error", (err ) => { reject(err); worker.terminate(); }); }); };
module.exports = random;
|
worker.js worker 线程逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| const { parentPort } = require("worker_threads"); const { randomBytes } = require("crypto"); parentPort.on("message", size => { const response = { error: null, data: null };
randomBytes(size, (err, buf) => { if (err) { response.err = err; } else { response.data = buf; }
parentPort.postMessage(response); }); });
|
test.js 测试调用
1 2 3 4 5 6 7
| const random = require("./thread"); (async () => { console.time("Worker mode"); const result = await random(32); result.toString("hex"); console.timeEnd("Worker mode"); })().catch(console.error);
|
结果数据:Worker mode: 63.023ms
使用正常的模式看看耗时
1 2 3 4 5 6 7 8 9 10
| const { randomBytes } = require("crypto"); console.time("Normal mode"); randomBytes(32, (err, buf) => { if (err) { console.error(err); } else { buf.toString("hex"); } console.timeEnd("Normal mode"); });
|
结果数据:Normal mode: 0.383ms
耗时挺严重的,这是上面我们的例子中每次调用都会创建销毁一个线程,这个耗时最大了,而且线程间拷贝传递数据也会耗时。
所以官方推荐使用线程池模式以及使用 SharedArrayBuffer
传递数据避免拷贝。
线程池 Demo
这里提供一个我写的一个线程池 demo 并不是很完善。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
| const EventEmitter = require("events"); const { Worker } = require("worker_threads");
const WorkerStates = { TODO: 0, READY: 1, DOING: 2, OFF: 3 };
const WorkerPoolStates = { TODO: 0, READY: 1, OFF: 2 };
class SHA256 { constructor() { this.size = 10; this.workers = []; this.state = WorkerPoolStates.TODO; }
init() { return new Promise((resolve, reject) => { if (this.state == WorkerPoolStates.READY) { resolve(); return; }
let successCount = 0; let failedCount = 0;
const event = new EventEmitter(); event.on("spawning", (isSuccess, ErrorReason) => { if (isSuccess) { ++successCount; } else { ++failedCount; }
if (failedCount == this.size) { this.state = WorkerPoolStates.OFF; reject(new Error(ErrorReason)); } else if (successCount != 0 && successCount + failedCount == this.size) { this.state = WorkerPoolStates.READY; resolve(); } });
for (let i = 0; i < this.size; ++i) { const worker = new Worker(`${__dirname}/worker.js`); this.workers.push({ state: WorkerStates.TODO, instance: worker });
worker.on( "online", (index => () => { this.workers[index].state = WorkerStates.READY; this.workers[index].instance.removeAllListeners(); event.emit("spawning", true); })(i) );
worker.on( "error", (index => ErrorReason => { this.workers[index].state = WorkerStates.OFF; this.workers[index].instance.removeAllListeners(); event.emit("spawning", false, ErrorReason); })(i) ); } }); }
digest(data = "") { return new Promise((resolve, reject) => { if (this.state != WorkerPoolStates.READY) { reject(new Error("Create threads failed or not ready yet")); }
let curAvaWorker = null; let curAvaWorkerIndex = 0;
for (let i = 0; i < this.size; ++i) { const curWorker = this.workers[i]; if (curWorker.state == WorkerStates.OFF) { recreate(i); } if (curAvaWorker == null && curWorker.state == WorkerStates.READY) { curWorker.state = WorkerStates.DOING; curAvaWorker = curWorker.instance; curAvaWorkerIndex = i; } }
if (curAvaWorker == null) { return; }
curAvaWorker.on("message", msg => { this.free(curAvaWorkerIndex, false); if (!msg.error) { resolve(msg.data); return; } reject(msg.error); });
curAvaWorker.once("error", error => { this.free(curAvaWorkerIndex, true); reject(error); });
curAvaWorker.postMessage(data); }); }
recreate(i) { const worker = new Worker(`${__dirname}/worker.js`); const deadWorker = this.workers[i]; deadWorker.state = WorkerStates.TODO; deadWorker.instance = worker;
worker.once("online", () => process.nextTick(() => { deadWorker.state = WorkerStates.READY; worker.removeAllListeners(); }) );
worker.once("error", error => { console.error(error); deadWorker.state = WorkerStates.OFF; worker.removeAllListeners(); }); }
free(i, hasError) { this.workers[i].status = hasError ? WorkerStates.READY : WorkerStates.OFF; this.workers[i].instance.removeAllListeners(); }
terminate() { this.state = WorkerPoolStates.OFF; return new Promise((resolve, reject) => { for (let i = 0; i < this.size; ++i) { this.workers[i].instance.terminate(err => { if (!err && i == this.size) { resolve(); } else { reject(err); } }); } }); } }
module.exports = new SHA256();
|