Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Koa-spring: Node进程通信的实践 #42

Open
closertb opened this issue Dec 19, 2019 · 0 comments
Open

Koa-spring: Node进程通信的实践 #42

closertb opened this issue Dec 19, 2019 · 0 comments
Labels
Server Node服务及其他后端服务相关

Comments

@closertb
Copy link
Owner

closertb commented Dec 19, 2019

关于这篇文章

  • Node多进程
  • 进程通信
  • 闭包与立即执行

如果你感兴趣,可以fork项目,自己体验一下
Koa-springhttps://github.com/closertb/koa-spring
related-client: https://github.com/closertb/koa-spring-client
技术栈:koa + Sequelize + routing-controllers + typescript
上篇:Koa-spring:后端太忙,让我自己写服务(上)
下篇:Koa-spring:后端太忙,让我自己写服务(下)

Node多进程

进程与线程

  • 进程:是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础,进程是线程的容器(来自百科),进程是资源分配的最小单位。每个进程都拥有自己的独立空间地址、数据栈,一个进程无法访问另外一个进程里定义的变量、数据结构,只有建立了 IPC 通信,进程之间才可数据共享。
  • 线程:线程是操作系统能够进行运算调度的最小单位,首先我们要清楚线程是隶属于进程的,被包含于进程之中。一个线程只能隶属于一个进程,但是一个进程是可以拥有多个线程的。

这里只立个概念,有很多文章是讲Node的进程与线程的,这里推荐(很多文章年代久远,但都是精华):

开始前,先强调一个点:

开启多进程不是为了解决高并发,主要是解决了单进程单线程模式下 Node.js CPU 利用率不足的情况,充分利用多核 CPU 的性能。

进程通信

需求是这样的,在第一篇提到过鉴权中间件,系统登录后,我需要缓存用户的登录状态,下一次用户发起请求,根据缓存直接校验token是否有效,简单粗暴,依赖的库是memory-cache,没有上高大上的Redis。开始时我的系统是单进程的,操作简单:

  1. 用户登录时,缓存cache,cache.put(id, user, expirations);
  2. 鉴权时,根据请求携带的id和token, 校验是否和缓存中缓存的一致:token === cache.get(id).token;

简单粗暴高效,直到我为了程序更健壮,用cluster模式开启了多进程,显然,上面的方案不再奏效,进程与进程间的cache是分别存在不同的内存块的,所以,只有换方案,仍然没有上redis,而是考虑用进程间的通信来解决。下面是一个简单的示意图:

image

简单来讲,就是利用主进程与工作进程之间的IPC通道进行通信,登录成功后,工作进程将鉴权信息发送给主进程,主进程进行存储。下一次请求发起鉴权时,工作进程给主进程发送一个读取缓存的消息,并开启监听;主进程监听到这个消息,并读取缓存中的鉴权信息,然后再发送给工作进程;工作进程收到后,判断携带的token是否和主进程缓存的token一致,一致则鉴权通过,部分示例代码:

// index.ts
const worker = cluster.fork();
//监听message事件
worker.on('message', (msg: ActionBody) => {
  const { type, payload } = msg;
  if(type == 'readCache') {
    const { uid, id } = payload;
    const res = cache.get(id) || {};
    worker.send({ type: 'sendCache', uid, payload: res })
  }
  if(type == 'saveCache') {
    cache.put(payload.id, payload, ExpiredTime);
  }
});

// userController.ts, 发起缓存鉴权信息消息
process.send({ type: 'saveCache', payload: res });

// AuthCheckMiddleWare.ts
function readCache(id: string) {
  return new Promise((res, rej) => {
    process.on('message', (m) = {
      if (m.uid === id) {
          res(m.payload);
      } else {
        rej({});
      }
    });
    process.send({ type: 'readCache', payload: { id, uid: id }});
  });
}


const user: any = await readCache(uid);
if(user && user.token === token) {
    ctx.user = user;
    await next();
} else {
  ctx.body = {
    code: 120001,
    message: uid ? '登录超时,请重新登录' : '请先登录',
    status: 'error'
  };
}

上面展示的是一个简易版的进程通信,在低并发请求时,运行是没有问题的。但如果你是老江湖,就会发现很多bug。

闭包与立即执行函数

上一节展示的代码,有多少bug呢?简单列举一下:

  • process.on重复订阅,即每发起一次鉴权,就会添加一次订阅。和浏览器的监听一样,如果重复去订阅监听,那就会重复响应;
  • 由于通信是异步的,所以发起的订阅在同一时间,并发量高一点,就会存在响应堆积,所以维护一个响应队列才是更好的做法;
  • 存在reject的情形,程序并没有相应的错误处理;
  • 会不会存在主进程一直未响应或工作进程未收到响应的情况?

所以基于以上问题,对readCache函数作了如下改动:

type Callback = (m: ActionBody) => boolean;

function generateUid() {
  const random = Math.floor(26 * Math.random() + 65);
  return `${Date.now()}-${String.fromCharCode(random)}`;
}

// 回调队列
const callbackList: Array<Callback> = [];

process.on('message', (m: ActionBody = { type: 'sendCache' }) => {
  let tempList = callbackList.slice();
  tempList.forEach((callback, i) => {
    callback = tempList[i];
    if (callback && callback(m)) {
      callbackList.splice(i, 1); // 成功处理了响应的或则响应已过期,移除这个回调;
    }
  });
});

function addCallback(callback: Callback) {
  callbackList.push(callback);
}

function readCache(id: string) {
  return new Promise((res, rej) => {
    try {
      const uid = generateUid();
      addCallback(((uid: string) => {
        let status = false;
        let timeout = setTimeout(() => { // 5秒超时读取,防止永久未回调,导致回调一直存在于回调列表中: 可能性很小
          rej({ message: '授权验证超时' });
          status = true;
        }, 5000);
        return (m: ActionBody) => {
        // 必须在过期前响应
        if (!status && m.uid === uid) {
          clearTimeout(timeout);  
          res(m.payload);
          return true;
        }
        return status;
      }
      })(uid));
      process.send({ type: 'readCache', payload: { id, uid }});
    } catch (error) {
      rej(error);
    }
  });
}

可以看到,针对上面提到的问题,作了如下几方面的改进:

  • 维护了一个callbackList响应回调列表;
  • 每次收到响应,对回调列表一一执行,响应成功的,从回调列表删除;
  • 生成唯一的uid,保证消息的发送与响应一一对应;
  • 利用setTimeout逻辑,来处理超时响应;

通信的优化就讲完了,但为什么和章节标题闭包与立即执行半毛钱关系没有。是有的,我又在闭包上跌了一跤,为了长记性,我故意用了这样一个标题。事情经过是这样的,开始添加回调函数不是上面那样写的,而是下面这样:

try {
  const uid = generateUid();
  addCallback((m: ActionBody) => {
    // 必须在过期前响应
    if (m.uid === uid) {
      res(m.payload);
      return true;
    }
    return false;
  }
  });
  process.send({ type: 'readCache', payload: { id, uid }});
}

看起来没什么毛病,无并发运行起来也没毛病。但当我加大并发量时,就会偶尔报权限错误,最后一排查,发现是m.uid === uid这一行的uid与期望的不一致,被篡改成了下一次请求生成的uid(抱头思考N分钟,哦,原来,闭包的锅)。解决闭包最好的方法是什么,立即执行函数,然后才有了上面的写法。如果对还原这个过程有兴趣,可以在主进程的响应里,添加一个500ms的延迟。

至此,我的这一次前端向Node服务端的探索告一段落,但好多想做的还没实现。愿下一次机会来临时,有能力让Graphql在我的服务中落地,用了Github的API V4后, 发现Graphql对架构要求极高,这能力怎么学,还得倒腾,倒腾。

系列索引

上篇:Koa-spring:后端太忙,让我自己写服务(上)
下篇:Koa-spring:后端太忙,让我自己写服务(下)

@closertb closertb added JS 原生JS相关 Server Node服务及其他后端服务相关 labels Dec 19, 2019
@closertb closertb removed the JS 原生JS相关 label Dec 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Server Node服务及其他后端服务相关
Projects
None yet
Development

No branches or pull requests

1 participant