[GAE] [Python] Google App EngineのChannel APIを試してみた

[GAE] [Python] Google App EngineのChannel APIを試してみた

Google App EngineのChannel APIを試してみました。

プッシュ型情報配信について

プッシュ型情報配信について何かいい方法はないかと考えたとき、まず始めに、JavaのSocket通信を使えばいいかなと思って、無料でJavaが使えそうなサーバーを探したとき、Google App Engineを思いついた。でも、Google App EngineではSocketクラスが使えないみたい。
その代わり、Channel APIというものでプッシュ型情報配信の実装が可能になるらしい。

ちなみに、Google App Engine の登録からHello Worldまでの実装はこのサイトが参考になります。

Google App Engineを使って無料でサイトを立ち上げる方法

あと、Google App Engine は無料で使える制限がかなりシビアなので、実際にお仕事で使う際は注意が必要です。

Google App Engine – Pricing and Features

サーバー側の処理 (Python)

早速、Channel APIを使ってみたけど、何故か参考になりそうなコードはPythonが多かったので、とりあえずPythonで試してみた。

クライアントIDとトークンを作成してシステムにキャッシュする

サーバー側とクライアント側で対になるclient_idとtokenを作成します。

# Client IDの作成。64文字以内の一意なASCII文字列であればなんでもいい
client_id = str(uuid.uuid4())
# Client IDからTokenを作成する。TokenはClient IDと一対一
token = channel.create_channel(client_id)

現在アクセスされているユーザー一覧をオブジェクトとしてシステムにキャッシュするために、Google App Engine で用意されている memcache に保存します。
その際、オブジェクトとしてユーザークラスを作成しています。

# 同時接続しているユーザーのClient ID一覧を取得
users = memcache.get('users')
if not users:
    users = {}

# ユーザークラスを作成する
user = User(client_id, token)

# 新しいClient IDを追加する
users[client_id] = user
memcache.set(USER_KEY, users)

ユーザークラス

class User:
    def __init__(self, client_id, token):
        self.client_id = client_id
        self.token = token
        self.datetime = datetime.datetime.now()
        self.ip = ''

ポストされたときに同時接続しているユーザーに通知する

クライアントからメッセージがPOSTされたら、クライアントIDとメッセージの値を取得します。

client_id = str(self.request.POST['client_id'])
value = self.request.POST['value']
try:
    # UTF-8 へのエンコード追加
    value = value.encode('utf-8');
except UnicodeDecodeError:
    pass

配信する情報をJSONコードにして、memcache に保存されているユーザーへメッセージを通知します。

response = '{"message":{"value":"' + value + '","client_id":"' + client_id + '"}}'

# 同時接続中ユーザーのClient ID一覧を取得
users = memcache.get(USER_KEY)
if client_id in users:
    for id in users:
        # 一人ずつ更新を通知する
        channel.send_message(id, response)

クロスドメイン通信を許可する

Google App Engineを通信用として、自分のサーバーからメッセージをアクセスしたいときは、クロスドメイン通信を許可する必要があります。

Access-Control-Allow-Origin
許可するアクセス元のドメインを指定。もしくは*でワイルドカードを指定。
Access-Control-Allow-Headers
データ受け取り側のサーバーのヘッダを指定。とりあえず*でワイルドカードを指定しても動くみたいです。
Access-Control-Allow-Methods
アクセスするMethod(GET、POST、OPTIONS)の指定。
複数指定する場合はカンマ区切りで指定。例:「GET, POST, OPTIONS」
self.response.headers['Access-Control-Allow-Origin'] = '*'
self.response.headers['Access-Control-Allow-Headers'] = '*'
self.response.headers['Access-Control-Allow-Methods'] = 'GET'

Google App Engineでクロスドメイン通信

Channel APIで切断の検知を受け取る

Channel APIで切断の検知を受け取る事ができます。ブラウザを閉じる等で切断されたときは「/_ah/channel/disconnected/」が呼ばれます。また、接続されたときは「/_ah/channel/connected/」が呼ばれます。

webapp.WSGIApplication にハンドラを指定することで、イベントを受け取ります。

# Channel APIの接続
class ChannelConnectHandler(webapp.RequestHandler):
    def post(self):
        pass

# Channel APIの切断
class ChannelDisconnectHandler(webapp.RequestHandler):
    def post(self):
        client_id = self.request.get('from')

        # 切断されたユーザーを取り除く
        users = memcache.get(USER_KEY)
        users = dict([(id, last) for id, last in users.items() if last.client_id != client_id])
        memcache.set('users', users)

def main():
    application = webapp.WSGIApplication([('/', MainHandler),('/login', LoginHandler),('/post', PostHandler),('/_ah/channel/connected/', ChannelConnectHandler),('/_ah/channel/disconnected/', ChannelDisconnectHandler)],debug=True)
    util.run_wsgi_app(application)

Python アプリケーションの設定

