ウェブでストリームサーバを作るのは、とてもかんたんです。
概要
ウェブによるストリームサービスも、最近けっこう話題になりますね。[※1]
ウェブサーバとクライアントの継続したやり取りには、いろいろなやり方がありますーーとはいえ、たんにサーバ側から情報を継続して取得するだけなら、ここで取り上げる「サーバ送信イベント」で十分だったります。[※2][※3]
- ※1
- ChatGPT が採用したからでしょうかーーなおTwitter も以前からストリームでAPIを提供していますが、ここで取り上げるしくみ(SSE )とは違う、独自のやり方を採用しています。
- ※2
- ほかには、セッションIDを管理して継続したやり取りにみせかけるやり方(Ajax)、じっさいにコネクションを張りつつサーバもクライアントも情報を送信できるやり方(WebSocket )、などがあります。
- ※3
- この「サーバ送信イベント」(SSE )は、ウェブの標準化団体W3Cによる推奨規格です。なお規格自体は2012年に勧告されていますが、当時の仕様はすでに非推奨になっています:
- ・
- https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events
- ・
- https://www.w3.org/TR/2012/WD-eventsource-20120426/
作成:サーバ(JavaScript (Node.js (http)))
JSのウェブライブラリ(Node.js (http))なら、たとえば次のように数行のスクリプトを書くだけで、ストリームサービスを実装できます:[※1][※2]
- ◯
- a.js
const web = require('http'); const srv = web.createServer(function (req, res) { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache' }); setInterval(function() { res.write('data: 1\n\n'); }, 1000); }); srv.listen(80);
- ※1
- あくまで実験用としてですがーーじっさいの運用では、イベントや異常処理を組み込む必要があります。
- ※2
- クライアントがオリジン間リソース共有(CORS)を要求するときは、ヘッダ部分に次を追加しておきます(最近のウェブブラウザは、セキュリティ上この要求を既定にしています)ーー実験するだけなら、URL部分は「* 」でもかまいせんが……:
- ・
- Access-Control-Allow-Origin: <url_server>
作成:サーバ(Apache, Python)
ウェブサーバ(Apache)の場合でも、次のように数行のCGIスクリプトを書くだけです:[※1][※2]
- ◯
- nph-a.cgi
#!/usr/bin/python3 -u import time print('HTTP/1.1 200 OK') print('Content-type: text/event-stream') print('Cache-Control: no-cache') print('') while True: print('data: 1') print('') time.sleep(1)
- ※1
- Apacheの場合、スクリプトは「nph-」で始まる名前にします(NPH: non-parsed headers )ーーこうすることで、最初のステータス行をふくめサーバ側が返すのでなく、(すべての応答を)スクリプトに任せるようになります。
- ※2
- Pythonの場合、引数に「-u」をつけることで、標準出力(をふくめすべての入出力)のバッファリングを無効にします。
作成:クライアント:フロントエンド(JavaScript (EventSource))
サーバが出力するストリームを、(ChatGPT のように)ウェブブラウザ上でつなげていくなら、次のようなスクリプトになります:
- ◯
- b.html
<html> <body> <div id="001"/> <script> const div = document.getElementById("001"); const cnn = new EventSource("http://<server>:<port>/<path>"); cnn.onmessage = function(eve) { div.appendChild(document.createTextNode(eve.data)); } </script> </body> </html>
11...
作成:クライアント:バックエンド(Python (PycURL))
バックエンド側で処理するなら、次のようなスクリプトになります:[※1][※2]
- ◯
- b.py
#!/usr/bin/python3 import pycurl def eve(dat): print(dat) cnn = pycurl.Curl() cnn.setopt(pycurl.URL, 'http://<server>:<port>/<path>') cnn.setopt(pycurl.WRITEFUNCTION, eve) cnn.perform()
$ b.py
b'data: 1\n\n' b'data: 1\n\n' ...
- ※1
- SSLで自己証明書を使って実験するときは、次のオプションを付けます:
- ・
- setopt(pycurl.SSL_VERIFYPEER, 0)
- ・
- setopt(pycurl.SSL_VERIFYHOST, 0)
- ※2
- コンテナ経由で実行するときは、出力をバッファリングしない処理も必要になります:
- ・
- print(dat, flush=True)
作成:クライアント:バックエンド(Python (Requests))
モジュール「Requests」を使うこともできます(処理効率は落ちますが):
- ◯
- b.py
#!/usr/bin/python3 import requests with requests.get('http://<server>:<port>/<path>', stream=True) as cnn: for dat in cnn.iter_lines(): if dat: print(dat)
作成:クライアント:バックエンド(sh, curl)
コマンドで取得していくなら、1行で済みます:[※1][※2]
$ curl -N http://<server>:<port>/<path>
data: 1 data: 1 ...
- ※1
- 引数「-N」は、バッファリングを抑制するオプションです。
- ※2
- SSLで自己証明書を使って実験するときは、引数「--insecure」を付けます。
作成:サーバ(Apache, Python, JSON)
ちなみに出力をJSON形式にする場合は、次のようになります(ここでは、項目にUNIXタイムを出力しています):
- ◯
- nph-c.cgi
#!/usr/bin/python3 -u import time print('HTTP/1.1 200 OK') print('Content-type: text/event-stream') print('Cache-Control: no-cache') print('') while True: print('data: {"date":' + str(time.time()) + '}') print('') time.sleep(1)
作成:クライアント:バックエンド(Python (PycURL) , JSON)
クライアント側では、次のように処理します:
- ◯
- d.py
#!/usr/bin/python3 import pycurl import json import re def eve(dat): res = re.sub('^data: ', '', dat.decode('utf-8')) jsn = json.loads(res) print(jsn['date']) cnn = pycurl.Curl() cnn.setopt(pycurl.URL, 'http://<server>:<port>/<path>') cnn.setopt(pycurl.WRITEFUNCTION, eve) cnn.perform()
$ d.py
1688384110.7515016 1688384111.7526295 ...