Stream API入門

この記事は最終更新日から1年以上が経過しています。

Nodeのアドベントカレンダー、既に終わった枠が空いていて、この際書きたいネタがあったんで参加しました。宜しくお願いします。

アドベントカレンダーの時期だけ出没する弱い日曜Haskellerです。普段の実務ではNode.jsにお世話になってます。宜しくお願いします。

さて、みなさんStream API使ってますか?Node.jsといったら非同期ですよね、やっぱり。しかしながら、JavaScriptでも他の言語でも、非同期処理自体は注目されているものの、まだexperimentalという感じで様々なAPIが考案されては消えていき、また元々そういう文化が根強くなかったところから来た人たちにとって、こういう文化はちょっと立ち入りづらいところもあるかもしれませんね。

今日は、主にそういう人たちに向けて、まず非同期の色々なAPIの紹介、そしてその中でのストリームのメリット、そして実際のStream APIの使い方の紹介、そしてRxJSのススメをちょこっとだけやっておきます。

非同期処理とは

さて、非同期処理とはそもそもどんなものでしょうか?I/Oで便利とか、メモリの圧迫を防げるとか、色々言われるわけですが、実際にはそれらは非同期処理の実装による恩恵であって、非同期処理自体は、あなたがNode.jsを使っていれば身近にある、次のような処理のことです。

const fs = require('fs');

fs.readFile('/path/to/some', (err, data) => {
  if (err) {
    console.error(err);
    return;
  }
  console.log(`size: ${data.length}B`);
});

このプログラムは、/path/to/someというファイルを読み込んで、そのファイルサイズを出力してくれるものです。しかしながら、注意しなければいけないのは、多くのプログラミング言語と違い、このプログラムのフロー自体はfs.readFileでブロックされるわけではなく、そのまま流れていくということです。どういうことかは動作を見てもらった方が分かりやすいと思うので、以下のようなプログラムに修正して実行結果を見てみましょう。

// test.js

const fs = require('fs');

fs.readFile('/path/to/some', (err, data) => {
  if (err) {
    console.error(err);
    return;
  }
  console.log(`size: ${data.length}B`);
});

console.log('readFile done');

/path/to/someに16byteのファイルがある時、このプログラムの実行結果は、以下のようになります。

$ node test.js
readFile done
size: 16B

これが非同期の処理の特徴になります。このようなAPIが標準で提供されている処理系はNode.js以外にはそうそう無いでしょう。多くの言語の処理系で提供されているような処理も、もちろんNode.jsでも可能です。それは次のような処理です。

// test-sync.js

const fs = require('fs');

const data = fs.readFileSync('/path/to/some');
console.log(`size: ${data.length}B`);

console.log('readFileSync done');

このプログラムの実行結果は、以下のようになります。

$ node test-sync.js
size: 16B
readFileSync done

さっきと出力順が逆です。このような処理は、非同期処理と対応して同期処理と呼ばれます。つまり、同期処理とはプログラムの処理が同期され順に実行されていく処理のこと、非同期処理とは互いの処理が同期されず、いつ呼ばれるかのタイミングが分からずにそれぞれ実行される処理のことです。

さて、ここまでの説明で、あなたは、「非同期処理とはようは並列処理のことか」と思ったかもしれません。しかしながら、Node.jsの多くのプログラムはシングルスレッドで動いています。つまり、一つの仕事を行なっている間、別の仕事をする術は、標準APIには用意されていません。

どういうことでしょうか?あなたは上のプログラムの実行イメージを次のように考えたでしょう。

node-async-bad-image.png

しかしながら、実際の動作はあなたの考えた図とは異なります。Node.jsの動作は以下のような図になっているのです。

node-async-eventloop-image.png

そして、fs.readFileは定期pollによって1イベントを監視するイベント処理を投入するのです。もっと簡単に言えば、大体のI/O操作はシステム(OS)に任せることができ、readFileなどの操作はシステムに任せきることで、Node.jsでは本来の処理に戻り、システムから通知があった段階でコールバックを走らせるということを行います。これはループ処理によってキューがなくなるまで行われる上のような循環図になりますが、誤解を恐れずもっと分かりやすく書くなら、以下のような処理になっているということです。

fsreadfile-node-async-image.png

そしてそれぞれのイベント処理がどのタイミングで呼ばれるかは、実装とキューの中身の構成に依存します。

もっと詳細を知りたい方は、今となっては少々機能が足され、実装が変わっていたりしますが、大枠の仕組みは変わっていないので、大津さんのこの資料が参考になると思います2

さて、これにより、あなたは分かり難いスレッドモデルなどよりもよっぽど簡単に、非同期な処理を実現できます。シングルスレッド内で、容易にインスタンスを共有できますし3、スレッドの発行や終了タイミングについて特に気にかける必要はありません。

非同期処理のAPI色々

さて、ここまでで、あなたはNode.jsにおける非同期処理の実装について、多少の理解ができたと思います。問題なのは、パフォーマンスが優秀でも、やはり非同期処理は並列処理と同様にプログラミングを行う上で扱いにくいということです。私たちは、多くの場合同期的な処理を求めていますし、パフォーマンスに問題がなければ、全てを同期的な処理で書いたでしょう。

Node.jsは、そのパフォーマンスを最大限に引き出すために、古来から様々なAPIを用意してきました。代表的なものがコールバックを受け取る関数です。そしてもう一つがEventEmitterとStreamです。また、それに合わせて、JavaScriptでも近代的な手法を取り入れるべく、様々なAPIが取り入れられてきました。PromiseとGeneratorは既にNodeには搭載されており、それに加え次期Node v8ではasync/awaitの搭載が予定されています。

それぞれの手法が一体どういう用途に向けてのものなのか、それぞれどのように使うのかをまとめておきましょう。

コールバックを受け取るAPI

Node.jsの標準の実装は、多くの場合この形式のAPIで提供されています。引数に、コールバック関数と呼ばれるクロージャを渡し、それをイベント度に呼び出すAPIデザインのことです。上述で使った次の形のAPIですね。

const fs = require('fs');

fs.readFile('/path/to/some', (err, data) => {
  if (err) {
    console.error(err);
    return;
  }
  console.log(`size: ${data.length}B`);
});

他にも、setTimeoutなどのネイティブなAPIや、Node.jsの主だったライブラリでは、このデザインが用いられています。全てのコールバック関数を受け取るAPIが、非同期処理でないことに気をつけて下さい。例えば、Array.prototype.forEachなどはコールバック関数を受け取りますが、同期処理用のAPIです。つまり、

// test-foreach.js

[1, 2].forEach(n => {
  console.log(n);
});
console.log('forEach done');

これの出力結果は以下のようになります。

$ node test-foreach.js
1
2
forEach done

コールバックを受け取る形式のAPIは、古くから使われ、とくにくせがあるわけでは無いため、非常に多くの場所で使われています。しかしながら、表現力が弱いことも確かです。よく槍玉に挙げられるのが、コールバック地獄というものです。以下の例を見てください。

const fs = require('fs');

fs.readFile('/path/to/some', (err, data) => {
  if (err) {
    console.error(err);
    return;
  }
  data.trim().split('\n').forEach(path => {
    fs.appendFile(path, 'done', (err) => {
      if (err) {
        console.error(err);
        return;
      }
    });
  });
});

これはファイルパスの一覧表が書かれているファイルを読み込み、それらのファイルそれぞれの最後に、doneと書き込むプログラムです。見て分かるように、とても単純なプログラムであっても、このようにネストが深く読みづらく書きづらいプログラムになってしまいます。またエラー処理に関しても、冗長で、同じような処理を幾度も書くことになってしまうのです。

また、他にもこのデザインには問題があります。例えば、よくあることとして、データの取得時と全てのデータの取得が完了した場合の通知を受け取るようなAPIがあったとします。これをコールバックを使って以下のように実装したとしましょう。

function readContents(onNext, onEnd = () => { return; }, onError = err => { throw err; }) {
  ...
}

これを使うようにプログラムを書いてみます。

readContents(
  data => {
    console.log(data);
  },
  () => {
    console.log('end');
  },
  err => {
    console.error(err);
  },
);

控えめに言っても、見やすいとも使いやすいとも言えませんね。このように非同期処理において、コールバックを受け取るようなAPIデザインは、素朴なアイデアであり、特殊な扱いをしなくて済みますが、その分使い勝手が非常に悪いという欠点がありました。

EventEmitter

Node.jsで複数のイベントをコールバックで扱うのではなく、専用のプロキシオブジェクトを通すことで、使い勝手を向上させようと、考案されたのがEventEmitterです。このオブジェクトは、eventsモジュールから利用できます。では、先ほどのような関数をEventEmitterを利用するよう、書き換えてみましょう。

const EventEmitter = require('events');

class ReadContentsEmitter extends EventEmitter {
  ...
}

function readContents() {
  return new ReadContentsEmitter();
}

readContents()
  .on('data', data => {
    console.log(data);
  })
  .on('end', () => {
    console.log('end');
  })
  .on('error', err => {
    console.error(err);
  })
  ;

これによってコードの見通しは、コールバック版より随分良くなりました。EventEmitterはイベント駆動の非同期処理を行う場合に非常に便利です。Node.jsでは、netモジュールのServerクラスや、streamモジュールのストリームなど多くのクラスがEventEmitterをベースに作られています。

ところで大事なことですが、EventEmitterは非同期処理をサポートしてくれるようなAPIであって、EventEmitter自体が非同期であるわけではないことには注意が必要です。次の例をみてください。

// test-eventemitter.js

const EventEmitter = require('events');

class MyEmitter extends EventEmitter {}

const emitter = new MyEmitter();

emitter.on('data', data => console.log(`data: ${data}`));

emitter.emit('data', 'sample');
console.log('emit done');

このプログラムの出力は必ず以下の様になります。

$ node test-eventemitter.js
data: sample
emit done