Channel APIを使うために、app.yaml に inbound_services を設定します。

app.yaml

inbound_services:
- channel_presence

クライアント側の処理 (JavaScript)

Channel API接続状況を受け取るには、Googleが用意したJSクラスをインポートする必要があります。

<script type="text/javascript" src="/_ah/channel/jsapi"></script>

でもこれ、ローカルだと上手くいくんだけど本番などなんかおかしい。
あと、別ドメイン(絶対パスで指定)からだと動かなかったりするので、とりあえず本番ではこっち↓を使ってる。

<script type="text/javascript" src="https://talkgadget.google.com/talkgadget/channel.js"></script>

ソケット通信の初期設定

まず最初に、loginイベントを呼び出してログイン状態にします。

document.loginForm.onsubmit=login;

/**
 * ソケット通信のログイン
 */
function login(event){
    var url='http://{アプリケーション名}.appspot.com/login';
    var http=new XMLHttpRequest();
    http.open('GET',url,true);
    http.onreadystatechange=function(){
        //4で受信完了
        if (http.readyState==4){
            try{
                var json=eval('('+http.responseText+')');
                token=String(json.token);
                client_id=String(json.client_id);
                initialize();
            }catch(err){
                console.error(err);
                console.log(http.responseText);
            }
        }
    }
    http.send();
    return false;
}

ログインが成功したら、受け取ったトークンからchannelオブジェクトを生成して、ソケット通信の準備をします。

channelオブジェクトには、それぞれ「onopen」「onmessage」「onerror」「onclose」のイベントが用意されていて、サーバーからのリクエストを受信します。
別のクライアントからメッセージがあったときは「onmessage」が呼ばれます。

/**
 * ソケット通信の準備をする
 */
function initialize(){
    var channel=new goog.appengine.Channel(token);
    var socket=channel.open({
        onopen:function(){
            //ソケットopen完了時(受信可となったタイミング)にコールされる処理
            console.log('onopen');
        },
        onmessage:receive,
        onerror:function(error){
            //エラー時に発生するイベント
            console.error('onerror');
        },
        onclose:function(){
            //接続が終了したときのイベント
            console.log('onclose');
        }
    });
}

/**
 * 新しいメッセージを受け取ったときに発生するイベント
 * @param    event
 */
function receive(event){
    var json=null;
    try{
        json=eval('('+event.data+')');
    }catch(err){
        console.error(err);
        console.log(event.data);
    }
    if(json!=null){
        if(json.message){
            var msg=json.message;

            //値を取得
            var value=String(msg.value);

            //出力
            var txt=document.getElementById("output");
            txt.innerHTML=(txt.innerHTML.length>0)?txt.innerHTML+"n"+value:value;
            txt.scrollTop=txt.scrollHeight;
        }
    }
}

メッセージを送信する

メッセージを送信するときは、postイベントを呼び出してログイン時に受け取ったクライアントIDとメッセージの内容を送信します。

document.sendForm.onsubmit=send;

/**
 * 新しいメッセージを送信する
 * @param    event
 */
function send(event){
    var url='http://{アプリケーション名}.appspot.com/post';
    var value=document.getElementById('message').value;
    var params='value='+encodeURIComponent(value)+'&client_id='+client_id;
    var path=url+'?time='+(new Date().getTime());
    var http=new XMLHttpRequest();
    http.open('POST',path,true);
    //Tokenの受信時に呼ばれる関数
    http.onreadystatechange=function(){
        //4で受信完了
        if (http.readyState==4){
        }
    }
    http.setRequestHeader("content-type","application/x-www-form-urlencoded;charset=UTF-8");
    http.send(params);
    return false;
}

サンプル

Pythonコード

上記の説明のものも踏まえて、サンプルコードを公開しました。そのままデプロイすればGoogle App Engine上動くはず。

main.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2012 Hiroaki Komatsu
#
import uuid
import datetime
import os
from django.utils import simplejson
from google.appengine.api import channel, memcache
from google.appengine.ext import webapp
from google.appengine.ext.webapp import util, template

USER_KEY = 'users'
TIME_OUT = 600

# 開発用サーバかどうかを判定する
def is_dev():
    return os.environ.get('SERVER_SOFTWARE', '').startswith('Development')

# タイムアウトしたユーザーを取り除く
def timeout_check(users):
    now = datetime.datetime.now()
    users = dict([(id, last) for id, last in users.items() if (now - last.datetime) < datetime.timedelta(seconds=TIME_OUT)])
    return users

# 同時接続しているユーザー一覧を取得
def get_users_list(users):
    list = []
    for id in users:
        list.append({
            'client_id' : str(id),
            'ip' : str(users[id].ip),
        })
    return list

# 同時接続しているユーザー一覧を通知する
def push_users(users):
    response = '{"users":' + simplejson.dumps(get_users_list(users), ensure_ascii=False) + '}'
    for id in users:
        # 一人ずつアクセス情報を通知する
        channel.send_message(id, response)

