您现在的位置是:群英 > 开发技术 > PHP语言
PHP实现RabbitMQ消息列队的方法是怎样
Admin发表于 2022-05-16 17:45:27770 次浏览
相信很多人对“PHP实现RabbitMQ消息列队的方法是怎样”都不太了解,下面群英小编为你详细解释一下这个问题,希望对你有一定的帮助


业务场景

项目公司是主php做开发的,框架为thinkphp。众所周知,php本身的运行效率存在一定的缺陷,所以如果有一个很复杂很耗时的业务时,必须开发一个常驻内存的程序。首先我想到了php的workerman与swoole,但是这里应上面的标题哈,想将耗时任务交给另一个服务器,同时列队处理。所以这里我想独立部署一个rabbitmq服务器用于处理列队任务。

当rabbitmq服务器我们准备好了,建立了一个持久化命名为ceshi的列队,如下:

项目上生产者和消费者的开发我这里全部采用tinkphp6+workerman,为便于管理。这里这么做也是因为发现workerman中对rabbitmq的文档解释太少了!

所以开始踩坑!

1、首先部署好thinkphp6框架

过程去看

2、安装workerman扩展

过程去看

3、生产者

配置一个workerman类

创建的send类代码如下:

<?php

namespace app\workerman;
use bunny\channel;
use workerman\rabbitmq\client;
use think\worker\server;
class send extends server
{
    //websocket地址,一会用于测试。
    protected $socket = 'websocket://127.0.0.1:2345';

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onmessage($connection, $data)
{
        //websocket发送过来的消息
        $connection->send('我收到你的信息了:'.$data);
        //rabbitmq配置
        $options = [
            'host'=>'127.0.0.1',//rabbitmq ip
            'port'=>5672,//rabbitmq 通讯端口
            'user'=>'admin',//rabbitmq 账号
            'password'=>'123456'//rabbitmq 密码
        ];
        (new client($options))->connect()->then(function (client $client) {
            return $client->channel();
        })->then(function (channel $channel) {
            /**
             * 创建队列(queue)
             * name: ceshi         // 队列名称
             * passive: false      // 如果设置true存在则返回ok,否则就报错。设置false存在返回ok,不存在则自动创建
             * durable: true       // 是否持久化,设置false是存放到内存中rabbitmq重启后会丢失,
             *                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的queue
             * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
             *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
             */
            return $channel->queuedeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (channel $channel) use($data){
            echo "发送消息内容:".$data."\n";

            /**
             * 发送消息
             * body 发送的数据
             * headers 数据头,建议 ['content_type' => 'text/plain'],这样消费端是springboot注解接收直接是字符串类型
             * exchange 交换器名称
             * routingkey 路由key
             * mandatory
             * immediate
             * @return bool|promiseinterface|int
             */

            return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (channel $channel) {
            //echo " [x] sent 'hello world!'\n";
            $client = $channel->getclient();
            return $channel->close()->then(function () use ($client) {
                return $client;
            });
        })->then(function (client $client) {
            $client->disconnect();
        });
    }

    /**
     * 当连接建立时触发的回调函数
     * @param $connection
     */
    public function onconnect($connection)
{

    }

    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public function onclose($connection)
{

    }
    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onerror($connection, $code, $msg)
{
        echo "error $code $msg\n";
    }

    /**
     * 每个进程启动
     * @param $worker
     */
    public function onworkerstart($worker)
{


    }
}

上述都ok以后咱们可以项目路径下通过命令启动这个生产者:

php think worker:server

测试发送数据:

通过这个网站

连接【ws://127.0.0.1:2345】后发送数据!

前往rabbitmq控制台

列队中有一条消息产生并且等待了!

这个时候你可能问,如果我发送数据不想通过ws发送而是接口发送怎么办?

笨思路呗:接口给内置服务器发消息->内置服务去发消息给rabbitmq

将协议改为tcp

然后重新启动服务

然后去tp6创建一个路由接口

接口代码

<?php
namespace app\controller;

use app\basecontroller;

class index extends basecontroller
{
    public function index(string $msg)
{
        //连接本地tcp服务
        $client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1);
        //发送字符串
        fwrite($client, $msg."\n");
        //断开服务
        fclose($client);
        return 'ok';
    }

}

执行结果:

说明接口成功的将数据发送给了本地内置的tcp服务。

同时,内置服务将收到的数据给了rabbitmq服务列队中。

生产者完成。

4、消费者

同生产者一样新创建一个thinkphp6及安装workerman扩展,注意端口别和生产者冲突!这里我设置的是2346端口

创建的receive类代码如下:

<?php

namespace app\workerman;
use bunny\channel;
use bunny\message;
use workerman\rabbitmq\client;
use think\worker\server;
class receive extends server
{
    protected $socket = 'tcp://127.0.0.1:2346';

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onmessage($connection, $data)
{

    }