つまり、あくまでEventEmitterは同期的であり、emit()が非同期に行われる場合には非同期になるというだけなのです。

Promise

コールバック地獄を抜け出す方法として、古くから活用されていたのが、Promiseです。

Promiseを広く多用してきた有名なJavaScriptライブラリに、jQueryがあります4。名高い、jQueryのajaxAPIではPromiseがふんだんに活用されていますね。

さて、非同期処理は、非同期とはいうものの、同期したい箇所もあります。以下の例を見てください。

const fs = require('fs');

fs.readFile('/path/to/some', (err, data) => {
  if (err) {
    console.error(err);
    return;
  }
  fs.appendFile('/path/to/some2', data, (err) => {
    if (err) {
      console.error(err);
      return;
    }
  });
});

このプログラムでは、二つの非同期処理、readFileappendFileがありますが、この二つは完全に独立して扱われているわけではありません。appendFileは、readFileが出力した値を使っているため、readFileが実行された後、同期的に、非同期処理appendFileが呼ばれるということになります。このように、非同期処理の中で小さな単位で同期的な処理を必要とする場合があります。非同期処理において、コールバックのネストが深くなるのは、主にそのような場所においてです。

Promiseは、このような非同期処理の同期的関係を内部で管理してくれるオブジェクトです。実際にES2015のPromiseを使って、上のようなプログラムがどのように書けるか見てみましょう。

const fs = require('fs');

new Promise((resolve, reject) => {
  fs.readFile('/path/to/some', (err, data) => {
    if (err) {
      return reject(err);
    }
    return resolve(data);
  });
})
  .then(data => new Promise((resolve, reject) => {
    fs.appendFile('/path/to/some2', data, (err) => {
      if (err) {
        return reject(err);
      }
      return resolve();
    });
  })
  .catch(err => console.error(err))
  ;

Promiseを使うことにより、同期的な関係をthenというチェインで繋げていくことができます。これによって、ネストはどんなにチェインを繋げても深くなりません。Promiseが考慮されたAPIを用意することによって、記述量もコールバックと大差なく書けるようになるでしょう。

しかし、現状Promiseを考慮したAPIを提供しているライブラリはあまりありません。一つには、あまり処理系においての対応状況がよろしくなかったことなどが理由としてあります。もう一つは、async/await対応が目に見えていたからでしょう。おそらくasync/awaitが本格的に組み込まれれば、Promiseを考慮したAPIは格段と増えるでしょう。

async/await

さて、async/awaitは、Promiseを命令的なコードにおいて扱いやすくしたものです。私たちは多くの場合、非同期処理より同期処理の方が書き慣れています。async/awaitは、いわば、イベント駆動なものを除いた、非同期なコールバック形式のAPIの、完全な代替手段と言ってよいでしょう。async/awaitを使えば、さっきのPromiseのコード例は、次のように書くことができます。

const fs = require('fs');

async function asyncTest() {
  try {
    const data = await new Promise((resolve ,reject) => {
      fs.readFile('/path/to/some', (err, data) => {
        if (err) {
          return reject(err);
        }
        return resolve(data);
      });
    });

    await new Promise((resolve, reject) => {
      fs.appendFile('/path/to/some2', data, (err) => {
        if (err) {
          return reject(err);
        }
        return resolve();
      });
    });
  } catch (err) {
    console.error(err);
  }
}

asyncTest();

そして、async/awaitは、Generatorの5、Promise向けに特化したものです。つまり、Promiseを扱うGeneratorであり、Promiseを返す関数です。例えば、上のプログラムは、以下のようにGeneratorとPromiseを使った関数に変換することが可能です。

// helper
function generator_to_promise(gen) {
  return new Promise((resolve, reject) => {
    const generator = gen();

    function step(result) {
      if (result.done) {
        return resolve(result.value);
      }
      return new Promise(resolveNext => resolveNext(result.value))
        .then(
          value => {
            try {
              step(generator.next());
            } catch (err) {
              reject(err);
            }
          },
          err => {
            try {
              step(generator.throw(err));
            } catch (nerr) {
              reject(nerr);
            }
          }
        )
        ;
    }
    step(generator.next());
  });
}

// main
const fs = require('fs');

function asyncTest() {
  return generator_to_promise(function* asyncTest_generator() {
    try {
      const data = yield new Promise((resolve ,reject) => {
        fs.readFile('/path/to/some', (err, data) => {
          if (err) {
            return reject(err);
          }
          return resolve(data);
        });
      });

      yield new Promise((resolve, reject) => {
        fs.appendFile('/path/to/some2', data, (err) => {
          if (err) {
            return reject(err);
          }
          return resolve();
        });
      });
    } catch (err) {
      console.error(err);
    }
  });
}

asyncTest();

少し長いですが、基本を押さえればなんということは無いです。まずawaitは全てyieldに変換できます。Generatorのnext()によって、yieldまで実行した後返ってきた値をPromiseのresolveに投げてやれば、その値がPromiseだろうと普通の値であろうとPromiseオブジェクトの値に変換されます。catchには代替としてgenerator.throwを割り当てることができ、Generator内のtry-catchが処理できれば順次次の処理へ、処理できなければrejectとして返ることになります。あとは、これらを再帰関数でGeneratorのdoneまで順次実行していけば、最終的なPromiseが得られます。generator_to_promiseヘルパーはそのようなことをやっています6

このように、Generatorと同じようなAPIによって、非同期処理を、あたかも同期的な処理のように記述でき、非同期処理の中でも同期的な部分において、コールバック形式の問題点やPromiseの特殊な記法を払拭できるようになります。

なお、せっかくなので、Promiseとの対応をまとめておきましょう。

Promiseasync/await
Promise.resolve(obj)await obj
Promise.reject(err)throw err
promise.then(data => ...)const data = await promise; ...
promise.then(() => ...)await promise; ...
promise.catch(err => ...)try { await promise } catch (err) { ... }

Generator

Generatorは実際には、同期的な処理を担当する機能です。しかしながら、そのアイデアは、非同期処理にも利用できることは、前回見ました。ここで、もう一度Generatorを出しておきたかったのは、Streamの対となる存在としてです。

フライングして、async/awaitでGeneratorの話を出しましたが、今更ながらGeneratorのおさらいをしておきましょう。GeneratorはES2015で取り入れられた機能です。これを使えば、メモリ空間を抑えたり、イテレータの記述を楽に行えたりするようになります。例えば、フィボナッチ数列を求めるプログラムをGeneratorで書いてみましょう。

function* gen_fibonacci() {
  let state = {
    target: 1,
    post: 1,
  };
  for (;;) {
    yield state.target;
    const pre = state.target;
    state.target = state.post;
    state.post = pre + state.target;
  }
}

Generatorは、function*(アスタリスク)を付けることで、生成できます。yieldで実行が一度止まるようになっており、yieldを返したい値につけることで、値を返せるようになっています。この関数を使ってみましょう。

function get_fibonacci(n) {
  const gen = gen_fibonacci();
  for (let i = 1; i < n; i++) {
    gen.next();
  }
  return gen.next().value;
}

get_fibonacci(1)  // -> 1
get_fibonacci(2)  // -> 1
get_fibonacci(3)  // -> 2
get_fibonacci(10) // -> 55

このGeneratorによって、あなたは好きな段階でフィボナッチ数列の次の項の計算を計算機に通知でき、フィボナッチ数列の必要な項を取捨選択して保持しておくような処理ができますし、何よりこのジェネレータから取得できる項数には限りがありません7。あなたは、フィボナッチ数列のある項が欲しい時に、何項までを計算させるかを関数に通達する必要は無いのです。

現実には、フィボナッチ数列に関してそこまでの要求することは無いでしょうが、Generatorが色々な面で応用できそうだということは分かってもらえたでしょう。

Generatorのnext()には値を渡せるようになっており、値が渡されるとyieldの返り値になります。最初のnext()の引数は意味がありません。また、next()は返り値を表すvalueと、ステータスを表すdoneというプロパティを持っており、Generatorが終了した場合、donetrueになります。

ところで、Generatorの性質は、Streamの性質と対を成しています。Generatorは、 その出力を求められた時 、出力を求める計算を同期的に行い、結果を出します。対して、Streamは、出力を求める計算を行って その出力が求まった時 、その結果を非同期に出すのです。この関係は、GeneratorとStreamの理解を促進し、使い分けを考える上で重要でしょう。

stream-and-generator.png

Stream

さて、いよいよStreamについて考える番です。Streamはどんなものでしょうか?Node.jsのStream APIには、Streamに該当するものが大きく三つあるのですが、俗にStreamで大事になるのがReadableStreamというものの特徴です。後の二つは、まあオマケみたいなものです。

ReadableStreamは、いわばPromiseの出力を複数に対応させたものです。といっても、配列を出力するPromiseというわけではなく、出力が複数で、且つそれぞれが非同期になるようなものです。具体的にどういったものなのか、コードを見てもらった方が早いと思うので、見てみましょう。以下は、ファイルをgzipで圧縮して出力するサンプルです。

const fs = require('fs');
const zlib = require('zlib');

fs.createReadStream('/path/to/source')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('/path/to/dest'))
  ;

注目したいのは、createReadStreamの部分です。createReadStreamは、指定されたファイルの内容を一定バイトごとに取得し、それぞれの出力に対しイベントを流すのです。なぜそのようなことをするのでしょうか?

最初に紹介したサンプルでは、「readFileはシステムにファイルの読み込みを任せ、本業を行いながらシステムからの通知を待つことで、効率の良い処理ができる。そしてそれがNode.jsの非同期処理の強みだ」と紹介しました。しかしながら、その後、非同期処理の中には同期的な部分、つまり出力が取得できないと先に進めない処理があること、そしてそれを扱いやすくしたAPIを幾つか紹介してきました。

