Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node源码粗读(11):透过console.log来看stream.write #24

Open
xtx1130 opened this issue Apr 23, 2018 · 0 comments
Open

node源码粗读(11):透过console.log来看stream.write #24

xtx1130 opened this issue Apr 23, 2018 · 0 comments

Comments

@xtx1130
Copy link
Owner

xtx1130 commented Apr 23, 2018

这篇文章从console.log入手,对流的写入进行层层剖析。

console.log浅析

console.log的实现浅析很容易,直接转到console.js中:

// ./lib/console.js

// ...
Console.prototype.log = function log(...args) {
  write(this._ignoreErrors,
        this._stdout,
        this[kFormatForStdout](args),
        this._stdoutErrorHandler,
        this[kGroupIndent]);
};

其中write函数定义如下:

function write(ignoreErrors, stream, string, errorhandler, groupIndent) {
 // ...
  try {
    // Add and later remove a noop error handler to catch synchronous errors.
    stream.once('error', noop);

    stream.write(string, errorhandler);
  } catch (e) {
    // console is a debugging utility, so it swallowing errors is not desirable
    // even in edge cases such as low stack space.
    if (isStackOverflowError(e))
      throw e;
    // Sorry, there's no proper way to pass along the error here.
  } finally {
    stream.removeListener('error', noop);
  }
}

stream则是在创建实例的时候传递进去的:

module.exports = new Console({
  stdout: process.stdout,
  stderr: process.stderr
});

由此可见,console.log实质是process.stdout.write

process.stdout的实现

接下来,我们直接溯源到process.stdout,相关源码在stdio.js中:

// ./lib/internal/process/stdio.js
// ...
function getStdout() {
    if (stdout) return stdout;
    stdout = createWritableStdioStream(1);
    stdout.destroySoon = stdout.destroy;
    stdout._destroy = function(er, cb) {
      // Avoid errors if we already emitted
      er = er || new ERR_STDOUT_CLOSE();
      cb(er);
    };
    if (stdout.isTTY) {
      process.on('SIGWINCH', () => stdout._refreshSize());
    }
    return stdout;
  }
// ...
Object.defineProperty(process, 'stdout', {
    configurable: true,
    enumerable: true,
    get: getStdout
  });
