From 70158a00a1c2e5a873927dad511d095fba119ed2 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Wed, 2 Jun 2021 17:42:23 +0800 Subject: [PATCH 01/32] ensure that channel exists across instances when using redis manager so it can propagate message across instances --- src/ChannelManagers/RedisChannelManager.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index f96aff2567..bdf873f82d 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -429,6 +429,11 @@ public function onMessage(string $redisChannel, string $payload) $channel->broadcastLocallyToEveryoneExcept($payload, $socketId, $appId); } + public function find($appId, string $channel) + { + return $this->findOrCreate($appId, $channel); + } + /** * Build the Redis connection URL from Laravel database config. * From c218a95e9ec29cb9e689d66114407dc9e88c2431 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Wed, 2 Jun 2021 17:53:52 +0800 Subject: [PATCH 02/32] handle missing channel on server1 not propagating message to server2 --- src/ChannelManagers/RedisChannelManager.php | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index bdf873f82d..81f05e1801 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -431,7 +431,13 @@ public function onMessage(string $redisChannel, string $payload) public function find($appId, string $channel) { - return $this->findOrCreate($appId, $channel); + if (! $channelInstance = parent::find($appId, $channel)) { + $class = $this->getChannelClassName($channel); + + $this->channels[$appId][$channel] = new $class($channel); + } + + return $this->channels[$appId][$channel]; } /** From 2fe5fca6e6816f48a64019b5767d2b874b8bf656 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Wed, 2 Jun 2021 18:05:11 +0800 Subject: [PATCH 03/32] check against the list of channels in redis --- src/ChannelManagers/RedisChannelManager.php | 26 ++++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 81f05e1801..8743f04f20 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -431,13 +431,17 @@ public function onMessage(string $redisChannel, string $payload) public function find($appId, string $channel) { - if (! $channelInstance = parent::find($appId, $channel)) { - $class = $this->getChannelClassName($channel); + return $this->isChannelInSet($appId, $channel)->then(function($isInSet) use($appId, $channel){ + if($isInSet){ + if (! $channelInstance = parent::find($appId, $channel)) { + $class = $this->getChannelClassName($channel); - $this->channels[$appId][$channel] = new $class($channel); - } - - return $this->channels[$appId][$channel]; + $this->channels[$appId][$channel] = new $class($channel); + } + return $this->channels[$appId][$channel]; + } + return null; + }); } /** @@ -612,6 +616,16 @@ public function removeChannelFromSet($appId, string $channel): PromiseInterface ); } + /** + * Check if channel is on the list. + */ + public function isChannelInSet($appId, string $channel): PromiseInterface + { + return $this->publishClient->sismember( + $this->getChannelsRedisHash($appId), $channel + ); + } + /** * Set data for a topic. Might be used for the presence channels. * From 5c21b8ac77e18f212bb212045fc96c0885b96779 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Wed, 2 Jun 2021 18:12:35 +0800 Subject: [PATCH 04/32] trying to fix --- src/ChannelManagers/RedisChannelManager.php | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 8743f04f20..874a27e440 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -431,17 +431,16 @@ public function onMessage(string $redisChannel, string $payload) public function find($appId, string $channel) { - return $this->isChannelInSet($appId, $channel)->then(function($isInSet) use($appId, $channel){ + $this->isChannelInSet($appId, $channel)->then(function($isInSet) use($appId, $channel){ if($isInSet){ if (! $channelInstance = parent::find($appId, $channel)) { $class = $this->getChannelClassName($channel); - $this->channels[$appId][$channel] = new $class($channel); } - return $this->channels[$appId][$channel]; } - return null; }); + + return parent::find($appId, $channel); } /** From 96f53bc9cdbb0f0da00b6e963f7c1b9dd7b24038 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Wed, 2 Jun 2021 18:16:56 +0800 Subject: [PATCH 05/32] sync channel list across servers --- src/ChannelManagers/RedisChannelManager.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 874a27e440..25c81f15cf 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -437,6 +437,8 @@ public function find($appId, string $channel) $class = $this->getChannelClassName($channel); $this->channels[$appId][$channel] = new $class($channel); } + }else{ + unset($this->channels[$appId][$channel]); } }); From 7fbd37cf42bdbbdd5459c54b3327c32cbc1cb114 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Wed, 2 Jun 2021 18:19:53 +0800 Subject: [PATCH 06/32] added documentation --- src/ChannelManagers/RedisChannelManager.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 25c81f15cf..fcf206c587 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -619,6 +619,10 @@ public function removeChannelFromSet($appId, string $channel): PromiseInterface /** * Check if channel is on the list. + * + * @param string|int $appId + * @param string $channel + * @return PromiseInterface */ public function isChannelInSet($appId, string $channel): PromiseInterface { From 374d68697bc4c04fb99ad16e4ee90414991287e8 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Wed, 2 Jun 2021 19:27:21 +0800 Subject: [PATCH 07/32] channel should always be registered to propagate across servers --- src/ChannelManagers/RedisChannelManager.php | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index fcf206c587..80be978749 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -431,16 +431,10 @@ public function onMessage(string $redisChannel, string $payload) public function find($appId, string $channel) { - $this->isChannelInSet($appId, $channel)->then(function($isInSet) use($appId, $channel){ - if($isInSet){ - if (! $channelInstance = parent::find($appId, $channel)) { - $class = $this->getChannelClassName($channel); - $this->channels[$appId][$channel] = new $class($channel); - } - }else{ - unset($this->channels[$appId][$channel]); - } - }); + if (! $channelInstance = parent::find($appId, $channel)) { + $class = $this->getChannelClassName($channel); + $this->channels[$appId][$channel] = new $class($channel); + } return parent::find($appId, $channel); } From fb27ded6c7962afb94d6cf71681b1dfbb6138073 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 11:21:35 +0800 Subject: [PATCH 08/32] fixes on broadcasting to everyone --- src/Channels/Channel.php | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 7cc7f375a2..90018465e9 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -190,20 +190,16 @@ public function broadcastLocally($appId, stdClass $payload): bool */ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, $appId, bool $replicate = true) { - if ($replicate) { - $this->channelManager->broadcastAcrossServers($appId, $socketId, $this->getName(), $payload); - } - - if (is_null($socketId)) { - return $this->broadcast($appId, $payload, $replicate); - } - collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); } }); + if ($replicate) { + $this->channelManager->broadcastAcrossServers($appId, null, $this->getName(), $payload); + } + return true; } From e12aae507e3c8c6692538cbe008c7e84d5027ea2 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 11:26:44 +0800 Subject: [PATCH 09/32] do not replicate on call to broadcast as it is already broadcasted across server prior the call --- src/Channels/Channel.php | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 90018465e9..9927b53691 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -190,16 +190,20 @@ public function broadcastLocally($appId, stdClass $payload): bool */ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, $appId, bool $replicate = true) { + if ($replicate) { + $this->channelManager->broadcastAcrossServers($appId, $socketId, $this->getName(), $payload); + } + + if (is_null($socketId)) { + return $this->broadcast($appId, $payload, false); + } + collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); } }); - if ($replicate) { - $this->channelManager->broadcastAcrossServers($appId, null, $this->getName(), $payload); - } - return true; } From d255a4e0e8b88337dfca3e20c19d9ed7a4ade7d8 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 11:40:00 +0800 Subject: [PATCH 10/32] include serverId in socketId --- src/Server/WebSocketHandler.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Server/WebSocketHandler.php b/src/Server/WebSocketHandler.php index 855532dd3f..2e2f35559c 100644 --- a/src/Server/WebSocketHandler.php +++ b/src/Server/WebSocketHandler.php @@ -233,7 +233,7 @@ protected function limitConcurrentConnections(ConnectionInterface $connection) */ protected function generateSocketId(ConnectionInterface $connection) { - $socketId = sprintf('%d.%d', random_int(1, 1000000000), random_int(1, 1000000000)); + $socketId = sprintf('%s.%d.%d', $this->channelManager->getServerId(), random_int(1, 1000000000), random_int(1, 1000000000)); $connection->socketId = $socketId; From 7ba243b720359c61fa03fee46a29bd9ee532872a Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 11:51:38 +0800 Subject: [PATCH 11/32] Revert "include serverId in socketId" This reverts commit d255a4e0e8b88337dfca3e20c19d9ed7a4ade7d8. --- src/Server/WebSocketHandler.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Server/WebSocketHandler.php b/src/Server/WebSocketHandler.php index 2e2f35559c..855532dd3f 100644 --- a/src/Server/WebSocketHandler.php +++ b/src/Server/WebSocketHandler.php @@ -233,7 +233,7 @@ protected function limitConcurrentConnections(ConnectionInterface $connection) */ protected function generateSocketId(ConnectionInterface $connection) { - $socketId = sprintf('%s.%d.%d', $this->channelManager->getServerId(), random_int(1, 1000000000), random_int(1, 1000000000)); + $socketId = sprintf('%d.%d', random_int(1, 1000000000), random_int(1, 1000000000)); $connection->socketId = $socketId; From 92eb8c35fe77714b801d9071a166d4e83163317a Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 12:41:58 +0800 Subject: [PATCH 12/32] remove channel from set only if connections is less than 1 --- src/ChannelManagers/RedisChannelManager.php | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 80be978749..b70ca64418 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -145,25 +145,14 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan */ public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface { - return $this->getGlobalConnectionsCount($connection->app->id, $channelName) + return $this->decrementSubscriptionsCount($connection->app->id, $channelName) ->then(function ($count) use ($connection, $channelName) { - if ($count === 0) { - // Make sure to not stay subscribed to the PubSub topic - // if there are no connections. + // If the total connections count gets to 0 after unsubscribe, + // try again to check & unsubscribe from the PubSub topic if needed. + if ($count < 1) { $this->unsubscribeFromTopic($connection->app->id, $channelName); + $this->removeChannelFromSet($connection->app->id, $channelName); } - - $this->decrementSubscriptionsCount($connection->app->id, $channelName) - ->then(function ($count) use ($connection, $channelName) { - // If the total connections count gets to 0 after unsubscribe, - // try again to check & unsubscribe from the PubSub topic if needed. - if ($count < 1) { - $this->unsubscribeFromTopic($connection->app->id, $channelName); - } - }); - }) - ->then(function () use ($connection, $channelName) { - return $this->removeChannelFromSet($connection->app->id, $channelName); }) ->then(function () use ($connection) { return $this->removeConnectionFromSet($connection); From 2964c3f65f02058e35e40c9c7550fb28a51002ed Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 15:18:19 +0800 Subject: [PATCH 13/32] update lastPongedAt whenever we send message to the connection to avoid cleaning up when no ping was called due to continuous stream of events --- src/Channels/Channel.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 9927b53691..49853f7e50 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -158,6 +158,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo collect($this->getConnections()) ->each(function ($connection) use ($payload) { $connection->send(json_encode($payload)); + $this->channelManager->connectionPonged($connection); }); if ($replicate) { @@ -201,6 +202,7 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); + $this->channelManager->connectionPonged($connection); } }); From 5619003099e3741937ae1eff0489d97181ade4cf Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 16:04:44 +0800 Subject: [PATCH 14/32] consider connection as ponged for every message sent to connection --- src/Channels/Channel.php | 2 -- src/Server/Messages/PusherClientMessage.php | 6 ++++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 49853f7e50..9927b53691 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -158,7 +158,6 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo collect($this->getConnections()) ->each(function ($connection) use ($payload) { $connection->send(json_encode($payload)); - $this->channelManager->connectionPonged($connection); }); if ($replicate) { @@ -202,7 +201,6 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); - $this->channelManager->connectionPonged($connection); } }); diff --git a/src/Server/Messages/PusherClientMessage.php b/src/Server/Messages/PusherClientMessage.php index 7b4dc64d8a..359788f576 100644 --- a/src/Server/Messages/PusherClientMessage.php +++ b/src/Server/Messages/PusherClientMessage.php @@ -5,6 +5,7 @@ use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\Contracts\PusherMessage; use BeyondCode\LaravelWebSockets\DashboardLogger; +use BeyondCode\LaravelWebSockets\Events\ConnectionPonged; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; use stdClass; @@ -69,6 +70,11 @@ public function respond() $this->payload, $this->connection->socketId, $this->connection->app->id ); + $this->channelManager->connectionPonged($this->connection) + ->then(function () { + ConnectionPonged::dispatch($this->connection->app->id, $this->connection->socketId); + }); + DashboardLogger::log($this->connection->app->id, DashboardLogger::TYPE_WS_MESSAGE, [ 'socketId' => $this->connection->socketId, 'event' => $this->payload->event, From 1be80f14bf69e71856353fe73f4681dda433b584 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 16:06:01 +0800 Subject: [PATCH 15/32] mark connection as ponged to update lastPongedAt and avoid cleanup for active connections --- src/Server/Messages/PusherClientMessage.php | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Server/Messages/PusherClientMessage.php b/src/Server/Messages/PusherClientMessage.php index 359788f576..73afe15d4a 100644 --- a/src/Server/Messages/PusherClientMessage.php +++ b/src/Server/Messages/PusherClientMessage.php @@ -54,6 +54,11 @@ public function __construct(stdClass $payload, ConnectionInterface $connection, */ public function respond() { + $this->channelManager->connectionPonged($this->connection) + ->then(function () { + ConnectionPonged::dispatch($this->connection->app->id, $this->connection->socketId); + }); + if (! Str::startsWith($this->payload->event, 'client-')) { return; } @@ -70,11 +75,6 @@ public function respond() $this->payload, $this->connection->socketId, $this->connection->app->id ); - $this->channelManager->connectionPonged($this->connection) - ->then(function () { - ConnectionPonged::dispatch($this->connection->app->id, $this->connection->socketId); - }); - DashboardLogger::log($this->connection->app->id, DashboardLogger::TYPE_WS_MESSAGE, [ 'socketId' => $this->connection->socketId, 'event' => $this->payload->event, From 5ad16c1cc8f2d1ccc5fc84a65951543c27872a1a Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 16:27:11 +0800 Subject: [PATCH 16/32] trying to fix issues with pong on server message --- src/Channels/Channel.php | 9 +++++++++ src/Server/Messages/PusherClientMessage.php | 6 ------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 9927b53691..c82d91082d 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -4,6 +4,7 @@ use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\DashboardLogger; +use BeyondCode\LaravelWebSockets\Events\ConnectionPonged; use BeyondCode\LaravelWebSockets\Events\SubscribedToChannel; use BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; @@ -158,6 +159,10 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo collect($this->getConnections()) ->each(function ($connection) use ($payload) { $connection->send(json_encode($payload)); + $this->channelManager->connectionPonged($connection) + ->then(function() use($connection){ + ConnectionPonged::dispatch($connection->app->id, $connection->socketId); + }); }); if ($replicate) { @@ -202,6 +207,10 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); } + $this->channelManager->connectionPonged($connection) + ->then(function() use($connection){ + ConnectionPonged::dispatch($connection->app->id, $connection->socketId); + }); }); return true; diff --git a/src/Server/Messages/PusherClientMessage.php b/src/Server/Messages/PusherClientMessage.php index 73afe15d4a..7b4dc64d8a 100644 --- a/src/Server/Messages/PusherClientMessage.php +++ b/src/Server/Messages/PusherClientMessage.php @@ -5,7 +5,6 @@ use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\Contracts\PusherMessage; use BeyondCode\LaravelWebSockets\DashboardLogger; -use BeyondCode\LaravelWebSockets\Events\ConnectionPonged; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; use stdClass; @@ -54,11 +53,6 @@ public function __construct(stdClass $payload, ConnectionInterface $connection, */ public function respond() { - $this->channelManager->connectionPonged($this->connection) - ->then(function () { - ConnectionPonged::dispatch($this->connection->app->id, $this->connection->socketId); - }); - if (! Str::startsWith($this->payload->event, 'client-')) { return; } From eee83fe7fa40bda44681e8ef1f1dd07920b477c0 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 16:46:09 +0800 Subject: [PATCH 17/32] mark connection as ponged prior sending message --- src/Channels/Channel.php | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index c82d91082d..40ca915319 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -158,10 +158,9 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo { collect($this->getConnections()) ->each(function ($connection) use ($payload) { - $connection->send(json_encode($payload)); $this->channelManager->connectionPonged($connection) - ->then(function() use($connection){ - ConnectionPonged::dispatch($connection->app->id, $connection->socketId); + ->then(function() use($connection, $payload){ + $connection->send(json_encode($payload)); }); }); @@ -204,12 +203,11 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, } collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { - if ($connection->socketId !== $socketId) { - $connection->send(json_encode($payload)); - } $this->channelManager->connectionPonged($connection) - ->then(function() use($connection){ - ConnectionPonged::dispatch($connection->app->id, $connection->socketId); + ->then(function() use($connection, $payload, $socketId){ + if ($connection->socketId !== $socketId) { + $connection->send(json_encode($payload)); + } }); }); From c70b032d5300a19bf9a285a190802e8300d9937d Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 16:49:07 +0800 Subject: [PATCH 18/32] mark as ponged prior sending message --- src/Channels/Channel.php | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 40ca915319..5792651d83 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -82,16 +82,22 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b { $this->saveConnection($connection); - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->getName(), - ])); + $this->channelManager->connectionPonged($connection) + ->then(function() use($connection, $payload){ + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->getName(), + ])); + }); + DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [ 'socketId' => $connection->socketId, 'channel' => $this->getName(), ]); + ConnectionPonged::dispatch($connection->app->id, $connection->socketId); + SubscribedToChannel::dispatch( $connection->app->id, $connection->socketId, @@ -161,6 +167,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo $this->channelManager->connectionPonged($connection) ->then(function() use($connection, $payload){ $connection->send(json_encode($payload)); + ConnectionPonged::dispatch($connection->app->id, $connection->socketId); }); }); @@ -207,6 +214,7 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, ->then(function() use($connection, $payload, $socketId){ if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); + ConnectionPonged::dispatch($connection->app->id, $connection->socketId); } }); }); From 93bdc9de0207f10aed017280f741cda7c4d4a07e Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 17:08:51 +0800 Subject: [PATCH 19/32] add space after function keyword --- src/Channels/Channel.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 5792651d83..17e46788eb 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -83,7 +83,7 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b $this->saveConnection($connection); $this->channelManager->connectionPonged($connection) - ->then(function() use($connection, $payload){ + ->then(function () use($connection, $payload){ $connection->send(json_encode([ 'event' => 'pusher_internal:subscription_succeeded', 'channel' => $this->getName(), @@ -165,7 +165,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo collect($this->getConnections()) ->each(function ($connection) use ($payload) { $this->channelManager->connectionPonged($connection) - ->then(function() use($connection, $payload){ + ->then(function () use($connection, $payload){ $connection->send(json_encode($payload)); ConnectionPonged::dispatch($connection->app->id, $connection->socketId); }); @@ -211,7 +211,7 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { $this->channelManager->connectionPonged($connection) - ->then(function() use($connection, $payload, $socketId){ + ->then(function () use($connection, $payload, $socketId){ if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); ConnectionPonged::dispatch($connection->app->id, $connection->socketId); From 2ca34a721f9ccea77f1725eb5fbd8ba81cbcbd74 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 17:19:42 +0800 Subject: [PATCH 20/32] update connection on presence channel to be ponged upon subscription --- src/Channels/PresenceChannel.php | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/Channels/PresenceChannel.php b/src/Channels/PresenceChannel.php index 614fe8da50..316eeca9a0 100644 --- a/src/Channels/PresenceChannel.php +++ b/src/Channels/PresenceChannel.php @@ -40,19 +40,22 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b $hash[$user->user_id] = $user->user_info ?? []; } - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->getName(), - 'data' => json_encode([ - 'presence' => [ - 'ids' => collect($users)->map(function ($user) { - return (string) $user->user_id; - })->values(), - 'hash' => $hash, - 'count' => count($users), - ], - ]), - ])); + $this->channelManager->connectionPonged($connection) + ->then(function () use($connection, $users, $hash) { + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->getName(), + 'data' => json_encode([ + 'presence' => [ + 'ids' => collect($users)->map(function ($user) { + return (string) $user->user_id; + })->values(), + 'hash' => $hash, + 'count' => count($users), + ], + ]), + ])); + }); }); }) ->then(function () use ($connection, $user, $payload) { From 1bb434dc1fcb7a3dbc5fb0bd462e1343c894dd81 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 17:20:25 +0800 Subject: [PATCH 21/32] update connection on presence channel to be ponged upon subscription --- src/Channels/PresenceChannel.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Channels/PresenceChannel.php b/src/Channels/PresenceChannel.php index 316eeca9a0..6ab0c16df4 100644 --- a/src/Channels/PresenceChannel.php +++ b/src/Channels/PresenceChannel.php @@ -3,6 +3,7 @@ namespace BeyondCode\LaravelWebSockets\Channels; use BeyondCode\LaravelWebSockets\DashboardLogger; +use BeyondCode\LaravelWebSockets\Events\ConnectionPonged; use BeyondCode\LaravelWebSockets\Events\SubscribedToChannel; use BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; @@ -55,6 +56,7 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b ], ]), ])); + ConnectionPonged::dispatch($connection->app->id, $connection->socketId); }); }); }) From 86e73b589724849c633c9376a1db1bfa63af75ce Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 20:57:28 +0800 Subject: [PATCH 22/32] fixes linting errors --- src/Channels/Channel.php | 6 +++--- src/Channels/PresenceChannel.php | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 17e46788eb..4d233e0425 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -83,7 +83,7 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b $this->saveConnection($connection); $this->channelManager->connectionPonged($connection) - ->then(function () use($connection, $payload){ + ->then(function () use ($connection, $payload){ $connection->send(json_encode([ 'event' => 'pusher_internal:subscription_succeeded', 'channel' => $this->getName(), @@ -165,7 +165,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo collect($this->getConnections()) ->each(function ($connection) use ($payload) { $this->channelManager->connectionPonged($connection) - ->then(function () use($connection, $payload){ + ->then(function () use ($connection, $payload){ $connection->send(json_encode($payload)); ConnectionPonged::dispatch($connection->app->id, $connection->socketId); }); @@ -211,7 +211,7 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { $this->channelManager->connectionPonged($connection) - ->then(function () use($connection, $payload, $socketId){ + ->then(function () use ($connection, $payload, $socketId){ if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); ConnectionPonged::dispatch($connection->app->id, $connection->socketId); diff --git a/src/Channels/PresenceChannel.php b/src/Channels/PresenceChannel.php index 6ab0c16df4..ae8323f099 100644 --- a/src/Channels/PresenceChannel.php +++ b/src/Channels/PresenceChannel.php @@ -42,7 +42,7 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b } $this->channelManager->connectionPonged($connection) - ->then(function () use($connection, $users, $hash) { + ->then(function () use ($connection, $users, $hash) { $connection->send(json_encode([ 'event' => 'pusher_internal:subscription_succeeded', 'channel' => $this->getName(), From 5ccae68e3203febae9a6bb9316649f27bbf7f934 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Thu, 3 Jun 2021 20:58:43 +0800 Subject: [PATCH 23/32] fixes linting errors --- src/Channels/Channel.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 4d233e0425..826c553e69 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -83,7 +83,7 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b $this->saveConnection($connection); $this->channelManager->connectionPonged($connection) - ->then(function () use ($connection, $payload){ + ->then(function () use ($connection, $payload) { $connection->send(json_encode([ 'event' => 'pusher_internal:subscription_succeeded', 'channel' => $this->getName(), @@ -165,7 +165,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo collect($this->getConnections()) ->each(function ($connection) use ($payload) { $this->channelManager->connectionPonged($connection) - ->then(function () use ($connection, $payload){ + ->then(function () use ($connection, $payload) { $connection->send(json_encode($payload)); ConnectionPonged::dispatch($connection->app->id, $connection->socketId); }); @@ -211,7 +211,7 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { $this->channelManager->connectionPonged($connection) - ->then(function () use ($connection, $payload, $socketId){ + ->then(function () use ($connection, $payload, $socketId) { if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); ConnectionPonged::dispatch($connection->app->id, $connection->socketId); From 7cae46adc56cba7d890c23e91168a52d228fb3d9 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Fri, 4 Jun 2021 19:40:19 +0800 Subject: [PATCH 24/32] use pubsub to update lastPongedAt across servers --- src/ChannelManagers/LocalChannelManager.php | 26 +++++++++++-- src/ChannelManagers/RedisChannelManager.php | 25 ++++++++++++ src/Channels/Channel.php | 42 ++++++++++----------- src/Channels/PresenceChannel.php | 31 +++++++-------- 4 files changed, 81 insertions(+), 43 deletions(-) diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index 913744baea..da3918374e 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -429,9 +429,7 @@ public function getMemberSockets($userId, $appId, $channelName): PromiseInterfac */ public function connectionPonged(ConnectionInterface $connection): PromiseInterface { - $connection->lastPongedAt = Carbon::now(); - - return $this->updateConnectionInChannels($connection); + return $this->pongConnectionInChannels($connection); } /** @@ -460,6 +458,28 @@ public function removeObsoleteConnections(): PromiseInterface ); } + /** + * Pong connection in channels. + * + * @param ConnectionInterface $connection + * @return PromiseInterface[bool] + */ + + public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface + { + return $this->getLocalChannels($connection->app->id) + ->then(function ($channels) use ($connection) { + foreach ($channels as $channel) { + if ($conn = $channel->getConnection($connection->socketId)) { + $conn->lastPongedAt = Carbon::now(); + $channel->saveConnection($conn); + } + } + + return true; + }); + } + /** * Update the connection in all channels. * diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index b70ca64418..24a44b22e0 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -352,6 +352,15 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf { // This will update the score with the current timestamp. return $this->addConnectionToSet($connection, Carbon::now()) + ->then(function () use ($connection) { + $payload = [ + 'socketId' => $connection->socketId, + 'appId' => $connection->app->id, + 'serverId' => $this->getServerId() + ]; + return $this->publishClient + ->publish($this->getPongRedisHash($connection->app->id), json_encode($payload)); + }) ->then(function () use ($connection) { return parent::connectionPonged($connection); }); @@ -393,6 +402,12 @@ public function onMessage(string $redisChannel, string $payload) return; } + if($redisChannel == $this->getPongRedisHash($payload->appId)){ + $connection = $this->fakeConnectionForApp($payload->appId, $payload->socketId); + + return parent::connectionPonged($connection); + } + $payload->channel = Str::after($redisChannel, "{$payload->appId}:"); if (! $channel = $this->find($payload->appId, $payload->channel)) { @@ -742,6 +757,16 @@ public function getRedisKey($appId = null, string $channel = null, array $suffix return $hash; } + /** + * Get the pong Redis hash. + * + * @param string|int $appId + */ + public function getPongRedisHash($appId): string + { + return $this->getRedisKey($appId, null, ['pong']); + } + /** * Get the statistics Redis hash. * diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 826c553e69..788ed6082c 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -4,7 +4,6 @@ use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; use BeyondCode\LaravelWebSockets\DashboardLogger; -use BeyondCode\LaravelWebSockets\Events\ConnectionPonged; use BeyondCode\LaravelWebSockets\Events\SubscribedToChannel; use BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; @@ -60,6 +59,18 @@ public function getConnections() return $this->connections; } + /** + * Get connection by socketId. + * + * @param string socketId + * @return ?ConnectionInterface + */ + + public function getConnection(string $socketId): ?ConnectionInterface + { + return $this->connections[$socketId] ?? null; + } + /** * Check if the channel has connections. * @@ -82,13 +93,10 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b { $this->saveConnection($connection); - $this->channelManager->connectionPonged($connection) - ->then(function () use ($connection, $payload) { - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->getName(), - ])); - }); + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->getName(), + ])); DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [ @@ -96,8 +104,6 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b 'channel' => $this->getName(), ]); - ConnectionPonged::dispatch($connection->app->id, $connection->socketId); - SubscribedToChannel::dispatch( $connection->app->id, $connection->socketId, @@ -164,11 +170,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo { collect($this->getConnections()) ->each(function ($connection) use ($payload) { - $this->channelManager->connectionPonged($connection) - ->then(function () use ($connection, $payload) { - $connection->send(json_encode($payload)); - ConnectionPonged::dispatch($connection->app->id, $connection->socketId); - }); + $connection->send(json_encode($payload)); }); if ($replicate) { @@ -210,13 +212,9 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, } collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { - $this->channelManager->connectionPonged($connection) - ->then(function () use ($connection, $payload, $socketId) { - if ($connection->socketId !== $socketId) { - $connection->send(json_encode($payload)); - ConnectionPonged::dispatch($connection->app->id, $connection->socketId); - } - }); + if ($connection->socketId !== $socketId) { + $connection->send(json_encode($payload)); + } }); return true; diff --git a/src/Channels/PresenceChannel.php b/src/Channels/PresenceChannel.php index ae8323f099..614fe8da50 100644 --- a/src/Channels/PresenceChannel.php +++ b/src/Channels/PresenceChannel.php @@ -3,7 +3,6 @@ namespace BeyondCode\LaravelWebSockets\Channels; use BeyondCode\LaravelWebSockets\DashboardLogger; -use BeyondCode\LaravelWebSockets\Events\ConnectionPonged; use BeyondCode\LaravelWebSockets\Events\SubscribedToChannel; use BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; @@ -41,23 +40,19 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b $hash[$user->user_id] = $user->user_info ?? []; } - $this->channelManager->connectionPonged($connection) - ->then(function () use ($connection, $users, $hash) { - $connection->send(json_encode([ - 'event' => 'pusher_internal:subscription_succeeded', - 'channel' => $this->getName(), - 'data' => json_encode([ - 'presence' => [ - 'ids' => collect($users)->map(function ($user) { - return (string) $user->user_id; - })->values(), - 'hash' => $hash, - 'count' => count($users), - ], - ]), - ])); - ConnectionPonged::dispatch($connection->app->id, $connection->socketId); - }); + $connection->send(json_encode([ + 'event' => 'pusher_internal:subscription_succeeded', + 'channel' => $this->getName(), + 'data' => json_encode([ + 'presence' => [ + 'ids' => collect($users)->map(function ($user) { + return (string) $user->user_id; + })->values(), + 'hash' => $hash, + 'count' => count($users), + ], + ]), + ])); }); }) ->then(function () use ($connection, $user, $payload) { From ad6d34593aaa3452f2c590fca99361e37007efc0 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Fri, 4 Jun 2021 20:05:07 +0800 Subject: [PATCH 25/32] when a message is sent or received, let's update lastPongedAt --- src/Channels/Channel.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 788ed6082c..106ff206e1 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -7,6 +7,7 @@ use BeyondCode\LaravelWebSockets\Events\SubscribedToChannel; use BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; +use Carbon\Carbon; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; use stdClass; @@ -171,6 +172,8 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo collect($this->getConnections()) ->each(function ($connection) use ($payload) { $connection->send(json_encode($payload)); + $connection->lastPongedAt = Carbon::now(); + $this->saveConnection($connection); }); if ($replicate) { @@ -212,9 +215,11 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, } collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { + $connection->lastPongedAt = Carbon::now(); if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); } + $this->saveConnection($connection); }); return true; From e3f0add044eabe7ce7f952b0241683b79adf8a8c Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Mon, 7 Jun 2021 13:32:10 +0800 Subject: [PATCH 26/32] trying to fix issues with socket not in set --- src/Channels/Channel.php | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 106ff206e1..0ea2f87c2b 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -172,8 +172,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo collect($this->getConnections()) ->each(function ($connection) use ($payload) { $connection->send(json_encode($payload)); - $connection->lastPongedAt = Carbon::now(); - $this->saveConnection($connection); + $this->channelManager->connectionPonged($connection); }); if ($replicate) { @@ -215,11 +214,10 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, } collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { - $connection->lastPongedAt = Carbon::now(); if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); } - $this->saveConnection($connection); + $this->channelManager->connectionPonged($connection); }); return true; From ebf00d3d1d10a3e9b79452f891cabd3150c84ce6 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Mon, 7 Jun 2021 15:09:48 +0800 Subject: [PATCH 27/32] pong only the receiver for broadcasting to avoid having to keep the member_removed initiator --- src/Channels/Channel.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 0ea2f87c2b..7821b5587f 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -216,8 +216,8 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); + $this->channelManager->connectionPonged($connection); } - $this->channelManager->connectionPonged($connection); }); return true; From b9ce798d3a662a12b9b4451aa4032d1034bbb96a Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Mon, 7 Jun 2021 17:09:45 +0800 Subject: [PATCH 28/32] fixes spacing issues --- src/ChannelManagers/LocalChannelManager.php | 1 - src/ChannelManagers/RedisChannelManager.php | 4 ++-- src/Channels/Channel.php | 2 -- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index da3918374e..b3fa8307df 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -464,7 +464,6 @@ public function removeObsoleteConnections(): PromiseInterface * @param ConnectionInterface $connection * @return PromiseInterface[bool] */ - public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface { return $this->getLocalChannels($connection->app->id) diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 24a44b22e0..d4e403b67c 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -356,7 +356,7 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf $payload = [ 'socketId' => $connection->socketId, 'appId' => $connection->app->id, - 'serverId' => $this->getServerId() + 'serverId' => $this->getServerId(), ]; return $this->publishClient ->publish($this->getPongRedisHash($connection->app->id), json_encode($payload)); @@ -402,7 +402,7 @@ public function onMessage(string $redisChannel, string $payload) return; } - if($redisChannel == $this->getPongRedisHash($payload->appId)){ + if ($redisChannel == $this->getPongRedisHash($payload->appId)) { $connection = $this->fakeConnectionForApp($payload->appId, $payload->socketId); return parent::connectionPonged($connection); diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 7821b5587f..e3e4711a93 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -7,7 +7,6 @@ use BeyondCode\LaravelWebSockets\Events\SubscribedToChannel; use BeyondCode\LaravelWebSockets\Events\UnsubscribedFromChannel; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; -use Carbon\Carbon; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; use stdClass; @@ -66,7 +65,6 @@ public function getConnections() * @param string socketId * @return ?ConnectionInterface */ - public function getConnection(string $socketId): ?ConnectionInterface { return $this->connections[$socketId] ?? null; From 00568fc473fb0ba741ce1be11e979c88dd5235bc Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Mon, 7 Jun 2021 17:11:41 +0800 Subject: [PATCH 29/32] fixes more spacing issues --- src/ChannelManagers/RedisChannelManager.php | 1 + src/Channels/Channel.php | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index d4e403b67c..71cc71abe1 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -358,6 +358,7 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf 'appId' => $connection->app->id, 'serverId' => $this->getServerId(), ]; + return $this->publishClient ->publish($this->getPongRedisHash($connection->app->id), json_encode($payload)); }) diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index e3e4711a93..2150e6091f 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -97,7 +97,6 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b 'channel' => $this->getName(), ])); - DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [ 'socketId' => $connection->socketId, 'channel' => $this->getName(), From d481ab59fbad81e20bfe63504acd393438c0f9a9 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Tue, 8 Jun 2021 09:48:43 +0800 Subject: [PATCH 30/32] handle locks properly for connection cleanup --- src/ChannelManagers/LocalChannelManager.php | 29 ++++++++++++--------- src/ChannelManagers/RedisChannelManager.php | 27 +++++++++++-------- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index b3fa8307df..54e9ef1fe3 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -439,23 +439,26 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf */ public function removeObsoleteConnections(): PromiseInterface { - if (! $this->lock()->acquire()) { - return Helpers::createFulfilledPromise(false); - } + $lock = $this->lock(); + try{ + if (! $lock->acquire()) { + return Helpers::createFulfilledPromise(false); + } - $this->getLocalConnections()->then(function ($connections) { - foreach ($connections as $connection) { - $differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now()); + $this->getLocalConnections()->then(function ($connections) { + foreach ($connections as $connection) { + $differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now()); - if ($differenceInSeconds > 120) { - $this->unsubscribeFromAllChannels($connection); + if ($differenceInSeconds > 120) { + $this->unsubscribeFromAllChannels($connection); + } } - } - }); + }); - return Helpers::createFulfilledPromise( - $this->lock()->forceRelease() - ); + return Helpers::createFulfilledPromise(true); + } finally { + optional($lock)->forceRelease(); + } } /** diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 71cc71abe1..a873520e1e 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -374,18 +374,23 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf */ public function removeObsoleteConnections(): PromiseInterface { - $this->lock()->get(function () { - $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) - ->then(function ($connections) { - foreach ($connections as $socketId => $appId) { - $connection = $this->fakeConnectionForApp($appId, $socketId); - - $this->unsubscribeFromAllChannels($connection); - } - }); - }); + $lock = $this->lock(); + try{ + $lock->get(function () { + $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($connections) { + foreach ($connections as $socketId => $appId) { + $connection = $this->fakeConnectionForApp($appId, $socketId); + + $this->unsubscribeFromAllChannels($connection); + } + }); + }); - return parent::removeObsoleteConnections(); + return parent::removeObsoleteConnections(); + } finally { + optional($lock)->forceRelease(); + } } /** From a298fcf406b1d7a6c995459d2e03e08dbf9aca6d Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Tue, 8 Jun 2021 09:48:43 +0800 Subject: [PATCH 31/32] added space after try --- src/ChannelManagers/LocalChannelManager.php | 29 ++++++++++++--------- src/ChannelManagers/RedisChannelManager.php | 27 +++++++++++-------- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index b3fa8307df..54e9ef1fe3 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -439,23 +439,26 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf */ public function removeObsoleteConnections(): PromiseInterface { - if (! $this->lock()->acquire()) { - return Helpers::createFulfilledPromise(false); - } + $lock = $this->lock(); + try{ + if (! $lock->acquire()) { + return Helpers::createFulfilledPromise(false); + } - $this->getLocalConnections()->then(function ($connections) { - foreach ($connections as $connection) { - $differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now()); + $this->getLocalConnections()->then(function ($connections) { + foreach ($connections as $connection) { + $differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now()); - if ($differenceInSeconds > 120) { - $this->unsubscribeFromAllChannels($connection); + if ($differenceInSeconds > 120) { + $this->unsubscribeFromAllChannels($connection); + } } - } - }); + }); - return Helpers::createFulfilledPromise( - $this->lock()->forceRelease() - ); + return Helpers::createFulfilledPromise(true); + } finally { + optional($lock)->forceRelease(); + } } /** diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 71cc71abe1..a873520e1e 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -374,18 +374,23 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf */ public function removeObsoleteConnections(): PromiseInterface { - $this->lock()->get(function () { - $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) - ->then(function ($connections) { - foreach ($connections as $socketId => $appId) { - $connection = $this->fakeConnectionForApp($appId, $socketId); - - $this->unsubscribeFromAllChannels($connection); - } - }); - }); + $lock = $this->lock(); + try{ + $lock->get(function () { + $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($connections) { + foreach ($connections as $socketId => $appId) { + $connection = $this->fakeConnectionForApp($appId, $socketId); + + $this->unsubscribeFromAllChannels($connection); + } + }); + }); - return parent::removeObsoleteConnections(); + return parent::removeObsoleteConnections(); + } finally { + optional($lock)->forceRelease(); + } } /** From 7bc0957b11eaaa9a7ebff408ed9612177f5325f3 Mon Sep 17 00:00:00 2001 From: "Melvin D. Protacio" Date: Tue, 8 Jun 2021 10:40:14 +0800 Subject: [PATCH 32/32] space after the try brace --- src/ChannelManagers/LocalChannelManager.php | 2 +- src/ChannelManagers/RedisChannelManager.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index 54e9ef1fe3..642f9ea968 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -440,7 +440,7 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf public function removeObsoleteConnections(): PromiseInterface { $lock = $this->lock(); - try{ + try { if (! $lock->acquire()) { return Helpers::createFulfilledPromise(false); } diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index a873520e1e..aed05c0a46 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -375,7 +375,7 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf public function removeObsoleteConnections(): PromiseInterface { $lock = $this->lock(); - try{ + try { $lock->get(function () { $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) ->then(function ($connections) {