今回のサンプルプログラムの例を考えてみましょう。ファイルをgzip圧縮して、別ファイルに出力する様なプログラムです。これは、完全に処理が同期しています。ファイルの読み込みが行えなければ圧縮はできませんし、圧縮できなければ出力ができません。つまり、非同期処理をしても何の効率も引き出せないことになります。しかしながら、この様な処理は多くの箇所で出てくるでしょう。

さて、この様な処理においても非同期処理の強みを活かして、もっと効率の良い様な体系を作れないか、と考えてみましょう。gzip圧縮は、簡単に言えば、文字列を前から辿っていき、以前に共通部分があるような箇所を見つけた場合それをポインタに置き換えるようなアルゴリズムです。つまり、いきなり全体像を掴む必要はなく、ある程度文字列の先頭部分を取得できれば、それに対しての前処理を行うことが可能です。つまり、ある程度ファイルの内容が先読みできれば、ある程度処理を短縮できる可能性があります。これが、Streamのモチベーションです。

createReadStreamが行うことはとても単純です。簡単な流れは次の様になります。

stream-flow-image.png

詳しいカスタマイズ方法などは後ほど紹介しましょう。これによって、その後の.pipe(zlib.createGzip())などは、バッファをちょっとずつ受け取り前処理を行いながら次の入力を待てる様になり、処理を前もって行うことができます。これは、非同期の強みをさらに活かせますね。

さて、今までは速さに注目してきたわけですが、Streamにおいては、管理機構によるコストなどから、よほど中間処理が重かったりファイルサイズが大きいというわけでもなければ、一気にreadFileしてしまった方が速度的には速い場合が多いです。想像してみてください。システムに移譲する処理は、普通結構重いわけです。それを細かくやっているわけですから、一回しか移譲処理を行わなくていい場合に比べ、遅くなるというのは当たり前のことですよね。あれ?と思った方が多いでしょう。では、なぜわざわざStream APIが使われるのでしょうか?Promiseでいいはずですよね。

さて、非同期によってI/O待ちをいくらか解消できるというのは、何も速度を最適化するだけが強みではありません。プログラミングにおいては、速度とそれに加えて空間も重要なトピックになります。そしてそれこそが、StreamがPromiseに比べて非同期処理の強みとなる部分です。

あなたは、例えばとても容量の大きなファイルを扱うことが、得てしてあるでしょう。今回の処理で、1Gのファイルを圧縮するといったことは多分に起こります。その様な場合に、一気にファイルを読み込み、圧縮をかけ、新たにファイルに書き込むことを想定してみてください。圧縮といえど、圧縮されたファイルサイズは小さくなるかもしれませんが、中間処理には処理するバッファのサイズ分空間が必要になります。また、1Gのファイルはそのままバッファになるわけですから、少なくとも1G+αのメモリ空間が消費されることになります。これはあまり許容できるものではありません。

一方で、Stream APIを使う場合は、一定量のバッファに対して、次のバッファが来るまでに前処理を行いバッファは捨てることができます。これによって、例え1Gのファイルでも、あなたは少ないメモリ量で、Promiseより少し遅いとはいえ、ある程度のパフォーマンスが出る様なコードが書けるのです。また、細かな単位で処理ができるということは、一気に読み込む場合に比べ、トータルの消費メモリは変わらなくても大きなメモリ消費を行っている時間を短縮できます。1Gのファイルサイズは(最適化を考えない場合!)一気に読み込み処理をする場合、圧縮処理をかけている間ずっと保持されます。それに比べStreamでは、短かな前処理の間細かなバッファを保持しておくだけですみます。Streamの強みはまさにそこにあるわけです。

ところで、async/awaitがただのPromiseを返すのに特化したGeneratorと見ることができた様に、PromiseもStreamが一出力に特化したものと見ることができます。つまり、PromiseはStreamで実装ができます。もちろん、Generatorでasync/awaitを実装するよりもネイティブのasync/awaitの方が最適化がかかっていますし、Promiseも同じです。しかしながら、同じ性質を持つ部分に着目することは、理解の上でも、そしてロジックを再利用する上でも役に立つことは、覚えておいて損は無いでしょう。

stream-and-promise.png

ここまでのまとめ

さて、StreamがPromiseと違う性質を持っていること、そしてStreamが非同期処理においてどのように役に立つのかは分かってもらえたでしょうか?あなたは、そもそもの非同期処理の利点、そして非同期APIそれぞれの利点と、それぞれのAPIによって受ける恩恵が違うことを理解したでしょう。さらに、あなたは一概に非同期にすればいいわけでは無いこと、一概にPromiseやStreamを使えばいいというわけでは無いことを理解したはずです。例えば、あなたはコンフィグを読み込んでmain部分を起動する様なプログラムを書くとき、fs.readFileよりもfs.readFileSyncを活用するべきです。そこで、わざわざイベントループを回すことには、何の意味もありません。

const fs = require('fs');
const main = require('./main');

/** Bad:
fs.readFile('/path/to/config', (err, config) => {
  if (err) {
    throw err;
  }
  main(JSON.parse(config));
});
*/

const config = JSON.parse(fs.readFileSync('/path/to/config'));
main(config);

非同期がただのイベントループであることを理解したあなたは、単にreadFileによってイベントをループに登録し待機させるよりも、全く同じことをループにイベントを登録せずに同期APIを使用して行った方が効率が良いと分かるはずです。またここでStreamを使うことにも何の意味もありませんね。configはそれほど大きいファイルではないはずですから。

次の様なプログラムでは、あなたはStreamを使用するのがいいかもしれません。

const fs = require('fs');
const Base64Encoder = require('b64').Encoder;
const Sender = require('./send-anything');

const sender = new Sender();

/** Maybe Bad:
fs.readFile('/path/to/data', (err, data) => {
  if (err) {
    sender.rollback();
    throw err;
  }
  sender.write(data.toString('base64'));
});
*/

fs.createReadStream('/path/to/data')
  .pipe(new Base64Encoder())
  .on('data', data => {
    sender.write(data);
  })
  .on('error', err => {
    sender.rollback();
    throw err;
  })
  ;

base64エンコーディングは、先頭のある程度の位置までが分かれば、そこまでは完全にエンコーディングできる形式ですから、Stream化をうまく行えます。b64はbase64のエンコードとデコードをストリーム上で行える様にしてくれるライブラリです。あなたがどの様なデータを受け取るのかわかりませんが、もしデータの形式が不明な場合はStream化をしておけば汎用的に使えるAPIにできます。対して、もし速度が重要で、データが小さいサイズだと分かっている場合は、単純にasync形式で提供した方がいいかもしれません。

同期的なAPIを使うか非同期的なAPIを使うか、はたまたそのAPIの中でもどの様な形式のものを使うかを、時と場合によって上手に判断することが大事なのです。

Stream APIの使い方

Streamの種類とそれぞれの機能

では、いよいよ本題のStreamの使い方についてです。さて、Streamの名を冠するオブジェクトには、大きく分けて三種類あります。一つはReadable、二つ目はWritable、そして三つ目がその両方の機能を持つStream、Duplexです。

Node.jsのStreamの基本的な概念はとても単純です。それは、ReadableなものとWritableなものはpipeで繋げられる、ということです。

readable-and-writable.png

Duplexはどちらの機能も持っているため、どちらとも繋げることができます。想像しやすいのは、蛇口とバケツのイメージです。

stream-image.png

Duplexは蛇口を持ちつつ、バケツの様に蛇口から水を受け取ることもできます。ReadableとWritableはどういったものがあるか想像できるでしょうが、Duplexはパッと思いつかない人もいるかもしれません。Node.jsでは、SocketがDuplexで実装されています。何かを相手に流したり、逆に何かを相手から受け取ったりということが、それぞれReadable、Writableの機能として実装されています。また、一番利用されやすいのがTransformというDuplexから派生したものです。これは、タンクのような役割をするDuplexをユーザーが作りやすくするための基底クラスで、上流のReadableから受け取ったものを加工して、下流のWritableに流すという役割を受け持ちます。

transform-image.png

前に紹介したcreateGzipが返すストリームは、Transformを使って実装されています。それぞれの基底クラスを使って、あなたが何かクラスを作ろうという場合、それぞれ実装する必要のあるメソッドがあります。

基底クラス実装する必要のあるメソッド
Readable_read
Writable_write
Duplex_read/_write
Transform_transform

例えば、カスタムのReadableの作り方を示しておきましょう。

// test-custom-readable.js

const Readable = require('stream').Readable;

class CustomReadable extends Readable {
  constructor(max) {
    super();
    this.state = 0;
    this.max = max;
  }
  _read(n) {
    for(let l = Math.min(this.state + n, this.max); this.state < l; this.state++) {
      this.push(this.state.toString());
    }
    if (this.state === this.max) {
      this.push(null);
    }
  }
}

new CustomReadable(10)
  .pipe(process.stdout) // process.stdout implements Writable
  ;

このプログラムの実行結果は、以下の様になります。

$ node test-custom-readable.js 
0123456789

0から与えられた数字までを繋げた文字列を流すストリームです。このように、それぞれのメソッドを実装することで、後は基底クラスの処理に任せることができます。他のクラスの実装の仕方については、Node.jsのStreamモジュールのドキュメントStream Handbookを読んでみるのがいいでしょう。

さて、StreamクラスはEventEmitterを実装しており、それぞれのイベントを受け取ることもできます。イベントの一覧は次の様になります。

  • Readableのイベント
    • data: データが取得できるようになった時にそのデータとともに発火
    • readable: 取得できるデータがストリーム内に発生した時に発火
    • end: データを全て流し終えた時に発火
    • error: エラーが発生した時にエラーとともに発火
    • close: ストリームのリソースが閉じられた時に発火(全てのストリームがこのイベントを流すわけではありません)
  • Writableのイベント
    • drain: writeがfalseを返した時に準備ができてから発火
    • pipe: Readableからpipeされた時にそのReadableと共に発火
    • unpipe: Readableからunpipeされた時にそのReadableと共に発火
    • finish: endが呼ばれ、全てのデータを処理し終えた時に発火
    • error: エラーが発生した時にエラーとともに発火
    • close: ストリームのリソースが閉じられた時に発火(全てのストリームがこのイベントを流すわけではありません)

