diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 8f4f18e..231d070 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,5 +1,15 @@ - + tests diff --git a/src/Examples/request-events-with-persistent-connection.php b/src/Examples/request-events-with-persistent-connection.php new file mode 100644 index 0000000..5408feb --- /dev/null +++ b/src/Examples/request-events-with-persistent-connection.php @@ -0,0 +1,62 @@ +getId(); + + // Create filter for text notes + $filter = new Filter(); + $filter->setKinds([1]); // kind 1 = text note + $filter->setLimit(25); + + // Create request message + $requestMessage = new RequestMessage($subscriptionId, [$filter]); + + // Create persistent connection. + $connection = new PersistentConnection($relay, $requestMessage); + + $startTime = time(); + $timeoutSeconds = 5; + + // Set callback to handle received events. + $connection->onReceive(function ($response) use ($startTime, $timeoutSeconds, $connection) { + if (isset($response->event->content)) { + $content = $response->event->content; + $timestamp = date('Y-m-d H:i:s', $response->event->created_at); + // Output received content from event. + print "[$timestamp] New event (note) received:" . PHP_EOL; + print $content . PHP_EOL; + + // Timeout limit reached, so we close the connection and exit the script here. + if (time() - $startTime >= $timeoutSeconds) { + print PHP_EOL; + print "Reached timeout of {$timeoutSeconds} seconds, closing connection..." . PHP_EOL; + $connection->close(); + exit(0); + } + } + }); + + // Disable automatic printing since we're handling it in callbacks. + // Could be handy when debugging. + $connection->setPrintMessages(false); + + // Now start transmitting and new received events will be printed. + print "Starting to listen for messages..." . PHP_EOL; + $connection->transmit(); + +} catch (Exception $e) { + print 'Exception error: ' . $e->getMessage() . PHP_EOL; +} diff --git a/src/Request/PersistentConnection.php b/src/Request/PersistentConnection.php new file mode 100644 index 0000000..1ad670a --- /dev/null +++ b/src/Request/PersistentConnection.php @@ -0,0 +1,198 @@ +payload = $message->generate(); + $this->websocketClient = $relay->getClient(); + $this->setPersistent(); + } + + /** + * Adds a callback function to be called when messages are received. + * + * The callback will receive the RelayResponse object as its parameter. + * + * @param callable $callback (Closure) function to be called with the RelayResponse + * @return self + */ + public function onReceive(callable $callback): self + { + $this->messageCallbacks[] = $callback; + return $this; + } + + /** + * Enable or disable printing messages to stdout. + * + * @param bool $enable Whether to print messages + * @return self + */ + public function setPrintMessages(bool $enable = true): self + { + $this->printMessages = $enable; + return $this; + } + + /** + * For transmitting messages between the client and relay. + * + * @return array + */ + public function transmit(): array + { + if (!$this->websocketClient->isConnected()) { + $this->websocketClient->connect(); + } + $this->websocketClient->setPersistent($this->isPersistent()); + try { + $this->websocketClient->text($this->payload); + $this->websocketClient->onText(function (Client $client, Connection $connection, Text $message) { + $relayResponse = RelayResponse::create(json_decode($message->getContent())); + $this->responses[] = $relayResponse; + + // Print message if enabled + if ($this->printMessages && isset($relayResponse->event->content)) { + print $relayResponse->event->content . PHP_EOL; + } + + // Call all registered callbacks + foreach ($this->messageCallbacks as $callback) { + $callback($relayResponse); + } + })->start(); + } catch (\Exception $e) { + throw new \RuntimeException($e->getMessage()); + } catch (\Throwable $e) { + throw new \RuntimeException($e->getMessage()); + } + return $this->responses; + } + + /** + * Clear all registered message callbacks. + * + * @return self + */ + public function clearCallbacks(): self + { + $this->messageCallbacks = []; + return $this; + } + + private function setPersistent(bool $persistent = true): void + { + $this->persistent = $persistent; + } + + private function isPersistent(): bool + { + return $this->persistent; + } + + /** + * Pause the socket connection receiving messages. + * + * @return void + */ + public function pause(): void + { + if ($this->websocketClient->isRunning()) { + $this->websocketClient->stop(); + } + } + + /** + * Resume paused socket connection to start receiving messages again. + * + * @return void + * @throws \Throwable + */ + public function resume(): void + { + if (!$this->websocketClient->isRunning()) { + $this->websocketClient->start(); + } + } + + /** + * Disconnect and close the socket connection. + * + * @return void + */ + public function close(): void + { + if ($this->websocketClient->isConnected()) { + $this->websocketClient->disconnect(); + } + $this->websocketClient->close(); + } +} diff --git a/tests/PersistentConnectionTest.php b/tests/PersistentConnectionTest.php new file mode 100644 index 0000000..e1f09af --- /dev/null +++ b/tests/PersistentConnectionTest.php @@ -0,0 +1,98 @@ +startTime = time(); + + // Set up basic test components + $this->relay = new Relay(self::TEST_RELAY_URL); + + $subscription = new Subscription(); + $filter = new Filter(); + $filter->setKinds([1]); + $filter->setLimit(1); + + $requestMessage = new RequestMessage($subscription->getId(), [$filter]); + $this->connection = new PersistentConnection($this->relay, $requestMessage); + } + + public function testMessageCallbacksAreInvoked(): void + { + $callbackInvoked = false; + + // Set up a test callback + $this->connection->onReceive(function ($response) use (&$callbackInvoked) { + $callbackInvoked = true; + $this->assertInstanceOf(RelayResponse::class, $response); + $this->assertTrue($this->relay->getClient()->isConnected()); + $this->assertTrue($this->relay->getClient()->isRunning()); + // Disconnect and close after 3 seconds + if (time() - $this->startTime >= $this->timeoutSeconds) { + $this->connection->close(); + } + }); + + $this->connection->transmit(); + + $this->assertTrue($callbackInvoked, 'Callback should have been invoked'); + } + + public function testClearCallbacks(): void + { + $callbackInvoked = false; + + $this->connection->onReceive(function ($response) use (&$callbackInvoked) { + $callbackInvoked = true; + $this->assertFalse($this->relay->getClient()->isConnected()); + $this->assertFalse($this->relay->getClient()->isRunning()); + $this->assertInstanceOf(RelayResponse::class, $response); + // Disconnect and close after 3 seconds + if (time() - $this->startTime >= $this->timeoutSeconds) { + $this->connection->close(); + } + }); + + $this->connection->clearCallbacks(); + + $this->assertFalse($callbackInvoked, 'Callback should not have been invoked after clearing'); + } + + public function testPrintMessagesFlag(): void + { + ob_start(); + $this->connection->onReceive(function ($response) { + // Disconnect and close after 3 seconds + print $this->relay->getClient()->isConnected(); + if (time() - $this->startTime >= $this->timeoutSeconds) { + $this->connection->close(); + } + }); + + $this->connection->setPrintMessages(true); + $this->connection->transmit(); + + $output = ob_get_clean(); + $this->assertIsString($output); + $this->assertNotEmpty($output); + } +}