Node.js StreamAPI から逃げちゃダメだ。逃げちゃダメだ。。

業務でnode.jsを使ったシステムを保守エンハンスをしていますが。
長い間理解できなかった、重い腰を上げてStreamAPIについて勉強したので、覚え書きをします。

Node.jsの世界で有名な格言として
「Stream を制するものは Node.jsを制す」
と言われております。さあNodeを我が物にしましょう。

※ StreamAPIのバージョンはStream1/Stream2/Stream3とありますが、今回はStream1の話になります。
※ 業務で使っているnode.jsのバージョンがv0.8なので。
※ v0.10系からStream2。v0.12系からStream3になっています。背景違いなどは追ってキャッチアップする予定。

StreamAPIの情報

勉強に用いたソース。

公式Doc

参考記事

Streamとは

データの流れを扱うAPIです。
Node.jsが出る前に存在した概念で、もろもろ調べたが下記問題を解決する概念らしい。

  • 大量データをメモリ上に展開する際に生じる問題
  • 一連のI/O処理の最適化問題
  • コールバック地獄 ※ node.jsに限る

アイディアとしては、データを一気に処理するのではなく、ある単位(破片)で読み込み、
後工程に回して処理するようにしたこと。

メモリ上に展開する読み込みデータも抑えられ、かつI/O待ちをしながらも後続の処理を実行できるので
処理全体のスピードを最適化できるのがStreamAPIのスゴイところです。

要はバケツリレー的な感じです。

Node.js における StreamAPIの実装方法

Node.js本体にStreamインターフェースが用意されております。
もちろん、インターフェースなので必要なイベント/プロパティ/メソッドをStreamAPIの概念を理解して実装する必要があります。
実装に必要なメソッドについて書いていきます。

データの読み込みを行う ReadableStream

全てのストリームはEventEmitterのインスタンスです。

基本的に覚えておくのは下記イベントとメソッド

  • Event:data
    • function (data) {} データ読み込み時に発生するdataイベント。断片的に読み込んだデータを扱います。
  • Event:end
    • function () {} データ読み込み完了時に発生する endイベント。データ読み込み完了時に行いたい処理、リソースの開放などを記述します。
  • Stream.pause()
    • データの読み込みを止める時に呼び出します。
  • Stream.resume()
    • データの読み込みを再開する時に呼び出します。
  • Stream.pipe(writableStream)
    • 自身の読み込みストリームを書き込みストリームに接続する時に呼び出します。Unixの|(パイプ)でデータを次に渡して処理する概念を踏襲している。 (例: $ cat hoge.txt | grep HOGE )

WritableStream

Readable Stream で断片的に読み込まれるデータを応用する時に用いるストリーム。
書き込み可能なストリームと言うらしいが、個人的にはピンときていないので、読み込んだデータを使って応用するものだと解釈している。

基本的に覚えておくのは下記イベントとメソッド

  • Event:drain
    • function () {} 読み込みストリームをpauseしていた状態から、メモリが空いたなどでデータを読み込み可能になった状態を示す。
  • Event:pipe
    • function (src) {} 読み込みストリームsrcが自身のwritableストリームにpipe()された時に発生するイベント。
  • Stream.write(data)
    • 読み込みストリームでdataイベントが発生した時に呼ばれるメソッド。このメソッドでデータを加工します。これ以上読み込みたくない時はfalseを返す。引き続きデータを読み込むときにはtrueを返すのが決まり。

練習問題

実際コードを書いてみないと理解できないので、練習問題を作りました。
ldjson形式の読み込みファイルを用意。

練習1. ReadableStream

問題

practice1.txt
fs.createReadStream を用いて、sample.ldjson を10byteづつ読み込んで、
dataイベント発生時に、読み込んだ内容を 標準出力(console.log) してください。

Readableストリームを使ってみましょうという問題。

解答例

practice1.js

"use strict"; var fs = require('fs'); var fileName = '100.ldjson'; var fileReaderStream = fs.createReadStream(fileName, {bufferSize: 10});//내부적으로 10byte씩 읽어낼 때마다 events.emit를 보냄


// bufferSize 옵션사라짐. 아래 사이트 참고 https://github.com/nodejs/node-v0.x-archive/issues/5098 // highWaterMark사용할 것. 아래 사이트 참고 https://stackoverflow.com/questions/11784172/nodejs-readstream-not-reading-buffersize-bytes-at-a-time