Writableのdrain以外は、分りやすいと思います。drainに関してですが、このイベントは少し特殊です。一般に、書き込みが読み込みより速いとは限りません。その様な場合に、読み込みが書き込み速度を上回る場合があります。もし上回った状態をそのまま続けた場合、もちろん書き込みは行われないわけですから、Writableの内部バッファに読み込みから流れてきたものが溜まり続ける状態になってしまいます。その様な状況はあまりよろしくありません。そのため、Writableのwriteは、内部バッファが必要になった(つまり書き込みが追いつかなくなった)状態ではfalseを返す様にできています。そして、書き込み待機がなくなった時にdrainを呼び出すのです。通常は、これらを監視して、Readableに的確に処理を待つように通知することが必要です。心配しないでください、まさにpipeがそれを行ってくれるため、通常あなたはこのイベントに関して余計な気を回す必要はありません!

さて、Duplexは両方の機能をもっているわけですが、その機能それぞれに対して、的確にイベントを処理する能力も実装しています。もちろん、Duplexの派生クラスであるTransformもです。

これがNode.jsのStream APIの概略です。簡単ですね!

二つのモード

さて、少しAdvancedな内容として、ReadableStreamの二つの状態について話をしておきましょう。あなたがStream APIをより効果的に活用するなら、知っておかなければならないトピックです。といってもそれほど難しいことはありません。ただ、少し暗黙的な動作が入ってくるため、はたから見ればこれらのAPIの動作は難しく見えることがあります。

さて、ReadableStreamの読み込みモードには二種類の状態があります。二つの状態はそれぞれ以下のものです。

  • pausedモード
    • イベントループにイベントは投げられますが、読み込みが終わった後もストリーム内のバッファにデータは保持されます
    • あなたがもしデータを取得したい場合、readを手動で呼び出せば、バッファからデータを取り出すことができます
  • flowingモード
    • データの取得後、自動的にデータがイベントに流れます
    • あなたがもしデータを取得したい場合、'data'イベントと'end'イベントを監視するか、pipeでWritableを繋げるのがいいでしょう

デフォルトでは、ReadableStreamはpausedモードで取得されます。もし、flowingモードがデフォルトだった場合、あなたがデータを取得したいと思った時には、最初の幾つかのデータがどこかに消えているかもしれません。そのような事態を防ぐため、Readableは内部で取得した分を保持してくれているのです。さて、このモードは次のようなことをするとflowingモードに切り替わります。

  • 'data'イベントにハンドラを登録する
  • resume()メソッドを呼ぶ
  • pipe()メソッドにWritableを渡す

このため、あなたは特にこの二つのモードを意識せず、データを取得することができます。あなたがもしpausedモードに戻したければ、次に該当することを行えば切り替わります。

  • pipe先が無い場合は、pause()メソッドを呼ぶ
  • pipe先が存在する場合、全ての'data'イベントハンドラを外し、全てのpipe先をunpipeする

flowingモードへの切り替えに比べ、少々厄介ですね。まあ、大体のケースではflowingモードからpausedモードに差し戻す必要はないため、あなたはそれほど心配をしなくていいでしょう。

詳細については、Node.jsのドキュメントを見てみるといいでしょう。もし、あなたのストリームがうまく作動しない場合、上記のことを思い出してみてください。あなたがちゃんとストリームを扱っている場合、実装側のバグによるところが多いでしょう。その場合そのライブラリがちゃんと二つのモードを扱えているか、確認してみるのがいいでしょう。

オブジェクトモード

さて、Streamの本来の内部データは、Bufferが基本です。I/O操作はシステムとのバイナリ通信が基本だからです。しかしながら、ストリームの内部データとしてオブジェクトを持ちたい時があります。特に、次の様なことをしたい時は頻繁に起こるでしょう。

+----------------+  Buffer  +-------------+  Object  +----------+
| FileReadStream | -------> | ParseStream | -------> | Analyzer | ...
+----------------+          +-------------+          +----------+

つまり、ファイルのある一定の位置まで分かっていればパース処理ができるような、データフォーマットのデータを解析し、そのデータに対してそれぞれ独立したピュアな解析処理ができる場合です。例えばCSVの各行をJSONに変換し、その上で順に画面に表示していく場合、オブジェクトをストリームで扱えば独自に処理をしなくて済みます。しかしながら、Stream APIはBufferが基本であり、Bufferのために処理に最適化を入れている部分もあるため、当初Stream APIがオブジェクトを扱えるべきかについては慎重でした。Bufferは複数のBufferを結合してもBufferです。それに対してオブジェクトは複数扱う場合一旦配列にしなければなりません。

ただし、需要が大きかったこともあり、現在はobjectModeというものがStream APIに搭載されています。Stream APIの各基底クラスにobjectModeを有効にしたオプションを投げれば、Streamがオブジェクトを扱う様になります。デフォルトでは無効になっており、オブジェクトは扱えないことに注意してください!

const Readable = require('stream').Readable;

class RangeReadable extends Readable {
  constructor(n) {
    super({
      objectMode: true,
    });
    this.state = 0;
    this.max = n;
  }

  _read() {
    if (this.max === this.state) {
      this.push(null);
      return;
    }
    this.push(this.state++);
  }
}

/** Bad: 
new RangeReadable(10)
  .pipe(process.stdout)
  ;
*/

new RangeReadable(10)
  .on('data', n => console.log(n))
  ;

オブジェクトを流す場合は、流すストリームはもちろんのこと、受け取るWritableもobjectModeである必要があることに注意してください!このため、process.stdoutに直接データを流すことはできません。もしデータを流したいなら、データを文字列に変換するTransformを作るといいでしょう。objectModeは他にも通常のストリームとAPIの仕様が少し異なったりするので注意が必要です。

しかしながら、あなたのストリームプログラミングをより豊かで便利なものにしてくれるでしょう。このobjectModeを使った強力なライブラリも幾つかあります。通常のストリームとの違いには注意しつつも活用していくといいでしょう。

Streamの活用例とライブラリ

Node.jsのStream APIは、Node.jsの至る所で活用されています。また、Stream APIのサポートを行っているライブラリも多くありますし、よりStream APIを便利に使うためのライブラリも多数用意されています。ここでは、そのうちの幾つかを紹介しておきましょう。

Node.jsが標準でストリームによって機能を提供しているモジュールは以下のものがあります。

色々ありますね。これで全てではありません。他にもいくつかあります。もちろん、ストリームでなくコールバック形式のAPIも同様に用意されています。これらをうまく使い分けていきましょう。

Stream APIの対応をしている有名なライブラリとしては、csv-stringifyrequestなどがあります。また、gulpはオブジェクトストリームのヘビィユーザーで、vinylというオブジェクトをストリームに流すデザインを設計することで、とても扱いやすいAPIを提供しているビルドツールとして有名です。

また、ストリームの機能を拡張するものとして、次の様なライブラリも提供されています。

  • merge-stream: ストリームのマージを行う
  • through2: Transformをより扱いやすくしたもの
  • event-stream: 様々なストリームユーティリティライブラリをまとめたもの

さらに詳細なことが知りたければ、Node.jsのStreamモジュールのドキュメントStream Handbookを読んでみるのがいいでしょう。

RxJSのススメ

さて、Stream APIの使い方を今まで紹介してきましたが、Stream APIには、使用上注意しなければいけないことも、幾つかあります。

現状のStream APIの大きな問題点は、エラーハンドリングに関してです。次のコードを見てください。

const fs = require('fs');
const zlib = require('zlib');

fs.createReadStream('/path/to/some')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('/path/to/dest')
  .on('error', err => {
    // handling error
    ...
  })
  ;

あなたは、エラーが発生した場合、そのエラーを受け取って特殊なエラーハンドリングを行わなければ、システムが壊れてしまうようなプログラムを書いていたとしましょう。あなたは、onでエラーをハンドリングしたと思っていますが、実際に何かしらのハードディスクトラブルでreadに失敗した場合、あなたのシステムは壊れるでしょう。Node.jsのStream APIがエラーを伝搬しないことによって、エラーハンドリングが直感的でないことは幾人かから既に指摘を受けていますが、諸事情により簡単に直せる問題ではありません。

Node.jsのエラーハンドリング事情については、StackOverflowのこのスレッドが参考になるでしょう。また、どういう経緯でこうなったのか、Stream APIの変遷についてはこの記事を読んでみると良いと思います(コメントで補足をもらいました。そちらも参照してください)。

また、もう一つの問題として使いにくさがあります。非同期コールバック、Promise、async/awaitと進化していった、一つの出力を持つ非同期体系に対し、Node.jsのStream APIは簡単に操作できるものではありませんし、様々なライブラリをimportする必要が出てきます。また、分岐や取得タイミングの調整についても、暗黙的な動作を採用してしまったために、かえって適切に調整することが難しくなっています。

そこで今日最後に紹介しておくのが、RxJSです。標準でサポートされている機能を蹴ってまで、RxJSを使う魅力があるのか、と思われる方も多いでしょうが、RxJSを操れるようになればその考え方は大きく変わるはずです。

RxJSの特徴は、多数のプリミティブなオペレータというものを提供していることと、HotとColdという概念によって、より効率良く的確にストリームのリソースを管理する方法を提供していることです。RxJSの主なストリーム処理のイメージは以下の様になります。

+------------+     +----------+             +-----------+
| Observable | >>> | Operator | >>> ... >>> | subscribe |
+------------+     +----------+             +-----------+

Node.jsのStream APIと同じ様な概念に見えますね!ObservableはReadable、OperatorはTransform、subscribeはWritableに対応します。では、それぞれを繋ぐ、pipeの役割はどのようなメソッドが持つのでしょうか?ここで、あなたはRxJSのサンプルコードを見た時、少し驚くかもしれません。

const rx = require('rx');
const rxNode = require('rx-node');
const fs = require('fs');

const source = rxNode.fromReadableStream(process.stdin)
  .map(s => s.toString())
  .map(s => s.replace(/\s/g, ''))
  .filter(s => s !== '')
  .map(s => Buffer.from(s, 'utf8'))
  ;

rxNode.writeToStream(source, process.out, 'utf8');

そう、オペレータとは、RxJSが常備しているメソッドのことです。そして、これらはとても多くのものが提供されています。このため、あなたはNode.jsのStream APIよりもRxJSがとてもユーザーフレンドリーで扱いやすいと思うようになるでしょう。

また、RxJSではありがたいことに、Stream APIで問題だった、エラーが伝播しない問題を解決します。あなたはPromiseのように好きな時にエラーを回収し、またエラーを投げ、はたまた最後にまとめてエラーハンドリングを行うことも可能なのです。さらにデフォルトでオブジェクトを扱う様できているため、あなたはobjectModeに関して気にする必要は全くなくなります。また、Hot/Coldの概念やSubjectをうまく使えば、効率良くストリームを分岐し、リソースを管理することができます。

もし、あなたがRxJSに興味を持ったなら、ぜひ使ってみてください。なお、RxJSは現在バージョン5が出ており(まだ5.0.0リリースから一ヶ月も経ってない気がしますが。ついでに、このサンプルはバージョン4のものです :innocent: )、多数のAPIの変更がありました。おそらく古い資料も多いので、あなたがもし5から始めたいのなら、Learn RxJSを参照してください。

最後に

Node.jsにおける、非同期処理のためのAPIを色々見てきました。そして、その中でのStream APIと使い方、そしてStream APIのデメリットとRxJSが解決してくれる点について書きました。

それぞれの使い分けをしながら、高パフォーマンスで且つ使いやすいプログラムを書いていけるといいですね。

なお、この記事では非同期処理だけを念頭に置いてきましたが、上述の通り非同期処理は魔法でもなんでもなく、ただ単なるシングルスレッドで動くイベントループです。あなたがもし非常に重たい処理を行う場合、非同期処理は何の助力にもなってくれないでしょう。

そのような場合、あなたは計算のパフォーマンスを最大限引き出すため、クラスタ化を行っておく必要があるでしょう。clusterモジュールthrongなどの導入を検討する必要があります。

Node.jsが、シングルスレッドで非同期処理を、簡単に利用できるようにしているからといって、並列処理が必要なくなるわけではありません。並列処理と非同期処理、それぞれを使い分けながらプログラミングを行うことは、あなたのプログラムをより快適に動作させるでしょう。

では、よい非同期ライフを!

(イラストには、いらすとやさんの画像を遣わしていただきました。毎度お世話になっています。ありがとうございます)


  1. 親しみやすいようpollにしましたが、実際には環境にあった最適のシステムコールが実行されることになります 

  2. 最近、最新のに追いついてる分かりやすい記事を読んだんですがURLを忘れてしまった・・・誰か、もっといい記事があったら紹介してもらえると助かります 

  3. 実際にはデッドロックやメモリリーク、インスタンスの破壊などのバグが発生しないよう、共有インスタンスの扱いには注意する必要があるでしょう 

  4. jQueryではDeferredという名前で、現在のES2015に取り入れられているPromiseと、中身が少し異なりますが、広義にはPromiseとして扱われます 

  5. Generatorがどういうものか分からないという方は、まず次の項目を見てから戻ってきた方がいいかもしれません 

  6. このヘルパーは、TypeScriptのものを、余計なところを省いて、この処理向けに特殊化し、分かりやすくしたものです 

  7. もちろん、データの表現値としての限界はありますが 



외부에서 들어오는 값들은 유효성 체크가 필요하다.


1. 외부에서 들어오는 값의 종류

 ・파라미터

 ・redis, DB 등

 ・API rreturn


2. 값의 유효성을 체크하는 방법론 : 표로 체크하기


1
2
3
4
5
6
7
8
function extractEntity(sentence,userId,isFinal){
    //Redis
    let lastWexResult = yield SessionWex.getField(userId,'wex_result');
    //external API call and return
    let result = yield sendRequest(options);
 
    ...
}
cs



위와같은 함수가 존재할 경우,


 ・파라미터 : sentence,userId,isFinal

 ・redis, DB 등 : lastWexResult

 ・API rreturn : result


가 외부에서 들어오는 값으로 인식될 수 있다.


이때 표를 만들어 각 값의 유효성을 체크하면 편리하다.


 

 sentence

 userId

 isFinal

 lastWexResult

 result

 return

 type

 parameter

parameter 

parameter 

Redis 

API return 

 X

 is it can be undefined?

 X

 X 

 X 

 X

 X 

 

 is it can be null?

 X

 

 X 

 O 

 X 

 

 datatype

 String 

 String 

 String 

 Object 

 Object 

 X 


이렇게 정리하면 null이나 undefined의 값이 외부로 부터 들어올 때 구멍없이 처리할 수 있다.




3. 유효성 처리를 어떻게 할 것인가?


・디폴트 파라미터 값을 주어, null이나 undefined가 들어올 경우 default값을 주도록 한다.

・함수 내에 조건문을 통해 null이나 undefined를 처리한다. 이때 ()?true:false 의 oneline 조건문을 쓰면 편리하다.




Async 함수 (ES2017)-https://github.com/nhnent/fe.javascript/wiki/%23165:-%EB%AA%A8%EB%8D%98-%EC%9E%90%EB%B0%94%EC%8A%A4%ED%81%AC%EB%A6%BD%ED%8A%B8%EC%99%80-%EB%B9%84%EB%8F%99%EA%B8%B0-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D:-Generator-Yield-vs.-Async-Await

자, 제너레이터 함수와 yield가 async/await과 어떤 관련이 있을까? Async/await은 ES2017에서 정식으로 채택된 Javascript에 새롭게 제안된 것이다. 제너레이터보다 더 특별한 방식으로 함수의 실행을 잠시 멈추는 함수를 작성할 수 있다. Async/await을 사용하면 제너레이터의 일부 사용 사례를 더 쉽게 구현할 수 있다는 것을 기억해라. 제너레이터 함수/yield와 Async 함수/await은 모두 "기다리는" 비동기 코드를 작성하는 데 사용된다. 그래서 비동기 함수이지만 동기 함수처럼 보이게 한다; 콜백을 사용하지도 않는다.

제너레이터에서부터 시작해보자: yield는 제너레이터의 실행을 어떤 시점에서 멈출 수 있기 때문에, 비동기 요청이 끝날 때까지 기다린 후 다음 코드가 실행되게 할 수 있다. 다음 예제에서 생각해보자:

우리의 애플리케이션이 백엔드로부터 필요한 정보를 받는 "init" 함수를 갖고 있다고 하자. 예를 들어 사용자 목록을 받아올 때는, XHR 요청을 필요로 하기 때문에 비동기 메서드일 것이다. 프로미스를 사용한다면, 우리는 반드시 프로미스가 끝났을 때 실행될 콜백함수를 정의해야 한다.

img04-async-function-using-promise

제너레이터 함수에서는 비동기 함수인 "getUsersFromDataBase"의 종료를 기다리기 위해 yield를 사용하면 된다. 그리고 users에 반환 값을 넣어주면 된다.

img04-async-function-using-yield

이 방식이 확실히 읽기 쉽지만, 엄청 단순하게 동작하는 것은 아니다. 용어가 암시하듯이, "yield" 키워드는 정말 실행 권한을 제너레이터 함수의 호출자에게 맡긴다. 이것은 프로미스를 양도받는 역할을 하는 외부 함수가 있어야 한다는 것을 의미한다. 프로미스가 끝날 때까지 기다렸다가 제너레이터 함수에 반환 값을 넘겨주어, 함수의 실행이 재개되고 이 값이 users라는 변수에 할당되도록 한다.

위 예제는 우리가 이전에 알지 못했던 제너레이터 함수의 특징을 알게 해준다: 제너레이터 함수는 "멈춤 지점"이라면 외부 함수로 값을 반환할 뿐만 아니라, 외부함수로부터 값을 받을 수 있다. 이는 두 코드 간의 back-and-forth 커뮤니케이션을 허용한다.

이터레이터의 "next" 메서드의 변수로 전달된 값은 제너레이터 함수로 전달된다. 다음 예제는 어떻게 제너레이터 함수를 호출하는 "외부" 코드가 프로미스를 끝내고, 반환된 값을 제너레이터 함수로 보내는 역할을 하는지에 대한 설명이다:

img05-how-exteranl-code-get-generators-return-value

getUsersFromDatabase의 구현은 중요하지 않다. 이 함수는 2초 후에 "Test Users"라는 문자열을 반환하면서 종료되는 프로미스를 반환한다. 어떻게 외부 코드(6-30번째 줄)가 제너레이터의 마지막 값을 얻기 위해 제너레이터를 호출하고(6-7번째 줄), 프로미스가 반환한 값을 처리해서(16번째 줄에서 시작하는 콜백), 제너레이터에 전달하는지(20번째 줄)에 주목하자. 13, 18, 28번째 줄은 각 시점의 변수의 값을 보여주기 위한 곳으로, 그 값은 어두운 파란색 글자로 표시되며, 인라인 실행 도구에 의해 추가됐다.

이 예제는 제너레이터 함수가 하나의 값만 양도(yield)하는 특정한 경우에 대한 것으로 단순한 시나리오이다. 이론적으로, 외부 함수는 제너레이터의 마지막 return 구문에 다다르기 까지 지나치는 모든 프로미스를 반환해야 한다. 서드 파티 라이브러리에서도 같은 관계이다. 외부 함수가 프로미스를 어떻게 처리하고 해결하는지 상관하지 않고, 비동기로 기다리는 코드를 generators/yield 방식으로 작성할 수 있게 한다. 이 라이브러리들은 제너레이터 함수를 argument로 받아서, 제너레이터를 실행하고, yield 된 프로미스를 다루는 기능 제공한다.