// ...
function createWritableStdioStream(fd) {
  var stream;
  const tty_wrap = process.binding('tty_wrap');

  // Note stream._type is used for test-module-load-list.js

  switch (tty_wrap.guessHandleType(fd)) {
    case 'TTY':
      var tty = require('tty');
      stream = new tty.WriteStream(fd);
      stream._type = 'tty';
      break;

    case 'FILE':
      var fs = require('internal/fs');
      stream = new fs.SyncWriteStream(fd, { autoClose: false });
      stream._type = 'fs';
      break;

    case 'PIPE':
    case 'TCP':
      var net = require('net');
      stream = new net.Socket({
        fd: fd,
        readable: false,
        writable: true
      });
      stream._type = 'pipe';
      break;

    default:
      // Probably an error on in uv_guess_handle()
      throw new ERR_UNKNOWN_STREAM_TYPE();
  }

由此可见,process.stdout本质上是getStdout,而getStdout调用了createWritableStdioStream方法来创建的stdout,最终由tty.WriteStream创建了一个stream并返回给getStdout

net.socket介绍

调用栈就不再一一溯源了,在这里简单介绍一下。
tty中,会调用:

// ./lib/tty.js
  net.Socket.call(this, {
    handle: tty,
    readable: false,
    writable: true
  });
inherits(WriteStream, net.Socket);

进而在net.js中通过:

stream.Duplex.call(this, options);

最终生成了一个Duplex流(可读写流,网上资料很多不做过多介绍)。而Duplex流的write操作则来自_stream_writable.js:

// ./lib/_stream_duplex.js
function Duplex(options) {
  // ...
  Writable.call(this, options);
  //...
  if (options && options.readable === false)
    this.readable = false;

  if (options && options.writable === false)
    this.writable = false;

}

至此,可以列出基本的观光路线了:

console.js -> stdio.js ->tty.js -> net.js -> _stream_duplex.js -> _stream_writable.js

接下来才真正开始进行源码的解读。

process.stdout.write在js层对数据的写入

我们重点放到_stream_writable.js中,在创建Writable类的时候会进行如下操作:

// ./lib/_stream_writable.js
function Writable(options) {
// ...
if (options) {
    if (typeof options.write === 'function')
      this._write = options.write;

    if (typeof options.writev === 'function')
      this._writev = options.writev;

    if (typeof options.destroy === 'function')
      this._destroy = options.destroy;

    if (typeof options.final === 'function')
      this._final = options.final;
  }
// ...
}

重点注意一下this._write = options.write;在这里我就不详细溯源了,其实options.write来自net.js中的:

// ../lib/net.js
Socket.prototype._write = function(data, encoding, cb) {
 // ...
  var req = createWriteWrap(this._handle, afterWrite);
  if (writev)
    writevGeneric(this, req, data, cb);
  else
    writeGeneric(this, req, data, encoding, cb);
  if (req.async)
    this[kLastWriteQueueSize] = req.bytes;
};

writev是处理多数据流块用的,在这里我们只关注writeGeneric(this, req, data, encoding, cb);

createWriteWrap以及writeGeneric来自模块stream_base_commons.js

// ./lib/internal/stream_base_commons.js
const { WriteWrap } = process.binding('stream_wrap');
function createWriteWrap(handle, oncomplete) {
  var req = new WriteWrap();

  req.handle = handle;
  req.oncomplete = oncomplete;
  req.async = false;

  return req;
}

function writeGeneric(self, req, data, encoding, cb) {
  var err = handleWriteReq(req, data, encoding);

  afterWriteDispatched(self, req, err, cb);
}

代码只有简单的两行,却是最终console.log能最终打印到控制台的关键代码。

stream.write的底层实现

我们先详细解读一下WriteWrap,WriteWrap来自内部stream_wrap,视线直接转移到stream_wrap.cc中,其中有对WriteWrap的定义:

void LibuvStreamWrap::Initialize(Local<Object> target,
                                 Local<Value> unused,
                                 Local<Context> context) {
  // ...
  Local<FunctionTemplate> ww =
      FunctionTemplate::New(env->isolate(), is_construct_call_callback);
  ww->InstanceTemplate()->SetInternalFieldCount(StreamReq::kStreamReqField + 1);
  Local<String> writeWrapString =
      FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap");
  ww->SetClassName(writeWrapString);
  AsyncWrap::AddWrapMethods(env, ww);
  target->Set(writeWrapString, ww->GetFunction());
  env->set_write_wrap_template(ww->InstanceTemplate());
  // ...
}

在这里稍微注意下AsyncWrap::AddWrapMethods,这是AsyncWrap专门针对异步操作增加getAsyncId方法用的API。我们转到stream_wrap的头文件stream_wrap.h中看看LibuvStreamWrap是如何定义的:

// ./src/stream_wrap.h
class LibuvStreamWrap : public HandleWrap, public StreamBase {
 public:
  // ...
  int GetFD() override;
  bool IsAlive() override;
  bool IsClosing() override;
  bool IsIPCPipe() override;

  // JavaScript functions
  int ReadStart() override;
  int ReadStop() override;

  // Resource implementation
  int DoShutdown(ShutdownWrap* req_wrap) override;
  int DoTryWrite(uv_buf_t** bufs, size_t* count) override;
  int DoWrite(WriteWrap* w,
              uv_buf_t* bufs,
              size_t count,
              uv_stream_t* send_handle) override;
 // ...
  ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object) override;
  WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object) override;
}

LibuvStreamWrap继承于HandleWrapStreamBase,这样就很好理解了。HandleWrap包裹的是libuv handle,而StreamBase则完全是关于stream的操作,StreamBase提供基本的读写能力的同时,开放出来一些方法让子类去覆写,实现定制化的需求,例如DoTryWriteStreamBase中在写入流的时候调用的方法,而这个方法是专门提供出来让子类去覆写的。LibuvStreamWrap在构建的时候,通过覆写一些方法,实现对流定制化的操作。

stream.write的写入过程

接下来我们看一下流的写入过程,js方面调用的API就不过多介绍了,通过js可以定位到stream_base.cc中的函数:

int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
  //..
  if (try_write) {
    data_size = StringBytes::Write(env->isolate(),
                                   stack_storage,
                                   storage_size,
                                   string,
                                   enc);
    buf = uv_buf_init(stack_storage, data_size);

    uv_buf_t* bufs = &buf;
    size_t count = 1;
    err = DoTryWrite(&bufs, &count);

    synchronously_written = count == 0 ? data_size : data_size - buf.len;
    bytes_written_ += synchronously_written;
    // ...
    }
  }
  //..
}

很容易注意到,真正的写入过程是刚才所提到的DoTryWrite,我们跟进一下DoTryWrite,由于这个函数被覆写了,所以我们直接去stream_wrap.cc

