基于Hyperf实现RabbitMQ+WebSocket消息推送
php中文网最新课程
每日17点准时技术干货分享
composer require hyperf/websocket-serverreturn ['mode' => SWOOLE_PROCESS,'servers' => [['name' => 'http','type' => Server::SERVER_HTTP,'host' => '0.0.0.0','port' => 11111,'sock_type' => SWOOLE_SOCK_TCP,'callbacks' => [SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],],],['name' => 'ws','type' => Server::SERVER_WEBSOCKET,'host' => '0.0.0.0','port' => 12222,'sock_type' => SWOOLE_SOCK_TCP,'callbacks' => [SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],],],],
declare(strict_types=1);/*** This file is part of Hyperf.** @link https://www.hyperf.io* @document https://doc.hyperf.io* @contact group@hyperf.io* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE*/namespace App\Controller;use Hyperf\Contract\OnCloseInterface;use Hyperf\Contract\OnMessageInterface;use Hyperf\Contract\OnOpenInterface;use Swoole\Http\Request;use Swoole\Server;use Swoole\Websocket\Frame;use Swoole\WebSocket\Server as WebSocketServer;class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface{/*** 发送消息* @param WebSocketServer $server* @param Frame $frame*/public function onMessage(WebSocketServer $server, Frame $frame): void{//心跳刷新缓存$redis = $this->container->get(\Redis::class);//获取所有的客户端id$fdList = $redis->sMembers('websocket_sjd_1');//如果当前客户端在客户端集合中,就刷新if (in_array($frame->fd, $fdList)) {$redis->sAdd('websocket_sjd_1', $frame->fd);$redis->expire('websocket_sjd_1', 7200);}$server->push($frame->fd, 'Recv: ' . $frame->data);}/*** 客户端失去链接* @param Server $server* @param int $fd* @param int $reactorId*/public function onClose(Server $server, int $fd, int $reactorId): void{//删掉客户端id$redis = $this->container->get(\Redis::class);//移除集合中指定的value$redis->sRem('websocket_sjd_1', $fd);var_dump('closed');}/*** 客户端链接* @param WebSocketServer $server* @param Request $request*/public function onOpen(WebSocketServer $server, Request $request): void{//保存客户端id$redis = $this->container->get(\Redis::class);$res1 = $redis->sAdd('websocket_sjd_1', $request->fd);var_dump($res1);$res = $redis->expire('websocket_sjd_1', 7200);var_dump($res);$server->push($request->fd, 'Opened');}}
function WebSocketTest() {if ("WebSocket" in window) {console.log("您的浏览器支持 WebSocket!");var num = 0// 打开一个 web socketvar ws = new WebSocket("ws://127.0.0.1:12222");ws.onopen = function () {// Web Socket 已连接上,使用 send() 方法发送数据//alert("数据发送中...");//ws.send("发送数据");};window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开var ping = {"type": "ping"};ws.send(JSON.stringify(ping));}, 5000);ws.onmessage = function (evt) {var d = JSON.parse(evt.data);console.log(d);if (d.code == 300) {$(".address").text(d.address)}if (d.code == 200) {var v = d.dataconsole.log(v);num++var str = `<div class="item"><p>${v.recordOutTime}</p><p>${v.userOutName}</p><p>${v.userOutNum}</p><p>${v.doorOutName}</p></div>`$(".tableHead").after(str)if (num > 7) {num--$(".table .item:nth-last-child(1)").remove()}}};ws.error = function (e) {console.log(e)alert(e)}ws.onclose = function () {// 关闭 websocketalert("连接已关闭...");};} else {alert("您的浏览器不支持 WebSocket!");}}
composer require hyperf/amqpreturn ['default' => ['host' => 'localhost','port' => 5672,'user' => 'guest','password' => 'guest','vhost' => '/','pool' => ['min_connections' => 1,'max_connections' => 10,'connect_timeout' => 10.0,'wait_timeout' => 3.0,'heartbeat' => -1,],'params' => ['insist' => false,'login_method' => 'AMQPLAIN','login_response' => null,'locale' => 'en_US','connection_timeout' => 3.0,'read_write_timeout' => 6.0,'context' => null,'keepalive' => false,'heartbeat' => 3,],],];
declare(strict_types=1);namespace App\Amqp\Consumer;use Hyperf\Amqp\Annotation\Consumer;use Hyperf\Amqp\Message\ConsumerMessage;use Hyperf\Amqp\Result;use Hyperf\Server\Server;use Hyperf\Server\ServerFactory;/*** @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)*/class DemoConsumer extends ConsumerMessage{/*** rabbmitMQ消费端代码* @param $data* @return string*/public function consume($data): string{print_r($data);//获取集合中所有的value$redis = $this->container->get(\Redis::class);$fdList=$redis->sMembers('websocket_sjd_1');$server=$this->container->get(ServerFactory::class)->getServer()->getServer();foreach($fdList as $key=>$v){if(!empty($v)){$server->push((int)$v, $data);}}return Result::ACK;}}
/*** test* @return array*/public function test(){$data = array('code' => 200,'data' => ['userOutName' => 'ccflow','userOutNum' => '9999','recordOutTime' => date("Y-m-d H:i:s", time()),'doorOutName' => '教师公寓',]);$data = \GuzzleHttp\json_encode($data);$message = new DemoProducer($data);$producer = ApplicationContext::getContainer()->get(Producer::class);$result = $producer->produce($message);var_dump($result);$user = $this->request->input('user', 'Hyperf');$method = $this->request->getMethod();return ['method' => $method,'message' => "{$user}.",];}
▼请点击下方:“阅读原文”,在线查看!