이 긴 예제는 Async/await이 엄청 유용한 기능인지 보여주기 위한 코드이다: 비동기 코드를 generator 함수 예제(1-4번째 줄)와 비슷한 데, 심지어는 프로미스를 다루기 위한 외부의 헬퍼 함수가 필요하지도 않는다! async/await을 쓰면, 이렇게만 쓰면 된다:

img05-async-function-using-async-await

제너레이터의 별표(*)는 함수 선언부 앞에 오는 async 키워드로, "yield" 키워드는 "await"으로 대체되었다. Await은 프로미스를 반환하는 구문이기만 하면 그 앞에 놓일 수 있다. 그리고 await 키워드는 그 자신보다 먼저 선언된, async 키워드가 있는 함수 안에서만 사용할 수 있다. 이제 테스트 함수가 실행될 때, 다른 함수의 도움없이 await 키워드에서 멈추고, 프로미스가 끝나길 기다렸다 자동으로 프로미스에서 반환된 값을 users라는 const 변수에 할당할 것이다.

Async/await은 프로미스를 .then() 메서드나 콜백 정의, 그리고 이것들의 중첩현상(악명높은 죽음의 피라미드) 없이 다루는 코드를 작성하게 한다. 좋은 해결방법인 것처럼 보이나 이것을 사용하기 전에 반드시 생각해야 할 몇 가지 중요한 점이 있다. 때때로, 이전의 .then()의 프로미스 방식을 고수하는 것이 더 좋은 방법일 수 있다. 다음에 오는 내용을 꼭 생각해봐야 한다:

1) 비동기 함수는 항상 프로미스를 반환한다: 비동기 함수는 모든 "await" 키워드에서 실행을 잠깐 멈추고 비동기 구문이 종료되길 기다린다. 그래서 await이 붙는 함수 자체가 비동기적이다(이 때문에 비동기 함수 앞에 async 키워드가 붙는 것이다). 이는 async 키워드를 가진 함수는 무엇을 반환하든 간에 항상 리졸브되거나 에러를 던지는 프로미스를 반환한다는 것이다. 이전 예제에서 "test" 함수는 문자열 "Test Users Correctly received"라는 문자열을 반환했다. 그러나 실제로는 이 문자열과 함께 해결되는 프로미스가 반환됐다. 그래서 코드를 설계할 때나 코드의 다른 부분이 주어진 함수와 어떻게 상호작용하도록 고민할 때, 프로미스를 받기 원하는지 아닌 지를 꼭 생각해야 한다.

2) Await은 항상 프로미스를 병렬적이 아닌, 순차적으로 기다린다 await 키워드는 한 번에 여러 개 아닌, 하나의 프로미스만 기다릴 수 있다. 그래서 만약 여러 개의 프로미스를 다루면서, 이 각각이 await 키워드를 통해 기다리길 바란다면, 하나의 동작이 완전히 끝나야 다음 동작으로 넘어갈 수 있다.

동시에 수백 개의 요청을 보내 네트워크에 부하를 주는 것을 방지하고 싶을 때처럼 이 방식이 최선일 때가 있지만, 이 방식은 동시에 여러 개의 프로미스를 처리하는 것보다 훨씬 느리다.

img06-sequencial-async-functions

사용자 목록에 대한 배열인 "users"가 이전에 선언되어 있다고 가정하자. 그리고 "getProfileImage"가 프로필 이미지를 반환하는 프로미스를 반환한다고 가정하자. 이 예제는 각각의 사용자들을 순회하는데, "profileImages" 배열에 프로필 이미지를 넣기 위해 매 차례에서 잠깐 멈춘다. 이것은 현재 이터레이션의 프로미스가 끝났을 때만 다음 이터레이션으로 이동한다.

동시에 여러 프로미스를 처리하는 것에 대한 대안은 await과 프로미스를 같이 쓰는 것이다. 예를 들어, "Promise.all"로 프로미스 그룹을 관리할 수 있는 데, 이는 그룹의 모든 프로미스가 끝날(또는 실패 시 에러를 반환) 때 끝나는 하나의 프로미스를 반환한다. 그러면, 이 하나의 통합된 프로미스에 await을 사용하면 된다.

img07-parallel-async-functions

이 예제는 "map"을 사용해서 users 배열을 순회한다: 각각의 차례에서 "getProfileImage"가 실행되고, 실행되지 않는 프로미스를 반환한다. Map은 각 차례에서 일시정시하지 않고, 모든 프로미스를 가지는 배열을 반환한다는 것에 주목하자. 그러고 나서 우리는 이 모든 프로미스들을 "Promise.all"로 묶고, 이 한 개의 프로미스가 해결되는 것을 기다리기 위해 딱 이 시점에서만 await을 사용한다.

기억하자 - 제너레이터와 비동기 함수는 항상 특별한 타입의 객체를 반환한다.

  • Generator 함수: 값 X를 yield/return하면, 이것은 항상 {value: X, done: Boolean} 형태의 이터레이션 객체를 반환한다.
  • 비동기 함수: 값 X를 반환하면, X를 반환하면서 끝나거나 에러를 던지는 프로미스를 반환한다.

결론

제너레이터는 실행을 잠깐 멈출 수 있는 함수이다. 이터레이터 객체가 다음 값을 요청할 때마다 위임된(yield) 값을 생성한다. 이런 의미에서, 제너레이터는 수동적인 생산자인 반면 이터레이터는 적극적인 소비자이다(왜냐하면 값을 요청하는 것에 대한 주도권을 갖고 있기 때문이다). 이것은 일반적인 옵저버 패턴과 대조적이다. 옵저버 패턴에서는 적극적인 생산자(옵저버블, 주체)가 필요할 때 그 값을 반환하고, 하나 이상의 수동적인 소비자(옵저버)가 있어서 값이 반환되기를 기다리고 있다. 제너레이터 함수는 리스트에서 한 번에 하나의 값만 반환하는 데 사용할 수 있다. 아마도 필요에 따라 무한한 값을 생성하는 데 사용할 수 있다.

제너레이터 함수의 특별한 사용법 중의 하나는 프로미스를 양도하고(yielding) 동기식으로 동작하는 것처럼 보이는("기다리는") 비동기 코드를 작성하는 것이다. 그런데 이 방식은 반환되는 프로미스들을 다룰 다른 함수의 도움이 필요하다. 이런 경우에는 헬퍼 함수가 필요없는 async/await을 사용하는 것이 더 나은 방법이다.

비동기 함수와 await 키워드는 비동기 코드를 "기다리는" 방식으로 작성하기 위한 훌륭한 방법이다. 그러나 한 번에 여러 개의 프로미스를 기다릴 수 없기 때문에, 이런 한계 상황에서는 이전의 프로미스 폴백 방식을 사용하는 것이 낫다.







async-await를 사용한  비동기처리와 generator-yield를 사용한 비동기처리의 차이

Link to section

generator-yield의 경우 generator의 리턴값이 iterator으로, 각 yield에서 작업이 중단되면 .next()를 통해 다음의 작업으로 넘어가야 하는 불편함이 있었다. co 모듈이나 aa모듈은 그러한 불편함을 줄이기 위해 사용된 모듈이다.

그러나 async-await의 경우 await 이후에  promise가 오게 되면 promise의 resolve가 반환될때 까지 기다려주기 때문에 별도의 모듈을 사용할 필요가 없다. 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
const co = require('co');
 
const func1 = function() {
    return  new Promise(function(resolve, reject){
        setTimeout(function(){
            console.log('a');
            resolve();
            //reject('func1 fail');
        }, 2000);
    });
  };
 
const func2 = function() {
    return  new Promise(function(resolve, reject){
        setTimeout(function(){
            console.log('b');
            resolve();
            //reject('func2 fail');
        }, 3000);
    });
};
 
const func3 = function() {
    console.log('c');
};
 
//func1().then(func2).then(func3)
    
co(function*(){
    yield func1();
    yield func2();
    yield func3();
    })
    
 
async function add1(){
    await func1();
    await func2();
    await func3();    
}
cs


'C Lang > JS Technic' 카테고리의 다른 글

Stream API입문  (0) 2018.09.05
파라미터의 유효성체크(validation check of parameter)  (0) 2018.08.31
in 연산자  (0) 2018.08.23
for in vs for of 반복문  (0) 2018.08.23
array.shift()  (0) 2018.08.15

in 연산자는 명시된 속성이 명시된 객체에 존재하면 true를 반환합니다.

Link to section구문

속성 in 객체명

Link to section인자

속성
속성의 이름이나 배열의 인덱스를 뜻하는 문자열 또는 수 값입니다.
객체명
객체의 이름입니다.

Link to section설명

 다음 예제들은 in 연산자의 용도를 보여 줍니다.

// 배열
var trees = new Array("redwood", "bay", "cedar", "oak", "maple");
0 in trees         // true를 반환합니다.
3 in trees         // true를 반환합니다.
(1 + 2) in trees   // true를 반환합니다. 연산자 우선 순위에 의하여 이 구문의 괄호는 없어도 됩니다.
6 in trees         // false를 반환합니다.
"bay" in trees     // false를 반환합니다. 당신은 배열의 내용이 아닌, 인덱스 값을 명시하여야 합니다.
"length" in trees  // true를 반환합니다. length는 Array(배열) 객체의 속성입니다.

// 미리 정의된 객체
"PI" in Math       // true를 반환합니다.
"P" + "I" in Math  // true를 반환합니다.

// 사용자가 정의한 객체
var myCar = {company: "Lamborghini", model: "Lamborghini Veneno Roadster", year: 2014};
"company" in myCar // true를 반환합니다.
"model" in myCar   // true를 반환합니다.

 당신은 반드시 in 연산자의 오른쪽에 객체를 명시하여야 합니다. 예컨대 당신은 String 생성자로 만들어진 문자열을 명시할 수 있지만 문자열 리터럴은 명시할 수 없습니다.