    /**
     * 当连接建立时触发的回调函数
     * @param $connection
     */
    public function onconnect($connection)
{

    }

    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public function onclose($connection)
{

    }
    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onerror($connection, $code, $msg)
{
        echo "error $code $msg\n";
    }

    /**
     * 每个进程启动
     * @param $worker
     */
    public function onworkerstart($worker)
{
        //rabbitmq配置
        $options = [
            'host'=>'127.0.0.1',//rabbitmq ip
            'port'=>5672,//rabbitmq 通讯端口
            'user'=>'admin',//rabbitmq 账号
            'password'=>'123456'//rabbitmq 密码
        ];
        (new client($options))->connect()->then(function (client $client) {
            return $client->channel();
        })->then(function (channel $channel) {
            /**
             * 创建队列(queue)
             * name: ceshi         // 队列名称
             * passive: false      // 如果设置true存在则返回ok,否则就报错。设置false存在返回ok,不存在则自动创建
             * durable: true       // 是否持久化,设置false是存放到内存中rabbitmq重启后会丢失,
             *                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的queue
             * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
             *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
             */
            return $channel->queuedeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (channel $channel) {
            echo ' [*] waiting for messages. to exit press ctrl+c', "\n";
            $channel->consume(
                function (message $message, channel $channel, client $client) {
                    echo "接收消息内容:", $message->content, "\n";
                },
                'ceshi',
                '',
                false,
                true
            );
        });

    }
}

都ok以后咱们可以项目路径下通过命令启动这个消费者:

php think worker:server

此时应该会自动消费掉rabbitmq中等待的消息!

到这里消费者也就结束啦!

5、整体测试

接下来我用cmd来启动两个服务,然后用接口发送消息和消费测试!

至于具体怎么灵活应用自行开拓大脑哦~

比如php项目有些业务吃力,可以去做个java的消费端,让java来完成任务~



到此这篇关于“PHP实现RabbitMQ消息列队的方法是怎样”的文章就介绍到这了,感谢各位的阅读,更多相关PHP实现RabbitMQ消息列队的方法是怎样内容,欢迎关注群英网络资讯频道,小编将为大家输出更多高质量的实用文章!

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。

相关信息推荐
2022-08-01 17:56:40 
摘要:去除步骤:1、循环遍历二维数组的外层数组元素,语法“foreach($arr as $k=>$v){循环体代码}”;2、在循环体中,用unset()去除指定值,语法“if(is_array($v)){$ck=array_search(值,$v);if($ck){unset($arr[$k][$ck]);}}else{if($v===值){unset($arr[$k]);}}”。
2022-10-14 17:56:29 
摘要:这篇文章主要为大家详细介绍了如何利用Java语言实现一个简易的中文分词系统,文中的示例代码讲解详细,感兴趣的小伙伴可以尝试一下
2022-04-27 16:18:11 
摘要:python求100内的所有素数的方法:使用判断该数除了1和它本身以外不再有其他因数即可,代码为【i=2 for i in range(2,100): if(i%j==0):break else:num.append(i)】。
云活动
推荐内容
热门关键词
热门信息
群英网络助力开启安全的云计算之旅
立即注册,领取新人大礼包
  • 联系我们
  • 24小时售后:4006784567
  • 24小时TEL :0668-2555666
  • 售前咨询TEL:400-678-4567

  • 官方微信

    官方微信
Copyright  ©  QY  Network  Company  Ltd. All  Rights  Reserved. 2003-2019  群英网络  版权所有   茂名市群英网络有限公司
增值电信经营许可证 : B1.B2-20140078   粤ICP备09006778号
免费拨打  400-678-4567
免费拨打  400-678-4567 免费拨打 400-678-4567 或 0668-2555555
微信公众号
返回顶部
返回顶部 返回顶部