Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 5 additions & 19 deletions src/Api/ErrorParser/AbstractErrorParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,6 @@ abstract protected function payload(
StructureShape $member
);

protected function extractPayload(
StructureShape $member,
ResponseInterface $response
) {
if ($member instanceof StructureShape) {
// Structure members parse top-level data into a specific key.
return $this->payload($response, $member);
} else {
// Streaming data is just the stream from the response body.
return $response->getBody();
}
}

protected function populateShape(
array &$data,
ResponseInterface $response,
Expand All @@ -57,16 +44,15 @@ protected function populateShape(
if (!empty($data['code'])) {

$errors = $this->api->getOperation($command->getName())->getErrors();
foreach ($errors as $key => $error) {
foreach ($errors as $error) {

// If error code matches a known error shape, populate the body
if ($this->errorCodeMatches($data, $error)) {
$modeledError = $error;
$data['body'] = $this->extractPayload(
$modeledError,
$response
$data['body'] = $this->payload(
$response,
$error
);
$data['error_shape'] = $modeledError;
$data['error_shape'] = $error;

foreach ($error->getMembers() as $name => $member) {
switch ($member['location']) {
Expand Down
20 changes: 15 additions & 5 deletions src/Api/ErrorParser/JsonParserTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ private function genericHandler(ResponseInterface $response): array

$parsedBody = null;
$body = $response->getBody();
if (!$body->isSeekable() || $body->getSize()) {
$parsedBody = $this->parseJson((string) $body, $response);
if ($body->isSeekable()) {
$body->rewind();
}

$rawBody = $body->getContents();
if (!empty($rawBody)) {
$parsedBody = $this->parseJson($rawBody, $response);
}

// Parse error code from response body
Expand Down Expand Up @@ -133,10 +138,15 @@ protected function payload(
StructureShape $member
) {
$body = $response->getBody();
if (!$body->isSeekable() || $body->getSize()) {
$jsonBody = $this->parseJson($body, $response);
if ($body->isSeekable()) {
$body->rewind();
}

$rawBody = $body->getContents();
if (!empty($rawBody)) {
$jsonBody = $this->parseJson($rawBody, $response);
} else {
$jsonBody = (string) $body;
$jsonBody = $rawBody;
}

return $this->parser->parse($member, $jsonBody);
Expand Down
11 changes: 11 additions & 0 deletions src/Api/ErrorParser/JsonRpcErrorParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use Aws\Api\Parser\JsonParser;
use Aws\Api\Service;
use Aws\CommandInterface;
use GuzzleHttp\Psr7\CachingStream;
use GuzzleHttp\Psr7\Response;
use Psr\Http\Message\ResponseInterface;

/**
Expand All @@ -25,6 +27,15 @@ public function __invoke(
ResponseInterface $response,
?CommandInterface $command = null
) {
if (!$response->getBody()->isSeekable()) {
$response = new Response(
$response->getStatusCode(),
$response->getHeaders(),
new CachingStream($response->getBody()),
$response->getProtocolVersion(),
$response->getReasonPhrase()
);
}
$data = $this->genericHandler($response);

// Make the casing consistent across services.
Expand Down
14 changes: 13 additions & 1 deletion src/Api/ErrorParser/XmlErrorParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,24 @@ protected function payload(
ResponseInterface $response,
StructureShape $member
) {
$xmlBody = $this->parseXml($response->getBody(), $response);
$body = $response->getBody();
if ($body->isSeekable()) {
$body->rewind();
}

$rawBody = $body->getContents();
if (empty($rawBody)) {
return $rawBody;
}

$xmlBody = $this->parseXml($rawBody, $response);
$prefix = $this->registerNamespacePrefix($xmlBody);
$errorBody = $xmlBody->xpath("//{$prefix}Error");

if (is_array($errorBody) && !empty($errorBody[0])) {
return $this->parser->parse($member, $errorBody[0]);
}

return $rawBody;
}
}
2 changes: 1 addition & 1 deletion src/Api/Operation.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public function getOutput()
/**
* Get an array of operation error shapes.
*
* @return Shape[]
* @return StructureShape[]
*/
public function getErrors()
{
Expand Down
62 changes: 50 additions & 12 deletions src/Api/Parser/AbstractRestParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
namespace Aws\Api\Parser;

use Aws\Api\DateTimeResult;
use Aws\Api\ResponseWrapper;
use Aws\Api\Shape;
use Aws\Api\StructureShape;
use Aws\Result;
use Aws\CommandInterface;
use GuzzleHttp\Psr7\CachingStream;
use GuzzleHttp\Psr7\Response;
use Psr\Http\Message\ResponseInterface;

/**
Expand Down Expand Up @@ -39,6 +42,30 @@ public function __invoke(

if ($payload = $output['payload']) {
$this->extractPayload($payload, $output, $response, $result);
} else {
$body = $response->getBody();
if (!$body->isSeekable()) {
$response = new Response(
$response->getStatusCode(),
$response->getHeaders(),
new CachingStream($body),
$response->getProtocolVersion(),
$response->getReasonPhrase()
);
}

$body = $response->getBody();
if ($body->isSeekable()) {
$body->rewind();
}

$rawBody = $body->getContents();
if (!empty($rawBody)
&& count($output->getMembers()) > 0
) {
// if no payload was found, then parse the contents of the body
$this->payload($response, $output, $result);
}
}

foreach ($output->getMembers() as $name => $member) {
Expand All @@ -55,15 +82,6 @@ public function __invoke(
}
}

$body = $response->getBody();
if (!$payload
&& (!$body->isSeekable() || $body->getSize())
&& count($output->getMembers()) > 0
) {
// if no payload was found, then parse the contents of the body
$this->payload($response, $output, $result);
}

return new Result($result);
}

Expand All @@ -75,17 +93,37 @@ private function extractPayload(
) {
$member = $output->getMember($payload);
$body = $response->getBody();

if (!empty($member['eventstream'])) {
$result[$payload] = new EventParsingIterator(
$body,
$member,
$this
);
} elseif ($member instanceof StructureShape) {

return;
}

if (!$body->isSeekable()) {
$response = new Response(
$response->getStatusCode(),
$response->getHeaders(),
new CachingStream($response->getBody()),
$response->getProtocolVersion(),
$response->getReasonPhrase()
);
}

if ($member instanceof StructureShape) {
//Unions must have at least one member set to a non-null value
// If the body is empty, we can assume it is unset
if (!empty($member['union']) && ($body->isSeekable() && !$body->getSize())) {
$body = $response->getBody();
if ($body->isSeekable()) {
$body->rewind();
}

$rawBody = $body->getContents();
if (!empty($member['union'])
&& empty($rawBody)) {
return;
}

Expand Down
13 changes: 9 additions & 4 deletions src/Api/Parser/JsonRpcParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,16 @@ private function parseResponse(ResponseInterface $response, Operation $operation
}
}

$body = $response->getBody();
if ($body->isSeekable()) {
$body->rewind();
}

$result = $this->parseMemberFromStream(
$response->getBody(),
$operation->getOutput(),
$response
);
$body,
$operation->getOutput(),
$response
);

return new Result(is_null($result) ? [] : $result);
}
Expand Down
6 changes: 4 additions & 2 deletions src/Api/Parser/QueryParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public function __invoke(
ResponseInterface $response
) {
$output = $this->api->getOperation($command->getName())->getOutput();
$body = $response->getBody();
$xml = !$body->isSeekable() || $body->getSize()
// Read the full payload, even in non-seekable streams
$body = $response->getBody()->getContents();
// Just parse when the body is not empty
$xml = !empty($body)
? $this->parseXml($body, $response)
: null;

Expand Down
14 changes: 9 additions & 5 deletions src/Api/Parser/RestJsonParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ protected function payload(
StructureShape $member,
array &$result
) {
$responseBody = (string) $response->getBody();
$body = $response->getBody();
if ($body->isSeekable()) {
$body->rewind();
}

$rawBody = $body->getContents();

// Parse JSON if we have content
$parsedJson = null;
if (!empty($responseBody)) {
$parsedJson = $this->parseJson($responseBody, $response);
if (!empty($rawBody)) {
$parsedJson = $this->parseJson($rawBody, $response);
} else {
// An empty response body should be deserialized as null
$result = $parsedJson;
$result = null;
return;
}

Expand Down
7 changes: 6 additions & 1 deletion src/Api/Parser/RestXmlParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ protected function payload(
StructureShape $member,
array &$result
) {
$result += $this->parseMemberFromStream($response->getBody(), $member, $response);
$body = $response->getBody();
if ($body->isSeekable()) {
$body->rewind();
}

$result += $this->parseMemberFromStream($body, $member, $response);
}

public function parseMemberFromStream(
Expand Down
28 changes: 28 additions & 0 deletions src/AwsClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ public function __construct(array $args)
$args['with_resolved']($config);
}
$this->addUserAgentMiddleware($config);
$this->addEventStreamHttpFlagMiddleware();
}

public function getHandlerList()
Expand Down Expand Up @@ -643,6 +644,33 @@ private function addUserAgentMiddleware($args)
);
}

/**
* Enables streaming the response by using the stream flag.
*
* @return void
*/
private function addEventStreamHttpFlagMiddleware(): void
{
$this->getHandlerList()
-> appendInit(
function (callable $handler) {
return function (CommandInterface $command, $request = null) use ($handler) {
$operation = $this->getApi()->getOperation($command->getName());
$output = $operation->getOutput();
foreach ($output->getMembers() as $memberProps) {
if (!empty($memberProps['eventstream'])) {
$command['@http']['stream'] = true;
break;
}
}

return $handler($command, $request);
};
},
'event-streaming-flag-middleware'
);
}

/**
* Retrieves client context param definition from service model,
* creates mapping of client context param names with client-provided
Expand Down
26 changes: 0 additions & 26 deletions src/CloudWatchLogs/CloudWatchLogsClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -192,36 +192,10 @@
* @method \GuzzleHttp\Promise\Promise updateLogAnomalyDetectorAsync(array $args = [])
*/
class CloudWatchLogsClient extends AwsClient {
static $streamingCommands = [
'StartLiveTail' => true
];

public function __construct(array $args)
{
parent::__construct($args);
$this->addStreamingFlagMiddleware();
}

private function addStreamingFlagMiddleware()
{
$this->getHandlerList()
-> appendInit(
$this->getStreamingFlagMiddleware(),
'streaming-flag-middleware'
);
}

private function getStreamingFlagMiddleware(): callable
{
return function (callable $handler) {
return function (CommandInterface $command, $request = null) use ($handler) {
if (!empty(self::$streamingCommands[$command->getName()])) {
$command['@http']['stream'] = true;
}

return $handler($command, $request);
};
};
}

/**
Expand Down
Loading