int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
  int err;
  size_t written;
  uv_buf_t* vbufs = *bufs;
  size_t vcount = *count;

  err = uv_try_write(stream(), vbufs, vcount);
  if (err == UV_ENOSYS || err == UV_EAGAIN)
    return 0;
  if (err < 0)
    return err;

  written = err;
  for (; vcount > 0; vbufs++, vcount--) {
    // Slice
    if (vbufs[0].len > written) {
      vbufs[0].base += written;
      vbufs[0].len -= written;
      written = 0;
      break;
    // Discard
    } else {
      written -= vbufs[0].len;
    }
  }
  *bufs = vbufs;
  *count = vcount;
  return 0;
}

可以看到,其实最终写入调用的函数是libuv的uv_try_writeuv_try_write本质和uv_write是一样的,只不过uv_try_write会预先判断这个流能不能立即完成,如果不能立即完成,则不会将写入请求排入queue中。可以简单看一下uv_try_write代码,在stream.c中:

int uv_try_write(uv_stream_t* stream,
                 const uv_buf_t bufs[],
                 unsigned int nbufs) {
  int r;
  int has_pollout;
  size_t written;
  size_t req_size;
  uv_write_t req;

  /* Connecting or already writing some data */
  if (stream->connect_req != NULL || stream->write_queue_size != 0)
    return UV_EAGAIN;

  has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);

  r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
  if (r != 0)
    return r;
  // ...
}

可以看到最终,还是使用r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);uv_write实现的流的写入操作。

console.log写入之后的回调

流的写入流程已经分析完了,接下来看一下写入之后做了什么。视线回到_stream_writable.js中:

function onwrite(stream, er) {
  var state = stream._writableState;
  var sync = state.sync;
  var cb = state.writecb;
  // ...
    if (sync) {
      process.nextTick(afterWrite, stream, state, finished, cb);
    } else {
      afterWrite(stream, state, finished, cb);
    }
  }
}

js的代码我就不一一溯源了,onwrite函数可以从Writable.prototype.write中追溯过来,由于state.synctrue所以会走入到 process.nextTick(afterWrite, stream, state, finished, cb);这块的逻辑。而nextTick在创建TickObject时,会有如下逻辑:

// ./lib/internal/process/next_tick.js
  class TickObject {
    constructor(callback, args, triggerAsyncId) {
      // this must be set to null first to avoid function tracking
      // on the hidden class, revisit in V8 versions after 6.2
      this.callback = null;
      this.callback = callback;
      this.args = args;

      const asyncId = newAsyncId();
      this[async_id_symbol] = asyncId;
      this[trigger_async_id_symbol] = triggerAsyncId;

      if (initHooksExist()) {
        emitInit(asyncId,
                 'TickObject',
                 triggerAsyncId,
                 this);
      }
    }
  }

nextTick在创建之初,便被AsyncWrap包裹,并且作为异步事件的'TickObject'资源类型。所以在任何涉及到AsyncHooks回调中调用console.log,会直接导致AsyncHooks无限递归调用
nextTick说完了,那么大家可能还会好奇,console.log通过nextTick执行了什么回调,简单的从上面代码来看是afterWriteafterWrite代码如下:

// ./lib/_stream_writable.js
function afterWrite(stream, state, finished, cb) {
  if (!finished)
    onwriteDrain(stream, state);
  state.pendingcb--;
  cb();
  finishMaybe(stream, state);
}

可以看到最终调用的回调函数是cb,这个cb在哪里进行定义的呢?经过溯源,这个cb的定义出现在console.js中:

var prop = {
    writable: true,
    enumerable: false,
    configurable: true
  };
prop.value = createWriteErrorHandler(stdout);
Object.defineProperty(this, '_stdoutErrorHandler', prop);
function createWriteErrorHandler(stream) {
  return (err) => {
    if (err !== null && !stream._writableState.errorEmitted) {
      if (stream.listenerCount('error') === 0) {
        stream.on('error', noop);
      }
    }
  };
}

createWriteErrorHandler()是这个cb的真面目。代码意图也很明显,意即有错误的时候才会触发其中的逻辑,并且如果在错误没有被监听的情况下,增加错误监听函数noop(空函数)。

结语

至此,整个console.log的过程分析完毕,由于其中涉及到的部分很多,和整体逻辑关系不大的部分,例如tty_wrap.cc等没有逐步分析,而且js的调用栈由于篇幅关系,也没有一一分析。本篇文章可以帮助读者把console.logstream以及AsyncHook串联起来,其中省略一些溯源步骤还望读者见谅。

by 小菜

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant