YAPC::Asia 2012 - 09/28 at 東京大学
Yuji Shimada (xaicron)
CPAN Author
デザインセンスがとてもよい
New! 最近ぎっくり腰になりました
春頃に「Mobage を支える技術」という本を書きました。
⇡これね
他にうちの部では
@zigorou
@nekokak
の二人が書いています。
JPA 理事
JPA 理事
路傍の石
はい
marqs さんには寿司ビールをおごろうと思います。
みんな買ってね♡
宣伝終わり
発表中に dankogai ばりにガンガン質問してもらって大丈夫です。
今日は
今日は
今日は
というような話はしないのでご了承ください
@Yappo
「個人的には worker でのシグナル処理入ってて欲しかった。」
なるほど、
話しましょう
の前に
Parallel::Prefork を使った worker の実装例
use strict; use warnings; use DBI; use Parallel::Prefork; my $queue_table = 'neko_queue'; my $connect_info = [ ... ]; my $pm = Parallel::Prefork->new({ max_workers => 10, trap_signals => { TERM => 'TERM', HUP => 'TERM', }, }); while ($pm->signal_received ne 'TERM') { $pm->start(sub { my $q4m = DBI->connect(@$connect_info); my $index = $q4m->selectrow_array( 'SELECT queue_wait(?)', undef, $queue_table, ); return unless $index; # queue not found my $queue = $q4m->selectrow_hashref( 'SELECT * FROM ' . $queue_table, ); # do something $q4m->do('SELECT queue_end()'); }); } $pm->wait_all_children;
やったーできたよー!
...
Q. いまのコードの問題点はなにか?
Q. いまのコードの問題点はなにか?
A. シグナルを受け取ると子プロセスが処理中でも死ぬ
DEMO
... while ($pm->signal_received ne 'TERM') { $pm->start(sub { # 追加 my $signal_received = 0; $SIG{TERM} = sub { $signal_received = 1; }; my $q4m = DBI->connect(@$connect_info); my $index = $q4m->selectrow_array( 'SELECT queue_wait(?)', undef, $queue_table, ); return unless $index; # queue not found # シグナルを受け取っていたら終了する return if $signal_received; my $queue = $q4m->selectrow_hashref( 'SELECT * FROM ' . $queue_table, ); # do something $q4m->do('SELECT queue_end()'); }); } ...
Q. これで問題ないか?
Q. これで問題ないか?
A. 全然ダメですね
DEMO
→ Sys::SigAction を使うと良い
use POSIX qw(:signal_h); use Sys::SigAction qw(set_sig_handler); ... while ($pm->signal_received ne 'TERM') { $pm->start(sub { my $signal_received = 0; # Sys::SigAction を使う my $h = set_sig_handler( 'TERM', sub { $signal_received = 1; }, { flags => SA_RESTART }, ); my $q4m = DBI->connect(@$connect_info); my $index = $q4m->selectrow_array( 'SELECT queue_wait(?)', undef, $queue_table, ); return unless $index; # queue not found # シグナルを受け取っていたら終了する return if $signal_received; my $queue = $q4m->selectrow_hashref( 'SELECT * FROM ' . $queue_table, ); # do something $q4m->do('SELECT queue_end()'); }); } ...
Q. これで完璧か?
Q. これで完璧か?
A. 相変わらず queue_wait() でブロック
DEMO
... while ($pm->signal_received ne 'TERM') { $pm->start(sub { my $signal_received = 0; my $h = set_sig_handler( 'TERM', sub { $signal_received = 1; # 追加 my $sth = $DBI::lasth; if ($sth && $sth->{Database}{private_in_queue_wait}) { die 'RECEIVED TERM SIGNAL into queue_wait()'; } }, { flags => SA_RESTART }, ); my $q4m = DBI->connect(@$connect_info); $q4m->{private_in_queue_wait} = 1; # 追加 my $index = $q4m->selectrow_array( 'SELECT queue_wait(?)', undef, $queue_table, ); $q4m->{private_in_queue_wait} = 0; # 追加 return unless $index; # queue not found # シグナルを受け取っていたら終了する return if $signal_received; my $queue = $q4m->selectrow_hashref( 'SELECT * FROM ' . $queue_table, ); # do something $q4m->do('SELECT queue_end()'); }); } ...
ちなみに、queue_wait('table') はデフォルトでは 60秒で timeout しますが、
queue_wait('table', 10) とかやると 10秒で timeout します。
でも俺は即死して欲しいんだ!!!!!11
Q. これで完璧か?
Q. これで完璧か?
A. まぁまぁいいけど、プロセスのライフサイクルが短すぎる
無駄
無駄
無駄
無駄
無駄
無駄
DEMO
# 追加 my $max_requests_per_child = 10000; ... while ($pm->signal_received ne 'TERM') { $pm->start(sub { my $signal_received = 0; my $h = set_sig_handler( 'TERM', sub { $signal_received = 1; my $sth = $DBI::lasth; if ($sth && $sth->{Database}{private_in_queue_wait}) { die 'RECEIVED TERM SIGNAL into queue_wait()'; } }, { flags => SA_RESTART }, ); # ここでの接続を使いまわす my $q4m = DBI->connect(@$connect_info); # シグナルを受け取るか、max_requests_per_child までループ my $i = 0; while (!$signal_received && $max_requests_per_child > $i++) { $q4m->{private_in_queue_wait} = 1; my $index = $q4m->selectrow_array( 'SELECT queue_wait(?)', undef, $queue_table, ); $q4m->{private_in_queue_wait} = 0; return unless $index; # queue not found return if $signal_received; my $queue = $q4m->selectrow_hashref( 'SELECT * FROM ' . $queue_table, ); # do something $q4m->do('SELECT queue_end()'); } }); } ...
というわけで、だいたいこんなかんじで worker を書いて使ってます。
はい
Remote Notification API の裏側
実際の処理の説明の前に
APNs と GCM (C2DM) の概要
いろんなモジュール
いろんなモジュール
というわけで作りました
use Net::APNs::Extended; my $device_token = 'xxxxxxxxx'; # あとで説明 my $apns = Net::APNs::Extended->new( is_sandbox => 1, cert_file => 'xxx.pem', # 証明書 ); my $apns = Net::APNs::Extended->new( is_sandbox => 1, cert_file => 'apns.pem', ); # send notification to APNs $apns->send($device_token, { aps => { alert => "Hello, APNs!", badge => 1, sound => "default", }, foo => [qw/bar baz/], }); # if you want to handle the error if (my $error = $apns->retrive_error) { die Dumper $error; }
なんで HTTP じゃないんや... (帯域とかわかるけどさ...)
の前に
今年の Google I/O で突然の終了宣言\(^o^)/
とか書いたんだけどね...
_人人人人人人 人人人人人人_
> そこで颯爽と GCM の登場 <
 ̄Y^Y^Y^Y^Y^Y^Y^Y^Y^Y ̄
さっそくモジュール書きました
use WWW::Google::Cloud::Messaging; my $api_key = 'Your API Key'; my $gcm = WWW::Google::Cloud::Messaging->new(api_key => $api_key); my $res = $gcm->send({ registration_ids => [ $reg_id, ... ], collapse_key => $collapse_key, data => { message => 'blah blah blah', }, }); die $res->error unless $res->is_success; my $results = $res->results; while (my $result = $results->next) { my $reg_id = $result->target_reg_id; if ($result->is_success) { say sprintf 'message_id: %s, reg_id: %s', $result->message_id, $reg_id; } else { warn sprintf 'error: %s, reg_id: %s', $result->error, $reg_id; } if ($result->has_canonical_id) { say sprintf 'reg_id %s is old! refreshed reg_id is %s', $reg_id, $result->registration_id; } }
HTTP 素晴らしいですね
APNs と GCM についての説明終わり
↓の図重要
大まかに、
の二種類がある。
API はここでそれぞれのリクエストに応じて q4m に enqueue する。
APNs と GCM でそれぞれ処理が違う
APNs
GCM
Net::APNs::Extended::Feedback を使うと簡単に無効な device token 一覧が取れる
use Net::APNs::Extended::Feedback; my $feedback = Net::APNs::Extended::Feedback->new( is_sandbox => 1, cert_file => 'xxx', ); my $feedbacks = $feedback->retrieve_feedback; # [ # { # time_t => ..., # token_bin => ..., # token_hex => ..., # }, # { # time_t => ..., # token_bin => ..., # token_hex => ..., # }, # ... # ]
Remote Notification 終わり
Leaderboard API の裏側
@hirose31 さんが書いた神の書があるので安心
などなど大量にある
最初は Redis::hiredis が高速だったので検討していたが、XS レベルで multi がバグっていた!!
パッチを送ったものの取り込まれない
仕方ないので、やりたいことが一応全部できる RedisDB を利用することに
しかし、当初は RedisDB は PP だったが、いつの間にか XS に!!
まだ XS 版は使っていません。
use RedisDB; my $redis = RedisDB->new(host => 'localhost', port => 6379); say $redis->set(foo => 'bar'); # 1 say $redis->get('foo'); # 'bar' say '-'x80; say $redis->send_command('SET', foo => 'hoge'); # 1 say $redis->send_command('GET', 'foo'); # 1 my @results = $redis->get_all_replies; say Dumper \@results; # [ 'OK', 'hoge' ]
新しくスコアを登録したり、更新する場合は Redis と MySQL 両方に
Redis で発行しているコマンド
use RedisDB; my $redis = RedisDB->new(...); $redis->send_command('MULTI'); $redis->send_command('ZADD', 'score', $score, $user_id); # このへんで付加情報的な奴の更新 $redis->send_command('EXEC');
実際は、小さいスコアを許容するかどうかで処理を変えていたりと複雑
だけど、基本的には ZADD してるだけです!
use RedisDB; # score が高い人が 1位の場合 $redis->send_command('MULTI'); $redis->send_command('ZSCORE', 'score', $user_id); $redis->send_command('ZREVRANK', 'score', $user_id); $redis->send_command('EXEC'); @results = $redis->get_all_replies; say Dumper { score => $results[-1]->[0], rank => $results[-1]->[1] + 1, };
use RedisDB; my $redis = RedisDB->new(host => 'localhost', port => 6379); # score が高い人が 1位の場合 my $start = 1; my $end = 100; $redis->send_command('MULTI'); $redis->send_command('ZCOUNT', $score_key, '-inf', '+inf'); $redis->send_command('ZREVRANGE', $score_key, $start, $end, 'WITHSCORES'); $redis->send_command('EXEC'); @results = $redis->get_all_replies; my $data = $results[0]; my $total = $results[-1][0]; my $data = $results[-1][1]; my $rs = []; my $rank = $start; for (my $i = 0; $i < @$data; $i += 2) { push @$rs, { rank => $rank++, user_id => $data->[$i], score => $data->[$i+1], }; } say Dumper [ $total, $rs ];
ご清聴ありがとうございました。
Question?
j or →: next
k or ←: prev
h or ↑: list
l or ↓: return
o or ↵: open
? or /: toggle this help