博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
将Socket.io添加到多线程Node.js
阅读量:2523 次
发布时间:2019-05-11

本文共 13742 字,大约阅读时间需要 45 分钟。

One of the disadvantages of Node is that it is single-threaded. Of course, there is a way around it — namely a module called cluster. Cluster enables us to spread our application over multiple threads.

Node的缺点之一是它是单线程的。 当然,有一种解决方法-即称为cluster的模块 集群使我们能够将应用程序分布在多个线程上。

Now, however, a new problem presents itself. See, our code being run across multiple instances actually has some significant downsides. One of them is not having global states.

但是,现在出现了一个新问题。 看到,我们在多个实例上运行的代码实际上有一些重大缺点。 其中之一没有全球状态。

Normally, in a single-threaded instance, this would not be much of a worry. For us now it changes everything.

通常,在单线程实例中,不必担心。 现在对我们来说,它改变了一切。

Let’s see why.

让我们看看为什么。

那么,有什么问题呢? (So, what is the problem?)

Our application is a simple online chat running on four threads. This enables a user to be logged in at the same time on their phone and computer.

我们的应用程序是在四个线程上运行的简单在线聊天。 这使用户可以同时在其手机和计算机上登录。

Imagine that we have sockets set up exactly the way we would have set them for one thread. In other words we now have one big global state with sockets.

想象一下,我们完全按照设置一个线程的方式来设置套接字。 换句话说,我们现在有了一个带有套接字的全局状态。

When the user logs in on their computer, the website opens up the connection with a Socket.io instance on our server. The socket is stored in the state of thread #3.

当用户登录其计算机时,该网站将打开与我们服务器上的Socket.io实例的连接。 套接字以线程#3的状态存储。

Now, imagine the user goes to the kitchen to grab a snack and takes their phone with them — naturally wanting to keep texting with their friends online.

现在,假设用户去厨房拿点心,并随身携带手机-自然是想与朋友在线保持发短信。

Their phone connects to thread #4, and the socket is saved in the thread’s state.

他们的电话连接到线程4,套接字被保存为线程状态。

Sending a message from their phone will do the user no good. Only people from thread #3 are going to be able to see the message. That is because the sockets saved on thread #3 are not somehow magically stored on threads #1, #2 and #4 as well.

从他们的手机发送消息对用户没有好处。 只有线程#3的人才能看到该消息。 那是因为保存在线程#3上的套接字也没有以某种方式神奇地存储在线程#1,#2和#4中。

Funny enough, even the user themself is not going to see their messages on their computer once they come back from the kitchen.

有趣的是,即使他们自己从厨房回来,他们自己也不会在计算机上看到他们的消息。

Of course, when they refresh the website, we could send a GET request and fetch the last 50 messages, but we cannot really say it is the ‘dynamic’ way, can we?

当然,当他们刷新网站时,我们可以发送GET请求并获取最后50条消息,但是我们不能真正说这是“动态”方式,可以吗?

为什么会这样呢? (Why is this happening?)

Spreading our server over multiple threads is in some way tantamount to having several separate servers. They do not know about each other’s existence and certainly do not share any memory. This means that an object on one instance does not exist on the other.

在多个线程上分布我们的服务器在某种意义上等于拥有几个单独的服务器。 他们不知道彼此的存在,当然也不共享任何记忆。 这意味着一个实例上的对象在另一实例上不存在。

Sockets saved in thread #3 are not necessarily all the sockets that the user is using at the moment. If the user’s friends are on different threads, they are not going to see the user’s messages unless they refresh the website.

保存在线程#3中的套接字不一定是用户当前正在使用的所有套接字。 如果用户的朋友位于不同的线程上,则除非刷新网站,否则他们将不会看到用户的消息。

Ideally, we would like to notify other instances about an event for the user. This way we can be sure that every connected device is receiving live updates.

理想情况下,我们想通知其他实例有关用户的事件。 这样,我们可以确保每个连接的设备都在接收实时更新。

一个办法 (A solution)

We can notify other threads by using ’ publish/subscribe (pubsub).

我们可以使用的发布/订阅 ( pubsub )通知其他线程。

Redis is an open source (BSD-licensed) in-memory data structure store. It can be used as a database, cache and message broker.