# メイン
class MainHandler(webapp.RequestHandler):
    def get(self):

        if is_dev():
            script = '/_ah/channel/jsapi'
        else:
            # 本番で「/_ah/channel/jsapi」を読み込むと警告が出るので、本番用はこっちを使う
            script = 'http://talkgadget.google.com/talkgadget/channel.js'
            values = {
                'ip': str(self.request.remote_addr),
                'script': script,
            }

        path = os.path.join(os.path.dirname(__file__), 'index.html')
        self.response.out.write(template.render(path, values))

# ログイン
class LoginHandler(webapp.RequestHandler):
    def get(self):
        
        # クロスドメイン通信を許可
        self.response.headers['Access-Control-Allow-Origin'] = '*'
        self.response.headers['Access-Control-Allow-Headers'] = '*'
        self.response.headers['Access-Control-Allow-Methods'] = 'GET'
        
        # Client IDの作成。64文字以内の一意なASCII文字列であればなんでもいい
        client_id = str(uuid.uuid4())
        
        # Client IDからTokenを作成する。TokenはClient IDと一対一
        token = channel.create_channel(client_id)
        
        # 同時接続しているユーザーのClient ID一覧を取得
        users = memcache.get(USER_KEY)
        if not users:
            users = {}
        
        # タイムアウトしたユーザーを取り除く
        users = timeout_check(users)
        
        # ユーザークラスを作成する
        user = User(client_id, token)
        user.ip = str(self.request.remote_addr)
        
        # 新しいClient IDを追加する
        users[client_id] = user
        memcache.set(USER_KEY, users)
        
        # 同時接続しているユーザー一覧を通知する
        push_users(users)
        
        # レスポンス
        values = {
            'client_id': client_id,
            'token': token,
            'users': get_users_list(users),
        }
        self.response.out.write(values)

# メッセージ受信
class PostHandler(webapp.RequestHandler):
    def post(self):
        # クロスドメイン通信を許可
        self.response.headers['Access-Control-Allow-Origin'] = '*'
        self.response.headers['Access-Control-Allow-Headers'] = '*'
        self.response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
        
        client_id = str(self.request.POST['client_id'])
        value = self.request.POST['value']
        ip = str(self.request.remote_addr)
        try:
            # UTF-8 へのエンコード追加
            value = value.encode('utf-8');
        except UnicodeDecodeError:
            pass
        
        message = {
            'value': value,
            'client_id': str(client_id),
            'datetime': str(datetime.datetime.now(JST())),
            'ip': str(ip),
        }
        response = '{"message":' + simplejson.dumps(message, ensure_ascii=False) + '}'
        
        # 同時接続中ユーザーのClient ID一覧を取得
        users = memcache.get(USER_KEY)
        if client_id in users:
            
            # タイムアウトしたユーザーを取り除く
            users = timeout_check(users)
            users[client_id].datetime = datetime.datetime.now()
            memcache.set(USER_KEY, users)
            
            for id in users:
                # 一人ずつ更新を通知する
                channel.send_message(id, response)
        
        # レスポンス
        self.response.out.write(response)

# Channel APIの接続
class ChannelConnectHandler(webapp.RequestHandler):
    def post(self):
        pass

# Channel APIの切断
class ChannelDisconnectHandler(webapp.RequestHandler):
    def post(self):
        client_id = self.request.get('from')
        
        # 切断されたユーザーを取り除く
        users = memcache.get(USER_KEY)
        users = dict([(id, last) for id, last in users.items() if last.client_id != client_id])
        memcache.set(USER_KEY, users)
        
        # 同時接続しているユーザー一覧を通知する
        push_users(users)

# 日本時間
class JST(datetime.tzinfo):
    def utcoffset(self, dt):
        return datetime.timedelta(hours=9)
    def dst(self, dt):
        return datetime.timedelta(0)
    def tzname(self, dt):
        return "JST"

# ユーザークラス
class User:
    def __init__(self, client_id, token):
        self.client_id = client_id
        self.token = token
        self.datetime = datetime.datetime.now()
        self.ip = ''

def main():
    application = webapp.WSGIApplication([('/', MainHandler),('/login', LoginHandler),('/post', PostHandler),('/_ah/channel/connected/', ChannelConnectHandler),('/_ah/channel/disconnected/', ChannelDisconnectHandler)],debug=True)
    util.run_wsgi_app(application)

if __name__ == '__main__':
    main()

サンプルビュー

同時接続を確認するには複数ウィンドウを立ち上げて試して下さい。

ちなみに、Channel APIを使う上で無料の制限があって、1日辺りに接続できるクライアント数が100までだそうなので、それを超えるとつながらなくなるかも。
なので、お手柔らかに。

Datastoreを使ったサンプル

追加でGoogle App EngineのDatastoreを使って、保存できるようにしました。