php-nsq NSQ 的 PHP 客户端开源项目

我要开发同款
匿名用户2018年06月21日
28阅读
开发技术C/C++
所属分类服务器软件、JMS/消息中间件
授权协议未知

作品详情

php-nsq

php-nsq是nsq的php客户端,采用c扩展编写,性能和稳定性。

安装:

请提前安装libevent

Dependencies: libevent  (apt-get install libevent-dev ,yum install libevent-devel)1. sudo phpize2. ./configure 3. make  4. make install  add in your php.ini:extension = nsq.so; pub例子:$nsqdAddr = array(    "127.0.0.1:4150",    "127.0.0.1:4154");$nsq = new Nsq();$isTrue = $nsq->connectNsqd($nsqdAddr);for($i = 0; $i < 10000; $i++){    $nsq->publish("test", "nihao");}$nsq->closeNsqdConnection();// Deferred publish //function : deferredPublish(string topic,string message, int millisecond); //millisecond default : [0 < millisecond < 3600000]$deferred = new Nsq();$isTrue = $deferred->connectNsqd($nsqdAddr);for($i = 0; $i < 20; $i++){    $deferred->deferredPublish("test", "message daly", 3000); }$deferred->closeNsqdConnection(); sub例子:<?php //sub$nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd http addr$nsq = new Nsq();$config = array(    "topic" => "test",    "channel" => "struggle",    "rdy" => 2,                //optional , default 1    "connect_num" => 1,        //optional , default 1       "retry_delay_time" => 5000,  //optional, default 0 , if run callback failed, after 5000 msec, message will be retried    "auto_finish" => true, //default true);$nsq->subscribe($nsq_lookupd, $config, function($msg,$bev){     echo $msg->payload;    echo $msg->attempts;    echo $msg->message_id;    echo $msg->timestamp;});Nsq类方法:

connectNsqd($nsqdAddrArr) pub的时候连接nsq,你也可以利用此函数做健康检查

closeNsqdConnection() 关闭nsq的连接

publish($topic,$msg) 消息发送

deferredPublish($topic,$msg,$msec) 延迟消息发送

subscribe($nsq_lookupd,$config,$callback) 消息订阅

Message类方法与属性:

timestamp 消息时间戳

attempts 消息的重试次数,(从1开始)

message_id 消息id 

payload 消息内容

finish($bev,$msg->message_id) 主动的ack消息方法

touch($bev,$msg->message_id) 如果你消息执行太长,可以利用次函数告知nsq你还活着,一般用于执行频率比较规律的场景。

Tips:

1.如果callback内需要外部变量,可以采用以下use的写法: 

$nsq->subscribe($nsq_lookupd, $config, function($msg,$bev) use ($you_variable){     echo $msg->payload;    echo $msg->attempts;    echo $msg->message_id;    echo $msg->timestamp;});

 2.消息重试,只要抛异常就可以,切记不要陷入死循环,超过自己觉得可以的次数要return: 

subscribe($nsq_lookupd, $config, function($msg){     try{        echo $msg->payload . " " . "attempts:".$msg->attempts."\n";        //do something    }catch(Exception $e){        if($msg->attempts < 3){            //the message will be retried after you configure retry_delay_time             throw new Exception("");         }else{            echo $e->getMessage();            return;        }    }});

3.如果你想增加客户端的心跳时间与消息的超时时间:

 第一步 在nsqd启动时要加入相关参数,这个参数是最大的限制,比如--max-heartbeat-interval=1m30s 心跳时间最大不能超过1分30秒:      nsqd --lookupd-tcp-address=127.0.0.1:4160 --max-heartbeat-interval=1m30s --msg-timeout=10m30s第二步  因为第一步是指定最大时间,所以还需要第二步在客户端指定所需要的值 具体请看 example目录中的identify开头的文件例子。

4.如果你想增强消费能力,可以加大rdy参数

5.你可以用supervisor管理,但是因为是多进程消费,你需要在supervisorjob的配置文件 添加: 

    stopasgroup=true    killasgroup=trueChanges

3.0

修复因libevent超过4096消息被截断问题

增加identify指令功能,可以增加客户端心跳时间与消息超时时间

2.4.0

修复pubbug

修复subcoredump

修覆盖touchbug

增加等待,当刚初始化的topic没消息时

2.3.1

pub支持域名

修复pubcoredump

声明:本文仅代表作者观点,不代表本站立场。如果侵犯到您的合法权益,请联系我们删除侵权资源!如果遇到资源链接失效,请您通过评论或工单的方式通知管理员。未经允许,不得转载,本站所有资源文章禁止商业使用运营!
下载安装【程序员客栈】APP
实时对接需求、及时收发消息、丰富的开放项目需求、随时随地查看项目状态

评论