php-sq
php-sq是sq的php客户端,采用c扩展编写,性能和稳定性。
安装:请提前安装libevet
Depedecies: libevet (apt-get istall libevet-dev ,yum istall libevet-devel)1. sudo phpize2. ./cofigure 3. make 4. make istall add i your php.ii:extesio = sq.so; pub例子:$sqdAddr = array( "127.0.0.1:4150", "127.0.0.1:4154");$sq = ew Nsq();$isTrue = $sq->coectNsqd($sqdAddr);for($i = 0; $i < 10000; $i++){ $sq->publish("test", "ihao");}$sq->closeNsqdCoectio();// Deferred publish //fuctio : deferredPublish(strig topic,strig message, it millisecod); //millisecod default : [0 < millisecod < 3600000]$deferred = ew Nsq();$isTrue = $deferred->coectNsqd($sqdAddr);for($i = 0; $i < 20; $i++){ $deferred->deferredPublish("test", "message daly", 3000); }$deferred->closeNsqdCoectio(); sub例子:<?php //sub$sq_lookupd = ew NsqLookupd("127.0.0.1:4161"); //the sqlookupd http addr$sq = ew Nsq();$cofig = array( "topic" => "test", "chael" => "struggle", "rdy" => 2, //optioal , default 1 "coect_um" => 1, //optioal , default 1 "retry_delay_time" => 5000, //optioal, default 0 , if ru callback failed, after 5000 msec, message will be retried "auto_fiish" => true, //default true);$sq->subscribe($sq_lookupd, $cofig, fuctio($msg,$bev){ echo $msg->payload; echo $msg->attempts; echo $msg->message_id; echo $msg->timestamp;});Nsq类方法:coectNsqd($sqdAddrArr) pub的时候连接sq,你也可以利用此函数做健康检查
closeNsqdCoectio() 关闭sq的连接
publish($topic,$msg) 消息发送
deferredPublish($topic,$msg,$msec) 延迟消息发送
subscribe($sq_lookupd,$cofig,$callback) 消息订阅
Message类方法与属性:timestamp 消息时间戳
attempts 消息的重试次数,(从1开始)
message_id 消息id
payload 消息内容
fiish($bev,$msg->message_id) 主动的ack消息方法
touch($bev,$msg->message_id) 如果你消息执行太长,可以利用次函数告知sq你还活着,一般用于执行频率比较规律的场景。
Tips:1.如果callback内需要外部变量,可以采用以下use的写法:
$sq->subscribe($sq_lookupd, $cofig, fuctio($msg,$bev) use ($you_variable){ echo $msg->payload; echo $msg->attempts; echo $msg->message_id; echo $msg->timestamp;});2.消息重试,只要抛异常就可以,切记不要陷入死循环,超过自己觉得可以的次数要retur:
subscribe($sq_lookupd, $cofig, fuctio($msg){ try{ echo $msg->payload . " " . "attempts:".$msg->attempts."\"; //do somethig }catch(Exceptio $e){ if($msg->attempts < 3){ //the message will be retried after you cofigure retry_delay_time throw ew Exceptio(""); }else{ echo $e->getMessage(); retur; } }});3.如果你想增加客户端的心跳时间与消息的超时时间:
第一步 在sqd启动时要加入相关参数,这个参数是最大的限制,比如--max-heartbeat-iterval=1m30s 心跳时间最大不能超过1分30秒: sqd --lookupd-tcp-address=127.0.0.1:4160 --max-heartbeat-iterval=1m30s --msg-timeout=10m30s第二步 因为第一步是指定最大时间,所以还需要第二步在客户端指定所需要的值 具体请看 example目录中的idetify开头的文件例子。4.如果你想增强消费能力,可以加大rdy参数
5.你可以用supervisor管理,但是因为是多进程消费,你需要在supervisorjob的配置文件 添加:
stopasgroup=true killasgroup=trueChages3.0
修复因libevet超过4096消息被截断问题
增加idetify指令功能,可以增加客户端心跳时间与消息超时时间
2.4.0
修复pubbug
修复subcoredump
修覆盖touchbug
增加等待,当刚初始化的topic没消息时
2.3.1
pub支持域名
修复pubcoredump
评论