var color1 = new String("green");
"length" in color1 // true를 반환합니다. string 객체는 length, anchor, big등의 프로퍼티를 가지고 있기때문에 true

var color2 = "coral";
"length" in color2 // color2는 String 객체가 아니리 원시자료형이기 때문에 오류를 냅니다.

Link to section제거되었거나 정의되지 않은 속성에 대하여 in 연산자 사용하기

 in 연산자는 delete 연산자로 제거된 속성에 대하여 false를 반환합니다.

var myCar = {company: "Lamborghini", model: "Lamborghini Veneno Roadster", year: 2014};
delete myCar.company;
"company" in myCar; // false를 반환합니다.

var trees = new Array("redwood", "bay", "cedar", "oak", "maple");
delete trees[3];
3 in trees; // false를 반환합니다.

 만약 당신이 속성을 undefined로 설정하였는데 그것을 제거하지 않으면, in 연산자는 그 속성에 대하여 true를 반환합니다.

var myCar = {company: "Lamborghini", model: "Lamborghini Veneno Roadster", year: 2014};
myCar.company = undefined;
"company" in myCar; // true를 반환합니다.
var trees = new Array("redwood", "bay", "cedar", "oak", "maple");
trees[3] = undefined;
3 in trees; // true를 반환합니다.

Link to section상속된 속성

 in 연산자는 프로토타입 체인에 의하여 접근할 수 있는 속성에 대하여 true를 반환합니다.

"toString" in {}; // true를 반환합니다.

Link to section명세

명세StatusComment
ECMAScript Latest Draft (ECMA-262)
The definition of 'Relational Operators' in that specification.
Draft 
ECMAScript 2015 (6th Edition, ECMA-262)
The definition of 'Relational Operators' in that specification.
Standard 
ECMAScript 5.1 (ECMA-262)
The definition of 'The in Operator' in that specification.
Standard 
ECMAScript 3rd Edition (ECMA-262)
The definition of 'The in Operator' in that specification.
Standard초기의 정의가 담겨 있습니다. JavaScript 1.4에 추가되었습니다.

Link to section브라우저 호환성

We're converting our compatibility data into a machine-readable JSON format. This compatibility table still uses the old format, because we haven't yet converted the data it contains. Find out how you can help!

기능ChromeFirefox (Gecko)Internet ExplorerOperaSafari
지원(Yes)(Yes)(Yes)(Yes)(Yes)

Link to section관련 문서



 for in vs for of 반복문



forEach반복문

forEach반복문은 오직 Array 객체에서만 사용가능한 메서드입니다.(ES6부터는 Map, Set 등에서도 지원됩니다) 배열의 요소들을 반복하여 작업을 수행할 수 있습니다. forEach구문의 인자로 callback 함수를 등록할 수 있고, 배열의 각 요소들이 반복될 때 이 callback 함수가 호출됩니다. callback 함수에서 배열 요소의 인덱스와 값에 접근할 수 있습니다.

var items = ['item1', 'item2', 'item3'];

items.forEach(function(item) {
    console.log(item);
});
// 출력 결과: item, item2, item3




for …in 반복문

for in 반복문은 객체의 속성들을 반복하여 작업을 수행할 수 있습니다. 모든 객체에서 사용이 가능합니다. for in 구문은 객체의 key 값에 접근할 수 있지만, value 값에 접근하는 방법은 제공하지 않습니다. 자바스크립트에서 객체 속성들은 내부적으로 사용하는 숨겨진 속성들을 가지고 있습니다. 그 중 하나가 [[Enumerable]]이며, for in 구문은 이 값이 true로 셋팅되어 속성들만 반복할 수 있습니다. 이러한 속성들을 열거형 속성이라고 부르며, 객체의 모든 내장 메서드를 비롯해 각종 내장 프로퍼티 같은 비열거형 속성은 반복되지 않습니다.

var obj = {
    a: 1, 
    b: 2, 
    c: 3
};

for (var prop in obj) {
    console.log(prop, obj[prop]); // a 1, b 2, c 3
}




for …of 반복문

for of 반복문은 ES6에 추가된 새로운 컬렉션 전용 반복 구문입니다. for of 구문을 사용하기 위해선 컬렉션 객체가 [Symbol.iterator] 속성을 가지고 있어야만 합니다(직접 명시 가능).

따라서, 객체 {}는 [Symbol.iterator] 속성을 가지고 있지않으므로 for of 구문을 사용할 수 없습니다.

var iterable = [10, 20, 30];

for (var value of iterable) {
  console.log(value); // 10, 20, 30
}

for in 반복문과 for of 반복문의 차이점

  • for in 반복문 : 객체의 모든 열거 가능한 속성에 대해 반복
  • for of 반복문 : [Symbol.iterator] 속성을 가지는 컬렉션 전용
Object.prototype.objCustom = function () {};
Array.prototype.arrCustom = function () {};

var iterable = [3, 5, 7];
iterable.foo = "hello";

for (var key in iterable) {
  console.log(key); // 0, 1, 2, "foo", "arrCustom", "objCustom"
}

for (var value of iterable) {
  console.log(value); // 3, 5, 7
}



출처: http://itstory.tk/entry/Javascript-for-in-vs-for-of-반복문 [덕's IT Story]

shift () 메소드는 배열에서 첫 번째 요소를 제거하고, 제거된 요소를 반환합니다. 이 메소드는 배열의 길이를 변하게 합니다.

Link to section문법(Syntax)

arr.shift()

Link to section반환 값

배열에서 제거된 요소를 반환합니다. 빈 배열의 경우 undefined 를 반환합니다.

Link to section설명

shift 메소드는 0번째 위치의 요소를 제거 하고 연이은 나머지 값들의 위치를 한칸 씩 앞으로 당깁니다. 그리고 제거된 값을 반환 합니다.  만약 배열의 length가 0이라면 undefined를 리턴 합니다.

shift 는 의도적인 일반형태로써; 이 메소드는 배열과 유사한 형태의 객체에서  호출 하거나 적용할 수 있습니다.  연속된 일련의 마지막 항목을 나타내는 길이 속성을 가지고 있지 않은 객체의 제로베이스 수치 속성에는 의미 있는 작동을 하지 않을 수 있습니다. (Objects which do not contain a length property reflecting the last in a series of consecutive, zero-based numerical properties may not behave in any meaningful manner.)

Link to section예시

Link to section배열에서 한 요소 제거하기

아래 코드는 myFish 라는 배열에서 첫번째 요소를 제거 하기 전과 후를 보여 줍니다.  그리고 제거된 요소도 보여줍니다.

var myFish = ['angel', 'clown', 'mandarin', 'surgeon'];

console.log('myFish before: ' + myFish);
// "제거전 myFish 배열: angel,clown,mandarin,surgeon"

var shifted = myFish.shift(); 

console.log('myFish after: ' + myFish); 
// "제거후 myFish 배열: clown,mandarin,surgeon" 

console.log('Removed this element: ' + shifted); 
// "제거된 배열 요소: angel"

 

Link to sectionwhile 문 안에서 shift() 사용하기

shift() 메서드는 while 문의 조건으로 사용되기도 합니다. 아래 코드에서는 while 문을 한번 돌 때 마다 배열의 다음 요소를 제거하고, 이는 빈 배열이 될 때까지 반복됩니다.

var names = ["Andrew", "Edward", "Paul", "Chris" ,"John"];

while( (i = names.shift()) !== undefined ) {
    console.log(i);
}
// Andrew, Edward, Paul, Chris, John


Node.jsでSocket.ioを使わずにWebSocket

この記事は最終更新日から1年以上が経過しています。

ちょっとWebSocketを使う話があったので試してみます。

考えてみるとNode.jsでリアルタイム通信をする時はSocket.ioMilkcocoaを使ってました。

調べるとそこまでサンプルっぽいサンプルが見当たらないのでメモしてみます。

websocket-node

npmなどで調べるとこのパッケージがメジャーっぽいですね。
https://github.com/theturtle32/WebSocket-Node

$ npm i websocket

でインストールできます。

試してみる

サーバー側

公式サンプルそのままです。

app.js
#!/usr/bin/env node
var WebSocketServer = require('websocket').server;
var http = require('http');

var server = http.createServer(function(request, response) {
    console.log((new Date()) + ' Received request for ' + request.url);
    response.writeHead(404);
    response.end();
});
server.listen(8080, function() {
    console.log((new Date()) + ' Server is listening on port 8080');
});

wsServer = new WebSocketServer({
    httpServer: server,
    // You should not use autoAcceptConnections for production
    // applications, as it defeats all standard cross-origin protection
    // facilities built into the protocol and the browser.  You should
    // *always* verify the connection's origin and decide whether or not
    // to accept it.
    autoAcceptConnections: false
});

function originIsAllowed(origin) {
  // put logic here to detect whether the specified origin is allowed.
  return true;
}

wsServer.on('request', function(request) {
    if (!originIsAllowed(request.origin)) {
      // Make sure we only accept requests from an allowed origin
      request.reject();
      console.log((new Date()) + ' Connection from origin ' + request.origin + ' rejected.');
      return;
    }

    var connection = request.accept('echo-protocol', request.origin);
    console.log((new Date()) + ' Connection accepted.');
    connection.on('message', function(message) {
        if (message.type === 'utf8') {
            console.log('Received Message: ' + message.utf8Data);
            connection.sendUTF(message.utf8Data);
        }
        else if (message.type === 'binary') {
            console.log('Received Binary Message of ' + message.binaryData.length + ' bytes');
            connection.sendBytes(message.binaryData);
        }
    });
    connection.on('close', function(reasonCode, description) {
        console.log((new Date()) + ' Peer ' + connection.remoteAddress + ' disconnected.');
    });
});

クライアント側