Redis是一种开源( BSD许可)的内存数据结构 商店。 它可以用作数据库,缓存和消息代理。

This means that we can use Redis to have events distributed between our instances.

这意味着我们可以使用Redis在实例之间分配事件。

Note that normally we would probably store our entire structure inside Redis. However, since the structure is not serializable and needs to be kept “alive” inside the memory, we are going to store part of it on each instance.

请注意,通常我们可能会将整个结构存储在Redis中。 但是,由于该结构不可序列化,需要在内存中保持“有效”状态,因此我们将在每个实例上存储其一部分。

(The flow)

Let’s now think about the steps in which we are going to handle an incoming event.

现在让我们考虑一下处理传入事件的步骤。

  1. The event called message comes to one of our sockets — this way, we do not have to listen for every possible event.

    名为message的事件到达了我们的套接字之一-这样,我们不必侦听每个可能的事件。

  2. Inside the object passed to the handler of this event as an argument, we can find the name of the event. For example, sendMessage.on('message', ({ event }) =>{}).

    在作为参数传递给该事件的处理程序的对象内,我们可以找到事件的名称。 例如, sendMessage.on('message', ({ event }) =>{})

  3. If there is a handler for this name, we are going execute it.

    如果有该名称的处理程序,我们将执行它。
  4. The handler may execute dispatch with a response.

    处理程序可以通过响应执行调度

  5. The dispatch sends the response event to our Redis pubsub. From there it gets emitted to each one of our instances.

    调度将响应事件发送到我们的Redis pubsub 从那里时发出我们的实例中的每一个。

  6. Each instance emits it to their socketsState, ensuring every connected client is going to receive the event.

    每个实例都将其发送到其socketsState,以确保每个连接的客户端都将接收事件。

Seems complicated, I know, but bear with me.

似乎很复杂,我知道,但是请忍受我。

实作 (Implementation)

Here is the with the environment ready, so that we do not have to install and set everything up ourselves.

这是已经准备好环境的 ,因此我们不必自己安装和设置所有内容。

First, we are going to set up a server with Express.

首先,我们将使用Express设置服务器。

import * as moduleAlias from 'module-alias';moduleAlias.addAliases({  src: __dirname,});import * as express from 'express';import * as http from 'http';import * as socketio from 'socket.io';const port = 7999;const app = express();const server = http.createServer(app);const io = initSocket(socketio(server).of('/socket'));server.listen(port, () => {  console.log(`Listening on port ${port}.`);});

We create an Express app, HTTP server and init sockets.

我们创建一个Express应用,HTTP服务器和初始化套接字。

Now we can focus on adding sockets.

现在我们可以集中精力添加套接字。

We pass the Socket.io’s server instance to our function in which we set the middlewares.

我们通过 Socket.io的服务器实例是我们设置中间件的函数。

const initSocket = (instance: socketio.Namespace): socketio.Namespace =>  instance.use(onAuth).use(onConnection);

onAuth (onAuth)

The onAuth function simply imitates a mock authorization. In our case it is token-based.

onAuth函数只是模仿一个模拟授权。 在我们的情况下,它是基于令牌的。

Personally, I would probably replace it with in the future, but it is not enforced in any way.

就个人而言,将来我可能会用代替它,但是它不会以任何方式强制执行。

