MkItYs

MkItYs > ネットワークとサーバを作る > 

images

ウェブのストリームサーバを作る:SSE, Node.js (http) , Apache (NPH) , JavaScript (EventSource) , Python (PycURL, Requests) , curl

images

ウェブでストリームサーバを作るのは、とてもかんたんです。

概要


ウェブによるストリームサービスも、最近けっこう話題になりますね。[※1]

ウェブサーバとクライアントの継続したやり取りには、いろいろなやり方がありますーーとはいえ、たんにサーバ側から情報を継続して取得するだけなら、ここで取り上げる「サーバ送信イベント」で十分だったります。[※2][※3]


※1
ChatGPT が採用したからでしょうかーーなおTwitter も以前からストリームでAPIを提供していますが、ここで取り上げるしくみ(SSE )とは違う、独自のやり方を採用しています。
※2
ほかには、セッションIDを管理して継続したやり取りにみせかけるやり方(Ajax)、じっさいにコネクションを張りつつサーバもクライアントも情報を送信できるやり方(WebSocket )、などがあります。
※3
この「サーバ送信イベント」(SSE )は、ウェブの標準化団体W3Cによる推奨規格です。なお規格自体は2012年に勧告されていますが、当時の仕様はすでに非推奨になっています:
https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events
https://www.w3.org/TR/2012/WD-eventsource-20120426/

作成:サーバ(JavaScript (Node.js (http)))


JSのウェブライブラリ(Node.js (http))なら、たとえば次のように数行のスクリプトを書くだけで、ストリームサービスを実装できます:[※1][※2]

a.js
const web = require('http');
const srv = web.createServer(function (req, res) {
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache'
  });
  setInterval(function() {
    res.write('data: 1\n\n');
   }, 1000);
});
srv.listen(80);

※1
あくまで実験用としてですがーーじっさいの運用では、イベントや異常処理を組み込む必要があります。
※2
クライアントがオリジン間リソース共有(CORS)を要求するときは、ヘッダ部分に次を追加しておきます(最近のウェブブラウザは、セキュリティ上この要求を既定にしています)ーー実験するだけなら、URL部分は「* 」でもかまいせんが……:
Access-Control-Allow-Origin: <url_server>

作成:サーバ(Apache, Python)


ウェブサーバ(Apache)の場合でも、次のように数行のCGIスクリプトを書くだけです:[※1][※2]

nph-a.cgi
#!/usr/bin/python3 -u

import time

print('HTTP/1.1 200 OK')
print('Content-type: text/event-stream')
print('Cache-Control: no-cache')
print('')

while True:
  print('data: 1')
  print('')
  time.sleep(1)

※1
Apacheの場合、スクリプトは「nph-」で始まる名前にします(NPH: non-parsed headers )ーーこうすることで、最初のステータス行をふくめサーバ側が返すのでなく、(すべての応答を)スクリプトに任せるようになります。
※2
Pythonの場合、引数に「-u」をつけることで、標準出力(をふくめすべての入出力)のバッファリングを無効にします。

作成:クライアント:フロントエンド(JavaScript (EventSource))


サーバが出力するストリームを、(ChatGPT のように)ウェブブラウザ上でつなげていくなら、次のようなスクリプトになります:

b.html
<html>
  <body>
    <div id="001"/>
    <script>

      const div = document.getElementById("001");
      const cnn = new EventSource("http://<server>:<port>/<path>");
      cnn.onmessage = function(eve) {
        div.appendChild(document.createTextNode(eve.data));
      }

    </script>
  </body>
</html>
11...

作成:クライアント:バックエンド(Python (PycURL))


バックエンド側で処理するなら、次のようなスクリプトになります:[※1][※2]

b.py
#!/usr/bin/python3

import pycurl

def eve(dat):
  print(dat)

cnn = pycurl.Curl()
cnn.setopt(pycurl.URL, 'http://<server>:<port>/<path>')
cnn.setopt(pycurl.WRITEFUNCTION, eve)
cnn.perform()
$ b.py
b'data: 1\n\n'
b'data: 1\n\n'
...

※1
SSLで自己証明書を使って実験するときは、次のオプションを付けます:
setopt(pycurl.SSL_VERIFYPEER, 0)
setopt(pycurl.SSL_VERIFYHOST, 0)
※2
コンテナ経由で実行するときは、出力をバッファリングしない処理も必要になります:
print(dat, flush=True)

作成:クライアント:バックエンド(Python (Requests))


モジュール「Requests」を使うこともできます(処理効率は落ちますが):

b.py
#!/usr/bin/python3

import requests

with requests.get('http://<server>:<port>/<path>', stream=True) as cnn:
  for dat in cnn.iter_lines():
    if dat:
      print(dat)

作成:クライアント:バックエンド(sh, curl)


コマンドで取得していくなら、1行で済みます:[※1][※2]

$ curl -N http://<server>:<port>/<path>
data: 1

data: 1

...

※1
引数「-N」は、バッファリングを抑制するオプションです。
※2
SSLで自己証明書を使って実験するときは、引数「--insecure」を付けます。

作成:サーバ(Apache, Python, JSON)


ちなみに出力をJSON形式にする場合は、次のようになります(ここでは、項目にUNIXタイムを出力しています):

nph-c.cgi
#!/usr/bin/python3 -u

import time

print('HTTP/1.1 200 OK')
print('Content-type: text/event-stream')
print('Cache-Control: no-cache')
print('')

while True:
  print('data: {"date":' + str(time.time()) + '}')
  print('')
  time.sleep(1)

作成:クライアント:バックエンド(Python (PycURL) , JSON)


クライアント側では、次のように処理します:

d.py
#!/usr/bin/python3

import pycurl
import json
import re

def eve(dat):
  res = re.sub('^data: ', '', dat.decode('utf-8'))
  jsn = json.loads(res)
  print(jsn['date'])

cnn = pycurl.Curl()
cnn.setopt(pycurl.URL, 'http://<server>:<port>/<path>')
cnn.setopt(pycurl.WRITEFUNCTION, eve)
cnn.perform()
$ d.py
1688384110.7515016
1688384111.7526295
...