Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update PersistentConnection class with onReceive callback method (Clo…
…sure) to listen to
  • Loading branch information
Sebastix committed May 27, 2025
commit a6937dd33210e3c12f3d236a05fe62e4f9cc803f
104 changes: 98 additions & 6 deletions src/Request/PersistentConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,21 @@ class PersistentConnection
*
* @var array
*/
protected array $responses;
protected array $responses = [];

/**
* Array of callback functions (closures) to be called when messages are received.
*
* @var callable[]
*/
protected array $messageCallbacks = [];

/**
* Whether to print received messages to stdout.
*
* @var bool
*/
protected bool $printMessages = false;

/**
* https://github.com/sirn-se/websocket-php/blob/v3.4-main/docs/Client.md#persistent-connection
Expand All @@ -62,6 +76,32 @@ public function __construct(Relay|RelaySet $relay, MessageInterface $message)
$this->setPersistent();
}

/**
* Adds a callback function to be called when messages are received.
*
* The callback will receive the RelayResponse object as its parameter.
*
* @param callable $callback (Closure) function to be called with the RelayResponse
* @return self
*/
public function onReceive(callable $callback): self
{
$this->messageCallbacks[] = $callback;
return $this;
}

/**
* Enable or disable printing messages to stdout.
*
* @param bool $enable Whether to print messages
* @return self
*/
public function setPrintMessages(bool $enable = true): self
{
$this->printMessages = $enable;
return $this;
}

/**
* For transmitting messages between the client and relay.
*
Expand All @@ -73,10 +113,17 @@ public function transmit(): array
try {
$this->websocketClient->text($this->payload);
$this->websocketClient->onText(function (Client $client, Connection $connection, Text $message) {
$this->responses[] = RelayResponse::create(json_decode($message->getContent()));
$res = end($this->responses);
if (isset($res->event->content)) {
print $res->event->content . PHP_EOL;
$relayResponse = RelayResponse::create(json_decode($message->getContent()));
$this->responses[] = $relayResponse;

// Print message if enabled
if ($this->printMessages && isset($relayResponse->event->content)) {
print $relayResponse->event->content . PHP_EOL;
}

// Call all registered callbacks
foreach ($this->messageCallbacks as $callback) {
$callback($relayResponse);
}
})->start();
} catch (\Exception $e) {
Expand All @@ -87,6 +134,17 @@ public function transmit(): array
return $this->responses;
}

/**
* Clear all registered message callbacks.
*
* @return self
*/
public function clearCallbacks(): self
{
$this->messageCallbacks = [];
return $this;
}

private function setPersistent(bool $persistent = true): void
{
$this->persistent = $persistent;
Expand All @@ -97,4 +155,38 @@ private function isPersistent(): bool
return $this->persistent;
}

}
/**
* Pause the socket connection receiving messages.
*
* @return void
*/
public function pause(): void {
if ($this->websocketClient->isRunning()) {
$this->websocketClient->stop();
}
}

/**
* Resume paused socket connection to start receiving messages again.
*
* @return void
* @throws \Throwable
*/
public function resume(): void {
if (!$this->websocketClient->isRunning()) {
$this->websocketClient->start();
}
}

/**
* Disconnect and close the socket connection.
*
* @return void
*/
public function close(): void {
if ($this->websocketClient->isConnected()) {
$this->websocketClient->disconnect();
}
$this->websocketClient->close();
}
}
Loading