
ウェブでストリームサーバを作るのは、とてもかんたんです。
概要
ウェブによるストリームサービスも、最近けっこう話題になりますね。[※1]
ウェブサーバとクライアントの継続したやり取りには、いろいろなやり方がありますーーとはいえ、たんにサーバ側から情報を継続して取得するだけなら、ここで取り上げる「サーバ送信イベント」で十分だったります。[※2][※3]
- ※1
- ChatGPT が採用したからでしょうかーーなおTwitter も以前からストリームでAPIを提供していますが、ここで取り上げるしくみ(SSE )とは違う、独自のやり方を採用しています。
- ※2
- ほかには、セッションIDを管理して継続したやり取りにみせかけるやり方(Ajax)、じっさいにコネクションを張りつつサーバもクライアントも情報を送信できるやり方(WebSocket )、などがあります。
- ※3
- この「サーバ送信イベント」(SSE )は、ウェブの標準化団体W3Cによる推奨規格です。なお規格自体は2012年に勧告されていますが、当時の仕様はすでに非推奨になっています:
- ・
- https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events
- ・
- https://www.w3.org/TR/2012/WD-eventsource-20120426/
作成:サーバ(JavaScript (Node.js (http)))
JSのウェブライブラリ(Node.js (http))なら、たとえば次のように数行のスクリプトを書くだけで、ストリームサービスを実装できます:[※1][※2]
- ◯
- a.js
const web = require('http');
const srv = web.createServer(function (req, res) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
});
setInterval(function() {
res.write('data: 1\n\n');
}, 1000);
});
srv.listen(80);
- ※1
- あくまで実験用としてですがーーじっさいの運用では、イベントや異常処理を組み込む必要があります。
- ※2
- クライアントがオリジン間リソース共有(CORS)を要求するときは、ヘッダ部分に次を追加しておきます(最近のウェブブラウザは、セキュリティ上この要求を既定にしています)ーー実験するだけなら、URL部分は「* 」でもかまいせんが……:
- ・
- Access-Control-Allow-Origin: <url_server>
作成:サーバ(Apache, Python)
ウェブサーバ(Apache)の場合でも、次のように数行のCGIスクリプトを書くだけです:[※1][※2]
- ◯
- nph-a.cgi
#!/usr/bin/python3 -u
import time
print('HTTP/1.1 200 OK')
print('Content-type: text/event-stream')
print('Cache-Control: no-cache')
print('')
while True:
print('data: 1')
print('')
time.sleep(1)
- ※1
- Apacheの場合、スクリプトは「nph-」で始まる名前にします(NPH: non-parsed headers )ーーこうすることで、最初のステータス行をふくめサーバ側が返すのでなく、(すべての応答を)スクリプトに任せるようになります。
- ※2
- Pythonの場合、引数に「-u」をつけることで、標準出力(をふくめすべての入出力)のバッファリングを無効にします。
作成:クライアント:フロントエンド(JavaScript (EventSource))
サーバが出力するストリームを、(ChatGPT のように)ウェブブラウザ上でつなげていくなら、次のようなスクリプトになります:
- ◯
- b.html
<html>
<body>
<div id="001"/>
<script>
const div = document.getElementById("001");
const cnn = new EventSource("http://<server>:<port>/<path>");
cnn.onmessage = function(eve) {
div.appendChild(document.createTextNode(eve.data));
}
</script>
</body>
</html>
11...
作成:クライアント:バックエンド(Python (PycURL))
バックエンド側で処理するなら、次のようなスクリプトになります:[※1][※2]
- ◯
- b.py
#!/usr/bin/python3 import pycurl def eve(dat): print(dat) cnn = pycurl.Curl() cnn.setopt(pycurl.URL, 'http://<server>:<port>/<path>') cnn.setopt(pycurl.WRITEFUNCTION, eve) cnn.perform()
$ b.py
b'data: 1\n\n' b'data: 1\n\n' ...
- ※1
- SSLで自己証明書を使って実験するときは、次のオプションを付けます:
- ・
- setopt(pycurl.SSL_VERIFYPEER, 0)
- ・
- setopt(pycurl.SSL_VERIFYHOST, 0)
- ※2
- コンテナ経由で実行するときは、出力をバッファリングしない処理も必要になります:
- ・
- print(dat, flush=True)
作成:クライアント:バックエンド(Python (Requests))
モジュール「Requests」を使うこともできます(処理効率は落ちますが):
- ◯
- b.py
#!/usr/bin/python3
import requests
with requests.get('http://<server>:<port>/<path>', stream=True) as cnn:
for dat in cnn.iter_lines():
if dat:
print(dat)
作成:クライアント:バックエンド(sh, curl)
コマンドで取得していくなら、1行で済みます:[※1][※2]
$ curl -N http://<server>:<port>/<path>
data: 1 data: 1 ...
- ※1
- 引数「-N」は、バッファリングを抑制するオプションです。
- ※2
- SSLで自己証明書を使って実験するときは、引数「--insecure」を付けます。
作成:サーバ(Apache, Python, JSON)
ちなみに出力をJSON形式にする場合は、次のようになります(ここでは、項目にUNIXタイムを出力しています):
- ◯
- nph-c.cgi
#!/usr/bin/python3 -u
import time
print('HTTP/1.1 200 OK')
print('Content-type: text/event-stream')
print('Cache-Control: no-cache')
print('')
while True:
print('data: {"date":' + str(time.time()) + '}')
print('')
time.sleep(1)
作成:クライアント:バックエンド(Python (PycURL) , JSON)
クライアント側では、次のように処理します:
- ◯
- d.py
#!/usr/bin/python3
import pycurl
import json
import re
def eve(dat):
res = re.sub('^data: ', '', dat.decode('utf-8'))
jsn = json.loads(res)
print(jsn['date'])
cnn = pycurl.Curl()
cnn.setopt(pycurl.URL, 'http://<server>:<port>/<path>')
cnn.setopt(pycurl.WRITEFUNCTION, eve)
cnn.perform()
$ d.py
1688384110.7515016 1688384111.7526295 ...