fileReaderStream.setEncoding('utf8'); var count = 0; fileReaderStream.on('data', function(data) { //모든 스트림객체는 이벤트 인스턴스이기때문에 on가능. 스트림객체가 읽혀지기 시작하는 것이 emit 이기 때문에 emit은 별도로 쓰지 않아도 된다. count++; console.log(count + ': ' + data); }); fileReaderStream.resume();

出力

断片的にデータが読み込まれることがわかると思います。

練習1.out
1: {"gender":
2:  "male", "
3: age": 51, 
4: "weapon": 
5: "HokoYari"
6: , "power":
7:  222347}
{
...略...

練習2. ReadableStream -> WritableStream

問題

練習2.txt
練習1で作成したStream に1行ずつ改行毎にDataイベントを発生させるWritableなLineStreamを接続(pipe)してください。LineStreamのdataイベント発生時に、読み込んだ内容を 標準出力(console.log) してください。

ReadableストリームとWritableストリームとの連結方法、Writableストリームの使い方を掴みましょう。

practice2.js
var LineStream = require('./line_stream');

var fileName = 'sample.ldjson'
var fileReaderStream = require('fs').createReadStream(fileName, {bufferSize: 10});
fileReaderStream.setEncoding('utf8');

var lineStream = new LineStream()
var index = 0;
lineStream.on('data', function(data) {
  index++;
  console.log(index + ': ' + data);
});

fileReaderStream.pipe(lineStream);
lineStream.resume();
line_stream.js
"use strict";

module.exports = LineStream;

var stream = require('stream');
var util = require('util');

function LineStream() {
    this.writable = true;
    this.readable = true;
    this.ended = false;
    this.paused = true;
    this.encoding = 'utf8';
    this.buf = '';
};

util.inherits(LineStream, stream.Stream);

/* readable stream function */
LineStream.prototype.setEncoding = function (encoding) {
  this.encoding = encoding;
};

LineStream.prototype.pause = function () {
  this.paused = true;
};

LineStream.prototype.resume = function () {
  this.paused = false;
  /* TODO: 'drain' */
};

/* writable stream function */
LineStream.prototype.write = function (data) {
  if (this.ended || this.paused) {
    return false;
  }
  if (Buffer.isBuffer(data)) {
    this.buf += data.toString(this.encoding); 
  } else {
    this.buf += data;
  }
  this.searchLine();
  return true;
};

LineStream.prototype.searchLine = function () {
  while(true) {
    var pos = 0;
    var index = this.buf.indexOf('\n');
    var line;
    if (index > -1) {
      line = this.buf.substring(pos, index);
      this.emit('data', line);
      this.buf = this.buf.substring(index + 1);
      continue;
    }
    break;
  }
};

LineStream.prototype.pipe = function (dest) {
  this.on('data', function(data) {
    dest.write(data);
  });
  return dest;
};

LineStream.prototype.end = function () {
  if (this.buf) {
    this.emit('data', this.buf);
  }
  this.ended = true;
  this.destroy();
};

LineStream.prototype.destroy = function () {
  this.ended = true;
  this.readable = false;
  this.writable = false;
  this.emit('end');
  this.emit('close');
};

解答例

出力
練習2.out
1: {"gender": "male", "age": 51, "weapon": "HokoYari", "power": 222347}
2: {"gender": "male", "age": 69, "weapon": "HokoYari", "power": 562426}
3: {"gender": "female", "age": 70, "weapon": "pike", "power": 871983}
4: {"gender": "female", "age": 94, "weapon": "HokoYari", "power": 204004}
... 略 ...

練習3. ReadableStream -> 流入制限したWritableStream

問題

練習3.txt
練習2のLineStreamを改良して、1秒間に1行ずつ`data`イベントを発生するストリームに変更しましょう。

ストリームの肝である、pause()/resume()/drainイベントの一連の流れを掴みましょう。

解答例

practice3.js
"use strict";
var fs = require('fs');
var LineRestrictStream = require('./line_restrict_stream');

var fileName = '100.ldjson';
var fileReaderStream = fs.createReadStream(fileName, {bufferSize: 10});
fileReaderStream.setEncoding('utf8');

var lineRestrictStream = new LineRestrictStream(100);

var count = 0;
lineRestrictStream.on('data', function(data) {
  count++;
  console.log(count + ' : ' + data);
});

lineRestrictStream.on('drain', function() {
  //console.log('  LineRestrictStream Event \'drain\' occurred.')
  fileReaderStream.resume();
});

lineRestrictStream.on('pipe', function(src) {
  lineRestrictStream.setSrcStream(src);
});

fileReaderStream.pipe(lineRestrictStream);
lineRestrictStream.resume();
line_restrict_stream.js
"use strict";

module.exports = LineRestrictStream;

var stream = require('stream');
var util = require('util');

function LineRestrictStream(interval) {
    this.srcStream;
    this.writable = true;
    this.readable = true;
    this.ended = false;
    this.paused = true;
    this.encoding = 'utf8';
    this.bufStr = '';
    this.buf = [];

    // For Stream Restriction
    this.interval = interval || 1000;
    this.lastDequeuedAt = 0;
    this.hasTimer = false;
};

util.inherits(LineRestrictStream, stream.Stream);

/* readable stream function */
LineRestrictStream.prototype.setEncoding = function (encoding) {
  this.encoding = encoding;
};

LineRestrictStream.prototype.pause = function () {
  this.paused = true;
};

LineRestrictStream.prototype.resume = function () {
  this.paused = false;
  this.dequeue();
};

LineRestrictStream.prototype.setSrcStream = function (srcStream) {
  this.srcStream = srcStream;
};

/* writable stream function */
LineRestrictStream.prototype.write = function (data) {
  if (this.ended || this.paused) {
    return false;
  }
  if (Buffer.isBuffer(data)) {
    data = data.toString(this.encoding);
  }
  this.bufStr += data;
  this.searchLine();
  if (this.buf.length > 5) {
    return false;
  }
  this.dequeue();
  return true;
};

LineRestrictStream.prototype.searchLine = function () {
  while(true) {
    var pos = 0;
    var index = this.bufStr.indexOf('\n');
    var line;
    if (index > -1) {
      line = this.bufStr.substring(pos, index);
      this.enqueue(line);
      this.bufStr = this.bufStr.substring(index + 1);
      continue;
    }
    break;
  }
};

LineRestrictStream.prototype.enqueue = function (line) {
  this.buf.push(line);
};

LineRestrictStream.prototype.dequeue = function (isTimerProcess) {
  if (!isTimerProcess && this.hasTimer) {
      return;
  }
  if (this.buf.length == 0) {
    this.emit('drain');
    return;
  }
  this.startTimer();
};

LineRestrictStream.prototype.startTimer = function () {
  var now = Date.now();
  var diff = now - this.lastDequeuedAt;

  if (diff < this.interval) {
    if (!this.hasTimer) {
      this.setTimer(this.interval - diff);
    }
    return;
  }
  console.log('  \tYou waited ' + diff + 'ms.' + ' (queue_size: ' + this.buf.length +')');
  this.lastDequeuedAt = now;
  this.emit('data', this.buf.shift());
  this.setDequeueTimer(this.interval);
};

LineRestrictStream.prototype.setDequeueTimer = function (time) {
  var self = this;
  setTimeout(function() {
    self.dequeue(true);
    this.hasTimer = false;
    if (self.buf.length == 0) {
      self.emit('drain');
    }
  }, time);
  this.hasTimer = true;
};


LineRestrictStream.prototype.pipe = function (dest) {
  this.on('data', function(data) {
    dest.write(data);
  });
  return dest;
};

LineRestrictStream.prototype.end = function () {
  if (this.buf) {
    this.dequeue();
  }
  this.ended = true;
  this.destroy();
};

LineRestrictStream.prototype.destroy = function () {
  this.ended = true;
  this.readable = false;
  this.writable = false;
  this.emit('end');
  this.emit('close');
};

出力

練習3.out
    You waited 1434933713380ms. (queue_size: 1)
1 : {"gender": "male", "age": 51, "weapon": "HokoYari", "power": 222347}
    You waited 119ms. (queue_size: 6)
2 : {"gender": "male", "age": 69, "weapon": "HokoYari", "power": 562426}
    You waited 101ms. (queue_size: 5)
3 : {"gender": "female", "age": 70, "weapon": "pike", "power": 871983}
    You waited 101ms. (queue_size: 4)
4 : {"gender": "female", "age": 94, "weapon": "HokoYari", "power": 204004}
    You waited 101ms. (queue_size: 3)
5 : {"gender": "male", "age": 74, "weapon": "sword", "power": 549572}
    You waited 101ms. (queue_size: 2)
... 略 ...

まとめ

当初想定していたより、記述量が多い印象を持った。Stream2/Stream3ではどの程度改善されたのかは追って調べたい。

Node.jsは随所にstreamを用いた実装がされているので、この少しnode.jsと仲良くなれた気がします。
理解に相当時間かかってしまった。。ふぅー。


+ Recent posts