const onAuth: SocketMiddleware = (socket, next) => {  const { token, id }: { token: string; id: string } =    socket.request._query || socket.request.headers;  if (!token) {    return next(new Error('Authorization failed, no token has been provided!'));  }  // mock  const user = checkToken(token, id);  socket.user = user;  return next();};

Now, let’s move on to the onConnection middleware.

现在,让我们继续使用onConnection中间件。

onConnection (onConnection)

const onConnection: SocketMiddleware = (socket, next) => {  if (!socket.user) {    return next(new Error('Something went wrong.'));  }  const { id } = socket.user;  socketsState.add(id, socket);  socket.on('message', ({ event, args }) => {    const handler = handlers[event];    if (!handler) {      return null;    }    return handler && handler({ id, args });  });  socket.on('disconnect', () => {    return socketsState.remove(id, socket);  });  return next();};

Here we see that we retrieve the user’s id, which was set in the previous middleware, and save it in our socketsState, with the key being the id and the value being an array of sockets.

在这里,我们看到我们检索了用户的id ,它是在先前的中间件中设置的,并将其保存在我们的socketsState中,键是id,值是套接字数组。

Next, we listen for the message event. Our entire logic is based on that — every event the frontend sends us is going to be called: message.

接下来,我们监听消息事件。 我们的整个逻辑是基于这一点的-前端发送给我们的每个事件都将被称为: message

The name of the event will be sent inside the arguments object — as stated above.

如上所述,事件名称将在arguments对象内部发送。

处理程序 (Handlers)

As you can see in onConnection, specifically in the listener for the message event, we are looking for a handler based on the event’s name.

如您在onConnection中看到的,特别是在消息事件的侦听器中,我们正在根据事件的名称寻找处理程序。

Our handlers is simply an object in which the key is the event name and the value is the function. We will use it to listen for events and respond accordingly.

我们的处理程序只是一个对象,其中键是事件名称,值是函数。 我们将使用它来监听事件并做出相应的响应。

const dispatchTypes = {  MESSAGE_SENT: 'message_sent',  POST_UPDATED_NOTIFICATION: 'post_updated_notification',};interface Handlers {  [key: string]: ({ id, args }: { id: string; args: any }) => any;}const handlers: Handlers = {  sendMessage: async ({ id, args }) => {    // await sendMessageToUser();    dispatch({      id,      event: dispatchTypes.MESSAGE_SENT,      args: {        message: `A message from user with id: ${id} has been send`,      },    });  },  postUpdated: async ({ id, args }) => {    dispatch({      id,      event: dispatchTypes.POST_UPDATED_NOTIFICATION,      args: {        message: 'A post you have been mentioned in has been updated',      },    });  },};export = handlers;

Also, later on, we are going to add the dispatch function and use it to send the event across the instances.

另外,稍后,我们将添加调度功能,并使用它在实例之间发送事件。

套接字状态 (SocketsState)

We know the interface of our state, but we have yet to implement it.

我们知道状态的接口,但尚未实现。

We add methods for adding and removing a socket, as well as for emitting an event.

我们添加了添加和删除套接字以及发出事件的方法。

import * as socketio from 'socket.io';interface SocketsState {  [id: string]: socketio.Socket[];}const socketsState: SocketsState = {};const add = (id: string, socket: socketio.Socket) => {  if (!socketsState[id]) {    socketsState[id] = [];  }  socketsState[id] = [...socketsState[id], socket];  return socketsState[id];};const remove = (id: string, socket: socketio.Socket) => {  if (!socketsState[id]) {    return null;  }  socketsState[id] = socketsState[id].filter((s) => s !== socket);  if (!socketsState[id].length) {    socketsState[id] = undefined;  }  return null;};const emit = ({  event,  id,  args,}: {  event: string;  id: string;  args: any;}) => {  if (!socketsState[id]) {    return null;  }  socketsState[id].forEach((socket) =>    socket.emit('message', { event, id, args }),  );  return null;};export { add, remove, emit };

The add function checks whether the state has a property which is equal to the user’s id. If that is the case, then we simply add it to our already existing array. Otherwise, we create a new array first.

add函数检查状态是否具有等于用户ID的属性。 如果是这种情况,那么我们只需将其添加到我们现有的数组中即可。 否则,我们首先创建一个新数组。

The remove function also checks if the state has the user’s id in its properties. If not — it does nothing. Otherwise, it filters the array to remove the socket from the array. Then if the array is empty it removes it from the state, setting the property to undefined.

删除功能还检查状态是否在其属性中具有用户的ID。 如果没有,它什么也不做。 否则,它将过滤阵列以从阵列中移除套接字。 然后,如果数组为空,则将其从状态中删除,并将属性设置为undefined

Redis的pubsub (Redis’ pubsub)

For creating our pubsub we are going to use the package called node-redis-pubsub.

为了创建我们的pubsub,我们将使用名为node-redis-pubsub的包。

import * as NRP from 'node-redis-pubsub';const client = new NRP({  port: 6379,  scope: 'message',});export = client;

添加调度 (Adding dispatch)

Ok, now all that’s left to do is to add the dispatch function…

好的,现在剩下要做的就是添加调度功能…

const dispatch = ({  event,  id,  args,}: {  event: string;  id: string;  args: any;}) => pubsub.emit('outgoing_socket_message', { event, id, args });

…and add a listener for outgoing_socket_message. This way, each instance receives the event and sends it to the user’s sockets.

…并为outbound_socket_message添加一个侦听器。 这样,每个实例都会接收事件并将其发送到用户的套接字。

pubsub.on('outgoing_socket_message', ({ event, id, args }) =>  socketsState.emit({ event, id, args }),);

使它们全部成为多线程 (Making it all multi-threaded)

Finally, let’s add the code needed for our server to be multi-threaded.

最后,让我们添加服务器多线程所需的代码。

import * as os from 'os';import * as cluster from 'cluster';const spawn = () => {  const numWorkes = os.cpus().length;  for (let i = 0; i < numWorkes; i += 1) {    cluster.fork();  }  cluster.on('online', () => {    console.log('Worker spawned');  });  cluster.on('exit', (worker, code, status) => {    if (code === 0 || worker.exitedAfterDisconnect) {      console.log(`Worker ${worker.process.pid} finished his job.`);      return null;    }    console.log(      `Worker ${        worker.process.pid      } crashed with code ${code} and status ${status}.`,    );    return cluster.fork();  });};export { spawn };
import * as moduleAlias from 'module-alias';moduleAlias.addAliases({  src: __dirname,});import * as express from 'express';import * as http from 'http';import * as cluster from 'cluster';import * as socketio from 'socket.io';import * as killPort from 'kill-port';import { initSocket } from 'src/common/socket';import { spawn } from 'src/clusters';const port = 7999;if (cluster.isMaster) {  killPort(port).then(spawn);} else {  const app = express();  const server = http.createServer(app);  const io = initSocket(socketio(server).of('/socket'));  server.listen(port, () => {    console.log(`Listening on port ${port}.`);  });}

Note: We have to kill the port, because after quitting our Nodemon process with Ctrl + c it just hangs there.

注意:我们必须终止该端口,因为使用Ctrl + c退出Nodemon进程后,该端口仅挂在该端口上。

With a little tweaking, we now have working sockets across all instances. As a result: a much more efficient server.

稍作调整,我们现在可以在所有实例上使用工作套接字。 结果:服务器效率更高。

Thank you very much for reading!

非常感谢您的阅读!

I appreciate that it all might seem overwhelming at first and strenuous to take it all in at once. With that in mind, I highly encourage you to read the code again in its entirety and ponder it as a whole.

我赞赏这一切乍一看似乎让人不知所措,并且想一次全部收下就很费劲。 考虑到这一点,我强烈建议您再次阅读该代码,并从整体上考虑它。

If you have any questions or comments feel free to put them in the comment section below or send me a .

如果您有任何问题或评论,请随时将其放在下面的评论部分或给我发送 。

Check out my !

看看我的 !

!

Originally published at on September 10, 2018.

最初于年9月10日发布在上。

翻译自:

转载地址:http://cdwzd.baihongyu.com/

你可能感兴趣的文章
静态链接与动态链接的区别
查看>>
Android 关于悬浮窗权限的问题
查看>>
如何使用mysql
查看>>
linux下wc命令详解
查看>>
敏捷开发中软件测试团队的职责和产出是什么?
查看>>
在mvc3中使用ffmpeg对上传视频进行截图和转换格式
查看>>
python的字符串内建函数
查看>>
Spring - DI
查看>>
微软自己的官网介绍 SSL 参数相关
查看>>
Composite UI Application Block (CAB) 概念和术语
查看>>
64位MATLAB和C混合编程以及联合调试
查看>>
原生js大总结二
查看>>
PHP基础
查看>>
UVa 11488 超级前缀集合(Trie的应用)
查看>>
Django 翻译与 LANGUAGE_CODE
查看>>
[转]iOS教程:SQLite的创建数据库,表,插入查看数据
查看>>
【转载】OmniGraffle (一)从工具栏开始
查看>>
初识ionic
查看>>
java 中打印调用栈
查看>>
开发 笔记
查看>>