blog
blog copied to clipboard
GraphQL Subscription多用户订阅与通知(二)
GraphQL Subscription 多用户订阅与通知(二)
问题
在上一篇文章中,讲解了如何使用GraphQL Subscription实现多用户订阅与通知,但是有个问题,我们是在单个服务器应用实例(instance)的基础上实现的,什么意思?来讲下,我们启动一个Node.js服务器应用,比如常见的使用Express.js, Koa.js创建的Web Server应用,计算机系统都会分配资源(CPU,内存等)给一个进程,然后我们的Web Server应用就运行在这个进程中。在上一篇文章中,是通过如下命令启动了一个实例的GraphQL web server:
☁ apollo-graphql-tutorial [master] ⚡ npx ts-node src/subscriptions/main.ts
🚀 Server ready at http://localhost:3000/
🚀 Subscriptions ready at ws://localhost:3000/graphql
然后使用ps命令查看这个GraphQL web server的进程:
☁ apollo-graphql-tutorial [master] ⚡ ps aux | grep ts-node
ldu020 93228 0.0 0.0 4267752 632 s013 R+ 8:03PM 0:00.00 grep --color=auto --exclude-dir=.bzr --exclude-dir=CVS --exclude-dir=.git --exclude-dir=.hg --exclude-dir=.svn ts-node
ldu020 93174 0.0 0.9 4703956 146972 s011 S+ 8:02PM 0:07.11 node /usr/local/bin/npx ts-node src/subscriptions/main.ts
PID为93174的进程就是计算机系统分配给GraphQL web server应用程序的进程。
在上一篇文章开头提到过,Apollo Server 2.x提供的PubSub实现是基于Node.js的EventEmitter内部模块。因此,只能在单个应用实例(单个进程)中进行事件的发布订阅,或者说消息的发布与订阅。
然而却无法在多应用实例间(多个进程间)进行消息的发布与订阅,因为,每个进程都有自己的一部分独立的系统资源,彼此是隔离的 。什么意思?举个例子,由于进程间是系统资源隔离的,这里以内存为例,内存有栈内存和堆内存。我们使用JavaScript在应用程序中声明的变量,存在内存中,一个进程是无法获取其他进程内存中的变量的。同理,如果仅仅使用EventEmitter模块,A应用实例通过PubSub发送的消息,B应用实例是无法订阅到这条消息的。
因此,我们需要一种进程间通信(IPC)方案,来解决进程间(多个应用实例间)通信的问题。先给出本文GraphQL Subscription在多应用实例下的示意图:

图画的应该比较明白了,可供我们选择的IPC方案有很多,比如RabbitMQ,GCP的Cloud Pub/Sub,kafka等消息队列,PostgreSQL的NOTIFY和LISTEN命令。都可以实现在一个应用的多个实例,多个应用多个实例间,甚至应用集群和应用集群之间传递消息和数据的功能。
解决方案
本文为了示例简单,在本地环境通过pm2启动多个GraphQL web server的多个实例。并且手动指定客户端连接不同的web server实例,为了测试使用IPC后,GraphQL Subscription使用websocket进行消息推送的功能。不过在使用正确的方法之前,会先使用上一篇文章基于EventEmitter模块实现的PubSub方案,来验证下多服务器应用实例下,客户端收不到消息的场景。
使用pm2配合ecosystem文件启动4个GraphQL web server instances,ecosystem.config.js文件配置如下:
module.exports = {
apps: [
{
name: 'graphql-subscription-backend-server-instance-1',
interpreter: 'ts-node',
script: './src/subscriptions/main.ts',
port: 3001,
},
{
name: 'graphql-subscription-backend-server-instance-2',
interpreter: 'ts-node',
script: './src/subscriptions/main.ts',
port: 3002,
},
{
name: 'graphql-subscription-backend-server-instance-3',
interpreter: 'ts-node',
script: './src/subscriptions/main.ts',
port: 3003,
},
{
name: 'graphql-subscription-backend-server-instance-4',
interpreter: 'ts-node',
script: './src/subscriptions/main.ts',
port: 3004,
},
],
};
启动的4个GraphQL web server域名是localhost, 分别监听3001, 3002, 3003, 3004端口。这里为了演示,直接通过ts-node运行TypeScript编写的代码,生产环境请不要这么做。可以看到4个应用实例有各自的pid,表示运行在不同的进程中。

然后我们修改Altair GraphQL Client中每一个client(客户端用户)连接GraphQL web server的Subscription url地址端口号,让客户端按照示意图的方式与GraphQL web server各个实例连接,比如:

然后写好客户端Subscription:
subscription {
templateAdded {
id
name
shareLocationIds
}
}
点击"Send Request",成功建立websocket连接。如图所示:

可以通过pm2 logs命令查看各个 GraphQL web server instance 的日志:

ZOWI 用户(client 1, client 5)与 GraphQL web server instance-1 建立 websocket 连接,以及通过 HTTP 协议发送GraphQL mutation创建template
ZOLO 用户(client 2)与 GraphQL web server instance-2 建立 websocket 连接
ZEWI 用户(client 3)与 GraphQL web server instance-3 建立 websocket 连接
ZELO 用户(client 4)与 GraphQL web server instance-4 建立 websocket 连接
使用基于PostgreSQL的PubSub实现
接下来使用基于PostgreSQL NOTIFY/LISTEN命令的PubSub实现graphql-postgres-subscriptions,用来在多个GraphQL web server实例间进行消息传递(通信)。
本地启动PostgreSQL数据库服务,我这里使用Docker启动,启动后使用docker ps查看启动的PostgreSQL数据库服务容器

使用PostgreSQL实现的PubSub替换使用EventEmitter实现的PubSub,

在server.ts的createServer方法中,初始化postgresPubSub
const postgresPubSub = await createPostgresPubSub();
postgresPubSub.subscribe('error', console.error);
将postgresPubSub注入到templateConnector中
templateConnector: new TemplateConnector<IMemoryDB>(
memoryDB,
// pubsub,
postgresPubSub,
),
注入到SubscriptionContext中
const context: ISubscriptionContext = {
pubsub: postgresPubSub,
// pubsub,
subscribeUser: user,
userConnector,
locationConnector: new LocationConnector<IMemoryDB>(memoryDB),
};
在templateIterator方法中,可以拿到SubscriptionContext,从而拿到postgresPubSub
function templateIterator(__, ___, { pubsub }: ISubscriptionContext) {
return pubsub.asyncIterator([TriggerNameType.TEMPLATE_ADDED]);
}
关键的几处代码修改完毕。
接下来,使用ZOWI用户(client 5)开始发起一个addTemplate的GraphQL mutation。如下图所示:

先看下各个GraphQL web server实例的日志:

可以看到红色框标出来的日志,templateFilter方法在各个GraphQL web server实例上分别执行了一次,我在templateFilter方法中加了console.count('templateFilter'),用来进行debug,如下图:

说一下流程:ZOWI用户发起addTemplate的GraphQL mutation,创建template实体成功后,通过PubSub发送TEMPLATE_ADDED的消息,如下:

templateConnector.publish方法的实现为:
public publish(payload: any) {
this.pubsub.publish(TriggerNameType.TEMPLATE_ADDED, payload);
}
4个GraphQL web server实例都监听该消息,执行subscribe方法,如下图:

然后在templateFilter方法中,通过上下文可以拿到订阅用户的信息,以及发起addTemplate这个GraphQL mutation的用户信息,当然不仅限于这些信息,理论上可以通过上下文获取到任何信息,需要开发者自己去设计。根据这些信息,就可以实现过滤逻辑,什么消息,满足什么条件推送给客户端,不满足的则不推送。本例的过滤逻辑是:
- 不将新创建的
template推送给发起addTemplateGraphQLmutation这个用户自己。 - 新创建的
template上有sharelocationIds字段,推送给用户实体上locationId字段的值与sharelocationIds有交集;或者通过findLocationIdsByOrgId方法,通过orgId找到与之相关联的所有location的locationIds,与sharelocationIds有交集的用户。
基本流程和逻辑就说到这里,再次执行两次addTemplate,来看看客户端接收到的websocket信息情况。
首先是ZOWI用户

由于是他自己,所以被"过滤"掉,不会收到websocket消息。
ZOLO用户

ZOLO用户实体上的locationId为location 3,[location 3]与sharelocationIds([location 3])有交集,因此,要将新创建的template推送给该用户,可以看到收到的websocket消息,包含新创建的template。
同理,ZEWI用户

以及ZELO用户

使用基于EventEmitter的PubSub实现
这里简单看下结果就可以,发起addTemplate mutation后,查看各个web server实例的日志

只有web server instance 1自己订阅到了TEMPLATE_ADDED消息,并执行了templateFilter方法。其他web server instances无法订阅到web server instance 1发送的消息,因为他们运行在不同的进程中,无法使用EventEmitter进行通信。
ZOLO用户没有收到websocket推送的消息

ZEWI用户也没有收到消息

这篇文章针对的是开发阶段,在本地环境启动多个服务器应用实例,进行开发测试。对于线上环境,比如上到云环境,使用Cloud Kubernetes Engine水平扩展到多个Node,多个Pod,多个Container,App Engine,Compute Engine多实例,原理也是一样的。线上环境相对复杂,有很多额外的工作要做,如果有下一篇文章,会介绍如何使用云平台的这些服务,将这个GraphQL Subscription应用程序示例运行在这些服务上。
源码
https://github.com/mrdulin/apollo-graphql-tutorial/tree/master/src/subscriptions
参考
- https://www.postgresql.org/docs/9.6/sql-notify.html
- https://github.com/emilbayes/pg-ipc
- GraphQL Subscription多用户订阅与通知(一)
- https://github.com/apollographql/graphql-subscriptions
- https://github.com/GraphQLCollege/graphql-postgres-subscriptions
good,好哥哥