client-node.js
#!/usr/bin/env node
var WebSocketClient = require('websocket').client;

var client = new WebSocketClient();

client.on('connectFailed', function(error) {
    console.log('Connect Error: ' + error.toString());
});

client.on('connect', function(connection) {
    console.log('WebSocket Client Connected');
    connection.on('error', function(error) {
        console.log("Connection Error: " + error.toString());
    });
    connection.on('close', function() {
        console.log('echo-protocol Connection Closed');
    });
    connection.on('message', function(message) {
        if (message.type === 'utf8') {
            console.log("Received: '" + message.utf8Data + "'");
        }
    });

    function sendNumber() {
        if (connection.connected) {
            var number = Math.round(Math.random() * 0xFFFFFF);
            connection.sendUTF(number.toString());
            setTimeout(sendNumber, 1000);
        }
    }
    sendNumber();
});

client.connect('ws://localhost:8080/', 'echo-protocol');

実行

右がサーバー側、左がクライアント側

どっちもNode.jsで実行です。

ブラウザ側も書いてみる

以下を参考

WebSocket の導入: ウェブにソケットを実装する

var ws = new WebSocket('ws://localhost:8080/',['echo-protocol','soap', 'xmpp']);

ws.onopen = function() {//WS接続確立
   ws.send('hello');
 };

// Log errors
ws.onerror = function (error) {
  console.log('WebSocket Error ' + error);
};

// Log messages from the server
ws.onmessage = function (e) {
  console.log('Server: ' + e.data);
};

ちなみにecho-protocolを書いてないとエラーでNode.js側が死んじゃいましたね

WebSocketRequest.js:289
            throw new Error('Specified protocol was not requested by the client.');
            ^

Error: Specified protocol was not requested by the client.
    at WebSocketRequest.accept (/Users/sugawara_nobisuke/n0bisuke/lig/devrel/#18.NTT_AT/src/wssample/node_modules/websocket/lib/WebSocketRequest.js:289:19)
    at WebSocketServer.<anonymous> (/Users/sugawara_nobisuke/n0bisuke/lig/devrel/#18.NTT_AT/src/wssample/app.js:37:30)
    at emitOne (events.js:90:13)
    at WebSocketServer.emit (events.js:182:7)
    at WebSocketServer.handleUpgrade (/Users/sugawara_nobisuke/n0bisuke/lig/devrel/#18.NTT_AT/src/wssample/node_modules/websocket/lib/WebSocketServer.js:213:14)
    at emitThree (events.js:110:13)
    at Server.emit (events.js:188:7)
    at onParserExecuteCommon (_http_server.js:400:14)
    at HTTPParser.onParserExecute (_http_server.js:368:5)

パッケージ側でうまいことやってくれたらいいんですけど、この辺はsocket.ioさすがです。

まとめ

Websocketのブラウザ対応状況って久々にみたらけっこう対応してるんですね。

古いAndroidやIEを使わなければ素のWebsocketでいいんじゃないかとも思ったり。でも少し試してみて、Socket.ioの使い勝手はかなりいいと改めて思いました。

簡単なサンプルだったらMilkcocoa使おう。


wsモジュール

wsモジュールは、WebSocketプロトコル(RFC-6455に準拠する)の実装ライブラリです。
socket.ioのように多機能ではありませんが、シンプルな作りで非常に高速に動作するのが特徴です。
※socket.ioも内部でwsを使用しています

環境構築方法

今回使用した動作環境は以下のとおりです。

  • OS : MacOS X 10.7.4
  • Node.js : v0.8.15
  • npm : 1.1.66

適当なディレクトリを作成し、そこでnpmを使用して必要モジュールをインストールします。
今回はexpressも使用するので、いっしょにインストールしましょう。

1
2
3
% mkdir ws
% cd ws
% npm install ws express

wsモジュールを使ったチャット- server:nodejs & client:html

ありふれた例ですが、wsモジュールとexpressモジュールを使用してシンプルなチャットをつくってみましょう。
まずはサーバ側のモジュールをapp.jsという名前で作成します。

1
2
3
4
5
6
7
8
9
//app.js
var WebSocketServer = require('ws').Server
    , http = require('http')
    , express = require('express')
    , app = express();
 
app.use(express.static(__dirname + '/'));
var server = http.createServer(app);
var wss = new WebSocketServer({server:server});
cs

expressモジュールからHttpサーバを作成し、wsモジュールのServerの引数にしてWebSocket用サーバオブジェクトを作成します。

WebSocketの接続を保存しておく変数、connectionsを用意しておきます。
WebSocketServerのonメソッドを使用し、接続時、切断時、メッセージ受信時の処理を記述します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var connections = [];
 
wss.on('connection'function (ws,req) { //wss.on('connection'...)은 웹소켓을 연결하는 ws와 클라이언트 쪽의 데이터를 담고 있는 req를 콜백으로 반환한다.
    connections.push(ws);
    ws.on('close'function () {
        connections = connections.filter(function (conn, i) {
            return (conn === ws) ? false : true;
        });
    });
    ws.on('message'function (message) {
        console.log('message:', message);
        broadcast(JSON.stringify(message));
    });
});
cs

なお、接続時にはconnections変数へ接続オブジェクトを保存し、切断時にはconnections変数から削除します。
broadcastはこの後定義する関数です。

wsモジュールには、Websocketで接続しているユーザー全員にブロードキャストする機能はありません。
接続時に保存している配列を精査して全員にsendメソッドでメッセージを送ります。

1
2
3
4
5
6
7
8
function broadcast(message) {
    connections.forEach(function (con, i) {
        con.send(message);
    });
};
 
server.listen(3000);
 
cs

最後に3000番ポートでサーバを起動しています。

app.js全文です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//app.js
var WebSocketServer = require('ws').Server
    , http = require('http')
    , express = require('express')
    , app = express();
 
app.use(express.static(__dirname + '/'));
var server = http.createServer(app);
var wss = new WebSocketServer({server:server});
 
//Websocket接続を保存しておく
var connections = [];
 
//接続時
wss.on('connection'function (ws) {
    //配列にWebSocket接続を保存
    connections.push(ws);
    //切断時
    ws.on('close'function () {
        connections = connections.filter(function (conn, i) {
            return (conn === ws) ? false : true;
        });
    });
    //メッセージ送信時
    ws.on('message'function (message) {
        console.log('message:', message);
        broadcast(JSON.stringify(message));
    });
});
 
//ブロードキャストを行う
function broadcast(message) {
    connections.forEach(function (con, i) {
        con.send(message);
    });
};
 
server.listen(3000);
cs

次はindex.htmlファイルです。
sendボタンを押すと、テキストフィールドの文字をブロードキャストします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<!DOCTYPE html>
<html>
<head>
    <script>
        function send() {
            ws.send(document.getElementById('msg').value);
        }
 
        var host = window.document.location.host.replace(/:.*/'');
        var ws = new WebSocket('ws://' + host + ':3000');
        ws.onmessage = function (event) {
            document.getElementById("messages").innerHTML += "<div>" + JSON.parse(event.data) + "</div>";
            console.log(JSON.parse(event.data));
        };
    </script>
</head>
<body>
<strong>ws chat</strong><br>
<input type="text" id="msg"/>
<input type="button" value="send" onclick="send()"/>
<br>
<hr>
<div id="messages"/>
</body>
</html>
cs


サーバを起動し、ブラウザでhttp://localhost:3000/index.htmlを複数ブラウザで開いた後、文字を送信してみてください。

チャットができてますね。

1
% node app.js


5分で動かせるWebsocket

https://qiita.com/okumurakengo/items/a8ccea065f5659d1a1de


wsモジュールを使ったチャット- server:nodejs & client:nodejs





server-side.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
let WebSocketServer = require('ws').Server
    , http = require('http')
    , express = require('express')
    , app = express();
 
 
let server = http.createServer(app);
let wss = new WebSocketServer({server:server});
 
 
wss.on('connection'function(ws) {
 
    // When connection is success
    console.dir('connection on');
    console.log('Data sending from server to client started');
    _sendDataTimeGap(ws);
 
    // Processing data from client socket
    ws.on('message'function(message) {
        console.log(' message from client : ', message);
    });
    ws.on('error',function(error){
        console.log('error from client : ',error);
    });
    ws.on('close',function(code,reason){
        console.log('close from client : ',`code:${code}`,` reason:${reason}`);
    });
 
    // ws.close();
});
 
server.listen(3000);
 
 
/**
 * Function
 * @params {filePath:string} path of uploaded image file
 * @params {langs:array} array of langs from front
 */
 
function _sendDataTimeGap(ws){
    setTimeout(function() {
        ws.send('Server said Hello, Client' + new Date());
    }, 2000);
}
cs


client-side

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const fs = require('fs');
const WebSocket = require('ws');
 
var ws = new WebSocket('ws://localhost:3000');
 
// When connection is success
ws.on('open'function() {
    console.log('socket connected');
    console.log('Data sending from client to server started');
    _sendDataTimeGap(ws);
});
 
// Processing data from server socket
ws.on('close'function close(e) {
    console.log('close from server : ',e);
});
 
ws.on('message'function message(e) {
    console.log('message from server : ', e);
});
 
ws.on('error'function error(e) {
    console.log('error from server :',e);
});
 
 
/**
 * Function
 * @params {filePath:string} path of uploaded image file
 * @params {langs:array} array of langs from front
 */
 
function _sendDataTimeGap(ws){
    setTimeout(function() {
        ws.send('Client said Hello, Server' + new Date());
    }, 2000);
}
cs




https://www.npmjs.com/package/ws

まとめ

node.jsでWebSocketを使いたい場合、ほとんどのケースでsocket.ioを使用しているのではないでしょうか。
しかし、socket.ioほどの機能は必要なかったり(ブロードキャスト機能もありませんが・・・)、
wsをベースにしてライブラリを作成する場合には直接使用することもあるかもしれません。



参考サイトなど

+ Recent posts